From c7ed60fd3b86a8933ed0365f46859775d1f84ad2 Mon Sep 17 00:00:00 2001 From: AiLe <1449854570@qq.com> Date: Fri, 3 Jan 2025 09:41:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=AARDD=E7=9A=84=E7=89=88=E6=9C=ACDF?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E7=9A=84=E5=90=84=E8=AE=A2=E5=8D=95=E5=92=8C?= =?UTF-8?q?=E5=90=84=E5=95=86=E5=93=81=E6=80=BB=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- RDD.py | 86 +++++++++++++++++++++++++++++++++++++++++++++++++++++ sparksql.py | 2 +- 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 RDD.py diff --git a/RDD.py b/RDD.py new file mode 100644 index 0000000..95db847 --- /dev/null +++ b/RDD.py @@ -0,0 +1,86 @@ +import pymysql +from pyspark.sql import SparkSession +import pyspark.sql.functions as F + +# 将结果写入 MySQL 的函数 +def write_to_mysql(batch_df, batch_id): + # 将 DataFrame 转换为 Pandas DataFrame + pandas_df = batch_df.toPandas() + + # 连接 MySQL 数据库 + connection = pymysql.connect( + host='43.140.205.103', + user='kaku', + password='p4J7fY8mc6hcZfjG', + database='kaku' + ) + + cursor = connection.cursor() + + # 插入数据到 MySQL + for _, row in pandas_df.iterrows(): + order_id = row['order_id'] + order_type = row['order_type'] + order_name = row['order_name'] + status = row['status'] + count = row['count'] + + # 插入语句 + sql = """ + INSERT INTO orders_table (order_id, order_type, order_name, status, count) + VALUES (%s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE count = count + VALUES(count) + """ + cursor.execute(sql, (order_id, order_type, order_name, status, count)) + + # 提交事务并关闭连接 + connection.commit() + cursor.close() + connection.close() + +if __name__ == '__main__': + # 1- 创建 SparkSession + spark = SparkSession.builder \ + .config("spark.sql.shuffle.partitions", 1) \ + .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\ + .appName('WRDD_SQL') \ + .master('local[*]') \ + .getOrCreate() + + # 2- 读取 Kafka 数据流 + kafka_stream = spark.readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "niit-node2:9092") \ + .option("subscribePattern", "orders") \ + .load() + + # 3- 解析 Kafka 数据 + parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \ + .withColumn("order_id", F.split(F.col("value"), "\t")[0]) \ + .withColumn("order_name", F.split(F.col("value"), "\t")[1]) \ + .withColumn("course", F.split(F.col("value"), "\t")[2]) \ + .withColumn("order_type", F.split(F.col("value"), "\t")[3]) \ + .withColumn("status", F.split(F.col("value"), "\t")[4]) \ + .drop("value") + + # 4- 数据统计 定义时间窗口 + # 按照 order_id, order_type, order_name, status 分组并统计数量 + attendance_counts = parsed_stream \ + .groupBy( + F.window(parsed_stream.timestamp, "2 seconds"), + parsed_stream.order_id, + parsed_stream.order_type, + parsed_stream.order_name, + parsed_stream.status + ) \ + .count() + + # 5- 使用 foreachBatch 将数据推送到 MySQL + attendance_counts.writeStream \ + .foreachBatch(write_to_mysql) \ + .outputMode("update") \ + .trigger(processingTime="2 seconds") \ + .start() + + # 等待流任务结束 + spark.streams.awaitAnyTermination() diff --git a/sparksql.py b/sparksql.py index a737818..25b460a 100644 --- a/sparksql.py +++ b/sparksql.py @@ -46,7 +46,7 @@ if __name__ == '__main__': spark = SparkSession.builder \ .config("spark.sql.shuffle.partitions", 1) \ .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1") \ - .appName('ss_kafka_push_to_mysql') \ + .appName('conctroller_sql') \ .master('local[*]') \ .getOrCreate()