-
Notifications
You must be signed in to change notification settings - Fork 159
Expand file tree
/
Copy pathdiscord.rs
More file actions
2232 lines (2052 loc) · 85.7 KB
/
Copy pathdiscord.rs
File metadata and controls
2232 lines (2052 loc) · 85.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
use crate::acp::protocol::ConfigOption;
use crate::acp::ContentBlock;
use crate::adapter::{AdapterRouter, ChannelRef, ChatAdapter, MessageRef, SenderContext};
use crate::bot_turns::{BotTurnTracker, TurnAction, TurnSeverity};
use crate::config::{AllowBots, AllowUsers, SttConfig};
use crate::format;
use crate::media;
use async_trait::async_trait;
use serenity::builder::{
CreateActionRow, CreateButton, CreateCommand, CreateInteractionResponse,
CreateInteractionResponseMessage, CreateSelectMenu, CreateSelectMenuKind,
CreateSelectMenuOption, CreateThread, EditMessage,
};
use serenity::http::Http;
use serenity::model::application::ButtonStyle;
use serenity::model::application::{Command, ComponentInteractionDataKind, Interaction};
use serenity::model::channel::{AutoArchiveDuration, Message, MessageType, ReactionType};
use serenity::model::gateway::Ready;
use serenity::model::id::{ChannelId, MessageId, UserId};
use serenity::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::LazyLock;
use std::sync::{Arc, OnceLock};
use tracing::{debug, error, info, warn};
/// Hard cap on consecutive bot messages in a channel or thread.
/// Prevents runaway loops between multiple bots in "all" mode.
const MAX_CONSECUTIVE_BOT_TURNS: u32 = 1000;
/// Maximum entries in the participation cache before eviction.
const PARTICIPATION_CACHE_MAX: usize = 1000;
/// Discord StringSelectMenu hard limit on options.
const SELECT_MENU_PAGE_SIZE: usize = 25;
// --- DiscordAdapter: implements ChatAdapter for Discord via serenity ---
pub struct DiscordAdapter {
http: Arc<Http>,
}
impl DiscordAdapter {
pub fn new(http: Arc<Http>) -> Self {
Self { http }
}
/// Resolve the effective Discord channel ID from a ChannelRef.
/// Discord threads are channels, so prefer thread_id when set.
fn resolve_channel(channel: &ChannelRef) -> &str {
channel.thread_id.as_deref().unwrap_or(&channel.channel_id)
}
}
#[async_trait]
impl ChatAdapter for DiscordAdapter {
fn platform(&self) -> &'static str {
"discord"
}
fn message_limit(&self) -> usize {
2000
}
async fn send_message(
&self,
channel: &ChannelRef,
content: &str,
) -> anyhow::Result<MessageRef> {
let ch_id: u64 = Self::resolve_channel(channel).parse()?;
let msg = ChannelId::new(ch_id).say(&self.http, content).await?;
Ok(MessageRef {
channel: channel.clone(),
message_id: msg.id.to_string(),
})
}
async fn send_message_with_reply(
&self,
channel: &ChannelRef,
content: &str,
reply_to_message_id: &str,
) -> anyhow::Result<MessageRef> {
let ch_id: u64 = Self::resolve_channel(channel).parse()?;
let msg_id: u64 = reply_to_message_id.parse().unwrap_or(0);
if msg_id == 0 {
// Invalid message ID, fall back to plain send
return self.send_message(channel, content).await;
}
let builder = serenity::builder::CreateMessage::new()
.content(content)
.reference_message((ChannelId::new(ch_id), MessageId::new(msg_id)));
match ChannelId::new(ch_id)
.send_message(&self.http, builder)
.await
{
Ok(msg) => Ok(MessageRef {
channel: channel.clone(),
message_id: msg.id.to_string(),
}),
Err(e) => {
// Fallback to plain send if reply fails (e.g. unknown message, cross-channel)
tracing::warn!(error = ?e, reply_to = reply_to_message_id, "reply_to failed, falling back to plain send");
self.send_message(channel, content).await
}
}
}
async fn delete_message(&self, msg: &MessageRef) -> anyhow::Result<()> {
let ch_id: u64 = Self::resolve_channel(&msg.channel).parse()?;
let msg_id: u64 = msg.message_id.parse()?;
self.http
.delete_message(ChannelId::new(ch_id), MessageId::new(msg_id), None)
.await?;
Ok(())
}
async fn edit_message(&self, msg: &MessageRef, content: &str) -> anyhow::Result<()> {
let ch_id: u64 = Self::resolve_channel(&msg.channel).parse()?;
let msg_id: u64 = msg.message_id.parse()?;
ChannelId::new(ch_id)
.edit_message(
&self.http,
MessageId::new(msg_id),
EditMessage::new().content(content),
)
.await?;
Ok(())
}
fn use_streaming(&self, other_bot_present: bool) -> bool {
!other_bot_present
}
async fn create_thread(
&self,
channel: &ChannelRef,
trigger_msg: &MessageRef,
title: &str,
) -> anyhow::Result<ChannelRef> {
let ch_id: u64 = channel.channel_id.parse()?;
let msg_id: u64 = trigger_msg.message_id.parse()?;
let thread = ChannelId::new(ch_id)
.create_thread_from_message(
&self.http,
MessageId::new(msg_id),
CreateThread::new(title).auto_archive_duration(AutoArchiveDuration::OneDay),
)
.await?;
Ok(ChannelRef {
platform: "discord".into(),
channel_id: thread.id.to_string(),
thread_id: None,
parent_id: Some(channel.channel_id.clone()),
origin_event_id: None,
})
}
async fn add_reaction(&self, msg: &MessageRef, emoji: &str) -> anyhow::Result<()> {
let ch_id: u64 = Self::resolve_channel(&msg.channel).parse()?;
let msg_id: u64 = msg.message_id.parse()?;
self.http
.create_reaction(
ChannelId::new(ch_id),
MessageId::new(msg_id),
&ReactionType::Unicode(emoji.to_string()),
)
.await?;
Ok(())
}
async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> anyhow::Result<()> {
let ch_id: u64 = Self::resolve_channel(&msg.channel).parse()?;
let msg_id: u64 = msg.message_id.parse()?;
self.http
.delete_reaction_me(
ChannelId::new(ch_id),
MessageId::new(msg_id),
&ReactionType::Unicode(emoji.to_string()),
)
.await?;
Ok(())
}
}
// --- Handler: serenity EventHandler that delegates to AdapterRouter ---
pub struct Handler {
pub router: Arc<AdapterRouter>,
pub allow_all_channels: bool,
pub allow_all_users: bool,
pub allowed_channels: HashSet<u64>,
pub allowed_users: HashSet<u64>,
pub stt_config: SttConfig,
pub adapter: OnceLock<Arc<dyn ChatAdapter>>,
pub allow_bot_messages: AllowBots,
pub trusted_bot_ids: HashSet<u64>,
pub allow_user_messages: AllowUsers,
/// Role IDs that trigger the bot (same as direct @mention).
pub allowed_role_ids: HashSet<u64>,
/// Positive-only cache: thread channel_id → cached_at for threads where bot has participated.
pub participated_threads: tokio::sync::Mutex<HashMap<String, tokio::time::Instant>>,
/// Positive-only cache: thread channel_id → cached_at for threads where other bots have posted.
/// Like participation, a thread becoming multi-bot is irreversible (bot messages don't disappear).
pub multibot_threads: tokio::sync::Mutex<HashMap<String, tokio::time::Instant>>,
/// TTL for participation cache entries (from pool.session_ttl_hours).
pub session_ttl: std::time::Duration,
/// Configurable soft limit on bot turns per thread (reset by human message).
pub max_bot_turns: u32,
/// Per-thread bot turn tracker. Both counters reset on human msg.
pub bot_turns: tokio::sync::Mutex<BotTurnTracker>,
/// Allow the bot to respond to Discord DMs.
pub allow_dm: bool,
/// Per-thread dispatcher (Message mode uses cap=1 for FIFO; Thread/Lane use configured cap).
pub dispatcher: Arc<crate::dispatch::Dispatcher>,
}
impl Handler {
/// Check if the bot has participated in a Discord thread, and whether
/// other bots have also posted in it.
/// Returns `(involved, other_bot_present)`.
/// Fail-closed: returns `(false, false)` on API error.
/// Caches positive results only (both participation and multi-bot status are irreversible).
async fn bot_participated_in_thread(
&self,
http: &Http,
channel_id: ChannelId,
bot_id: UserId,
) -> (bool, bool) {
let key = channel_id.to_string();
// Check positive caches
let cached_involved = {
let cache = self.participated_threads.lock().await;
cache
.get(&key)
.is_some_and(|ts| ts.elapsed() < self.session_ttl)
};
let cached_multibot = {
let cache = self.multibot_threads.lock().await;
cache
.get(&key)
.is_some_and(|ts| ts.elapsed() < self.session_ttl)
};
// Both cached → skip fetch entirely
// With early detection from msg.author, multibot_threads is populated
// eagerly — no need to fetch just to check for other bots.
if cached_involved {
return (true, cached_multibot);
}
// Fetch recent messages
let messages = match channel_id
.messages(http, serenity::builder::GetMessages::new().limit(200))
.await
{
Ok(msgs) => msgs,
Err(e) => {
tracing::warn!(
channel_id = %channel_id,
error = %e,
"failed to fetch thread messages for participation check, rejecting (fail-closed)"
);
return (false, false);
}
};
let involved = cached_involved || messages.iter().any(|m| m.author.id == bot_id);
let other_bot_present = cached_multibot
|| messages
.iter()
.any(|m| m.author.bot && m.author.id != bot_id);
if involved && !cached_involved {
let mut cache = self.participated_threads.lock().await;
cache.insert(key.clone(), tokio::time::Instant::now());
// Evict if over capacity
if cache.len() > PARTICIPATION_CACHE_MAX {
cache.retain(|_, ts| ts.elapsed() < self.session_ttl);
if cache.len() > PARTICIPATION_CACHE_MAX {
let mut entries: Vec<_> = cache.iter().map(|(k, v)| (k.clone(), *v)).collect();
entries.sort_by_key(|(_, ts)| *ts);
let evict_count = entries.len() / 2;
for (k, _) in entries.into_iter().take(evict_count) {
cache.remove(&k);
}
}
}
}
if other_bot_present && !cached_multibot {
let mut cache = self.multibot_threads.lock().await;
cache.insert(key, tokio::time::Instant::now());
if cache.len() > PARTICIPATION_CACHE_MAX {
cache.retain(|_, ts| ts.elapsed() < self.session_ttl);
if cache.len() > PARTICIPATION_CACHE_MAX {
let mut entries: Vec<_> = cache.iter().map(|(k, v)| (k.clone(), *v)).collect();
entries.sort_by_key(|(_, ts)| *ts);
let evict_count = entries.len() / 2;
for (k, _) in entries.into_iter().take(evict_count) {
cache.remove(&k);
}
}
}
}
(involved, other_bot_present)
}
}
#[serenity::async_trait]
impl EventHandler for Handler {
async fn message(&self, ctx: Context, msg: Message) {
let bot_id = ctx.cache.current_user().id;
// Early multibot detection: cache that another bot is present.
// Runs before self-check and bot gating so we always detect other bots. (#481)
if msg.author.bot && msg.author.id != bot_id {
let key = msg.channel_id.to_string();
let mut cache = self.multibot_threads.lock().await;
cache.entry(key).or_insert_with(tokio::time::Instant::now);
}
// Bot turn counting: runs before self-check so ALL bot messages
// (including own) count toward the per-thread limit. This means
// soft_limit=20 = 20 total bot messages in the thread (~10 per bot
// in a two-bot ping-pong). (#483)
{
let thread_key = msg.channel_id.to_string();
let mut tracker = self.bot_turns.lock().await;
if msg.author.bot {
match tracker.classify_bot_message(&thread_key) {
TurnAction::Continue => {}
TurnAction::SilentStop => return,
TurnAction::WarnAndStop {
severity,
turns,
user_message,
} => {
match severity {
TurnSeverity::Hard => tracing::warn!(
channel_id = %msg.channel_id,
turns,
"hard bot turn limit reached",
),
TurnSeverity::Soft => tracing::info!(
channel_id = %msg.channel_id,
turns,
max = self.max_bot_turns,
"soft bot turn limit reached",
),
}
// Only post the warning if this bot is allowed in the channel/thread.
// Bot turn counting intentionally runs before channel gating so ALL
// bot messages are counted, but the *warning message* must respect
// channel permissions — otherwise bots that never participated in a
// thread will spam it with warnings.
//
// Must match the full thread allowlist semantics: a thread is allowed
// if its own channel_id OR its parent_id is in allowed_channels.
let ch = msg.channel_id.get();
let in_allowed_channel = self.allowed_channels.contains(&ch);
let mut allowed_here = self.allow_all_channels || in_allowed_channel;
if !allowed_here {
// Reuse detect_thread() for thread allowlist semantics.
// Only called on the WarnAndStop path (once per soft/hard
// limit hit), not on every bot message.
if let Ok(serenity::model::channel::Channel::Guild(gc)) =
msg.channel_id.to_channel(&ctx.http).await
{
let (in_thread, _) = detect_thread(
gc.thread_metadata.is_some(),
gc.parent_id.map(|id| id.get()),
gc.owner_id.map(|id| id.get()),
bot_id.get(),
&self.allowed_channels,
self.allow_all_channels,
in_allowed_channel,
);
if in_thread {
allowed_here = true;
}
}
}
if msg.author.id != bot_id && allowed_here {
// Only warn if this bot actually participated in the
// thread — prevents uninvolved bots from spamming
// warnings in shared channels. (#727)
// Second value is `is_multibot`; not needed here.
let (participated, _) = self
.bot_participated_in_thread(&ctx.http, msg.channel_id, bot_id)
.await;
if participated {
let _ = msg.channel_id.say(&ctx.http, &user_message).await;
}
}
return;
}
}
} else if matches!(msg.kind, MessageType::Regular | MessageType::InlineReply)
&& !msg.content.is_empty()
{
tracker.on_human_message(&thread_key);
}
}
// Ignore own messages (after counting toward bot turns above)
if msg.author.id == bot_id {
return;
}
let adapter = self
.adapter
.get_or_init(|| Arc::new(DiscordAdapter::new(ctx.http.clone())))
.clone();
let channel_id = msg.channel_id.get();
let in_allowed_channel =
self.allow_all_channels || self.allowed_channels.contains(&channel_id);
let is_mentioned = msg.mentions_user_id(bot_id)
|| msg.content.contains(&format!("<@{}>", bot_id))
|| (!self.allowed_role_ids.is_empty()
&& msg
.mention_roles
.iter()
.any(|r| self.allowed_role_ids.contains(&r.get())));
// Bot message gating (from upstream #321)
if msg.author.bot {
match self.allow_bot_messages {
AllowBots::Off => return,
AllowBots::Mentions => {
if !is_mentioned {
return;
}
}
AllowBots::All => {
let cap = MAX_CONSECUTIVE_BOT_TURNS as usize;
let limit = std::cmp::min(MAX_CONSECUTIVE_BOT_TURNS, 100) as u8;
let history = ctx
.cache
.channel_messages(msg.channel_id)
.map(|msgs| {
let mut recent: Vec<_> = msgs
.iter()
.filter(|(mid, _)| **mid < msg.id)
.map(|(_, m)| m.clone())
.collect();
recent.sort_unstable_by_key(|m| std::cmp::Reverse(m.id));
recent.truncate(cap);
recent
})
.filter(|msgs| !msgs.is_empty());
let recent = if let Some(cached) = history {
cached
} else {
match msg
.channel_id
.messages(
&ctx.http,
serenity::builder::GetMessages::new()
.before(msg.id)
.limit(limit),
)
.await
{
Ok(msgs) => msgs,
Err(e) => {
tracing::warn!(channel_id = %msg.channel_id, error = %e, "failed to fetch history for bot turn cap, rejecting (fail-closed)");
return;
}
}
};
let consecutive_bot = recent
.iter()
.take_while(|m| m.author.bot && m.author.id != bot_id)
.count();
if consecutive_bot >= cap {
tracing::warn!(channel_id = %msg.channel_id, cap, "bot turn cap reached, ignoring");
return;
}
}
}
if !self.trusted_bot_ids.is_empty()
&& !self.trusted_bot_ids.contains(&msg.author.id.get())
{
tracing::debug!(bot_id = %msg.author.id, "bot not in trusted_bot_ids, ignoring");
return;
}
}
// Thread detection: single to_channel() call for both allowed and
// non-allowed channels. Uses thread_metadata (not parent_id) to
// identify threads — see detect_thread() doc comments for rationale.
let (in_thread, bot_owns_thread, thread_parent_id, is_dm) = match msg
.channel_id
.to_channel(&ctx.http)
.await
{
Ok(serenity::model::channel::Channel::Guild(gc)) => {
let parent = gc.parent_id.map(|id| id.get().to_string());
let result = detect_thread(
gc.thread_metadata.is_some(),
gc.parent_id.map(|id| id.get()),
gc.owner_id.map(|id| id.get()),
bot_id.get(),
&self.allowed_channels,
self.allow_all_channels,
in_allowed_channel,
);
tracing::debug!(
channel_id = %msg.channel_id,
parent_id = ?gc.parent_id,
owner_id = ?gc.owner_id,
has_thread_metadata = gc.thread_metadata.is_some(),
in_thread = result.0,
bot_owns = ?result.1,
"thread check"
);
(
result.0,
result.1.unwrap_or(false),
if result.0 { parent } else { None },
false,
)
}
Ok(serenity::model::channel::Channel::Private(_)) => {
tracing::debug!(channel_id = %msg.channel_id, "DM channel");
(false, false, None, true)
}
Ok(other) => {
tracing::debug!(channel_id = %msg.channel_id, kind = ?other, "not a guild thread");
(false, false, None, false)
}
Err(e) => {
tracing::debug!(channel_id = %msg.channel_id, error = %e, "to_channel failed");
(false, false, None, false)
}
};
// DM gating: allow_dm must be true, otherwise reject
if is_dm && !self.allow_dm {
tracing::debug!(channel_id = %msg.channel_id, "DM rejected (allow_dm=false)");
return;
}
if !is_dm && !in_allowed_channel && !in_thread {
return;
}
// User message gating (mirrors Slack's AllowUsers logic).
// Mentions: always require @mention, even in bot's own threads.
// Involved (default): skip @mention if the bot owns the thread
// (Option A) OR has previously posted in it (Option B).
// MultibotMentions: same as Involved, but if other bots are also
// in the thread, require @mention to avoid all bots responding.
// DMs are treated as implicit @mention (mirrors Slack behavior).
if !is_mentioned && !is_dm {
match self.allow_user_messages {
AllowUsers::Mentions => return,
AllowUsers::Involved => {
if !in_thread {
return;
}
let (involved, _) = if bot_owns_thread {
(true, false) // other_bot_present not needed for Involved mode
} else {
self.bot_participated_in_thread(&ctx.http, msg.channel_id, bot_id)
.await
};
if !involved {
tracing::debug!(channel_id = %msg.channel_id, "bot not involved in thread, ignoring");
return;
}
}
AllowUsers::MultibotMentions => {
if !in_thread {
return;
}
let (involved, other_bot) = if bot_owns_thread {
// Still need to check for other bots
let (_, other) = self
.bot_participated_in_thread(&ctx.http, msg.channel_id, bot_id)
.await;
(true, other)
} else {
self.bot_participated_in_thread(&ctx.http, msg.channel_id, bot_id)
.await
};
if !involved {
tracing::debug!(channel_id = %msg.channel_id, "bot not involved in thread, ignoring");
return;
}
if other_bot {
tracing::debug!(channel_id = %msg.channel_id, "multi-bot thread, requiring @mention");
return;
}
}
}
}
if is_denied_user(
msg.author.bot,
self.allow_all_users,
&self.allowed_users,
msg.author.id.get(),
) {
tracing::info!(user_id = %msg.author.id, "denied user, ignoring");
let msg_ref = discord_msg_ref(&msg);
let _ = adapter.add_reaction(&msg_ref, "🚫").await;
return;
}
let prompt = resolve_mentions(&msg.content, bot_id, &self.allowed_role_ids);
// No text and no attachments → skip
if prompt.is_empty() && msg.attachments.is_empty() {
return;
}
let display_name = msg
.member
.as_ref()
.and_then(|m| m.nick.as_ref())
.unwrap_or(&msg.author.name);
let sender = build_sender_context(
&msg.author.id.to_string(),
&msg.author.name,
display_name,
&msg.channel_id.to_string(),
thread_parent_id.as_deref(),
msg.author.bot,
&msg.timestamp.to_rfc3339().unwrap_or_default(),
&msg.id.to_string(),
&bot_id.to_string(),
);
// Build extra content blocks from attachments (audio -> STT, text -> inline,
// image -> encode, video -> URL for agent-side inspection).
let mut extra_blocks = Vec::new();
let mut echo_entries: Vec<crate::stt::EchoEntry> = Vec::new();
let mut text_file_bytes: u64 = 0;
let mut text_file_count: u32 = 0;
let mut failed_image_files: Vec<String> = Vec::new();
const TEXT_TOTAL_CAP: u64 = 1024 * 1024; // 1 MB total for all text file attachments
const TEXT_FILE_COUNT_CAP: u32 = 5;
for attachment in &msg.attachments {
let mime = attachment.content_type.as_deref().unwrap_or("");
if media::is_audio_mime(mime) {
if self.stt_config.enabled {
let mime_clean = mime.split(';').next().unwrap_or(mime).trim();
match media::download_and_transcribe(
&attachment.url,
&attachment.filename,
mime_clean,
u64::from(attachment.size),
&self.stt_config,
None,
)
.await
{
Some(transcript) => {
debug!(filename = %attachment.filename, chars = transcript.len(), "voice transcript injected");
extra_blocks.insert(
0,
ContentBlock::Text {
text: format!("[Voice message transcript]: {transcript}"),
},
);
echo_entries.push(crate::stt::EchoEntry::Success(transcript));
}
None => {
warn!(filename = %attachment.filename, "STT failed for voice attachment");
echo_entries.push(crate::stt::EchoEntry::Failed);
}
}
} else {
tracing::warn!(filename = %attachment.filename, "skipping audio attachment (STT disabled)");
let msg_ref = discord_msg_ref(&msg);
let _ = adapter.add_reaction(&msg_ref, "🎤").await;
}
} else if media::is_text_file(&attachment.filename, attachment.content_type.as_deref())
{
if text_file_count >= TEXT_FILE_COUNT_CAP {
tracing::warn!(filename = %attachment.filename, count = text_file_count, "text file count cap reached, skipping");
continue;
}
// Pre-check with Discord-reported size (fast path, avoids unnecessary download).
// Running total uses actual downloaded bytes for accurate accounting.
if text_file_bytes + u64::from(attachment.size) > TEXT_TOTAL_CAP {
tracing::warn!(filename = %attachment.filename, total = text_file_bytes, "text attachments total exceeds 1MB cap, skipping remaining");
continue;
}
if let Some((block, actual_bytes)) = media::download_and_read_text_file(
&attachment.url,
&attachment.filename,
u64::from(attachment.size),
None,
)
.await
{
text_file_bytes += actual_bytes;
text_file_count += 1;
debug!(filename = %attachment.filename, "adding text file attachment");
extra_blocks.push(block);
}
} else {
match media::download_and_encode_image(
&attachment.url,
attachment.content_type.as_deref(),
&attachment.filename,
u64::from(attachment.size),
None,
)
.await
{
Ok(block) => {
debug!(url = %attachment.url, filename = %attachment.filename, "adding image attachment");
extra_blocks.push(block);
}
Err(media::MediaFetchError::NotAnImage) => {
if media::is_video_file(
&attachment.filename,
attachment.content_type.as_deref(),
) {
debug!(url = %attachment.url, filename = %attachment.filename, "adding video attachment link");
extra_blocks.push(video_attachment_block(
&attachment.filename,
attachment.content_type.as_deref(),
u64::from(attachment.size),
&attachment.url,
));
}
}
Err(e) => {
if let Some(entry) =
media::failed_attachment_entry(&attachment.filename, &e)
{
tracing::warn!(
url = %attachment.url,
filename = %attachment.filename,
error = %e,
"image attachment failed"
);
failed_image_files.push(entry);
} else {
// Network/HTTP 5xx failures: transient; warn in logs only.
tracing::warn!(
url = %attachment.url,
filename = %attachment.filename,
error = %e,
"image download failed"
);
}
}
}
}
}
tracing::debug!(
num_extra_blocks = extra_blocks.len(),
num_attachments = msg.attachments.len(),
in_thread,
"processing"
);
let thread_channel = if in_thread || is_dm {
// DMs use the DM channel directly (no threads in DMs).
ChannelRef {
platform: "discord".into(),
channel_id: msg.channel_id.get().to_string(),
thread_id: None,
parent_id: thread_parent_id.clone(),
origin_event_id: None,
}
} else {
match get_or_create_thread(&ctx, &adapter, &msg, &prompt).await {
Ok(ch) => ch,
Err(e) => {
// Thread creation failed — entire message is dropped. No reply,
// no warning, no agent note. The failed_image_files warning and
// agent note are NOT sent here: we have no thread to target and
// the agent won't run anyway.
error!("failed to create thread: {e}");
return;
}
}
};
// Agent note and user warning are sent together or not at all.
// Push the agent note here (after thread_channel is resolved) so a
// thread-creation failure above silences both, keeping them in sync.
if !failed_image_files.is_empty() {
extra_blocks.push(ContentBlock::Text {
text: media::format_failed_attachment_note(&failed_image_files),
});
}
// Send user-visible warning into the correct thread now that we know where it is.
// For top-level channel messages, thread_channel is the newly-created thread —
// not msg.channel_id. Sending before get_or_create_thread would route to the parent.
if !failed_image_files.is_empty() {
let file_list = failed_image_files
.iter()
.map(|n| sanitize_discord_filename(n))
.collect::<Vec<_>>()
.join("`, `");
let warn_msg = format!(
":warning: I couldn't process the file(s) you shared (`{file_list}`). \
Supported formats are PNG / JPEG / GIF / WebP up to 10 MB."
);
if let Err(e) = adapter.send_message(&thread_channel, &warn_msg).await {
warn!(
channel_id = %msg.channel_id,
error = %e,
"failed to send image validation warning to user"
);
}
}
let trigger_msg = discord_msg_ref(&msg);
// Per-thread streaming: check if another bot is present in this thread
let other_bot_present_flag = {
let cache = self.multibot_threads.lock().await;
cache.contains_key(&msg.channel_id.to_string())
};
// Backfill thread_id: when OAB just created a new thread, the sender
// was built before the thread existed. Patch it so the agent sees
// thread_id on the very first turn.
let mut sender = sender;
if sender.thread_id.is_none() && thread_channel.parent_id.is_some() {
sender.thread_id = Some(thread_channel.channel_id.clone());
}
let dispatcher = self.dispatcher.clone();
let stt_cfg = self.stt_config.clone();
tokio::spawn(async move {
// Best-effort echo before the agent reply so the user can verify STT.
crate::stt::post_echo(
&adapter,
&thread_channel,
&trigger_msg,
&echo_entries,
&stt_cfg,
)
.await;
let sender_id = sender.sender_id.clone();
let sender_name = sender.sender_name.clone();
let sender_json = serde_json::to_string(&sender).unwrap();
let thread_key = dispatcher.key("discord", &thread_channel.channel_id, &sender_id);
let estimated_tokens = crate::dispatch::estimate_tokens(&prompt, &extra_blocks);
let buf_msg = crate::dispatch::BufferedMessage {
sender_json,
sender_name,
prompt,
extra_blocks,
trigger_msg,
arrived_at: std::time::Instant::now(),
estimated_tokens,
other_bot_present: other_bot_present_flag,
};
if let Err(e) = dispatcher
.submit(thread_key, thread_channel, adapter, buf_msg)
.await
{
error!("dispatcher submit error: {e}");
}
});
}
async fn ready(&self, ctx: Context, ready: Ready) {
info!(user = %ready.user.name, "discord bot connected");
// Build the shared command list once.
let commands = vec![
CreateCommand::new("models").description("Select the AI model for this session"),
CreateCommand::new("agents").description("Select the agent mode for this session"),
CreateCommand::new("cancel").description("Cancel the current operation"),
CreateCommand::new("cancel-all")
.description("Cancel current operation and drop all buffered messages"),
CreateCommand::new("reset").description("Reset the conversation session"),
];
// Register global commands (works in DMs + all guilds after propagation).
if let Err(e) = Command::set_global_commands(&ctx.http, commands.clone()).await {
tracing::warn!(error = %e, "failed to register global slash commands");
} else {
info!("registered global slash commands");
}
// Also register per-guild for instant availability (global can take up to 1h).
for guild in &ready.guilds {
let guild_id = guild.id;
if let Err(e) = guild_id.set_commands(&ctx.http, commands.clone()).await {
tracing::warn!(%guild_id, error = %e, "failed to register guild slash commands");
} else {
info!(%guild_id, "registered guild slash commands");
}
}
}
async fn interaction_create(&self, ctx: Context, interaction: Interaction) {
match interaction {
Interaction::Command(cmd) if cmd.data.name == "models" => {
self.handle_config_command(&ctx, &cmd, "model", "model")
.await;
}
Interaction::Command(cmd) if cmd.data.name == "agents" => {
self.handle_config_command(&ctx, &cmd, "agent", "agent")
.await;
}
Interaction::Command(cmd) if cmd.data.name == "cancel" => {
self.handle_cancel_command(&ctx, &cmd).await;
}
Interaction::Command(cmd) if cmd.data.name == "cancel-all" => {
self.handle_cancel_all_command(&ctx, &cmd).await;
}
Interaction::Command(cmd) if cmd.data.name == "reset" => {
self.handle_reset_command(&ctx, &cmd).await;
}
Interaction::Component(comp) if comp.data.custom_id.starts_with("acp_config_") => {
self.handle_config_select(&ctx, &comp).await;
}
Interaction::Component(comp) if comp.data.custom_id.starts_with("acp_pg:") => {
self.handle_pagination(&ctx, &comp).await;
}
_ => {}
}
}
}
// --- Slash command & interaction handlers ---
impl Handler {
/// Build a Discord select menu from ACP configOptions with the given category.
/// Paginates options in pages of 25 (Discord limit). The current selection is
/// always placed first so it appears on page 0.
fn build_config_select(
options: &[ConfigOption],
category: &str,
page: usize,
) -> Option<CreateSelectMenu> {
let opt = options
.iter()
.find(|o| o.category.as_deref() == Some(category))?;
// Put current selection first so it always lands on page 0,
// then fill remaining slots in original order.
let sorted: Vec<_> = opt
.options
.iter()
.filter(|o| o.value == opt.current_value)
.chain(opt.options.iter().filter(|o| o.value != opt.current_value))
.collect();
let menu_options: Vec<CreateSelectMenuOption> = sorted
.iter()
.skip(page * SELECT_MENU_PAGE_SIZE)
.take(SELECT_MENU_PAGE_SIZE)
.map(|o| {
let mut item = CreateSelectMenuOption::new(&o.name, &o.value);
if let Some(desc) = &o.description {
item = item.description(desc);
}
if o.value == opt.current_value {
item = item.default_selection(true);
}
item
})
.collect();
if menu_options.is_empty() {
return None;
}
let current_name = opt
.options
.iter()
.find(|o| o.value == opt.current_value)
.map(|o| o.name.as_str())
.unwrap_or(&opt.current_value);
let total_pages = sorted.len().div_ceil(SELECT_MENU_PAGE_SIZE);
let placeholder = if total_pages > 1 {
format!(
"Current: {} (page {}/{})",
current_name,
page + 1,
total_pages
)