Skip to content

Commit 66b5714

Browse files
authored
Merge pull request #420 from dogzzdogzz/feat/per-thread-serialization
fix(slack): bot-to-bot communication — per-thread serialization + send-once reply
2 parents 6fd0d0c + 17c3e4c commit 66b5714

6 files changed

Lines changed: 201 additions & 96 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22
config.toml
33
*.swp
44
.DS_Store
5+
.env
6+
.kiro/

charts/openab/templates/configmap.yaml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ data:
4242
{{- if $cfg.discord.trustedBotIds }}
4343
trusted_bot_ids = {{ $cfg.discord.trustedBotIds | toJson }}
4444
{{- end }}
45+
{{- /* allowUserMessages: controls whether the bot requires @mention in threads (Discord) */ -}}
46+
{{- if $cfg.discord.allowUserMessages }}
47+
{{- if not (has $cfg.discord.allowUserMessages (list "involved" "mentions")) }}
48+
{{- fail (printf "agents.%s.discord.allowUserMessages must be one of: involved, mentions — got: %s" $name $cfg.discord.allowUserMessages) }}
49+
{{- end }}
50+
allow_user_messages = {{ $cfg.discord.allowUserMessages | toJson }} {{- /* involved (default): respond in bot's threads without @mention | mentions: always require @mention */ -}}
51+
{{- end }}
4552
{{- end }}
4653
4754
{{- if and ($cfg.slack).enabled }}
@@ -73,7 +80,7 @@ data:
7380
{{- if not (has ($cfg.slack).allowUserMessages (list "involved" "mentions")) }}
7481
{{- fail (printf "agents.%s.slack.allowUserMessages must be one of: involved, mentions — got: %s" $name ($cfg.slack).allowUserMessages) }}
7582
{{- end }}
76-
allow_user_messages = {{ ($cfg.slack).allowUserMessages | toJson }}
83+
allow_user_messages = {{ ($cfg.slack).allowUserMessages | toJson }} {{- /* involved (default): respond in bot's threads without @mention | mentions: always require @mention */ -}}
7784
{{- end }}
7885
{{- end }}
7986

src/adapter.rs

Lines changed: 65 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use anyhow::Result;
22
use async_trait::async_trait;
33
use serde::Serialize;
44
use std::sync::Arc;
5-
use tokio::sync::watch;
65
use tracing::error;
76

87
use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool};
@@ -41,6 +40,10 @@ pub struct SenderContext {
4140
pub display_name: String,
4241
pub channel: String,
4342
pub channel_id: String,
43+
/// Thread identifier, if the message is inside a thread.
44+
/// Slack: thread_ts. Discord: None (threads are separate channels).
45+
#[serde(skip_serializing_if = "Option::is_none")]
46+
pub thread_id: Option<String>,
4447
pub is_bot: bool,
4548
}
4649

@@ -57,9 +60,6 @@ pub trait ChatAdapter: Send + Sync + 'static {
5760
/// Send a new message, returns a reference to the sent message.
5861
async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result<MessageRef>;
5962

60-
/// Edit an existing message in-place.
61-
async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()>;
62-
6363
/// Create a thread from a trigger message, returns the thread channel ref.
6464
async fn create_thread(
6565
&self,
@@ -73,6 +73,17 @@ pub trait ChatAdapter: Send + Sync + 'static {
7373

7474
/// Remove a reaction/emoji from a message.
7575
async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()>;
76+
77+
/// Edit an existing message in-place (for streaming updates).
78+
/// Default: unsupported (send-once only).
79+
async fn edit_message(&self, _msg: &MessageRef, _content: &str) -> Result<()> {
80+
Err(anyhow::anyhow!("edit_message not supported"))
81+
}
82+
83+
/// Whether this adapter should use streaming edit (true) or send-once (false).
84+
fn use_streaming(&self) -> bool {
85+
false
86+
}
7687
}
7788

7889
// --- AdapterRouter ---
@@ -130,8 +141,6 @@ impl AdapterRouter {
130141
}
131142
}
132143

133-
let thinking_msg = adapter.send_message(thread_channel, "...").await?;
134-
135144
let thread_key = format!(
136145
"{}:{}",
137146
adapter.platform(),
@@ -144,7 +153,7 @@ impl AdapterRouter {
144153
if let Err(e) = self.pool.get_or_create(&thread_key).await {
145154
let msg = format_user_error(&e.to_string());
146155
let _ = adapter
147-
.edit_message(&thinking_msg, &format!("⚠️ {msg}"))
156+
.send_message(thread_channel, &format!("⚠️ {msg}"))
148157
.await;
149158
error!("pool error: {e}");
150159
return Err(e);
@@ -165,7 +174,6 @@ impl AdapterRouter {
165174
&thread_key,
166175
content_blocks,
167176
thread_channel,
168-
&thinking_msg,
169177
reactions.clone(),
170178
)
171179
.await;
@@ -190,7 +198,7 @@ impl AdapterRouter {
190198

191199
if let Err(ref e) = result {
192200
let _ = adapter
193-
.edit_message(&thinking_msg, &format!("⚠️ {e}"))
201+
.send_message(thread_channel, &format!("⚠️ {e}"))
194202
.await;
195203
}
196204

@@ -203,13 +211,12 @@ impl AdapterRouter {
203211
thread_key: &str,
204212
content_blocks: Vec<ContentBlock>,
205213
thread_channel: &ChannelRef,
206-
thinking_msg: &MessageRef,
207214
reactions: Arc<StatusReactionController>,
208215
) -> Result<()> {
209216
let adapter = adapter.clone();
210217
let thread_channel = thread_channel.clone();
211-
let msg_ref = thinking_msg.clone();
212218
let message_limit = adapter.message_limit();
219+
let streaming = adapter.use_streaming();
213220

214221
self.pool
215222
.with_connection(thread_key, |conn| {
@@ -221,57 +228,51 @@ impl AdapterRouter {
221228
let (mut rx, _) = conn.session_prompt(content_blocks).await?;
222229
reactions.set_thinking().await;
223230

224-
let initial = if reset {
225-
"⚠️ _Session expired, starting fresh..._\n\n...".to_string()
226-
} else {
227-
"...".to_string()
228-
};
229-
let (buf_tx, buf_rx) = watch::channel(initial);
230-
231231
let mut text_buf = String::new();
232232
let mut tool_lines: Vec<ToolEntry> = Vec::new();
233233

234234
if reset {
235235
text_buf.push_str("⚠️ _Session expired, starting fresh..._\n\n");
236236
}
237237

238-
// Spawn edit-streaming task — only edits the single message, never sends new ones.
239-
// Long content is truncated during streaming; final multi-message split happens after.
240-
let streaming_limit = message_limit.saturating_sub(100);
241-
let edit_handle = {
242-
let adapter = adapter.clone();
243-
let msg_ref = msg_ref.clone();
244-
let mut buf_rx = buf_rx.clone();
238+
// Streaming edit: send placeholder, spawn edit loop
239+
let (buf_tx, placeholder_msg) = if streaming {
240+
let initial = if reset {
241+
"⚠️ _Session expired, starting fresh..._\n\n…".to_string()
242+
} else {
243+
"…".to_string()
244+
};
245+
let msg = adapter.send_message(&thread_channel, &initial).await?;
246+
let (tx, rx) = tokio::sync::watch::channel(initial);
247+
let edit_adapter = adapter.clone();
248+
let edit_msg = msg.clone();
249+
let limit = message_limit;
250+
let mut buf_rx = rx;
245251
tokio::spawn(async move {
246-
let mut last_content = String::new();
252+
let mut last = String::new();
247253
loop {
248254
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
249255
if buf_rx.has_changed().unwrap_or(false) {
250256
let content = buf_rx.borrow_and_update().clone();
251-
if content != last_content {
252-
let display = if content.chars().count() > streaming_limit {
253-
// Tail-priority: keep the last N chars so user
254-
// sees the most recent agent output
255-
let total = content.chars().count();
256-
let skip = total - streaming_limit;
257-
let truncated: String = content.chars().skip(skip).collect();
258-
format!("…(truncated)\n{truncated}")
257+
if content != last {
258+
let display = if content.chars().count() > limit - 100 {
259+
format!("…{}", format::truncate_chars_tail(&content, limit - 100))
259260
} else {
260261
content.clone()
261262
};
262-
let _ = adapter.edit_message(&msg_ref, &display).await;
263-
last_content = content;
263+
let _ = edit_adapter.edit_message(&edit_msg, &display).await;
264+
last = content;
264265
}
265266
}
266-
if buf_rx.has_changed().is_err() {
267-
break;
268-
}
267+
if buf_rx.has_changed().is_err() { break; }
269268
}
270-
})
269+
});
270+
(Some(tx), Some(msg))
271+
} else {
272+
(None, None)
271273
};
272274

273275
// Process ACP notifications
274-
let mut got_first_text = false;
275276
let mut response_error: Option<String> = None;
276277
while let Some(notification) = rx.recv().await {
277278
if notification.id.is_some() {
@@ -284,12 +285,10 @@ impl AdapterRouter {
284285
if let Some(event) = classify_notification(&notification) {
285286
match event {
286287
AcpEvent::Text(t) => {
287-
if !got_first_text {
288-
got_first_text = true;
289-
}
290288
text_buf.push_str(&t);
291-
let _ =
292-
buf_tx.send(compose_display(&tool_lines, &text_buf, true));
289+
if let Some(tx) = &buf_tx {
290+
let _ = tx.send(compose_display(&tool_lines, &text_buf, true));
291+
}
293292
}
294293
AcpEvent::Thinking => {
295294
reactions.set_thinking().await;
@@ -307,8 +306,9 @@ impl AdapterRouter {
307306
state: ToolState::Running,
308307
});
309308
}
310-
let _ =
311-
buf_tx.send(compose_display(&tool_lines, &text_buf, true));
309+
if let Some(tx) = &buf_tx {
310+
let _ = tx.send(compose_display(&tool_lines, &text_buf, true));
311+
}
312312
}
313313
AcpEvent::ToolDone { id, title, status } => {
314314
reactions.set_thinking().await;
@@ -329,19 +329,20 @@ impl AdapterRouter {
329329
state: new_state,
330330
});
331331
}
332-
let _ =
333-
buf_tx.send(compose_display(&tool_lines, &text_buf, true));
332+
if let Some(tx) = &buf_tx {
333+
let _ = tx.send(compose_display(&tool_lines, &text_buf, true));
334+
}
334335
}
335336
_ => {}
336337
}
337338
}
338339
}
339340

340341
conn.prompt_done().await;
342+
// Stop the edit loop
341343
drop(buf_tx);
342-
let _ = edit_handle.await;
343344

344-
// Final edit with complete content
345+
// Build final content
345346
let final_content = compose_display(&tool_lines, &text_buf, false);
346347
let final_content = if final_content.is_empty() {
347348
if let Some(err) = response_error {
@@ -356,14 +357,18 @@ impl AdapterRouter {
356357
};
357358

358359
let chunks = format::split_message(&final_content, message_limit);
359-
let mut current_msg = msg_ref;
360-
for (i, chunk) in chunks.iter().enumerate() {
361-
if i == 0 {
362-
let _ = adapter.edit_message(&current_msg, chunk).await;
363-
} else if let Ok(new_msg) =
364-
adapter.send_message(&thread_channel, chunk).await
365-
{
366-
current_msg = new_msg;
360+
if let Some(msg) = placeholder_msg {
361+
// Streaming: edit first chunk into placeholder, send rest as new messages
362+
if let Some(first) = chunks.first() {
363+
let _ = adapter.edit_message(&msg, first).await;
364+
}
365+
for chunk in chunks.iter().skip(1) {
366+
let _ = adapter.send_message(&thread_channel, chunk).await;
367+
}
368+
} else {
369+
// Send-once: all chunks as new messages
370+
for chunk in &chunks {
371+
let _ = adapter.send_message(&thread_channel, chunk).await;
367372
}
368373
}
369374

src/discord.rs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::format;
55
use crate::media;
66
use async_trait::async_trait;
77
use std::sync::LazyLock;
8-
use serenity::builder::{CreateThread, EditMessage};
8+
use serenity::builder::CreateThread;
99
use serenity::http::Http;
1010
use serenity::model::channel::{AutoArchiveDuration, Message, ReactionType};
1111
use serenity::model::gateway::Ready;
@@ -54,19 +54,6 @@ impl ChatAdapter for DiscordAdapter {
5454
})
5555
}
5656

57-
async fn edit_message(&self, msg: &MessageRef, content: &str) -> anyhow::Result<()> {
58-
let ch_id: u64 = msg.channel.channel_id.parse()?;
59-
let msg_id: u64 = msg.message_id.parse()?;
60-
ChannelId::new(ch_id)
61-
.edit_message(
62-
&self.http,
63-
MessageId::new(msg_id),
64-
EditMessage::new().content(content),
65-
)
66-
.await?;
67-
Ok(())
68-
}
69-
7057
async fn create_thread(
7158
&self,
7259
channel: &ChannelRef,
@@ -216,11 +203,7 @@ impl EventHandler for Handler {
216203
self.allowed_channels.is_empty() || self.allowed_channels.contains(&channel_id);
217204

218205
let is_mentioned = msg.mentions_user_id(bot_id)
219-
|| msg.content.contains(&format!("<@{}>", bot_id))
220-
|| msg
221-
.mention_roles
222-
.iter()
223-
.any(|r| msg.content.contains(&format!("<@&{}>", r)));
206+
|| msg.content.contains(&format!("<@{}>", bot_id));
224207

225208
// Bot message gating (from upstream #321)
226209
if msg.author.bot {
@@ -361,6 +344,7 @@ impl EventHandler for Handler {
361344
display_name: display_name.to_string(),
362345
channel: "discord".into(),
363346
channel_id: msg.channel_id.to_string(),
347+
thread_id: None,
364348
is_bot: msg.author.bot,
365349
};
366350

src/format.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,13 @@ pub fn shorten_thread_name(prompt: &str) -> String {
6060
}
6161
}
6262

63+
64+
/// Truncate a string to at most `limit` Unicode characters, keeping the tail
65+
/// (most recent output) for better streaming UX.
66+
pub fn truncate_chars_tail(s: &str, limit: usize) -> String {
67+
let total = s.chars().count();
68+
if total <= limit {
69+
return s.to_string();
70+
}
71+
s.chars().skip(total - limit).collect()
72+
}

0 commit comments

Comments
 (0)