diff options
| author | Priyansh <[email protected]> | 2021-12-24 23:22:49 -0500 |
|---|---|---|
| committer | Priyansh <[email protected]> | 2021-12-24 23:22:49 -0500 |
| commit | 06b51f0c8e081c3e4120aef51c394662ea15a2a3 (patch) | |
| tree | 5e39193f993681e2bd095f11712abdad318c571c /consumer.py | |
| parent | 4c9653f46ad9c2cc64f0b8dea42700eeb5044a88 (diff) | |
| download | KafkaPySpark-06b51f0c8e081c3e4120aef51c394662ea15a2a3.tar.xz KafkaPySpark-06b51f0c8e081c3e4120aef51c394662ea15a2a3.zip | |
collaborative check
Diffstat (limited to 'consumer.py')
| -rw-r--r-- | consumer.py | 104 |
1 files changed, 54 insertions, 50 deletions
diff --git a/consumer.py b/consumer.py index 4e270a5..43264ad 100644 --- a/consumer.py +++ b/consumer.py @@ -1,22 +1,31 @@ from kafka import KafkaConsumer import json -from pyspark.sql import SparkSession -from pyspark.sql.functions import * -from pyspark.sql.types import * -from pyspark import SparkContext -from pyspark.streaming import StreamingContext -import math -import string -import random -from pyspark.conf import SparkConf - +import re topic_name = 'twitterdata' - -# consumer = KafkaConsumer(topic_name) -# for msg in consumer: -# print (msg) +def deEmojify(data): + emoj = re.compile("[" + u"\U0001F600-\U0001F64F" # emoticons + u"\U0001F300-\U0001F5FF" # symbols & pictographs + u"\U0001F680-\U0001F6FF" # transport & map symbols + u"\U0001F1E0-\U0001F1FF" # flags (iOS) + u"\U00002500-\U00002BEF" # chinese char + u"\U00002702-\U000027B0" + u"\U00002702-\U000027B0" + u"\U000024C2-\U0001F251" + u"\U0001f926-\U0001f937" + u"\U00010000-\U0010ffff" + u"\u2640-\u2642" + u"\u2600-\u2B55" + u"\u200d" + u"\u23cf" + u"\u23e9" + u"\u231a" + u"\ufe0f" # dingbats + u"\u3030" + "]+", re.UNICODE) + return re.sub(emoj, '', data) consumer = KafkaConsumer( topic_name, @@ -27,42 +36,37 @@ consumer = KafkaConsumer( max_poll_records = 100, value_deserializer=lambda x: json.loads(x.decode('utf-8'))) +counter = 0 for msg in consumer: + tweet = {} message = json.loads(json.dumps(msg.value)) - try: - print(message['extended_tweet']['full_text']) - except: - try: - print(message['text']) - except: - print(message) - print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - -# spark = SparkSession.builder.master("local").appName("Tweets").getOrCreate() -# sc = spark.sparkContext -# sc.setLogLevel("ERROR") -# urlRdd = sc.parallelize([consumer]) -# urldf = spark.read.json(urlRdd) -# urldf.printSchema() -# urldf.show() - + if 'created_at' in message: + tweet['created_at'] = message['created_at'] + if 'id' in message: + tweet['tweet_id'] = message['id'] + if 'location' in message: + tweet['location'] = message['user']['location'] + if 'screen_name' in message: + tweet['screen_name'] = message['user']['screen_name'] + if 'quote_count' in message: + tweet['quote_count'] = message['quote_count'] + if 'reply_count' in message: + tweet['reply_count'] = message['reply_count'] + if 'retweet_count' in message: + tweet['retweet_count'] = message['retweet_count'] + if 'favorite_count' in message: + tweet['favorite_count'] = message['favorite_count'] + if 'text' in message and 'RT @' not in message['text']: + if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']: + tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' ')) + else: + tweet['tweet'] = deEmojify(message['text'].replace('\n', ' ')) + # if 'tweet' in tweet and tweet['tweet'] != '': + print(counter) + counter += 1 + if counter == 99: + break -# if __name__ == '__main__': -# spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load() -# spark.createOrReplaceTempView("tweets") -# spark.sql("select * from tweets").show() -# spark.stop() -# sc = SparkContext(appName="TwitterStreaming") -# ssc = StreamingContext(sc, 5) -# ssc.checkpoint("checkpoint") -# lines = ssc.socketTextStream("localhost", 9999) -# words = lines.flatMap(lambda line: line.split(" ")) -# pairs = words.map(lambda word: (word, 1)) -# wordCounts = pairs.reduceByKey(lambda x, y: x + y) -# wordCounts.pprint() -# ssc.start() -# ssc.awaitTermination() -# ssc.stop() -# sc.stop() -# consumer.close() -# print("Done")
\ No newline at end of file +# End the consumer +consumer.close() +print('Done')
\ No newline at end of file |
