import pymysql from pyspark.sql import SparkSession import pyspark.sql.functions as F # 将结果写入 MySQL 的函数 def write_to_mysql(batch_df, batch_id): # 将 DataFrame 转换为 Pandas DataFrame pandas_df = batch_df.toPandas() # 连接 MySQL 数据库 connection = pymysql.connect( host='43.140.205.103', user='kaku', password='p4J7fY8mc6hcZfjG', database='kaku' ) cursor = connection.cursor() # 插入数据到 MySQL for _, row in pandas_df.iterrows(): order_id = row['order_id'] order_type = row['order_type'] order_name = row['order_name'] status = row['status'] count = row['count'] # 插入语句 sql = """ INSERT INTO orders_table (order_id, order_type, order_name, status, count) VALUES (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE count = count + VALUES(count) """ cursor.execute(sql, (order_id, order_type, order_name, status, count)) # 提交事务并关闭连接 connection.commit() cursor.close() connection.close() if __name__ == '__main__': # 1- 创建 SparkSession spark = SparkSession.builder \ .config("spark.sql.shuffle.partitions", 1) \ .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\ .appName('WRDD_SQL') \ .master('local[*]') \ .getOrCreate() # 2- 读取 Kafka 数据流 kafka_stream = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "niit-node2:9092") \ .option("subscribePattern", "orders") \ .load() # 3- 解析 Kafka 数据 parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \ .withColumn("order_id", F.split(F.col("value"), "\t")[0]) \ .withColumn("order_name", F.split(F.col("value"), "\t")[1]) \ .withColumn("course", F.split(F.col("value"), "\t")[2]) \ .withColumn("order_type", F.split(F.col("value"), "\t")[3]) \ .withColumn("status", F.split(F.col("value"), "\t")[4]) \ .drop("value") # 4- 数据统计 定义时间窗口 # 按照 order_id, order_type, order_name, status 分组并统计数量 attendance_counts = parsed_stream \ .groupBy( F.window(parsed_stream.timestamp, "2 seconds"), parsed_stream.order_id, parsed_stream.order_type, parsed_stream.order_name, parsed_stream.status ) \ .count() # 5- 使用 foreachBatch 将数据推送到 MySQL attendance_counts.writeStream \ .foreachBatch(write_to_mysql) \ .outputMode("update") \ .trigger(processingTime="2 seconds") \ .start() # 等待流任务结束 spark.streams.awaitAnyTermination()