diff options
| author | jmreddy2106 <[email protected]> | 2021-12-23 23:32:03 -0500 |
|---|---|---|
| committer | jmreddy2106 <[email protected]> | 2021-12-23 23:32:03 -0500 |
| commit | 223c2151bd7b08c99aa5910a31e0d7d642f2edf4 (patch) | |
| tree | 8cc3292a541d2c1e821eedcca07d15743c28b217 | |
| parent | 2f1f141d07a721eb4fabc6c3dc8e32d27c4251f6 (diff) | |
| download | KafkaPySpark-223c2151bd7b08c99aa5910a31e0d7d642f2edf4.tar.xz KafkaPySpark-223c2151bd7b08c99aa5910a31e0d7d642f2edf4.zip | |
Tested consumer
| -rw-r--r-- | README.md | 8 | ||||
| -rw-r--r-- | consumer.py | 60 | ||||
| -rw-r--r-- | requirements.txt | 2 |
3 files changed, 45 insertions, 25 deletions
@@ -1 +1,7 @@ -# KafkaPySpark
\ No newline at end of file +# KafkaPySpark +# Zookeeper +bin/zookeeper-server-start.sh config/zookeeper.properties +# Kafka Server +bin/kafka-server-start.sh config/server.properties +# Kafka Topic creation +bin/kafka-topics.sh --create --partitions 2 —replication-factor 1 --topic twitterdata --bootstrap-server localhost:9092
\ No newline at end of file diff --git a/consumer.py b/consumer.py index 3803b4a..3670105 100644 --- a/consumer.py +++ b/consumer.py @@ -8,39 +8,53 @@ from pyspark.streaming import StreamingContext import math import string import random +from pyspark.conf import SparkConf + topic_name = 'twitterdata' + +# consumer = KafkaConsumer(topic_name) +# for msg in consumer: +# print (msg) + consumer = KafkaConsumer( topic_name, - bootstrap_servers=['localhost:9092'], auto_offset_reset='latest', enable_auto_commit=True, auto_commit_interval_ms = 5000, fetch_max_bytes = 128, max_poll_records = 100, - value_deserializer=lambda x: json.loads(x.decode('utf-8'))) -if __name__ == '__main__': - spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load() - spark.createOrReplaceTempView("tweets") - spark.sql("select * from tweets").show() - spark.stop() - sc = SparkContext(appName="TwitterStreaming") - ssc = StreamingContext(sc, 5) - ssc.checkpoint("checkpoint") - lines = ssc.socketTextStream("localhost", 9999) - words = lines.flatMap(lambda line: line.split(" ")) - pairs = words.map(lambda word: (word, 1)) - wordCounts = pairs.reduceByKey(lambda x, y: x + y) - wordCounts.pprint() - ssc.start() - ssc.awaitTermination() - ssc.stop() - sc.stop() - consumer.close() - print("Done") - - +for msg in consumer: + print(json.loads(json.dumps(msg.value))) + +# spark = SparkSession.builder.master("local").appName("Tweets").getOrCreate() +# sc = spark.sparkContext +# sc.setLogLevel("ERROR") +# urlRdd = sc.parallelize([consumer]) +# urldf = spark.read.json(urlRdd) +# urldf.printSchema() +# urldf.show() + +# if __name__ == '__main__': +# spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate().readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic_name).load() +# spark.createOrReplaceTempView("tweets") +# spark.sql("select * from tweets").show() +# spark.stop() +# sc = SparkContext(appName="TwitterStreaming") +# ssc = StreamingContext(sc, 5) +# ssc.checkpoint("checkpoint") +# lines = ssc.socketTextStream("localhost", 9999) +# words = lines.flatMap(lambda line: line.split(" ")) +# pairs = words.map(lambda word: (word, 1)) +# wordCounts = pairs.reduceByKey(lambda x, y: x + y) +# wordCounts.pprint() +# ssc.start() +# ssc.awaitTermination() +# ssc.stop() +# sc.stop() +# consumer.close() +# print("Done")
\ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 065bf27..6017dec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ kafka-python -dotenv +python-dotenv tweepy==3.9.0 pyspark==2.4.6 |
