package com.bigdate import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** * @ Author 李烁升 * @ DATE 2024 12 05 15 36 * */ object kafka_sparkStreaming_mysql { # 1- 创建 SparkSession spark = SparkSession.builder \ .config("spark.sql.shuffle.partitions", 1) \ .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\ .appName('kafka_stream') \ .master('local[*]') \ .getOrCreate() # 2- 读取 Kafka 数据流 kafka_stream = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "niit-node2:9092") \ .option("subscribePattern", "orders") \ .load() # 3- 解析数据 parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \ .withColumn("order_id", F.split(F.col("value"), "\t")[0]) \ .withColumn("order_type", F.split(F.col("value"), "\t")[1]) \ .withColumn("order_name", F.split(F.col("value"), "\t")[2]) \ .withColumn("order_quantity", F.split(F.col("value"), "\t")[3]) \ .withColumn("status", F.split(F.col("value"), "\t")[4]) \ .drop("value") }