44 lines
1.2 KiB
Python
44 lines
1.2 KiB
Python
from pyspark.sql import SparkSession
|
||
import pyspark.sql.functions as F
|
||
import os
|
||
|
||
os.environ['JAVA_HOME'] = 'D:\CodeDevelopment\DevelopmentEnvironment\Java\jdk1.8.0_351'
|
||
# 配置Hadoop的路径,就是前面解压的那个路径
|
||
os.environ['HADOOP_HOME'] = 'D:\CodeDevelopment\DevelopmentEnvironment\hadoop-2.8.1'
|
||
|
||
if __name__ == '__main__':
|
||
# 1- 创建SparkSession对象
|
||
spark = SparkSession.builder\
|
||
.config("spark.sql.shuffle.partitions",1)\
|
||
.appName('sparksql_read_kafka_1_topic')\
|
||
.master('local[*]')\
|
||
.getOrCreate()
|
||
|
||
# 2- 数据输入
|
||
# 默认从Topic开头一直消费到结尾
|
||
init_df = spark.read\
|
||
.format("kafka")\
|
||
.option("kafka.bootstrap.servers","zhao:9092")\
|
||
.option("subscribe","test")\
|
||
.load()
|
||
|
||
# 3- 数据处理
|
||
result_df1 = init_df.select(F.expr("cast(value as string) as value"))
|
||
|
||
# selectExpr = select + F.expr
|
||
result_df2 = init_df.selectExpr("cast(value as string) as value")
|
||
|
||
result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))
|
||
|
||
# 4- 数据输出
|
||
print("result_df1")
|
||
result_df1.show()
|
||
|
||
print("result_df2")
|
||
result_df2.show()
|
||
|
||
print("result_df3")
|
||
result_df3.show()
|
||
|
||
# 5- 释放资源
|
||
spark.stop() |