aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'consumer.py')
-rw-r--r--consumer.py33
1 files changed, 30 insertions, 3 deletions
diff --git a/consumer.py b/consumer.py
index 9b13a22..3803b4a 100644
--- a/consumer.py
+++ b/consumer.py
@@ -1,5 +1,13 @@
from kafka import KafkaConsumer
import json
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import *
+from pyspark.sql.types import *
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+import math
+import string
+import random
topic_name = 'twitterdata'
@@ -14,6 +22,25 @@ consumer = KafkaConsumer(
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
-for message in consumer:
- tweets = json.loads(json.dumps(message.value))
- print(tweets) \ No newline at end of file
+if __name__ == '__main__':
+ spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load()
+ spark.createOrReplaceTempView("tweets")
+ spark.sql("select * from tweets").show()
+ spark.stop()
+ sc = SparkContext(appName="TwitterStreaming")
+ ssc = StreamingContext(sc, 5)
+ ssc.checkpoint("checkpoint")
+ lines = ssc.socketTextStream("localhost", 9999)
+ words = lines.flatMap(lambda line: line.split(" "))
+ pairs = words.map(lambda word: (word, 1))
+ wordCounts = pairs.reduceByKey(lambda x, y: x + y)
+ wordCounts.pprint()
+ ssc.start()
+ ssc.awaitTermination()
+ ssc.stop()
+ sc.stop()
+ consumer.close()
+ print("Done")
+
+
+