aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'consumer.py')
-rw-r--r--consumer.py25
1 files changed, 13 insertions, 12 deletions
diff --git a/consumer.py b/consumer.py
index 43264ad..06d553b 100644
--- a/consumer.py
+++ b/consumer.py
@@ -1,6 +1,7 @@
-from kafka import KafkaConsumer
+from kafka import KafkaConsumer, TopicPartition
import json
import re
+from database_configuration import insert_tweet
topic_name = 'twitterdata'
@@ -29,24 +30,27 @@ def deEmojify(data):
consumer = KafkaConsumer(
topic_name,
- auto_offset_reset='latest',
+ auto_offset_reset='earliest',
enable_auto_commit=True,
- auto_commit_interval_ms = 5000,
+ auto_commit_interval_ms = 1000,
fetch_max_bytes = 128,
max_poll_records = 100,
+ group_id = 'test-consumer-group',
+ consumer_timeout_ms=3000,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
-counter = 0
+
for msg in consumer:
+ consumer.commit()
tweet = {}
message = json.loads(json.dumps(msg.value))
if 'created_at' in message:
tweet['created_at'] = message['created_at']
if 'id' in message:
tweet['tweet_id'] = message['id']
- if 'location' in message:
+ if 'user' in message:
tweet['location'] = message['user']['location']
- if 'screen_name' in message:
+ if 'user' in message:
tweet['screen_name'] = message['user']['screen_name']
if 'quote_count' in message:
tweet['quote_count'] = message['quote_count']
@@ -61,12 +65,9 @@ for msg in consumer:
tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
else:
tweet['tweet'] = deEmojify(message['text'].replace('\n', ' '))
- # if 'tweet' in tweet and tweet['tweet'] != '':
- print(counter)
- counter += 1
- if counter == 99:
- break
+ if 'tweet' in tweet and tweet['tweet'] != '':
+ insert_tweet(tweet)
# End the consumer
consumer.close()
-print('Done') \ No newline at end of file
+print('Done')