diff options
| author | Priyansh <[email protected]> | 2021-12-23 18:40:34 -0500 |
|---|---|---|
| committer | Priyansh <[email protected]> | 2021-12-23 18:40:34 -0500 |
| commit | 82e7ff46c85acc1449d82e7eaa150f4064af05b7 (patch) | |
| tree | 413c9f0372a2800e927b8099f2fcda675ca45a4d /producer.py | |
| parent | 99c2910e4d05cbd977df28773c93ff3fac33a730 (diff) | |
| download | KafkaPySpark-82e7ff46c85acc1449d82e7eaa150f4064af05b7.tar.xz KafkaPySpark-82e7ff46c85acc1449d82e7eaa150f4064af05b7.zip | |
Added Kafka, Consumer and Producer Modules
Diffstat (limited to 'producer.py')
| -rw-r--r-- | producer.py | 43 |
1 files changed, 43 insertions, 0 deletions
diff --git a/producer.py b/producer.py new file mode 100644 index 0000000..e677082 --- /dev/null +++ b/producer.py @@ -0,0 +1,43 @@ +from dotenv import load_dotenv +from tweepy.streaming import StreamListener +from tweepy import OAuthHandler +from tweepy import Stream +from kafka import KafkaProducer, producer +import os + +load_dotenv() + +access_token = os.environ.get('ACCESS_TOKEN') +access_token_secret = os.environ.get('ACCESS_TOKEN_SECRET') +consumer_key = os.environ.get('CONSUMER_KEY') +consumer_secret = os.environ.get('CONSUMER_SECRET') + +producer = KafkaProducer(bootstrap_servers='localhost:9092') + +topic_name = 'twitterdata' + +class TwitterAuth(): + """Set up Twitter Authentication""" + def authenticateTwitterApp(self): + auth = OAuthHandler(consumer_key, consumer_secret) + auth.set_access_token(access_token, access_token_secret) + return auth + +class TwitterStreamer(): + """Set up Streamer""" + def stream_tweets(self): + while True: + listener = ListenerTS() + auth = self.twitterAuth.authenticateTwitterApp() + stream = Stream(auth, listener) + stream.filter(track=["Apple"], stall_warnings=True, languages= ["en"]) + +class ListenerTS(StreamListener): + + def on_data(self, raw_data): + producer.send(topic_name, str.encode(raw_data)) + return True + +if __name__ == "__main__": + TS = TwitterStreamer() + TS.stream_tweets()
\ No newline at end of file |
