diff options
Diffstat (limited to 'consumer.py')
| -rw-r--r-- | consumer.py | 33 |
1 files changed, 30 insertions, 3 deletions
diff --git a/consumer.py b/consumer.py index 9b13a22..3803b4a 100644 --- a/consumer.py +++ b/consumer.py @@ -1,5 +1,13 @@ from kafka import KafkaConsumer import json +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql.types import * +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +import math +import string +import random topic_name = 'twitterdata' @@ -14,6 +22,25 @@ consumer = KafkaConsumer( value_deserializer=lambda x: json.loads(x.decode('utf-8'))) -for message in consumer: - tweets = json.loads(json.dumps(message.value)) - print(tweets)
\ No newline at end of file +if __name__ == '__main__': + spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load() + spark.createOrReplaceTempView("tweets") + spark.sql("select * from tweets").show() + spark.stop() + sc = SparkContext(appName="TwitterStreaming") + ssc = StreamingContext(sc, 5) + ssc.checkpoint("checkpoint") + lines = ssc.socketTextStream("localhost", 9999) + words = lines.flatMap(lambda line: line.split(" ")) + pairs = words.map(lambda word: (word, 1)) + wordCounts = pairs.reduceByKey(lambda x, y: x + y) + wordCounts.pprint() + ssc.start() + ssc.awaitTermination() + ssc.stop() + sc.stop() + consumer.close() + print("Done") + + + |
