上传文件至 /

This commit is contained in:
konner 2025-01-03 15:17:26 +08:00
parent 1ae7dbfba7
commit c733c25385

View File

@ -0,0 +1,37 @@
package com.bigdate
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* @ Author 李烁升
* @ DATE 2024 12 05 15 36
* */
object kafka_sparkStreaming_mysql {
# 1- 创建 SparkSession
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 1) \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")\
.appName('kafka_stream') \
.master('local[*]') \
.getOrCreate()
# 2- 读取 Kafka 数据流
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "niit-node2:9092") \
.option("subscribePattern", "orders") \
.load()
# 3- 解析数据
parsed_stream = kafka_stream.selectExpr("cast(value as string) as value", "timestamp") \
.withColumn("order_id", F.split(F.col("value"), "\t")[0]) \
.withColumn("order_type", F.split(F.col("value"), "\t")[1]) \
.withColumn("order_name", F.split(F.col("value"), "\t")[2]) \
.withColumn("order_quantity", F.split(F.col("value"), "\t")[3]) \
.withColumn("status", F.split(F.col("value"), "\t")[4]) \
.drop("value")
}