aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-25 02:34:29 -0500
committerPriyansh <[email protected]>2021-12-25 02:34:29 -0500
commit5ef3baaf6575a4ad3b4e37897637d87af1af418a (patch)
treed8346e19c5d83d8e28580c2c571e94e2fa9b8e1f /consumer.py
parent06b51f0c8e081c3e4120aef51c394662ea15a2a3 (diff)
downloadKafkaPySpark-5ef3baaf6575a4ad3b4e37897637d87af1af418a.tar.xz
KafkaPySpark-5ef3baaf6575a4ad3b4e37897637d87af1af418a.zip
Added Database Insertion Script + Kafka Consumer stop if no message received in 3 seconds
Diffstat (limited to 'consumer.py')
-rw-r--r--consumer.py25
1 files changed, 13 insertions, 12 deletions
diff --git a/consumer.py b/consumer.py
index 43264ad..06d553b 100644
--- a/consumer.py
+++ b/consumer.py
@@ -1,6 +1,7 @@
-from kafka import KafkaConsumer
+from kafka import KafkaConsumer, TopicPartition
import json
import re
+from database_configuration import insert_tweet
topic_name = 'twitterdata'
@@ -29,24 +30,27 @@ def deEmojify(data):
consumer = KafkaConsumer(
topic_name,
- auto_offset_reset='latest',
+ auto_offset_reset='earliest',
enable_auto_commit=True,
- auto_commit_interval_ms = 5000,
+ auto_commit_interval_ms = 1000,
fetch_max_bytes = 128,
max_poll_records = 100,
+ group_id = 'test-consumer-group',
+ consumer_timeout_ms=3000,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
-counter = 0
+
for msg in consumer:
+ consumer.commit()
tweet = {}
message = json.loads(json.dumps(msg.value))
if 'created_at' in message:
tweet['created_at'] = message['created_at']
if 'id' in message:
tweet['tweet_id'] = message['id']
- if 'location' in message:
+ if 'user' in message:
tweet['location'] = message['user']['location']
- if 'screen_name' in message:
+ if 'user' in message:
tweet['screen_name'] = message['user']['screen_name']
if 'quote_count' in message:
tweet['quote_count'] = message['quote_count']
@@ -61,12 +65,9 @@ for msg in consumer:
tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
else:
tweet['tweet'] = deEmojify(message['text'].replace('\n', ' '))
- # if 'tweet' in tweet and tweet['tweet'] != '':
- print(counter)
- counter += 1
- if counter == 99:
- break
+ if 'tweet' in tweet and tweet['tweet'] != '':
+ insert_tweet(tweet)
# End the consumer
consumer.close()
-print('Done') \ No newline at end of file
+print('Done')