diff options
| author | Priyansh <[email protected]> | 2021-12-25 18:24:03 -0500 |
|---|---|---|
| committer | Priyansh <[email protected]> | 2021-12-25 18:24:03 -0500 |
| commit | 7ceb40394bd40075e7fe41b8c125a5d2dcedc781 (patch) | |
| tree | e6c791b0d6dda09898624ae0aa92f6aee821879b | |
| parent | cb1e6c587ebef823b8524d72c1a97ab5765901d8 (diff) | |
| download | KafkaPySpark-7ceb40394bd40075e7fe41b8c125a5d2dcedc781.tar.xz KafkaPySpark-7ceb40394bd40075e7fe41b8c125a5d2dcedc781.zip | |
Unfinished plot
| -rw-r--r-- | consumer.py | 56 |
1 files changed, 30 insertions, 26 deletions
diff --git a/consumer.py b/consumer.py index 8d15b07..5542b9e 100644 --- a/consumer.py +++ b/consumer.py @@ -1,10 +1,22 @@ from kafka import KafkaConsumer import json import re -from database_configuration import insert_tweet +# from database_configuration import insert_tweet +from nltk.sentiment.vader import SentimentIntensityAnalyzer +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt topic_name = 'twitterdata' +# Initialize an empty dataframe to store the tweet id and sentiment +tweets = pd.DataFrame(columns=['tweet_id', 'sentiment']) + + +def sentence_score(rs): + review_score = SentimentIntensityAnalyzer() + return review_score.polarity_scores(rs)['compound'] + def deEmojify(data): emoj = re.compile("[" u"\U0001F600-\U0001F64F" # emoticons @@ -30,43 +42,35 @@ def deEmojify(data): consumer = KafkaConsumer( topic_name, - auto_offset_reset='earliest', + auto_offset_reset='latest', enable_auto_commit=True, auto_commit_interval_ms = 1000, fetch_max_bytes = 128, max_poll_records = 100, - group_id = 'test-consumer-group', - consumer_timeout_ms=3000, value_deserializer=lambda x: json.loads(x.decode('utf-8'))) for msg in consumer: - consumer.commit() - tweet = {} + tweet = "" message = json.loads(json.dumps(msg.value)) - if 'created_at' in message: - tweet['created_at'] = message['created_at'] - if 'id' in message: - tweet['tweet_id'] = message['id'] - if 'user' in message: - tweet['location'] = message['user']['location'] - if 'user' in message: - tweet['screen_name'] = message['user']['screen_name'] - if 'quote_count' in message: - tweet['quote_count'] = message['quote_count'] - if 'reply_count' in message: - tweet['reply_count'] = message['reply_count'] - if 'retweet_count' in message: - tweet['retweet_count'] = message['retweet_count'] - if 'favorite_count' in message: - tweet['favorite_count'] = message['favorite_count'] if 'text' in message and 'RT @' not in message['text']: if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']: - tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' ')) + tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' ')) else: - tweet['tweet'] = deEmojify(message['text'].replace('\n', ' ')) - if 'tweet' in tweet and tweet['tweet'] != '': - insert_tweet(tweet) + tweet= deEmojify(message['text'].replace('\n', ' ')) + if tweet != '': + score = sentence_score(tweet) + sentiment = 'neutral' + if score > 0.5: + sentiment = 'positive' + elif score < -0.5: + sentiment = 'negative' + tweets = tweets.append({'tweet_id': message['id'], 'sentiment': sentiment}, ignore_index=True) + # Update the already shown plot with the new dataframe + tweets.groupby('sentiment').count()['tweet_id'].plot.bar() + plt.show() + plt.close('all') + # End the consumer consumer.close() |
