aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-23 22:06:36 -0500
committerPriyansh <[email protected]>2021-12-23 22:06:36 -0500
commit2f1f141d07a721eb4fabc6c3dc8e32d27c4251f6 (patch)
tree08cfe2e83327c6155f25488ee9205485e705a697
parent6c9043ae1a5e7ea34dc761b95cdee7e5ec210b69 (diff)
downloadKafkaPySpark-2f1f141d07a721eb4fabc6c3dc8e32d27c4251f6.tar.xz
KafkaPySpark-2f1f141d07a721eb4fabc6c3dc8e32d27c4251f6.zip
Basic PySpark Integration
-rw-r--r--consumer.py33
-rw-r--r--producer.py2
-rw-r--r--requirements.txt2
3 files changed, 32 insertions, 5 deletions
diff --git a/consumer.py b/consumer.py
index 9b13a22..3803b4a 100644
--- a/consumer.py
+++ b/consumer.py
@@ -1,5 +1,13 @@
from kafka import KafkaConsumer
import json
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import *
+from pyspark.sql.types import *
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+import math
+import string
+import random
topic_name = 'twitterdata'
@@ -14,6 +22,25 @@ consumer = KafkaConsumer(
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
-for message in consumer:
- tweets = json.loads(json.dumps(message.value))
- print(tweets) \ No newline at end of file
+if __name__ == '__main__':
+ spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load()
+ spark.createOrReplaceTempView("tweets")
+ spark.sql("select * from tweets").show()
+ spark.stop()
+ sc = SparkContext(appName="TwitterStreaming")
+ ssc = StreamingContext(sc, 5)
+ ssc.checkpoint("checkpoint")
+ lines = ssc.socketTextStream("localhost", 9999)
+ words = lines.flatMap(lambda line: line.split(" "))
+ pairs = words.map(lambda word: (word, 1))
+ wordCounts = pairs.reduceByKey(lambda x, y: x + y)
+ wordCounts.pprint()
+ ssc.start()
+ ssc.awaitTermination()
+ ssc.stop()
+ sc.stop()
+ consumer.close()
+ print("Done")
+
+
+
diff --git a/producer.py b/producer.py
index b965491..6723ae0 100644
--- a/producer.py
+++ b/producer.py
@@ -33,7 +33,7 @@ class TwitterStreamer():
listener = ListenerTS()
auth = self.twitterAuth.authenticateTwitterApp()
stream = Stream(auth, listener)
- stream.filter(track=["Christmas"], stall_warnings=True, languages= ["en"])
+ stream.filter(track=["Apple"], stall_warnings=True, languages= ["en"])
class ListenerTS(StreamListener):
diff --git a/requirements.txt b/requirements.txt
index ce2b409..065bf27 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,4 @@
kafka-python
dotenv
tweepy==3.9.0
-pyspark
+pyspark==2.4.6