diff options
| author | Priyansh <[email protected]> | 2021-12-25 02:34:29 -0500 |
|---|---|---|
| committer | Priyansh <[email protected]> | 2021-12-25 02:34:29 -0500 |
| commit | 5ef3baaf6575a4ad3b4e37897637d87af1af418a (patch) | |
| tree | d8346e19c5d83d8e28580c2c571e94e2fa9b8e1f | |
| parent | 06b51f0c8e081c3e4120aef51c394662ea15a2a3 (diff) | |
| download | KafkaPySpark-5ef3baaf6575a4ad3b4e37897637d87af1af418a.tar.xz KafkaPySpark-5ef3baaf6575a4ad3b4e37897637d87af1af418a.zip | |
Added Database Insertion Script + Kafka Consumer stop if no message received in 3 seconds
| -rw-r--r-- | consumer.py | 25 | ||||
| -rw-r--r-- | database_configuration.py | 32 |
2 files changed, 45 insertions, 12 deletions
diff --git a/consumer.py b/consumer.py index 43264ad..06d553b 100644 --- a/consumer.py +++ b/consumer.py @@ -1,6 +1,7 @@ -from kafka import KafkaConsumer +from kafka import KafkaConsumer, TopicPartition import json import re +from database_configuration import insert_tweet topic_name = 'twitterdata' @@ -29,24 +30,27 @@ def deEmojify(data): consumer = KafkaConsumer( topic_name, - auto_offset_reset='latest', + auto_offset_reset='earliest', enable_auto_commit=True, - auto_commit_interval_ms = 5000, + auto_commit_interval_ms = 1000, fetch_max_bytes = 128, max_poll_records = 100, + group_id = 'test-consumer-group', + consumer_timeout_ms=3000, value_deserializer=lambda x: json.loads(x.decode('utf-8'))) -counter = 0 + for msg in consumer: + consumer.commit() tweet = {} message = json.loads(json.dumps(msg.value)) if 'created_at' in message: tweet['created_at'] = message['created_at'] if 'id' in message: tweet['tweet_id'] = message['id'] - if 'location' in message: + if 'user' in message: tweet['location'] = message['user']['location'] - if 'screen_name' in message: + if 'user' in message: tweet['screen_name'] = message['user']['screen_name'] if 'quote_count' in message: tweet['quote_count'] = message['quote_count'] @@ -61,12 +65,9 @@ for msg in consumer: tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' ')) else: tweet['tweet'] = deEmojify(message['text'].replace('\n', ' ')) - # if 'tweet' in tweet and tweet['tweet'] != '': - print(counter) - counter += 1 - if counter == 99: - break + if 'tweet' in tweet and tweet['tweet'] != '': + insert_tweet(tweet) # End the consumer consumer.close() -print('Done')
\ No newline at end of file +print('Done') diff --git a/database_configuration.py b/database_configuration.py new file mode 100644 index 0000000..3b2f036 --- /dev/null +++ b/database_configuration.py @@ -0,0 +1,32 @@ +from cassandra.cluster import Cluster + +# Connect to the cluster and keyspace +cluster = Cluster(['127.0.0.1'], port=9042) +session = cluster.connect() +session.set_keyspace('twitter') +session.execute("USE twitter") + +# Insert a tweet in cassandra database if it doesn't exist + +def insert_tweet(tweet): + # Check if tweet exists + query = "SELECT * FROM twitterdata WHERE tweet_id = %s" + tweet_exists = session.execute(query, (tweet['tweet_id'],)) + if not tweet_exists: + # Insert tweet + query = "INSERT INTO twitterdata (tweet_id, created_at, location, screen_name, quote_count, reply_count, retweet_count, favorite_count, tweet) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)" + session.execute(query, (tweet['tweet_id'], tweet['created_at'], tweet['location'], tweet['screen_name'], tweet['quote_count'], tweet['reply_count'], tweet['retweet_count'], tweet['favorite_count'], tweet['tweet'])) + print('Tweet inserted with tweet_id: ' + str(tweet['tweet_id'])) + else: + print('Tweet already exists with tweet_id: ' + str(tweet['tweet_id'])) + +## +# insert_user = session.prepare('INSERT INTO table_name (id,name) VALUES (?, ?)') +# batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM) +# for i,j in some_value: +# try: +# batch.add(insert_user,(i,j)) +# logger.info('Data Inserted into the table') +# except Exception as e: +# logger.error('The cassandra error: {}'.format(e)) +# session.execute(batch)
\ No newline at end of file |
