aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-25 02:34:29 -0500
committerPriyansh <[email protected]>2021-12-25 02:34:29 -0500
commit5ef3baaf6575a4ad3b4e37897637d87af1af418a (patch)
treed8346e19c5d83d8e28580c2c571e94e2fa9b8e1f
parent06b51f0c8e081c3e4120aef51c394662ea15a2a3 (diff)
downloadKafkaPySpark-5ef3baaf6575a4ad3b4e37897637d87af1af418a.tar.xz
KafkaPySpark-5ef3baaf6575a4ad3b4e37897637d87af1af418a.zip
Added Database Insertion Script + Kafka Consumer stop if no message received in 3 seconds
-rw-r--r--consumer.py25
-rw-r--r--database_configuration.py32
2 files changed, 45 insertions, 12 deletions
diff --git a/consumer.py b/consumer.py
index 43264ad..06d553b 100644
--- a/consumer.py
+++ b/consumer.py
@@ -1,6 +1,7 @@
-from kafka import KafkaConsumer
+from kafka import KafkaConsumer, TopicPartition
import json
import re
+from database_configuration import insert_tweet
topic_name = 'twitterdata'
@@ -29,24 +30,27 @@ def deEmojify(data):
consumer = KafkaConsumer(
topic_name,
- auto_offset_reset='latest',
+ auto_offset_reset='earliest',
enable_auto_commit=True,
- auto_commit_interval_ms = 5000,
+ auto_commit_interval_ms = 1000,
fetch_max_bytes = 128,
max_poll_records = 100,
+ group_id = 'test-consumer-group',
+ consumer_timeout_ms=3000,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
-counter = 0
+
for msg in consumer:
+ consumer.commit()
tweet = {}
message = json.loads(json.dumps(msg.value))
if 'created_at' in message:
tweet['created_at'] = message['created_at']
if 'id' in message:
tweet['tweet_id'] = message['id']
- if 'location' in message:
+ if 'user' in message:
tweet['location'] = message['user']['location']
- if 'screen_name' in message:
+ if 'user' in message:
tweet['screen_name'] = message['user']['screen_name']
if 'quote_count' in message:
tweet['quote_count'] = message['quote_count']
@@ -61,12 +65,9 @@ for msg in consumer:
tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
else:
tweet['tweet'] = deEmojify(message['text'].replace('\n', ' '))
- # if 'tweet' in tweet and tweet['tweet'] != '':
- print(counter)
- counter += 1
- if counter == 99:
- break
+ if 'tweet' in tweet and tweet['tweet'] != '':
+ insert_tweet(tweet)
# End the consumer
consumer.close()
-print('Done') \ No newline at end of file
+print('Done')
diff --git a/database_configuration.py b/database_configuration.py
new file mode 100644
index 0000000..3b2f036
--- /dev/null
+++ b/database_configuration.py
@@ -0,0 +1,32 @@
+from cassandra.cluster import Cluster
+
+# Connect to the cluster and keyspace
+cluster = Cluster(['127.0.0.1'], port=9042)
+session = cluster.connect()
+session.set_keyspace('twitter')
+session.execute("USE twitter")
+
+# Insert a tweet in cassandra database if it doesn't exist
+
+def insert_tweet(tweet):
+ # Check if tweet exists
+ query = "SELECT * FROM twitterdata WHERE tweet_id = %s"
+ tweet_exists = session.execute(query, (tweet['tweet_id'],))
+ if not tweet_exists:
+ # Insert tweet
+ query = "INSERT INTO twitterdata (tweet_id, created_at, location, screen_name, quote_count, reply_count, retweet_count, favorite_count, tweet) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
+ session.execute(query, (tweet['tweet_id'], tweet['created_at'], tweet['location'], tweet['screen_name'], tweet['quote_count'], tweet['reply_count'], tweet['retweet_count'], tweet['favorite_count'], tweet['tweet']))
+ print('Tweet inserted with tweet_id: ' + str(tweet['tweet_id']))
+ else:
+ print('Tweet already exists with tweet_id: ' + str(tweet['tweet_id']))
+
+##
+# insert_user = session.prepare('INSERT INTO table_name (id,name) VALUES (?, ?)')
+# batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
+# for i,j in some_value:
+# try:
+# batch.add(insert_user,(i,j))
+# logger.info('Data Inserted into the table')
+# except Exception as e:
+# logger.error('The cassandra error: {}'.format(e))
+# session.execute(batch) \ No newline at end of file