aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-28 22:03:04 -0500
committerPriyansh <[email protected]>2021-12-28 22:03:04 -0500
commitb18c9a814c11f4d66f7a460d2179087805e034a1 (patch)
tree0525f32909fa24d83fd7fba6e3bacdc0d526b92b
parent101072ea24cc0670bbd689d039d5144fa5216a4b (diff)
downloadKafkaPySpark-b18c9a814c11f4d66f7a460d2179087805e034a1.tar.xz
KafkaPySpark-b18c9a814c11f4d66f7a460d2179087805e034a1.zip
Animated Graph using matplotlibHEADmain
-rw-r--r--consumer.py77
-rw-r--r--producer.py2
2 files changed, 48 insertions, 31 deletions
diff --git a/consumer.py b/consumer.py
index 5542b9e..abbcf52 100644
--- a/consumer.py
+++ b/consumer.py
@@ -3,16 +3,13 @@ import json
import re
# from database_configuration import insert_tweet
from nltk.sentiment.vader import SentimentIntensityAnalyzer
-import numpy as np
-import pandas as pd
+from time import sleep
+from multiprocessing import Process, Queue
import matplotlib.pyplot as plt
+import matplotlib.animation as animation
topic_name = 'twitterdata'
-# Initialize an empty dataframe to store the tweet id and sentiment
-tweets = pd.DataFrame(columns=['tweet_id', 'sentiment'])
-
-
def sentence_score(rs):
review_score = SentimentIntensityAnalyzer()
return review_score.polarity_scores(rs)['compound']
@@ -40,38 +37,58 @@ def deEmojify(data):
"]+", re.UNICODE)
return re.sub(emoj, '', data)
-consumer = KafkaConsumer(
+def consumer(q):
+ consumer = KafkaConsumer(
topic_name,
- auto_offset_reset='latest',
+ auto_offset_reset= 'earliest',
enable_auto_commit=True,
auto_commit_interval_ms = 1000,
fetch_max_bytes = 128,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
+ for msg in consumer:
+ tweet = ""
+ message = json.loads(json.dumps(msg.value))
+ if 'text' in message and 'RT @' not in message['text']:
+ if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']:
+ tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
+ else:
+ tweet= deEmojify(message['text'].replace('\n', ' '))
+ if tweet != '':
+ score = sentence_score(tweet)
+ sentiment = 'neutral'
+ if score > 0:
+ sentiment = 'positive'
+ elif score < 0:
+ sentiment = 'negative'
+ q.put(({'tweet_id': message['id'], 'tweet': tweet, 'sentiment': sentiment}))
+
+x_labels = ["Positive", "Negative", "Neutral"]
+y_data = [0, 0, 0]
-for msg in consumer:
- tweet = ""
- message = json.loads(json.dumps(msg.value))
- if 'text' in message and 'RT @' not in message['text']:
- if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']:
- tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
+if __name__ == '__main__':
+ q = Queue()
+ p = Process(target=consumer, args=(q,))
+ p.start()
+
+ def update_data():
+ # Get new data from the queue
+ new_data = q.get()
+ # Check the sentiment of the tweet
+ if new_data['sentiment'] == 'positive':
+ y_data[0] += 1
+ elif new_data['sentiment'] == 'negative':
+ y_data[1] += 1
else:
- tweet= deEmojify(message['text'].replace('\n', ' '))
- if tweet != '':
- score = sentence_score(tweet)
- sentiment = 'neutral'
- if score > 0.5:
- sentiment = 'positive'
- elif score < -0.5:
- sentiment = 'negative'
- tweets = tweets.append({'tweet_id': message['id'], 'sentiment': sentiment}, ignore_index=True)
- # Update the already shown plot with the new dataframe
- tweets.groupby('sentiment').count()['tweet_id'].plot.bar()
- plt.show()
- plt.close('all')
+ y_data[2] += 1
+
+ fig = plt.figure()
+ plt.barh(x_labels, y_data, color=['green', 'red', 'blue'])
+ def animate(i):
+ update_data()
+ plt.barh(x_labels, y_data, color=['green', 'red', 'blue'])
-# End the consumer
-consumer.close()
-print('Done')
+ ani = animation.FuncAnimation(fig, animate, interval=1, frames=100)
+ plt.show()
diff --git a/producer.py b/producer.py
index b965491..252a26b 100644
--- a/producer.py
+++ b/producer.py
@@ -33,7 +33,7 @@ class TwitterStreamer():
listener = ListenerTS()
auth = self.twitterAuth.authenticateTwitterApp()
stream = Stream(auth, listener)
- stream.filter(track=["Christmas"], stall_warnings=True, languages= ["en"])
+ stream.filter(track=["Covid"], stall_warnings=True, languages= ["en"])
class ListenerTS(StreamListener):