11import os
22import logging
3- from typing import Dict , Any , List , Tuple
3+ from typing import Dict , Any , List , Tuple , Iterator
44from requests_oauthlib import OAuth1Session
55from dotenv import set_key , load_dotenv
66from src .connections .base_connection import BaseConnection , Action , ActionParameter
77from src .helpers import print_h_bar
8+ import json ,requests
89
910logger = logging .getLogger ("connections.twitter_connection" )
1011
@@ -91,6 +92,13 @@ def register_actions(self) -> None:
9192 ActionParameter ("tweet_id" , True , str , "ID of the tweet to query for replies" )
9293 ],
9394 description = "Fetch tweet replies"
95+ ),
96+ "stream-tweets" : Action (
97+ name = "stream-tweets" ,
98+ parameters = [
99+ ActionParameter ("filter_string" , True , str , "Filter string for rules of the stream , e.g @username" )
100+ ],
101+ description = "Stream tweets based on filter rule"
94102 )
95103 }
96104
@@ -107,6 +115,8 @@ def _get_credentials(self) -> Dict[str, str]:
107115 'TWITTER_USER_ID' : 'user ID'
108116 }
109117
118+ optional_vars = {'TWITTER_BEARER_TOKEN' } # Bearer Token is used for streaming, Twitter premium plan is required
119+
110120 credentials = {}
111121 missing = []
112122
@@ -119,11 +129,14 @@ def _get_credentials(self) -> Dict[str, str]:
119129 if missing :
120130 error_msg = f"Missing Twitter credentials: { ', ' .join (missing )} "
121131 raise TwitterConfigurationError (error_msg )
132+
133+ for env_var in optional_vars :
134+ credentials [env_var ] = os .getenv (env_var )
122135
123136 logger .debug ("All required credentials found" )
124137 return credentials
125138
126- def _make_request (self , method : str , endpoint : str , ** kwargs ) -> dict :
139+ def _make_request (self , method : str , endpoint : str ,use_bearer : bool = False , stream : bool = False , ** kwargs ) -> dict :
127140 """
128141 Make a request to the Twitter API with error handling
129142
@@ -133,16 +146,25 @@ def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
133146 **kwargs: Additional request parameters
134147
135148 Returns:
136- Dict containing the API response
149+ Dict containing the API response (or raw response if stream=True)
137150 """
138151 logger .debug (f"Making { method .upper ()} request to { endpoint } " )
139152 try :
140- oauth = self ._get_oauth ()
141153 full_url = f"https://api.twitter.com/2/{ endpoint .lstrip ('/' )} "
142154
143- response = getattr (oauth , method .lower ())(full_url , ** kwargs )
155+ if use_bearer :
156+ response = requests .request (
157+ method = method .lower (),
158+ url = full_url ,
159+ auth = self ._bearer_oauth ,
160+ stream = stream ,
161+ ** kwargs
162+ )
163+ else :
164+ oauth = self ._get_oauth ()
165+ response = getattr (oauth , method .lower ())(full_url , ** kwargs )
144166
145- if response .status_code not in [200 , 201 ]:
167+ if not stream and response .status_code not in [200 , 201 ]:
146168 logger .error (
147169 f"Request failed: { response .status_code } - { response .text } "
148170 )
@@ -151,6 +173,10 @@ def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
151173 )
152174
153175 logger .debug (f"Request successful: { response .status_code } " )
176+
177+ if stream :
178+ return response
179+
154180 return response .json ()
155181
156182 except Exception as e :
@@ -307,6 +333,10 @@ def configure(self) -> None:
307333 oauth_tokens .get ('oauth_token_secret' )
308334 }
309335
336+ bearer_token = input ("Input Bearer Token for Twitter Streams (optional, hit Enter to skip): " ).strip ()
337+ if bearer_token :
338+ env_vars ['TWITTER_BEARER_TOKEN' ] = bearer_token
339+
310340 for key , value in env_vars .items ():
311341 set_key ('.env' , key , value )
312342 logger .debug (f"Saved { key } to .env" )
@@ -486,4 +516,58 @@ def get_tweet_replies(self, tweet_id: str, count: int = 10, **kwargs) -> List[di
486516 replies = response .get ("data" , [])
487517
488518 logger .info (f"Retrieved { len (replies )} replies" )
489- return replies
519+ return replies
520+
521+ def _bearer_oauth (self ,r ):
522+ bearer_token = self ._get_credentials ().get ("TWITTER_BEARER_TOKEN" )
523+ if not bearer_token :
524+ raise TwitterConfigurationError ("Bearer token is required for streaming API access" )
525+ r .headers ["Authorization" ] = f"Bearer { bearer_token } "
526+ r .headers ["User-Agent" ] = "v2FilteredStreamPython"
527+ return r
528+
529+
530+ def _get_rules (self ):
531+ """Get stream rules"""
532+ logger .debug ("Getting stream rules" )
533+ return self ._make_request ('get' , 'tweets/search/stream/rules' , use_bearer = True )
534+
535+ def _delete_rules (self ,rules ) -> None :
536+ """Delete stream rules"""
537+ if rules is None or "data" not in rules :
538+ return None
539+
540+ ids = list (map (lambda rule : rule ["id" ], rules ["data" ]))
541+ payload = {"delete" : {"ids" : ids }}
542+ return self ._make_request ('post' , 'tweets/search/stream/rules' , use_bearer = True , json = payload )
543+
544+
545+ def _build_rule (self , filter_string , ** kwargs ) -> None :
546+ """Build a rule for the stream"""
547+ rule = [{"value" :filter_string }]
548+ payload = {"add" : rule }
549+ return self ._make_request ('post' , 'tweets/search/stream/rules' , use_bearer = True , json = payload )
550+
551+ def stream_tweets (self , filter_string :str ,** kwargs ) -> Iterator [Dict [str , Any ]]:
552+ """Stream tweets. Requires Twitter Premium Plan and Bearer Token"""
553+ rules = self ._get_rules ()
554+ self ._delete_rules (rules )
555+ self ._build_rule (filter_string )
556+ logger .info ("Starting Twitter stream" )
557+ try :
558+ response = self ._make_request ('get' , 'tweets/search/stream' ,
559+ use_bearer = True , stream = True )
560+
561+ if response .status_code != 200 :
562+ raise TwitterAPIError (f"Stream connection failed with status { response .status_code } : { response .text } " )
563+
564+ for line in response .iter_lines ():
565+ if line :
566+ tweet_data = json .loads (line )['data' ]
567+ yield tweet_data
568+
569+ except Exception as e :
570+ logger .error (f"Error streaming tweets: { str (e )} " )
571+ raise TwitterAPIError (f"Error streaming tweets: { str (e )} " )
572+
573+
0 commit comments