diff options
| author | Priyansh <[email protected]> | 2021-12-23 22:06:36 -0500 |
|---|---|---|
| committer | Priyansh <[email protected]> | 2021-12-23 22:06:36 -0500 |
| commit | 2f1f141d07a721eb4fabc6c3dc8e32d27c4251f6 (patch) | |
| tree | 08cfe2e83327c6155f25488ee9205485e705a697 /consumer.py | |
| parent | 6c9043ae1a5e7ea34dc761b95cdee7e5ec210b69 (diff) | |
| download | KafkaPySpark-2f1f141d07a721eb4fabc6c3dc8e32d27c4251f6.tar.xz KafkaPySpark-2f1f141d07a721eb4fabc6c3dc8e32d27c4251f6.zip | |
Basic PySpark Integration
Diffstat (limited to 'consumer.py')
| -rw-r--r-- | consumer.py | 33 |
1 files changed, 30 insertions, 3 deletions
diff --git a/consumer.py b/consumer.py index 9b13a22..3803b4a 100644 --- a/consumer.py +++ b/consumer.py @@ -1,5 +1,13 @@ from kafka import KafkaConsumer import json +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql.types import * +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +import math +import string +import random topic_name = 'twitterdata' @@ -14,6 +22,25 @@ consumer = KafkaConsumer( value_deserializer=lambda x: json.loads(x.decode('utf-8'))) -for message in consumer: - tweets = json.loads(json.dumps(message.value)) - print(tweets)
\ No newline at end of file +if __name__ == '__main__': + spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load() + spark.createOrReplaceTempView("tweets") + spark.sql("select * from tweets").show() + spark.stop() + sc = SparkContext(appName="TwitterStreaming") + ssc = StreamingContext(sc, 5) + ssc.checkpoint("checkpoint") + lines = ssc.socketTextStream("localhost", 9999) + words = lines.flatMap(lambda line: line.split(" ")) + pairs = words.map(lambda word: (word, 1)) + wordCounts = pairs.reduceByKey(lambda x, y: x + y) + wordCounts.pprint() + ssc.start() + ssc.awaitTermination() + ssc.stop() + sc.stop() + consumer.close() + print("Done") + + + |
