diff options
Diffstat (limited to 'consumer.py')
| -rw-r--r-- | consumer.py | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/consumer.py b/consumer.py index 43264ad..06d553b 100644 --- a/consumer.py +++ b/consumer.py @@ -1,6 +1,7 @@ -from kafka import KafkaConsumer +from kafka import KafkaConsumer, TopicPartition import json import re +from database_configuration import insert_tweet topic_name = 'twitterdata' @@ -29,24 +30,27 @@ def deEmojify(data): consumer = KafkaConsumer( topic_name, - auto_offset_reset='latest', + auto_offset_reset='earliest', enable_auto_commit=True, - auto_commit_interval_ms = 5000, + auto_commit_interval_ms = 1000, fetch_max_bytes = 128, max_poll_records = 100, + group_id = 'test-consumer-group', + consumer_timeout_ms=3000, value_deserializer=lambda x: json.loads(x.decode('utf-8'))) -counter = 0 + for msg in consumer: + consumer.commit() tweet = {} message = json.loads(json.dumps(msg.value)) if 'created_at' in message: tweet['created_at'] = message['created_at'] if 'id' in message: tweet['tweet_id'] = message['id'] - if 'location' in message: + if 'user' in message: tweet['location'] = message['user']['location'] - if 'screen_name' in message: + if 'user' in message: tweet['screen_name'] = message['user']['screen_name'] if 'quote_count' in message: tweet['quote_count'] = message['quote_count'] @@ -61,12 +65,9 @@ for msg in consumer: tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' ')) else: tweet['tweet'] = deEmojify(message['text'].replace('\n', ' ')) - # if 'tweet' in tweet and tweet['tweet'] != '': - print(counter) - counter += 1 - if counter == 99: - break + if 'tweet' in tweet and tweet['tweet'] != '': + insert_tweet(tweet) # End the consumer consumer.close() -print('Done')
\ No newline at end of file +print('Done') |
