aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
blob: 8d15b07a6240f593060d592e67c83d4f3815c080 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from kafka import KafkaConsumer
import json
import re
from database_configuration import insert_tweet

topic_name = 'twitterdata'

def deEmojify(data):
    emoj = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002500-\U00002BEF"  # chinese char
        u"\U00002702-\U000027B0"
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u"\U00010000-\U0010ffff"
        u"\u2640-\u2642" 
        u"\u2600-\u2B55"
        u"\u200d"
        u"\u23cf"
        u"\u23e9"
        u"\u231a"
        u"\ufe0f"  # dingbats
        u"\u3030"
                      "]+", re.UNICODE)
    return re.sub(emoj, '', data)

consumer = KafkaConsumer(
    topic_name,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    auto_commit_interval_ms =  1000,
    fetch_max_bytes = 128,
    max_poll_records = 100,
    group_id = 'test-consumer-group',
    consumer_timeout_ms=3000,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')))


for msg in consumer:
    consumer.commit()
    tweet = {}
    message = json.loads(json.dumps(msg.value))
    if 'created_at' in message:
        tweet['created_at'] = message['created_at']
    if 'id' in message:
        tweet['tweet_id'] = message['id']
    if 'user' in message:
        tweet['location'] = message['user']['location']
    if 'user' in message:
        tweet['screen_name'] = message['user']['screen_name']
    if 'quote_count' in message:
        tweet['quote_count'] = message['quote_count']
    if 'reply_count' in message:
        tweet['reply_count'] = message['reply_count']
    if 'retweet_count' in message:
        tweet['retweet_count'] = message['retweet_count']
    if 'favorite_count' in message:
        tweet['favorite_count'] = message['favorite_count']
    if 'text' in message and 'RT @' not in message['text']:
        if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']:
            tweet['tweet'] = deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
        else:
            tweet['tweet'] = deEmojify(message['text'].replace('\n', ' '))
    if 'tweet' in tweet and tweet['tweet'] != '':
        insert_tweet(tweet)

# End the consumer
consumer.close()
print('Done')