diff --git a/kafka_sparkStreaming_mysql.scala b/kafka_sparkStreaming_mysql.scala new file mode 100644 index 0000000..1be269d --- /dev/null +++ b/kafka_sparkStreaming_mysql.scala @@ -0,0 +1,37 @@ +package com.bigdate + +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.streaming.kafka010.KafkaUtils +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe +/** + * @ Author 李烁升 + * @ DATE 2024 12 05 15 36 + * */ +object kafka_sparkStreaming_mysql { + # 1- 创建 SparkSession + spark = SparkSession.builder \ + .config("spark.sql.shuffle.partitions", 1) \ + .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\ + .appName('kafka_stream') \ + .master('local[*]') \ + .getOrCreate() + + # 2- 读取 Kafka 数据流 + kafka_stream = spark.readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "niit-node2:9092") \ + .option("subscribePattern", "orders") \ + .load() + + # 3- 解析数据 + parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \ + .withColumn("order_id", F.split(F.col("value"), "\t")[0]) \ + .withColumn("order_type", F.split(F.col("value"), "\t")[1]) \ + .withColumn("order_name", F.split(F.col("value"), "\t")[2]) \ + .withColumn("order_quantity", F.split(F.col("value"), "\t")[3]) \ + .withColumn("status", F.split(F.col("value"), "\t")[4]) \ + .drop("value") +}