aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-25 18:24:03 -0500
committerPriyansh <[email protected]>2021-12-25 18:24:03 -0500
commit7ceb40394bd40075e7fe41b8c125a5d2dcedc781 (patch)
treee6c791b0d6dda09898624ae0aa92f6aee821879b
parentcb1e6c587ebef823b8524d72c1a97ab5765901d8 (diff)
downloadKafkaPySpark-7ceb40394bd40075e7fe41b8c125a5d2dcedc781.tar.xz
KafkaPySpark-7ceb40394bd40075e7fe41b8c125a5d2dcedc781.zip
Unfinished plot
-rw-r--r--consumer.py56
1 files changed, 30 insertions, 26 deletions
diff --git a/consumer.py b/consumer.py
index 8d15b07..5542b9e 100644
--- a/consumer.py
+++ b/consumer.py
@@ -1,10 +1,22 @@
from kafka import KafkaConsumer
import json
import re
-from database_configuration import insert_tweet
+# from database_configuration import insert_tweet
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+import numpy as np
+import pandas as pd
+import matplotlib.pyplot as plt
topic_name = 'twitterdata'
+# Initialize an empty dataframe to store the tweet id and sentiment
+tweets = pd.DataFrame(columns=['tweet_id', 'sentiment'])
+
+
+def sentence_score(rs):
+ review_score = SentimentIntensityAnalyzer()
+ return review_score.polarity_scores(rs)['compound']
+
def deEmojify(data):
emoj = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
@@ -30,43 +42,35 @@ def deEmojify(data):
consumer = KafkaConsumer(
topic_name,
- auto_offset_reset='earliest',
+ auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 1000,
fetch_max_bytes = 128,
max_poll_records = 100,
- group_id = 'test-consumer-group',
- consumer_timeout_ms=3000,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for msg in consumer:
- consumer.commit()
- tweet = {}
+ tweet = ""
message = json.loads(json.dumps(msg.value))
- if 'created_at' in message:
- tweet['created_at'] = message['created_at']
- if 'id' in message:
- tweet['tweet_id'] = message['id']
- if 'user' in message:
- tweet['location'] = message['user']['location']
- if 'user' in message:
- tweet['screen_name'] = message['user']['screen_name']
- if 'quote_count' in message:
- tweet['quote_count'] = message['quote_count']
- if 'reply_count' in message:
- tweet['reply_count'] = message['reply_count']
- if 'retweet_count' in message:
- tweet['retweet_count'] = message['retweet_count']
- if 'favorite_count' in message:
- tweet['favorite_count'] = message['favorite_count']
if 'text' in message and 'RT @' not in message['text']:
if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']:
- tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
+ tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
else:
- tweet['tweet'] = deEmojify(message['text'].replace('\n', ' '))
- if 'tweet' in tweet and tweet['tweet'] != '':
- insert_tweet(tweet)
+ tweet= deEmojify(message['text'].replace('\n', ' '))
+ if tweet != '':
+ score = sentence_score(tweet)
+ sentiment = 'neutral'
+ if score > 0.5:
+ sentiment = 'positive'
+ elif score < -0.5:
+ sentiment = 'negative'
+ tweets = tweets.append({'tweet_id': message['id'], 'sentiment': sentiment}, ignore_index=True)
+ # Update the already shown plot with the new dataframe
+ tweets.groupby('sentiment').count()['tweet_id'].plot.bar()
+ plt.show()
+ plt.close('all')
+
# End the consumer
consumer.close()