diff options
| -rw-r--r-- | consumer.py | 33 | ||||
| -rw-r--r-- | producer.py | 2 | ||||
| -rw-r--r-- | requirements.txt | 2 |
3 files changed, 32 insertions, 5 deletions
diff --git a/consumer.py b/consumer.py index 9b13a22..3803b4a 100644 --- a/consumer.py +++ b/consumer.py @@ -1,5 +1,13 @@ from kafka import KafkaConsumer import json +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql.types import * +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +import math +import string +import random topic_name = 'twitterdata' @@ -14,6 +22,25 @@ consumer = KafkaConsumer( value_deserializer=lambda x: json.loads(x.decode('utf-8'))) -for message in consumer: - tweets = json.loads(json.dumps(message.value)) - print(tweets)
\ No newline at end of file +if __name__ == '__main__': + spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load() + spark.createOrReplaceTempView("tweets") + spark.sql("select * from tweets").show() + spark.stop() + sc = SparkContext(appName="TwitterStreaming") + ssc = StreamingContext(sc, 5) + ssc.checkpoint("checkpoint") + lines = ssc.socketTextStream("localhost", 9999) + words = lines.flatMap(lambda line: line.split(" ")) + pairs = words.map(lambda word: (word, 1)) + wordCounts = pairs.reduceByKey(lambda x, y: x + y) + wordCounts.pprint() + ssc.start() + ssc.awaitTermination() + ssc.stop() + sc.stop() + consumer.close() + print("Done") + + + diff --git a/producer.py b/producer.py index b965491..6723ae0 100644 --- a/producer.py +++ b/producer.py @@ -33,7 +33,7 @@ class TwitterStreamer(): listener = ListenerTS() auth = self.twitterAuth.authenticateTwitterApp() stream = Stream(auth, listener) - stream.filter(track=["Christmas"], stall_warnings=True, languages= ["en"]) + stream.filter(track=["Apple"], stall_warnings=True, languages= ["en"]) class ListenerTS(StreamListener): diff --git a/requirements.txt b/requirements.txt index ce2b409..065bf27 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ kafka-python dotenv tweepy==3.9.0 -pyspark +pyspark==2.4.6 |
