38 lines
1.4 KiB
Scala
38 lines
1.4 KiB
Scala
package com.bigdate
|
|
|
|
import org.apache.kafka.common.serialization.StringDeserializer
|
|
import org.apache.spark.streaming.kafka010.KafkaUtils
|
|
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
|
|
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
|
import org.apache.spark.{SparkConf, SparkContext}
|
|
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
|
|
/**
|
|
* @ Author 李烁升
|
|
* @ DATE 2024 12 05 15 36
|
|
* */
|
|
object kafka_sparkStreaming_mysql {
|
|
# 1- 创建 SparkSession
|
|
spark = SparkSession.builder \
|
|
.config("spark.sql.shuffle.partitions", 1) \
|
|
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\
|
|
.appName('kafka_stream') \
|
|
.master('local[*]') \
|
|
.getOrCreate()
|
|
|
|
# 2- 读取 Kafka 数据流
|
|
kafka_stream = spark.readStream \
|
|
.format("kafka") \
|
|
.option("kafka.bootstrap.servers", "niit-node2:9092") \
|
|
.option("subscribePattern", "orders") \
|
|
.load()
|
|
|
|
# 3- 解析数据
|
|
parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \
|
|
.withColumn("order_id", F.split(F.col("value"), "\t")[0]) \
|
|
.withColumn("order_type", F.split(F.col("value"), "\t")[1]) \
|
|
.withColumn("order_name", F.split(F.col("value"), "\t")[2]) \
|
|
.withColumn("order_quantity", F.split(F.col("value"), "\t")[3]) \
|
|
.withColumn("status", F.split(F.col("value"), "\t")[4]) \
|
|
.drop("value")
|
|
}
|