spark/sparksql.py

87 lines
2.8 KiB
Python

import pymysql
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.streaming import DataStreamWriter
def write_to_mysql(batch_df, batch_id):
# 将 DataFrame 转换为 Pandas DataFrame
pandas_df = batch_df.toPandas()
# 连接 MySQL 数据库
connection = pymysql.connect(
host='43.140.205.103',
user='kaku',
password='p4J7fY8mc6hcZfjG',
database='kaku',
charset='utf8mb4' # 设置为 utf8mb4 编码,以支持所有 Unicode 字符
)
cursor = connection.cursor()
# 插入数据到 MySQL
for _, row in pandas_df.iterrows():
order_name = row['order_name']
status = row['status']
count = row['count']
# 插入语句
sql = """
INSERT INTO order_name_YN (order_name, status, count)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE count = count + VALUES(count)
"""
cursor.execute("SET NAMES 'utf8mb4';")
cursor.execute(sql, (order_name, status, count))
# 提交事务并关闭连接
connection.commit()
cursor.close()
connection.close()
if __name__ == '__main__':
# 1- 创建 SparkSession
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 1) \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1") \
.appName('ss_kafka_push_to_mysql') \
.master('local[*]') \
.getOrCreate()
# 2- 读取 Kafka 数据流
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "niit-node2:9092") \
.option("subscribePattern", "orders") \
.load()
# 3- 解析数据并使用英文列名
parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \
.withColumn("order_id", F.split(F.col("value"), "\t")[0]) \
.withColumn("order_type", F.split(F.col("value"), "\t")[1]) \
.withColumn("order_name", F.split(F.col("value"), "\t")[2]) \
.withColumn("order_quantity", F.split(F.col("value"), "\t")[3]) \
.withColumn("status", F.split(F.col("value"), "\t")[4]) \
.drop("value")
# 4- 定义时间窗口,并使用窗口聚合
windowed_stream = parsed_stream \
.groupBy(
F.window(parsed_stream.timestamp, "2 seconds"), # 2分钟的时间窗口
parsed_stream.order_name,
parsed_stream.status
) \
.agg(F.count("*").alias("count"))
# 5- 使用 foreachBatch 将数据推送到 MySQL
windowed_stream.writeStream \
.foreachBatch(write_to_mysql) \
.outputMode("update") \
.trigger(processingTime="2 seconds") \
.start()
# 等待流任务结束
spark.streams.awaitAnyTermination()