From c733c25385396cda1162f3994a202778fdbb1867 Mon Sep 17 00:00:00 2001 From: konner <2046077688@qq.com> Date: Fri, 3 Jan 2025 15:17:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka_sparkStreaming_mysql.scala | 37 ++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 kafka_sparkStreaming_mysql.scala diff --git a/kafka_sparkStreaming_mysql.scala b/kafka_sparkStreaming_mysql.scala new file mode 100644 index 0000000..1be269d --- /dev/null +++ b/kafka_sparkStreaming_mysql.scala @@ -0,0 +1,37 @@ +package com.bigdate + +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.streaming.kafka010.KafkaUtils +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe +/** + * @ Author 李烁升 + * @ DATE 2024 12 05 15 36 + * */ +object kafka_sparkStreaming_mysql { + # 1- 创建 SparkSession + spark = SparkSession.builder \ + .config("spark.sql.shuffle.partitions", 1) \ + .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\ + .appName('kafka_stream') \ + .master('local[*]') \ + .getOrCreate() + + # 2- 读取 Kafka 数据流 + kafka_stream = spark.readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "niit-node2:9092") \ + .option("subscribePattern", "orders") \ + .load() + + # 3- 解析数据 + parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \ + .withColumn("order_id", F.split(F.col("value"), "\t")[0]) \ + .withColumn("order_type", F.split(F.col("value"), "\t")[1]) \ + .withColumn("order_name", F.split(F.col("value"), "\t")[2]) \ + .withColumn("order_quantity", F.split(F.col("value"), "\t")[3]) \ + .withColumn("status", F.split(F.col("value"), "\t")[4]) \ + .drop("value") +}