aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-24 23:22:49 -0500
committerPriyansh <[email protected]>2021-12-24 23:22:49 -0500
commit06b51f0c8e081c3e4120aef51c394662ea15a2a3 (patch)
tree5e39193f993681e2bd095f11712abdad318c571c /consumer.py
parent4c9653f46ad9c2cc64f0b8dea42700eeb5044a88 (diff)
downloadKafkaPySpark-06b51f0c8e081c3e4120aef51c394662ea15a2a3.tar.xz
KafkaPySpark-06b51f0c8e081c3e4120aef51c394662ea15a2a3.zip
collaborative check
Diffstat (limited to 'consumer.py')
-rw-r--r--consumer.py104
1 files changed, 54 insertions, 50 deletions
diff --git a/consumer.py b/consumer.py
index 4e270a5..43264ad 100644
--- a/consumer.py
+++ b/consumer.py
@@ -1,22 +1,31 @@
from kafka import KafkaConsumer
import json
-from pyspark.sql import SparkSession
-from pyspark.sql.functions import *
-from pyspark.sql.types import *
-from pyspark import SparkContext
-from pyspark.streaming import StreamingContext
-import math
-import string
-import random
-from pyspark.conf import SparkConf
-
+import re
topic_name = 'twitterdata'
-
-# consumer = KafkaConsumer(topic_name)
-# for msg in consumer:
-# print (msg)
+def deEmojify(data):
+ emoj = re.compile("["
+ u"\U0001F600-\U0001F64F" # emoticons
+ u"\U0001F300-\U0001F5FF" # symbols & pictographs
+ u"\U0001F680-\U0001F6FF" # transport & map symbols
+ u"\U0001F1E0-\U0001F1FF" # flags (iOS)
+ u"\U00002500-\U00002BEF" # chinese char
+ u"\U00002702-\U000027B0"
+ u"\U00002702-\U000027B0"
+ u"\U000024C2-\U0001F251"
+ u"\U0001f926-\U0001f937"
+ u"\U00010000-\U0010ffff"
+ u"\u2640-\u2642"
+ u"\u2600-\u2B55"
+ u"\u200d"
+ u"\u23cf"
+ u"\u23e9"
+ u"\u231a"
+ u"\ufe0f" # dingbats
+ u"\u3030"
+ "]+", re.UNICODE)
+ return re.sub(emoj, '', data)
consumer = KafkaConsumer(
topic_name,
@@ -27,42 +36,37 @@ consumer = KafkaConsumer(
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
+counter = 0
for msg in consumer:
+ tweet = {}
message = json.loads(json.dumps(msg.value))
- try:
- print(message['extended_tweet']['full_text'])
- except:
- try:
- print(message['text'])
- except:
- print(message)
- print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
-
-# spark = SparkSession.builder.master("local").appName("Tweets").getOrCreate()
-# sc = spark.sparkContext
-# sc.setLogLevel("ERROR")
-# urlRdd = sc.parallelize([consumer])
-# urldf = spark.read.json(urlRdd)
-# urldf.printSchema()
-# urldf.show()
-
+ if 'created_at' in message:
+ tweet['created_at'] = message['created_at']
+ if 'id' in message:
+ tweet['tweet_id'] = message['id']
+ if 'location' in message:
+ tweet['location'] = message['user']['location']
+ if 'screen_name' in message:
+ tweet['screen_name'] = message['user']['screen_name']
+ if 'quote_count' in message:
+ tweet['quote_count'] = message['quote_count']
+ if 'reply_count' in message:
+ tweet['reply_count'] = message['reply_count']
+ if 'retweet_count' in message:
+ tweet['retweet_count'] = message['retweet_count']
+ if 'favorite_count' in message:
+ tweet['favorite_count'] = message['favorite_count']
+ if 'text' in message and 'RT @' not in message['text']:
+ if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']:
+ tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
+ else:
+ tweet['tweet'] = deEmojify(message['text'].replace('\n', ' '))
+ # if 'tweet' in tweet and tweet['tweet'] != '':
+ print(counter)
+ counter += 1
+ if counter == 99:
+ break
-# if __name__ == '__main__':
-# spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load()
-# spark.createOrReplaceTempView("tweets")
-# spark.sql("select * from tweets").show()
-# spark.stop()
-# sc = SparkContext(appName="TwitterStreaming")
-# ssc = StreamingContext(sc, 5)
-# ssc.checkpoint("checkpoint")
-# lines = ssc.socketTextStream("localhost", 9999)
-# words = lines.flatMap(lambda line: line.split(" "))
-# pairs = words.map(lambda word: (word, 1))
-# wordCounts = pairs.reduceByKey(lambda x, y: x + y)
-# wordCounts.pprint()
-# ssc.start()
-# ssc.awaitTermination()
-# ssc.stop()
-# sc.stop()
-# consumer.close()
-# print("Done") \ No newline at end of file
+# End the consumer
+consumer.close()
+print('Done') \ No newline at end of file