Skip to content

Commit b0ef4a8

Browse files
authored
Merge pull request #123 from blorm-network/fetch_tweet_details
Twitter Stream Depth
2 parents 05b8c8f + 08e576a commit b0ef4a8

File tree

4 files changed

+132
-12
lines changed

4 files changed

+132
-12
lines changed

agents/example.json

+6-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
"name": "twitter",
2525
"timeline_read_count": 10,
2626
"own_tweet_replies_count":2,
27-
"tweet_interval": 5400
27+
"tweet_interval": 5400,
28+
"respond_to_mentions": {
29+
"enabled": false,
30+
"accounts_mentioned": ["blormmy"],
31+
"accounts_to_listen_to": ["0xzerebro"]
32+
}
2833
},
2934
{
3035
"name": "farcaster",

src/actions/twitter_actions.py

+36-4
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,49 @@ def like_tweet(agent, **kwargs):
9898
@register_action("respond-to-mentions")
9999
def respond_to_mentions(agent,**kwargs): #REQUIRES TWITTER PREMIUM PLAN
100100

101-
filter_str = f"@{agent.username} -is:retweet"
101+
accounts_to_listen_to = kwargs.get('accounts_to_listen_to', [])
102+
accounts_mentioned = kwargs.get('accounts_mentioned', [])
103+
104+
# Get user IDs of accounts to listen to
105+
accounts_to_listen_to_ids = []
106+
user_ids = agent.connection_manager.perform_action(
107+
connection_name="twitter",
108+
action_name="get-user-details",
109+
params = [[], accounts_to_listen_to]
110+
)
111+
if user_ids:
112+
accounts_to_listen_to_ids = [user.get('id') for user in user_ids]
113+
114+
# Create filter string to get tweets from accounts_to_listen_to_ids mentioning any of accounts_mentioned
115+
if accounts_mentioned:
116+
mention_str = " OR ".join([f"@{account}" for account in accounts_mentioned])
117+
118+
if accounts_to_listen_to_ids:
119+
filter_str = " OR ".join([f"from:{user_id}" for user_id in accounts_to_listen_to_ids])
120+
filter_str = f"({filter_str}) ({mention_str}) -is:retweet"
121+
else:
122+
filter_str = f"({mention_str}) -is:retweet"
123+
else:
124+
filter_str = ""
125+
126+
102127
stream_function = agent.connection_manager.perform_action(
103128
connection_name="twitter",
104129
action_name="stream-tweets",
105130
params=[filter_str]
106131
)
132+
107133
def process_tweets():
108134
for tweet_data in stream_function:
109-
tweet_id = tweet_data["id"]
110-
tweet_text = tweet_data["text"]
111-
agent.logger.info(f"Received a mention: {tweet_text}")
135+
try:
136+
tweet_id = tweet_data.get("id", "")
137+
tweet_text = tweet_data.get("text", "")
138+
author_id = tweet_data.get("author_id", "")
139+
author_username = tweet_data.get("author_username", "")
140+
agent.logger.info(f"Received a mention from @{author_username}: {tweet_text}")
141+
except Exception as e:
142+
agent.logger.error(f"Error processing tweet in stream: {str(e)}")
143+
continue
112144

113145
processing_thread = threading.Thread(target=process_tweets)
114146
processing_thread.daemon = True

src/agent.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,21 @@ def __init__(
4040
self.use_time_based_weights = agent_dict["use_time_based_weights"]
4141
self.time_based_multipliers = agent_dict["time_based_multipliers"]
4242

43-
has_twitter_tasks = any("tweet" in task["name"] for task in agent_dict.get("tasks", []))
43+
self.has_twitter_tasks = any("tweet" in task["name"] for task in agent_dict.get("tasks", []))
4444

4545
twitter_config = next((config for config in agent_dict["config"] if config["name"] == "twitter"), None)
4646

47-
if has_twitter_tasks and twitter_config:
47+
if self.has_twitter_tasks and twitter_config:
4848
self.tweet_interval = twitter_config.get("tweet_interval", 900)
4949
self.own_tweet_replies_count = twitter_config.get("own_tweet_replies_count", 2)
50+
respond_to_mentions_config = twitter_config.get("respond_to_mentions", None)
51+
52+
if (respond_to_mentions_config):
53+
self.respond_to_mentions = respond_to_mentions_config.get("enabled", False)
54+
self.accounts_mentioned = respond_to_mentions_config.get("accounts_mentioned", [])
55+
self.accounts_to_listen_to = respond_to_mentions_config.get("accounts_to_listen_to", [])
56+
else:
57+
self.respond_to_mentions = False
5058

5159
# Extract Echochambers config
5260
echochambers_config = next((config for config in agent_dict["config"] if config["name"] == "echochambers"), None)
@@ -79,7 +87,7 @@ def _setup_llm_provider(self):
7987
self.model_provider = llm_providers[0]
8088

8189
# Load Twitter username for self-reply detection if Twitter tasks exist
82-
if any("tweet" in task["name"] for task in self.tasks):
90+
if self.has_twitter_tasks:
8391
load_dotenv()
8492
self.username = os.getenv('TWITTER_USERNAME', '').lower()
8593
if not self.username:
@@ -172,14 +180,20 @@ def loop(self):
172180
logger.info(f"{i}...")
173181
time.sleep(1)
174182

183+
if self.has_twitter_tasks:
184+
if self.respond_to_mentions:
185+
logger.info("\n👀 Listening for mentions...")
186+
if (len(self.accounts_mentioned) == 0):
187+
self.accounts_mentioned = [self.username] # Default to listening to own mentions if no accounts mentioned
188+
execute_action(self, "respond-to-mentions", accounts_mentioned=self.accounts_mentioned, accounts_to_listen_to=self.accounts_to_listen_to)
175189
try:
176190
while True:
177191
success = False
178192
try:
179193
# REPLENISH INPUTS
180194
# TODO: Add more inputs to complexify agent behavior
181195
if "timeline_tweets" not in self.state or self.state["timeline_tweets"] is None or len(self.state["timeline_tweets"]) == 0:
182-
if any("tweet" in task["name"] for task in self.tasks):
196+
if (self.has_twitter_tasks):
183197
logger.info("\n👀 READING TIMELINE")
184198
self.state["timeline_tweets"] = self.connection_manager.perform_action(
185199
connection_name="twitter",

src/connections/twitter_connection.py

+72-3
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,21 @@ def register_actions(self) -> None:
9393
],
9494
description="Fetch tweet replies"
9595
),
96+
"get-user-details": Action(
97+
name="get-user-details",
98+
parameters=[
99+
ActionParameter("user_ids", True, list, "IDs of the user to get details for"),
100+
ActionParameter("usernames", True, list, "Usernames of the user to get details for")
101+
],
102+
description="Get details for user(s) by ID or username"
103+
),
104+
"get-tweet-details": Action(
105+
name="get-tweet-details",
106+
parameters=[
107+
ActionParameter("tweet_id", True, str, "ID of the tweet to get details for")
108+
],
109+
description="Get details for a specific tweet"
110+
),
96111
"stream-tweets": Action(
97112
name="stream-tweets",
98113
parameters=[
@@ -517,6 +532,47 @@ def get_tweet_replies(self, tweet_id: str, count: int = 10, **kwargs) -> List[di
517532

518533
logger.info(f"Retrieved {len(replies)} replies")
519534
return replies
535+
536+
def get_user_details(self, user_ids: list, usernames: list, **kwargs) -> dict:
537+
"""Get details for user(s) by ID or username"""
538+
logger.debug(f"Getting details for user_ids: {user_ids} or usernames: {usernames}")
539+
540+
params = {"user.fields": "username"}
541+
542+
if user_ids:
543+
params["ids"] = ",".join(user_ids)
544+
endpoint = "users"
545+
else:
546+
params["usernames"] = ",".join(usernames)
547+
endpoint = f"users/by/"
548+
549+
response = self._make_request('get', endpoint, params=params)
550+
551+
users = response.get("data", [])
552+
553+
logger.info("Retrieved user details")
554+
return users
555+
556+
def get_tweet_details(self, tweet_id: str, **kwargs) -> dict:
557+
"""Get details for a specific tweet"""
558+
logger.debug(f"Getting details for tweet {tweet_id}")
559+
560+
params = {
561+
"tweet.fields": "author_id,created_at,text,attachments,referenced_tweets",
562+
"expansions": "author_id",
563+
"user.fields": "username",
564+
"ids": tweet_id
565+
}
566+
567+
response = self._make_request('get', 'tweets', params=params)
568+
tweet = response.get("data", [])[0]
569+
570+
if "includes" in response and "users" in response["includes"]:
571+
author = response["includes"]["users"][0]
572+
tweet["author_username"] = author["username"]
573+
574+
logger.info("Retrieved tweet details")
575+
return tweet
520576

521577
def _bearer_oauth(self,r):
522578
bearer_token = self._get_credentials().get("TWITTER_BEARER_TOKEN")
@@ -553,17 +609,30 @@ def stream_tweets(self, filter_string:str,**kwargs) ->Iterator[Dict[str, Any]]:
553609
rules = self._get_rules()
554610
self._delete_rules(rules)
555611
self._build_rule(filter_string)
556-
logger.info("Starting Twitter stream")
557612
try:
613+
params = {
614+
"tweet.fields": "author_id,created_at,text,attachments,referenced_tweets",
615+
"user.fields": "username",
616+
"expansions": "author_id"
617+
}
558618
response = self._make_request('get', 'tweets/search/stream',
559-
use_bearer=True, stream=True)
619+
use_bearer=True, stream=True, params=params)
560620

561621
if response.status_code != 200:
562622
raise TwitterAPIError(f"Stream connection failed with status {response.status_code}: {response.text}")
563623

564624
for line in response.iter_lines():
625+
565626
if line:
566-
tweet_data = json.loads(line)['data']
627+
tweet_json = json.loads(line)
628+
629+
tweet_data = tweet_json.get("data", {})
630+
includes = tweet_json.get("includes", {})
631+
632+
if includes and "users" in includes:
633+
author = includes["users"][0]
634+
tweet_data["author_username"] = author["username"]
635+
567636
yield tweet_data
568637

569638
except Exception as e:

0 commit comments

Comments
 (0)