Skip to content

Commit a1072c8

Browse files
authored
feat: update stream_agent_raw function
Merged PR #863 with updates to stream_agent_raw function
2 parents a753b27 + e207418 commit a1072c8

File tree

4 files changed

+262
-28
lines changed

4 files changed

+262
-28
lines changed

app/services/tg/bot/kind/ai_relayer/router.py

Lines changed: 201 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import inspect
2+
import json
23
import logging
4+
from datetime import datetime
5+
from typing import List
36

47
import telegramify_markdown
58
from aiogram import Router
@@ -9,6 +12,8 @@
912

1013
from intentkit.core.client import execute_agent
1114
from intentkit.models.chat import AuthorType, ChatMessageCreate
15+
from intentkit.models.redis import get_redis
16+
from intentkit.models.user import User
1217
from intentkit.utils.slack_alert import send_slack_message
1318

1419
from app.services.tg.bot import pool
@@ -20,6 +25,133 @@
2025

2126
logger = logging.getLogger(__name__)
2227

28+
# Cache configuration
29+
CACHE_TTL = 24 * 60 * 60 # 1 day in seconds
30+
MAX_CACHED_MESSAGES = 10
31+
MAX_CACHE_SIZE_BYTES = 2048 # 2KB
32+
33+
34+
async def cache_message(chat_id: int, agent_id: str, message: Message) -> None:
35+
"""Cache a message in Redis for context retrieval.
36+
37+
Args:
38+
chat_id: The chat ID where the message was sent
39+
agent_id: The agent ID to identify the agent
40+
message: The message to cache
41+
"""
42+
try:
43+
redis = get_redis()
44+
cache_key = f"intentkit:tg_context:{agent_id}:{chat_id}"
45+
46+
# Prepare message data for caching
47+
username = (
48+
message.from_user.username
49+
if message.from_user and message.from_user.username
50+
else "Unknown"
51+
)
52+
53+
message_data = {
54+
"text": message.text,
55+
"username": username,
56+
"timestamp": datetime.now().isoformat(),
57+
"message_id": message.message_id,
58+
}
59+
60+
# Add to Redis list (right push to maintain chronological order - oldest first)
61+
await redis.rpush(cache_key, json.dumps(message_data))
62+
63+
# Trim list to keep only the latest MAX_CACHED_MESSAGES (remove from left)
64+
await redis.ltrim(cache_key, -MAX_CACHED_MESSAGES, -1)
65+
66+
# Set expiration
67+
await redis.expire(cache_key, CACHE_TTL)
68+
69+
except Exception as e:
70+
logger.warning(f"Failed to cache message: {e}")
71+
72+
73+
async def get_cached_context(chat_id: int, agent_id: str) -> List[dict]:
74+
"""Retrieve cached messages for context.
75+
76+
Args:
77+
chat_id: The chat ID to get context for
78+
agent_id: The agent ID to identify the agent
79+
80+
Returns:
81+
List of cached message data, limited by size and count
82+
"""
83+
try:
84+
redis = get_redis()
85+
cache_key = f"intentkit:tg_context:{agent_id}:{chat_id}"
86+
87+
# Get all cached messages (oldest first due to rpush)
88+
cached_messages = await redis.lrange(cache_key, 0, -1)
89+
90+
if not cached_messages:
91+
return []
92+
93+
# Parse messages and apply size limit
94+
context_messages = []
95+
total_size = 0
96+
97+
for msg_json in cached_messages:
98+
try:
99+
msg_data = json.loads(msg_json)
100+
msg_size = len(json.dumps(msg_data))
101+
102+
if total_size + msg_size > MAX_CACHE_SIZE_BYTES:
103+
break
104+
105+
context_messages.append(msg_data)
106+
total_size += msg_size
107+
108+
except json.JSONDecodeError:
109+
continue
110+
111+
return context_messages
112+
113+
except Exception as e:
114+
logger.warning(f"Failed to get cached context: {e}")
115+
return []
116+
117+
118+
async def clear_cached_context(chat_id: int, agent_id: str) -> None:
119+
"""Clear cached messages after agent processes them.
120+
121+
Args:
122+
chat_id: The chat ID to clear context for
123+
agent_id: The agent ID to identify the agent
124+
"""
125+
try:
126+
redis = get_redis()
127+
cache_key = f"intentkit:tg_context:{agent_id}:{chat_id}"
128+
await redis.delete(cache_key)
129+
130+
except Exception as e:
131+
logger.warning(f"Failed to clear cached context: {e}")
132+
133+
134+
async def get_user_id(from_user) -> str:
135+
"""
136+
Extract user_id from telegram message from_user.
137+
138+
Args:
139+
from_user: Telegram user object from message.from_user
140+
141+
Returns:
142+
str: User ID, either from database lookup or fallback to username/user_id
143+
"""
144+
if from_user and from_user.username:
145+
user = await User.get_by_tg(from_user.username)
146+
if user:
147+
return user.id
148+
else:
149+
return from_user.username
150+
elif from_user:
151+
return str(from_user.id)
152+
else:
153+
raise ValueError("No valid user information available")
154+
23155

24156
def cur_func_name():
25157
return inspect.stack()[1][3]
@@ -67,25 +199,43 @@ async def gp_command_start(message: Message):
67199
)
68200
async def gp_process_message(message: Message) -> None:
69201
bot = await message.bot.get_me()
70-
if (
202+
203+
# Check if this message should trigger a bot reply
204+
should_reply = (
71205
message.reply_to_message
72206
and message.reply_to_message.from_user.id == message.bot.id
73-
) or bot.username in message.text:
207+
) or bot.username in message.text
208+
209+
if should_reply:
74210
cached_bot_item = pool.bot_by_token(message.bot.token)
75211
if cached_bot_item is None:
76212
logger.warning(f"bot with token {message.bot.token} not found in cache.")
77213
return
78214

79-
if message.from_user:
215+
try:
216+
user_id = await get_user_id(message.from_user)
217+
is_owner = (
218+
cached_bot_item._owner == message.from_user.username
219+
if message.from_user
220+
else False
221+
)
80222
logger.info(f"message from: {message.from_user}")
81-
is_owner = cached_bot_item._owner == message.from_user.username
82-
else:
223+
except ValueError:
83224
is_owner = False
225+
logger.error(
226+
f"telegram message from user without username: {message.from_user}"
227+
)
228+
return
84229

85230
# Add processing reaction
86231
await message.react([ReactionTypeEmoji(emoji="🤔")])
87232

88233
try:
234+
# Get cached context messages
235+
context_messages = await get_cached_context(
236+
message.chat.id, cached_bot_item.agent_id
237+
)
238+
89239
# remove bot name tag from text
90240
message_text = remove_bot_name(bot.username, message.text)
91241
if len(message_text) > 65535:
@@ -95,24 +245,40 @@ async def gp_process_message(message: Message) -> None:
95245
f"length: {len(message_text)}\n"
96246
f"chat_id:{message.chat.id}\n"
97247
f"agent:{cached_bot_item.agent_id}\n"
98-
f"user:{message.from_user.id}\n"
248+
f"user:{user_id}\n"
99249
f"content:{message_text[:100]}..."
100250
)
101251
)
102252

253+
# Wrap message with group context and username
254+
username = (
255+
message.from_user.username
256+
if message.from_user and message.from_user.username
257+
else "Unknown"
258+
)
259+
260+
# Build message with context
261+
if context_messages:
262+
context_text = "Recent conversation context:\n"
263+
# Messages are already in chronological order (oldest first)
264+
for ctx_msg in context_messages:
265+
context_text += f"@{ctx_msg['username']}: {ctx_msg['text']}\n"
266+
context_text += f"\nCurrent message from @{username}: {message_text}"
267+
wrapped_message = f"[Group Message with Context]: {context_text}"
268+
else:
269+
wrapped_message = f"[Group Message from @{username}]: {message_text}"
270+
103271
input = ChatMessageCreate(
104272
id=str(XID()),
105273
agent_id=cached_bot_item.agent_id,
106274
chat_id=pool.agent_chat_id(
107275
cached_bot_item.is_public_memory, message.chat.id
108276
),
109-
user_id=cached_bot_item.agent_owner
110-
if is_owner
111-
else str(message.from_user.id),
112-
author_id=str(message.from_user.id),
277+
user_id=cached_bot_item.agent_owner if is_owner else user_id,
278+
author_id=user_id,
113279
author_type=AuthorType.TELEGRAM,
114280
thread_type=AuthorType.TELEGRAM,
115-
message=message_text,
281+
message=wrapped_message,
116282
)
117283
response = await execute_agent(input)
118284
await message.answer(
@@ -122,6 +288,10 @@ async def gp_process_message(message: Message) -> None:
122288
parse_mode="MarkdownV2",
123289
reply_to_message_id=message.message_id,
124290
)
291+
292+
# Clear cached context after successful agent response
293+
await clear_cached_context(message.chat.id, cached_bot_item.agent_id)
294+
125295
except Exception as e:
126296
logger.warning(
127297
f"error processing in function:{cur_func_name()}, token:{message.bot.token}, err={str(e)}"
@@ -135,6 +305,11 @@ async def gp_process_message(message: Message) -> None:
135305
await message.react([])
136306
except Exception as e:
137307
logger.warning(f"Failed to remove reaction: {str(e)}")
308+
else:
309+
# This message doesn't trigger a reply, cache it for context
310+
cached_bot_item = pool.bot_by_token(message.bot.token)
311+
if cached_bot_item:
312+
await cache_message(message.chat.id, cached_bot_item.agent_id, message)
138313

139314

140315
## direct commands and messages
@@ -164,11 +339,20 @@ async def process_message(message: Message) -> None:
164339
logger.warning(f"bot with token {message.bot.token} not found in cache.")
165340
return
166341

167-
if message.from_user:
342+
try:
343+
user_id = await get_user_id(message.from_user)
344+
is_owner = (
345+
cached_bot_item._owner == message.from_user.username
346+
if message.from_user
347+
else False
348+
)
168349
logger.info(f"message from: {message.from_user}")
169-
is_owner = cached_bot_item._owner == message.from_user.username
170-
else:
350+
except ValueError:
171351
is_owner = False
352+
logger.error(
353+
f"telegram message from user without username: {message.from_user}"
354+
)
355+
return
172356

173357
# Add processing reaction
174358
await message.react([ReactionTypeEmoji(emoji="🤔")])
@@ -181,7 +365,7 @@ async def process_message(message: Message) -> None:
181365
f"length: {len(message.text)}\n"
182366
f"chat_id:{message.chat.id}\n"
183367
f"agent:{cached_bot_item.agent_id}\n"
184-
f"user:{message.from_user.id}\n"
368+
f"user:{user_id}\n"
185369
f"content:{message.text[:100]}..."
186370
)
187371
)
@@ -190,10 +374,8 @@ async def process_message(message: Message) -> None:
190374
id=str(XID()),
191375
agent_id=cached_bot_item.agent_id,
192376
chat_id=pool.agent_chat_id(False, message.chat.id),
193-
user_id=cached_bot_item.agent_owner
194-
if is_owner
195-
else str(message.from_user.id),
196-
author_id=str(message.from_user.id),
377+
user_id=cached_bot_item.agent_owner if is_owner else user_id,
378+
author_id=user_id,
197379
author_type=AuthorType.TELEGRAM,
198380
thread_type=AuthorType.TELEGRAM,
199381
message=message.text,

intentkit/core/engine.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,12 @@ async def stream_agent_raw(
293293
# save input message first
294294
user_message = await message.save()
295295

296+
# temporary debug logging for telegram messages
297+
if user_message.author_type == AuthorType.TELEGRAM:
298+
logger.info(
299+
f"[TELEGRAM DEBUG] Agent: {user_message.agent_id} | Chat: {user_message.chat_id} | Message: {user_message.message}"
300+
)
301+
296302
if re.search(
297303
r"(@clear|/clear)(?!\w)",
298304
user_message.message.strip(),

intentkit/core/prompt.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from intentkit.models.agent_data import AgentData
1313
from intentkit.models.chat import AuthorType
1414
from intentkit.models.skill import Skill
15+
from intentkit.models.user import User
1516

1617
# ============================================================================
1718
# CONSTANTS AND CONFIGURATION
@@ -120,18 +121,37 @@ def _build_wallet_section(agent: Agent, agent_data: AgentData) -> str:
120121
return "\n".join(wallet_parts) + ("\n" if wallet_parts else "")
121122

122123

123-
def _build_user_info_section(context: AgentContext) -> str:
124+
async def _build_user_info_section(context: AgentContext) -> str:
124125
"""Build user information section when user_id is a valid EVM wallet address."""
125126
if not context.user_id:
126127
return ""
127128

128-
# Check if user_id is a valid EVM wallet address
129-
try:
130-
if is_address(context.user_id):
131-
return f"## User Info\n\nThe person you are talking to has wallet address: {context.user_id}\n\n"
132-
except Exception:
133-
# If validation fails, don't include the section
134-
pass
129+
user = await User.get(context.user_id)
130+
131+
prompt_array = []
132+
133+
evm_wallet_address = ""
134+
if user and user.evm_wallet_address:
135+
evm_wallet_address = user.evm_wallet_address
136+
elif is_address(context.user_id):
137+
evm_wallet_address = context.user_id
138+
139+
if evm_wallet_address:
140+
prompt_array.append(
141+
f"The user you are talking to has EVM wallet address: {evm_wallet_address}\n"
142+
)
143+
144+
if user:
145+
if user.email:
146+
prompt_array.append(f"User Email: {user.email}\n")
147+
if user.x_username:
148+
prompt_array.append(f"User X Username: {user.x_username}\n")
149+
if user.telegram_username:
150+
prompt_array.append(f"User Telegram Username: {user.telegram_username}\n")
151+
152+
if prompt_array:
153+
prompt_array.append("\n")
154+
return "## User Info\n\n" + "".join(prompt_array)
135155

136156
return ""
137157

@@ -397,7 +417,7 @@ async def formatted_prompt(
397417
)
398418

399419
# Add user info if user_id is a valid EVM wallet address
400-
user_info = _build_user_info_section(context)
420+
user_info = await _build_user_info_section(context)
401421
if user_info:
402422
final_system_prompt = f"{final_system_prompt}{user_info}"
403423

0 commit comments

Comments
 (0)