aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-24 01:36:12 -0500
committerPriyansh <[email protected]>2021-12-24 01:36:12 -0500
commit4c9653f46ad9c2cc64f0b8dea42700eeb5044a88 (patch)
treea3e06bdcd148bf8ac4f252b711f46b247fe46c62
parent223c2151bd7b08c99aa5910a31e0d7d642f2edf4 (diff)
downloadKafkaPySpark-4c9653f46ad9c2cc64f0b8dea42700eeb5044a88.tar.xz
KafkaPySpark-4c9653f46ad9c2cc64f0b8dea42700eeb5044a88.zip
Print only tweet messages
-rw-r--r--consumer.py10
-rw-r--r--producer.py2
2 files changed, 10 insertions, 2 deletions
diff --git a/consumer.py b/consumer.py
index 3670105..4e270a5 100644
--- a/consumer.py
+++ b/consumer.py
@@ -28,7 +28,15 @@ consumer = KafkaConsumer(
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for msg in consumer:
- print(json.loads(json.dumps(msg.value)))
+ message = json.loads(json.dumps(msg.value))
+ try:
+ print(message['extended_tweet']['full_text'])
+ except:
+ try:
+ print(message['text'])
+ except:
+ print(message)
+ print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
# spark = SparkSession.builder.master("local").appName("Tweets").getOrCreate()
# sc = spark.sparkContext
diff --git a/producer.py b/producer.py
index 6723ae0..b965491 100644
--- a/producer.py
+++ b/producer.py
@@ -33,7 +33,7 @@ class TwitterStreamer():
listener = ListenerTS()
auth = self.twitterAuth.authenticateTwitterApp()
stream = Stream(auth, listener)
- stream.filter(track=["Apple"], stall_warnings=True, languages= ["en"])
+ stream.filter(track=["Christmas"], stall_warnings=True, languages= ["en"])
class ListenerTS(StreamListener):