aboutsummaryrefslogtreecommitdiff
path: root/producer.py
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-23 19:05:23 -0500
committerPriyansh <[email protected]>2021-12-23 19:05:23 -0500
commitcbbc6b1cc7c7c1c24c3e702ab3b88e7c2dbb8673 (patch)
treee3a082caa280096a1ff0ab15b57f3f6dba29f66d /producer.py
parent227657238c66143100f053cab78b8ba53310492b (diff)
downloadKafkaPySpark-cbbc6b1cc7c7c1c24c3e702ab3b88e7c2dbb8673.tar.xz
KafkaPySpark-cbbc6b1cc7c7c1c24c3e702ab3b88e7c2dbb8673.zip
Updated Producer with requirements
Diffstat (limited to 'producer.py')
-rw-r--r--producer.py7
1 files changed, 5 insertions, 2 deletions
diff --git a/producer.py b/producer.py
index e677082..6723ae0 100644
--- a/producer.py
+++ b/producer.py
@@ -2,7 +2,7 @@ from dotenv import load_dotenv
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
-from kafka import KafkaProducer, producer
+from kafka import KafkaProducer
import os
load_dotenv()
@@ -16,7 +16,7 @@ producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'twitterdata'
-class TwitterAuth():
+class twitterAuth():
"""Set up Twitter Authentication"""
def authenticateTwitterApp(self):
auth = OAuthHandler(consumer_key, consumer_secret)
@@ -25,6 +25,9 @@ class TwitterAuth():
class TwitterStreamer():
"""Set up Streamer"""
+ def __init__(self):
+ self.twitterAuth = twitterAuth()
+
def stream_tweets(self):
while True:
listener = ListenerTS()