aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjmreddy2106 <[email protected]>2021-12-23 23:32:03 -0500
committerjmreddy2106 <[email protected]>2021-12-23 23:32:03 -0500
commit223c2151bd7b08c99aa5910a31e0d7d642f2edf4 (patch)
tree8cc3292a541d2c1e821eedcca07d15743c28b217
parent2f1f141d07a721eb4fabc6c3dc8e32d27c4251f6 (diff)
downloadKafkaPySpark-223c2151bd7b08c99aa5910a31e0d7d642f2edf4.tar.xz
KafkaPySpark-223c2151bd7b08c99aa5910a31e0d7d642f2edf4.zip
Tested consumer
-rw-r--r--README.md8
-rw-r--r--consumer.py60
-rw-r--r--requirements.txt2
3 files changed, 45 insertions, 25 deletions
diff --git a/README.md b/README.md
index fb37ca0..48dc92d 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,7 @@
-# KafkaPySpark \ No newline at end of file
+# KafkaPySpark
+# Zookeeper
+bin/zookeeper-server-start.sh config/zookeeper.properties
+# Kafka Server
+bin/kafka-server-start.sh config/server.properties
+# Kafka Topic creation
+bin/kafka-topics.sh --create --partitions 2 —replication-factor 1 --topic twitterdata --bootstrap-server localhost:9092 \ No newline at end of file
diff --git a/consumer.py b/consumer.py
index 3803b4a..3670105 100644
--- a/consumer.py
+++ b/consumer.py
@@ -8,39 +8,53 @@ from pyspark.streaming import StreamingContext
import math
import string
import random
+from pyspark.conf import SparkConf
+
topic_name = 'twitterdata'
+
+# consumer = KafkaConsumer(topic_name)
+# for msg in consumer:
+# print (msg)
+
consumer = KafkaConsumer(
topic_name,
- bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 5000,
fetch_max_bytes = 128,
max_poll_records = 100,
-
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
-if __name__ == '__main__':
- spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load()
- spark.createOrReplaceTempView("tweets")
- spark.sql("select * from tweets").show()
- spark.stop()
- sc = SparkContext(appName="TwitterStreaming")
- ssc = StreamingContext(sc, 5)
- ssc.checkpoint("checkpoint")
- lines = ssc.socketTextStream("localhost", 9999)
- words = lines.flatMap(lambda line: line.split(" "))
- pairs = words.map(lambda word: (word, 1))
- wordCounts = pairs.reduceByKey(lambda x, y: x + y)
- wordCounts.pprint()
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- sc.stop()
- consumer.close()
- print("Done")
-
-
+for msg in consumer:
+ print(json.loads(json.dumps(msg.value)))
+
+# spark = SparkSession.builder.master("local").appName("Tweets").getOrCreate()
+# sc = spark.sparkContext
+# sc.setLogLevel("ERROR")
+# urlRdd = sc.parallelize([consumer])
+# urldf = spark.read.json(urlRdd)
+# urldf.printSchema()
+# urldf.show()
+
+# if __name__ == '__main__':
+# spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load()
+# spark.createOrReplaceTempView("tweets")
+# spark.sql("select * from tweets").show()
+# spark.stop()
+# sc = SparkContext(appName="TwitterStreaming")
+# ssc = StreamingContext(sc, 5)
+# ssc.checkpoint("checkpoint")
+# lines = ssc.socketTextStream("localhost", 9999)
+# words = lines.flatMap(lambda line: line.split(" "))
+# pairs = words.map(lambda word: (word, 1))
+# wordCounts = pairs.reduceByKey(lambda x, y: x + y)
+# wordCounts.pprint()
+# ssc.start()
+# ssc.awaitTermination()
+# ssc.stop()
+# sc.stop()
+# consumer.close()
+# print("Done") \ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 065bf27..6017dec 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,4 @@
kafka-python
-dotenv
+python-dotenv
tweepy==3.9.0
pyspark==2.4.6