diff options
| author | Priyansh <[email protected]> | 2021-12-28 22:03:04 -0500 |
|---|---|---|
| committer | Priyansh <[email protected]> | 2021-12-28 22:03:04 -0500 |
| commit | b18c9a814c11f4d66f7a460d2179087805e034a1 (patch) | |
| tree | 0525f32909fa24d83fd7fba6e3bacdc0d526b92b /consumer.py | |
| parent | 101072ea24cc0670bbd689d039d5144fa5216a4b (diff) | |
| download | KafkaPySpark-b18c9a814c11f4d66f7a460d2179087805e034a1.tar.xz KafkaPySpark-b18c9a814c11f4d66f7a460d2179087805e034a1.zip | |
Diffstat (limited to 'consumer.py')
| -rw-r--r-- | consumer.py | 77 |
1 files changed, 47 insertions, 30 deletions
diff --git a/consumer.py b/consumer.py index 5542b9e..abbcf52 100644 --- a/consumer.py +++ b/consumer.py @@ -3,16 +3,13 @@ import json import re # from database_configuration import insert_tweet from nltk.sentiment.vader import SentimentIntensityAnalyzer -import numpy as np -import pandas as pd +from time import sleep +from multiprocessing import Process, Queue import matplotlib.pyplot as plt +import matplotlib.animation as animation topic_name = 'twitterdata' -# Initialize an empty dataframe to store the tweet id and sentiment -tweets = pd.DataFrame(columns=['tweet_id', 'sentiment']) - - def sentence_score(rs): review_score = SentimentIntensityAnalyzer() return review_score.polarity_scores(rs)['compound'] @@ -40,38 +37,58 @@ def deEmojify(data): "]+", re.UNICODE) return re.sub(emoj, '', data) -consumer = KafkaConsumer( +def consumer(q): + consumer = KafkaConsumer( topic_name, - auto_offset_reset='latest', + auto_offset_reset= 'earliest', enable_auto_commit=True, auto_commit_interval_ms = 1000, fetch_max_bytes = 128, max_poll_records = 100, value_deserializer=lambda x: json.loads(x.decode('utf-8'))) + for msg in consumer: + tweet = "" + message = json.loads(json.dumps(msg.value)) + if 'text' in message and 'RT @' not in message['text']: + if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']: + tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' ')) + else: + tweet= deEmojify(message['text'].replace('\n', ' ')) + if tweet != '': + score = sentence_score(tweet) + sentiment = 'neutral' + if score > 0: + sentiment = 'positive' + elif score < 0: + sentiment = 'negative' + q.put(({'tweet_id': message['id'], 'tweet': tweet, 'sentiment': sentiment})) + +x_labels = ["Positive", "Negative", "Neutral"] +y_data = [0, 0, 0] -for msg in consumer: - tweet = "" - message = json.loads(json.dumps(msg.value)) - if 'text' in message and 'RT @' not in message['text']: - if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']: - tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' ')) +if __name__ == '__main__': + q = Queue() + p = Process(target=consumer, args=(q,)) + p.start() + + def update_data(): + # Get new data from the queue + new_data = q.get() + # Check the sentiment of the tweet + if new_data['sentiment'] == 'positive': + y_data[0] += 1 + elif new_data['sentiment'] == 'negative': + y_data[1] += 1 else: - tweet= deEmojify(message['text'].replace('\n', ' ')) - if tweet != '': - score = sentence_score(tweet) - sentiment = 'neutral' - if score > 0.5: - sentiment = 'positive' - elif score < -0.5: - sentiment = 'negative' - tweets = tweets.append({'tweet_id': message['id'], 'sentiment': sentiment}, ignore_index=True) - # Update the already shown plot with the new dataframe - tweets.groupby('sentiment').count()['tweet_id'].plot.bar() - plt.show() - plt.close('all') + y_data[2] += 1 + + fig = plt.figure() + plt.barh(x_labels, y_data, color=['green', 'red', 'blue']) + def animate(i): + update_data() + plt.barh(x_labels, y_data, color=['green', 'red', 'blue']) -# End the consumer -consumer.close() -print('Done') + ani = animation.FuncAnimation(fig, animate, interval=1, frames=100) + plt.show() |
