from pyspark.sql import SparkSession from pyspark.sql.functions import col, split, window from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.appName("StreamingApp")\ .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\ .getOrCreate() # 配置Kafka的相关信息 kafka_broker = "niit-node2:9092" # Kafka broker地址 kafka_topic = "orders" # Kafka中的主题名称 # 设置从Kafka读取数据 kafka_stream_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_broker) \ .option("subscribe", kafka_topic) \ .load() # 假设Kafka消息的value字段为字符串,类型是byte类型,先将其转化为字符串 orders_df = kafka_stream_df.selectExpr("CAST(value AS STRING)") # 定义数据的schema,这里使用tab分隔符来分割 schema = StructType([ StructField("order_id", StringType(), True), StructField("order_type", StringType(), True), StructField("order_name", StringType(), True), StructField("order_quantity", IntegerType(), True), StructField("date", StringType(), True), StructField("is_valid", StringType(), True) # Y为有效, N为无效 ]) # 解析数据,按照\t进行切割 orders_parsed_df = orders_df.select( split(orders_df['value'], '\t').alias('cols') ).select( col('cols')[0].alias('order_id'), col('cols')[1].alias('order_type'), col('cols')[2].alias('order_name'), col('cols')[3].cast(IntegerType()).alias('order_quantity'), col('cols')[4].alias('date'), col('cols')[5].alias('is_valid') ) # 过滤出有效和无效的订单 valid_orders_df = orders_parsed_df.filter(orders_parsed_df['is_valid'] == 'Y') invalid_orders_df = orders_parsed_df.filter(orders_parsed_df['is_valid'] == 'N') # 每隔2秒统计一次有效和无效订单的总和 valid_order_count_df = valid_orders_df.groupBy(window(valid_orders_df.date, "2 seconds")).count() invalid_order_count_df = invalid_orders_df.groupBy(window(invalid_orders_df.date, "2 seconds")).count() # 合并有效和无效订单统计结果 order_count_df = valid_order_count_df.union(invalid_order_count_df) # 输出结果到控制台 query = order_count_df.writeStream \ .outputMode("update") \ .format("console") \ .option("truncate", "false") \ .start() # 等待终止 query.awaitTermination()