From 223c2151bd7b08c99aa5910a31e0d7d642f2edf4 Mon Sep 17 00:00:00 2001 From: jmreddy2106 Date: Thu, 23 Dec 2021 23:32:03 -0500 Subject: Tested consumer --- README.md | 8 +++++++- consumer.py | 60 ++++++++++++++++++++++++++++++++++---------------------- requirements.txt | 2 +- 3 files changed, 45 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index fb37ca0..48dc92d 100644 --- a/README.md +++ b/README.md @@ -1 +1,7 @@ -# KafkaPySpark \ No newline at end of file +# KafkaPySpark +# Zookeeper +bin/zookeeper-server-start.sh config/zookeeper.properties +# Kafka Server +bin/kafka-server-start.sh config/server.properties +# Kafka Topic creation +bin/kafka-topics.sh --create --partitions 2 —replication-factor 1 --topic twitterdata --bootstrap-server localhost:9092 \ No newline at end of file diff --git a/consumer.py b/consumer.py index 3803b4a..3670105 100644 --- a/consumer.py +++ b/consumer.py @@ -8,39 +8,53 @@ from pyspark.streaming import StreamingContext import math import string import random +from pyspark.conf import SparkConf + topic_name = 'twitterdata' + +# consumer = KafkaConsumer(topic_name) +# for msg in consumer: +# print (msg) + consumer = KafkaConsumer( topic_name, - bootstrap_servers=['localhost:9092'], auto_offset_reset='latest', enable_auto_commit=True, auto_commit_interval_ms = 5000, fetch_max_bytes = 128, max_poll_records = 100, - value_deserializer=lambda x: json.loads(x.decode('utf-8'))) -if __name__ == '__main__': - spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load() - spark.createOrReplaceTempView("tweets") - spark.sql("select * from tweets").show() - spark.stop() - sc = SparkContext(appName="TwitterStreaming") - ssc = StreamingContext(sc, 5) - ssc.checkpoint("checkpoint") - lines = ssc.socketTextStream("localhost", 9999) - words = lines.flatMap(lambda line: line.split(" ")) - pairs = words.map(lambda word: (word, 1)) - wordCounts = pairs.reduceByKey(lambda x, y: x + y) - wordCounts.pprint() - ssc.start() - ssc.awaitTermination() - ssc.stop() - sc.stop() - consumer.close() - print("Done") - - +for msg in consumer: + print(json.loads(json.dumps(msg.value))) + +# spark = SparkSession.builder.master("local").appName("Tweets").getOrCreate() +# sc = spark.sparkContext +# sc.setLogLevel("ERROR") +# urlRdd = sc.parallelize([consumer]) +# urldf = spark.read.json(urlRdd) +# urldf.printSchema() +# urldf.show() + +# if __name__ == '__main__': +# spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load() +# spark.createOrReplaceTempView("tweets") +# spark.sql("select * from tweets").show() +# spark.stop() +# sc = SparkContext(appName="TwitterStreaming") +# ssc = StreamingContext(sc, 5) +# ssc.checkpoint("checkpoint") +# lines = ssc.socketTextStream("localhost", 9999) +# words = lines.flatMap(lambda line: line.split(" ")) +# pairs = words.map(lambda word: (word, 1)) +# wordCounts = pairs.reduceByKey(lambda x, y: x + y) +# wordCounts.pprint() +# ssc.start() +# ssc.awaitTermination() +# ssc.stop() +# sc.stop() +# consumer.close() +# print("Done") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 065bf27..6017dec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ kafka-python -dotenv +python-dotenv tweepy==3.9.0 pyspark==2.4.6 -- cgit v1.2.3