diff options
| author | Priyansh <[email protected]> | 2021-12-25 02:34:29 -0500 |
|---|---|---|
| committer | Priyansh <[email protected]> | 2021-12-25 02:34:29 -0500 |
| commit | 5ef3baaf6575a4ad3b4e37897637d87af1af418a (patch) | |
| tree | d8346e19c5d83d8e28580c2c571e94e2fa9b8e1f /consumer.py | |
| parent | 06b51f0c8e081c3e4120aef51c394662ea15a2a3 (diff) | |
| download | KafkaPySpark-5ef3baaf6575a4ad3b4e37897637d87af1af418a.tar.xz KafkaPySpark-5ef3baaf6575a4ad3b4e37897637d87af1af418a.zip | |
Added Database Insertion Script + Kafka Consumer stop if no message received in 3 seconds
Diffstat (limited to 'consumer.py')
| -rw-r--r-- | consumer.py | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/consumer.py b/consumer.py index 43264ad..06d553b 100644 --- a/consumer.py +++ b/consumer.py @@ -1,6 +1,7 @@ -from kafka import KafkaConsumer +from kafka import KafkaConsumer, TopicPartition import json import re +from database_configuration import insert_tweet topic_name = 'twitterdata' @@ -29,24 +30,27 @@ def deEmojify(data): consumer = KafkaConsumer( topic_name, - auto_offset_reset='latest', + auto_offset_reset='earliest', enable_auto_commit=True, - auto_commit_interval_ms = 5000, + auto_commit_interval_ms = 1000, fetch_max_bytes = 128, max_poll_records = 100, + group_id = 'test-consumer-group', + consumer_timeout_ms=3000, value_deserializer=lambda x: json.loads(x.decode('utf-8'))) -counter = 0 + for msg in consumer: + consumer.commit() tweet = {} message = json.loads(json.dumps(msg.value)) if 'created_at' in message: tweet['created_at'] = message['created_at'] if 'id' in message: tweet['tweet_id'] = message['id'] - if 'location' in message: + if 'user' in message: tweet['location'] = message['user']['location'] - if 'screen_name' in message: + if 'user' in message: tweet['screen_name'] = message['user']['screen_name'] if 'quote_count' in message: tweet['quote_count'] = message['quote_count'] @@ -61,12 +65,9 @@ for msg in consumer: tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' ')) else: tweet['tweet'] = deEmojify(message['text'].replace('\n', ' ')) - # if 'tweet' in tweet and tweet['tweet'] != '': - print(counter) - counter += 1 - if counter == 99: - break + if 'tweet' in tweet and tweet['tweet'] != '': + insert_tweet(tweet) # End the consumer consumer.close() -print('Done')
\ No newline at end of file +print('Done') |
