66 lines
2.6 KiB
Python
66 lines
2.6 KiB
Python
from pyspark.sql import SparkSession
|
||
from pyspark.sql.functions import col, split, window
|
||
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
|
||
|
||
if __name__ == '__main__':
|
||
spark = SparkSession.builder.appName("StreamingApp")\
|
||
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\
|
||
.getOrCreate()
|
||
|
||
# 配置Kafka的相关信息
|
||
kafka_broker = "niit-node2:9092" # Kafka broker地址
|
||
kafka_topic = "orders" # Kafka中的主题名称
|
||
|
||
# 设置从Kafka读取数据
|
||
kafka_stream_df = spark.readStream \
|
||
.format("kafka") \
|
||
.option("kafka.bootstrap.servers", kafka_broker) \
|
||
.option("subscribe", kafka_topic) \
|
||
.load()
|
||
|
||
# 假设Kafka消息的value字段为字符串,类型是byte类型,先将其转化为字符串
|
||
orders_df = kafka_stream_df.selectExpr("CAST(value AS STRING)")
|
||
|
||
# 定义数据的schema,这里使用tab分隔符来分割
|
||
schema = StructType([
|
||
StructField("order_id", StringType(), True),
|
||
StructField("order_type", StringType(), True),
|
||
StructField("order_name", StringType(), True),
|
||
StructField("order_quantity", IntegerType(), True),
|
||
StructField("date", StringType(), True),
|
||
StructField("is_valid", StringType(), True) # Y为有效, N为无效
|
||
])
|
||
|
||
# 解析数据,按照\t进行切割
|
||
orders_parsed_df = orders_df.select(
|
||
split(orders_df['value'], '\t').alias('cols')
|
||
).select(
|
||
col('cols')[0].alias('order_id'),
|
||
col('cols')[1].alias('order_type'),
|
||
col('cols')[2].alias('order_name'),
|
||
col('cols')[3].cast(IntegerType()).alias('order_quantity'),
|
||
col('cols')[4].alias('date'),
|
||
col('cols')[5].alias('is_valid')
|
||
)
|
||
|
||
# 过滤出有效和无效的订单
|
||
valid_orders_df = orders_parsed_df.filter(orders_parsed_df['is_valid'] == 'Y')
|
||
invalid_orders_df = orders_parsed_df.filter(orders_parsed_df['is_valid'] == 'N')
|
||
|
||
# 每隔2秒统计一次有效和无效订单的总和
|
||
valid_order_count_df = valid_orders_df.groupBy(window(valid_orders_df.date, "2 seconds")).count()
|
||
invalid_order_count_df = invalid_orders_df.groupBy(window(invalid_orders_df.date, "2 seconds")).count()
|
||
|
||
# 合并有效和无效订单统计结果
|
||
order_count_df = valid_order_count_df.union(invalid_order_count_df)
|
||
|
||
# 输出结果到控制台
|
||
query = order_count_df.writeStream \
|
||
.outputMode("update") \
|
||
.format("console") \
|
||
.option("truncate", "false") \
|
||
.start()
|
||
|
||
# 等待终止
|
||
query.awaitTermination()
|