aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'consumer.py')
-rw-r--r--consumer.py77
1 files changed, 47 insertions, 30 deletions
diff --git a/consumer.py b/consumer.py
index 5542b9e..abbcf52 100644
--- a/consumer.py
+++ b/consumer.py
@@ -3,16 +3,13 @@ import json
import re
# from database_configuration import insert_tweet
from nltk.sentiment.vader import SentimentIntensityAnalyzer
-import numpy as np
-import pandas as pd
+from time import sleep
+from multiprocessing import Process, Queue
import matplotlib.pyplot as plt
+import matplotlib.animation as animation
topic_name = 'twitterdata'
-# Initialize an empty dataframe to store the tweet id and sentiment
-tweets = pd.DataFrame(columns=['tweet_id', 'sentiment'])
-
-
def sentence_score(rs):
review_score = SentimentIntensityAnalyzer()
return review_score.polarity_scores(rs)['compound']
@@ -40,38 +37,58 @@ def deEmojify(data):
"]+", re.UNICODE)
return re.sub(emoj, '', data)
-consumer = KafkaConsumer(
+def consumer(q):
+ consumer = KafkaConsumer(
topic_name,
- auto_offset_reset='latest',
+ auto_offset_reset= 'earliest',
enable_auto_commit=True,
auto_commit_interval_ms = 1000,
fetch_max_bytes = 128,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
+ for msg in consumer:
+ tweet = ""
+ message = json.loads(json.dumps(msg.value))
+ if 'text' in message and 'RT @' not in message['text']:
+ if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']:
+ tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
+ else:
+ tweet= deEmojify(message['text'].replace('\n', ' '))
+ if tweet != '':
+ score = sentence_score(tweet)
+ sentiment = 'neutral'
+ if score > 0:
+ sentiment = 'positive'
+ elif score < 0:
+ sentiment = 'negative'
+ q.put(({'tweet_id': message['id'], 'tweet': tweet, 'sentiment': sentiment}))
+
+x_labels = ["Positive", "Negative", "Neutral"]
+y_data = [0, 0, 0]
-for msg in consumer:
- tweet = ""
- message = json.loads(json.dumps(msg.value))
- if 'text' in message and 'RT @' not in message['text']:
- if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']:
- tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
+if __name__ == '__main__':
+ q = Queue()
+ p = Process(target=consumer, args=(q,))
+ p.start()
+
+ def update_data():
+ # Get new data from the queue
+ new_data = q.get()
+ # Check the sentiment of the tweet
+ if new_data['sentiment'] == 'positive':
+ y_data[0] += 1
+ elif new_data['sentiment'] == 'negative':
+ y_data[1] += 1
else:
- tweet= deEmojify(message['text'].replace('\n', ' '))
- if tweet != '':
- score = sentence_score(tweet)
- sentiment = 'neutral'
- if score > 0.5:
- sentiment = 'positive'
- elif score < -0.5:
- sentiment = 'negative'
- tweets = tweets.append({'tweet_id': message['id'], 'sentiment': sentiment}, ignore_index=True)
- # Update the already shown plot with the new dataframe
- tweets.groupby('sentiment').count()['tweet_id'].plot.bar()
- plt.show()
- plt.close('all')
+ y_data[2] += 1
+
+ fig = plt.figure()
+ plt.barh(x_labels, y_data, color=['green', 'red', 'blue'])
+ def animate(i):
+ update_data()
+ plt.barh(x_labels, y_data, color=['green', 'red', 'blue'])
-# End the consumer
-consumer.close()
-print('Done')
+ ani = animation.FuncAnimation(fig, animate, interval=1, frames=100)
+ plt.show()