spark/testkafka.py
2025-01-02 17:32:13 +08:00

66 lines
2.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
if __name__ == '__main__':
spark = SparkSession.builder.appName("StreamingApp")\
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\
.getOrCreate()
# 配置Kafka的相关信息
kafka_broker = "niit-node2:9092" # Kafka broker地址
kafka_topic = "orders" # Kafka中的主题名称
# 设置从Kafka读取数据
kafka_stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("subscribe", kafka_topic) \
.load()
# 假设Kafka消息的value字段为字符串类型是byte类型先将其转化为字符串
orders_df = kafka_stream_df.selectExpr("CAST(value AS STRING)")
# 定义数据的schema这里使用tab分隔符来分割
schema = StructType([
StructField("order_id", StringType(), True),
StructField("order_type", StringType(), True),
StructField("order_name", StringType(), True),
StructField("order_quantity", IntegerType(), True),
StructField("date", StringType(), True),
StructField("is_valid", StringType(), True) # Y为有效, N为无效
])
# 解析数据,按照\t进行切割
orders_parsed_df = orders_df.select(
split(orders_df['value'], '\t').alias('cols')
).select(
col('cols')[0].alias('order_id'),
col('cols')[1].alias('order_type'),
col('cols')[2].alias('order_name'),
col('cols')[3].cast(IntegerType()).alias('order_quantity'),
col('cols')[4].alias('date'),
col('cols')[5].alias('is_valid')
)
# 过滤出有效和无效的订单
valid_orders_df = orders_parsed_df.filter(orders_parsed_df['is_valid'] == 'Y')
invalid_orders_df = orders_parsed_df.filter(orders_parsed_df['is_valid'] == 'N')
# 每隔2秒统计一次有效和无效订单的总和
valid_order_count_df = valid_orders_df.groupBy(window(valid_orders_df.date, "2 seconds")).count()
invalid_order_count_df = invalid_orders_df.groupBy(window(invalid_orders_df.date, "2 seconds")).count()
# 合并有效和无效订单统计结果
order_count_df = valid_order_count_df.union(invalid_order_count_df)
# 输出结果到控制台
query = order_count_df.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
# 等待终止
query.awaitTermination()