diff options
| author | Priyansh <[email protected]> | 2021-12-24 01:36:12 -0500 |
|---|---|---|
| committer | Priyansh <[email protected]> | 2021-12-24 01:36:12 -0500 |
| commit | 4c9653f46ad9c2cc64f0b8dea42700eeb5044a88 (patch) | |
| tree | a3e06bdcd148bf8ac4f252b711f46b247fe46c62 | |
| parent | 223c2151bd7b08c99aa5910a31e0d7d642f2edf4 (diff) | |
| download | KafkaPySpark-4c9653f46ad9c2cc64f0b8dea42700eeb5044a88.tar.xz KafkaPySpark-4c9653f46ad9c2cc64f0b8dea42700eeb5044a88.zip | |
Print only tweet messages
| -rw-r--r-- | consumer.py | 10 | ||||
| -rw-r--r-- | producer.py | 2 |
2 files changed, 10 insertions, 2 deletions
diff --git a/consumer.py b/consumer.py index 3670105..4e270a5 100644 --- a/consumer.py +++ b/consumer.py @@ -28,7 +28,15 @@ consumer = KafkaConsumer( value_deserializer=lambda x: json.loads(x.decode('utf-8'))) for msg in consumer: - print(json.loads(json.dumps(msg.value))) + message = json.loads(json.dumps(msg.value)) + try: + print(message['extended_tweet']['full_text']) + except: + try: + print(message['text']) + except: + print(message) + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") # spark = SparkSession.builder.master("local").appName("Tweets").getOrCreate() # sc = spark.sparkContext diff --git a/producer.py b/producer.py index 6723ae0..b965491 100644 --- a/producer.py +++ b/producer.py @@ -33,7 +33,7 @@ class TwitterStreamer(): listener = ListenerTS() auth = self.twitterAuth.authenticateTwitterApp() stream = Stream(auth, listener) - stream.filter(track=["Apple"], stall_warnings=True, languages= ["en"]) + stream.filter(track=["Christmas"], stall_warnings=True, languages= ["en"]) class ListenerTS(StreamListener): |
