aboutsummaryrefslogtreecommitdiff
path: root/producer.py
diff options
context:
space:
mode:
authorPriyansh <[email protected]>2021-12-23 18:40:34 -0500
committerPriyansh <[email protected]>2021-12-23 18:40:34 -0500
commit82e7ff46c85acc1449d82e7eaa150f4064af05b7 (patch)
tree413c9f0372a2800e927b8099f2fcda675ca45a4d /producer.py
parent99c2910e4d05cbd977df28773c93ff3fac33a730 (diff)
downloadKafkaPySpark-82e7ff46c85acc1449d82e7eaa150f4064af05b7.tar.xz
KafkaPySpark-82e7ff46c85acc1449d82e7eaa150f4064af05b7.zip
Added Kafka, Consumer and Producer Modules
Diffstat (limited to 'producer.py')
-rw-r--r--producer.py43
1 files changed, 43 insertions, 0 deletions
diff --git a/producer.py b/producer.py
new file mode 100644
index 0000000..e677082
--- /dev/null
+++ b/producer.py
@@ -0,0 +1,43 @@
+from dotenv import load_dotenv
+from tweepy.streaming import StreamListener
+from tweepy import OAuthHandler
+from tweepy import Stream
+from kafka import KafkaProducer, producer
+import os
+
+load_dotenv()
+
+access_token = os.environ.get('ACCESS_TOKEN')
+access_token_secret = os.environ.get('ACCESS_TOKEN_SECRET')
+consumer_key = os.environ.get('CONSUMER_KEY')
+consumer_secret = os.environ.get('CONSUMER_SECRET')
+
+producer = KafkaProducer(bootstrap_servers='localhost:9092')
+
+topic_name = 'twitterdata'
+
+class TwitterAuth():
+ """Set up Twitter Authentication"""
+ def authenticateTwitterApp(self):
+ auth = OAuthHandler(consumer_key, consumer_secret)
+ auth.set_access_token(access_token, access_token_secret)
+ return auth
+
+class TwitterStreamer():
+ """Set up Streamer"""
+ def stream_tweets(self):
+ while True:
+ listener = ListenerTS()
+ auth = self.twitterAuth.authenticateTwitterApp()
+ stream = Stream(auth, listener)
+ stream.filter(track=["Apple"], stall_warnings=True, languages= ["en"])
+
+class ListenerTS(StreamListener):
+
+ def on_data(self, raw_data):
+ producer.send(topic_name, str.encode(raw_data))
+ return True
+
+if __name__ == "__main__":
+ TS = TwitterStreamer()
+ TS.stream_tweets() \ No newline at end of file