Structured Streaming测试
This commit is contained in:
parent
0922b0d91b
commit
eeda4a7e9b
36
testStreaming.py
Normal file
36
testStreaming.py
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
from pyspark.sql import SparkSession
|
||||||
|
from pyspark.sql.functions import explode
|
||||||
|
from pyspark.sql.functions import split
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
spark = SparkSession \
|
||||||
|
.builder \
|
||||||
|
.appName("StructuredNetworkWordCount") \
|
||||||
|
.getOrCreate()
|
||||||
|
print("Spark version: ", spark.version)
|
||||||
|
# Create DataFrame representing the stream of input lines from connection to localhost:9999
|
||||||
|
lines = spark \
|
||||||
|
.readStream \
|
||||||
|
.format("socket") \
|
||||||
|
.option("host", "niit-node2") \
|
||||||
|
.option("port", 9092) \
|
||||||
|
.load()
|
||||||
|
# Split the lines into words
|
||||||
|
words = lines.select(
|
||||||
|
explode(
|
||||||
|
split(lines.value, " ")
|
||||||
|
).alias("word")
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generate running word count
|
||||||
|
wordCounts = words.groupBy("word").count()
|
||||||
|
|
||||||
|
print("wordCounts: ", wordCounts)
|
||||||
|
# Start running the query that prints the running counts to the console
|
||||||
|
query = wordCounts \
|
||||||
|
.writeStream \
|
||||||
|
.outputMode("complete") \
|
||||||
|
.format("console") \
|
||||||
|
.start()
|
||||||
|
|
||||||
|
query.awaitTermination()
|
Loading…
x
Reference in New Issue
Block a user