aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
blob: 9b13a2269ac6885b7cb61e13a195719de3313099 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from kafka import KafkaConsumer
import json

topic_name = 'twitterdata'

consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=True,
    auto_commit_interval_ms =  5000,
    fetch_max_bytes = 128,
    max_poll_records = 100,

    value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
    tweets = json.loads(json.dumps(message.value))
    print(tweets)