diff --git a/testStreaming.py b/testStreaming.py new file mode 100644 index 0000000..4abf8c6 --- /dev/null +++ b/testStreaming.py @@ -0,0 +1,36 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import explode +from pyspark.sql.functions import split + +if __name__ == "__main__": + spark = SparkSession \ + .builder \ + .appName("StructuredNetworkWordCount") \ + .getOrCreate() + print("Spark version: ", spark.version) + # Create DataFrame representing the stream of input lines from connection to localhost:9999 + lines = spark \ + .readStream \ + .format("socket") \ + .option("host", "niit-node2") \ + .option("port", 9092) \ + .load() + # Split the lines into words + words = lines.select( + explode( + split(lines.value, " ") + ).alias("word") + ) + + # Generate running word count + wordCounts = words.groupBy("word").count() + + print("wordCounts: ", wordCounts) + # Start running the query that prints the running counts to the console + query = wordCounts \ + .writeStream \ + .outputMode("complete") \ + .format("console") \ + .start() + + query.awaitTermination() \ No newline at end of file