aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-23 18:40:34 -0500
committerPriyansh <[email protected]>2021-12-23 18:40:34 -0500
commit82e7ff46c85acc1449d82e7eaa150f4064af05b7 (patch)
tree413c9f0372a2800e927b8099f2fcda675ca45a4d /consumer.py
parent99c2910e4d05cbd977df28773c93ff3fac33a730 (diff)
downloadKafkaPySpark-82e7ff46c85acc1449d82e7eaa150f4064af05b7.tar.xz
KafkaPySpark-82e7ff46c85acc1449d82e7eaa150f4064af05b7.zip
Added Kafka, Consumer and Producer Modules
Diffstat (limited to 'consumer.py')
-rw-r--r--consumer.py19
1 files changed, 19 insertions, 0 deletions
diff --git a/consumer.py b/consumer.py
new file mode 100644
index 0000000..9b13a22
--- /dev/null
+++ b/consumer.py
@@ -0,0 +1,19 @@
+from kafka import KafkaConsumer
+import json
+
+topic_name = 'twitterdata'
+
+consumer = KafkaConsumer(
+ topic_name,
+ bootstrap_servers=['localhost:9092'],
+ auto_offset_reset='latest',
+ enable_auto_commit=True,
+ auto_commit_interval_ms = 5000,
+ fetch_max_bytes = 128,
+ max_poll_records = 100,
+
+ value_deserializer=lambda x: json.loads(x.decode('utf-8')))
+
+for message in consumer:
+ tweets = json.loads(json.dumps(message.value))
+ print(tweets) \ No newline at end of file