90 lines
3.4 KiB
Python
90 lines
3.4 KiB
Python
import os
|
|
from pyspark.sql import SparkSession
|
|
import pyspark.sql.functions as F
|
|
|
|
# # os.environ['JAVA_HOME'] = 'C:\\Program Files\\Java\\jdk1.8.0_351'
|
|
# os.environ['HADOOP_HOME'] = 'D:\\CodeDevelopment\\DevelopmentEnvironment\\hadoop-2.8.1'
|
|
|
|
if __name__ == '__main__':
|
|
# 1- 创建 SparkSession
|
|
spark = SparkSession.builder \
|
|
.config("spark.sql.shuffle.partitions", 1) \
|
|
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\
|
|
.appName('kafka_stream') \
|
|
.master('local[*]') \
|
|
.getOrCreate()
|
|
|
|
# 2- 读取 Kafka 数据流
|
|
kafka_stream = spark.readStream \
|
|
.format("kafka") \
|
|
.option("kafka.bootstrap.servers", "niit-node2:9092") \
|
|
.option("subscribePattern", "orders") \
|
|
.load()
|
|
|
|
# 3- 解析数据
|
|
parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \
|
|
.withColumn("order_id", F.split(F.col("value"), "\t")[0]) \
|
|
.withColumn("order_type", F.split(F.col("value"), "\t")[1]) \
|
|
.withColumn("order_name", F.split(F.col("value"), "\t")[2]) \
|
|
.withColumn("order_quantity", F.split(F.col("value"), "\t")[3]) \
|
|
.withColumn("status", F.split(F.col("value"), "\t")[4]) \
|
|
.drop("value")
|
|
|
|
# 4- 使用时间窗口对数据进行分组聚合
|
|
# 4.1 有效订单数量总和
|
|
orders_summary = parsed_stream.groupBy(
|
|
F.window("timestamp", "10 seconds").alias("time_window"), # 2 秒窗口
|
|
"status"
|
|
).count() \
|
|
.withColumnRenamed("count", "count") \
|
|
.drop("time_window") # 移除时间窗口字段
|
|
|
|
# 4.2 各个商品类型的有效订单数量
|
|
eachOrders_summary = parsed_stream.groupBy(
|
|
F.window("timestamp", "10 seconds").alias("time_window"),
|
|
"order_id", "status"
|
|
).count() \
|
|
.withColumnRenamed("count", "count") \
|
|
.drop("time_window") # 移除时间窗口字段
|
|
|
|
# 4.3 所有课程的数量
|
|
order_name_count = parsed_stream.groupBy(
|
|
F.window("timestamp", "10 seconds").alias("time_window"),
|
|
"order_name"
|
|
).count() \
|
|
.withColumnRenamed("count", "order_name_count") \
|
|
.drop("time_window") # 移除时间窗口字段
|
|
|
|
# 5- 将每组统计结果写入 Kafka
|
|
def write_to_kafka(batch_df, batch_id, topic_name):
|
|
batch_df.selectExpr(
|
|
"cast(null as string) as key", # Kafka 的 key
|
|
"to_json(struct(*)) as value" # 将所有列转换为 JSON 格式
|
|
).write \
|
|
.format("kafka") \
|
|
.option("kafka.bootstrap.servers", "niit-node2:9092") \
|
|
.option("topic", topic_name) \
|
|
.save()
|
|
|
|
# 将每组统计结果写入 Kafka topic
|
|
orders_summary.writeStream \
|
|
.foreachBatch(lambda df, id: write_to_kafka(df, id, "orders_summary")) \
|
|
.outputMode("complete") \
|
|
.trigger(processingTime="10 seconds") \
|
|
.start()
|
|
|
|
eachOrders_summary.writeStream \
|
|
.foreachBatch(lambda df, id: write_to_kafka(df, id, "eachOrders_summary")) \
|
|
.outputMode("update") \
|
|
.trigger(processingTime="10 seconds") \
|
|
.start()
|
|
|
|
order_name_count.writeStream \
|
|
.foreachBatch(lambda df, id: write_to_kafka(df, id, "order_name_count")) \
|
|
.outputMode("update") \
|
|
.trigger(processingTime="10 seconds") \
|
|
.start()
|
|
|
|
# 等待流任务结束
|
|
spark.streams.awaitAnyTermination()
|