Skip to content

Commit 68bdffb

Browse files
committed
Address channel review feedback
1 parent 4b77952 commit 68bdffb

7 files changed

Lines changed: 250 additions & 26 deletions

File tree

crates/calciforge/src/channels/matrix.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,7 @@ async fn send_matrix_message(
325325
}
326326
if status.as_u16() == 429 && attempt < MAX_RETRIES {
327327
let body_text = resp.text().await.unwrap_or_default();
328-
let retry_ms = serde_json::from_str::<serde_json::Value>(&body_text)
329-
.ok()
330-
.and_then(|value| value["retry_after_ms"].as_u64())
331-
.unwrap_or(1000);
328+
let retry_ms = matrix_retry_after_ms(&body_text);
332329
tracing::debug!(
333330
attempt,
334331
retry_ms,
@@ -343,6 +340,17 @@ async fn send_matrix_message(
343340
}
344341
}
345342

343+
fn matrix_retry_after_ms(body_text: &str) -> u64 {
344+
const DEFAULT_RETRY_AFTER_MS: u64 = 1_000;
345+
const MAX_RETRY_AFTER_MS: u64 = 30_000;
346+
347+
serde_json::from_str::<serde_json::Value>(body_text)
348+
.ok()
349+
.and_then(|value| value["retry_after_ms"].as_u64())
350+
.unwrap_or(DEFAULT_RETRY_AFTER_MS)
351+
.min(MAX_RETRY_AFTER_MS)
352+
}
353+
346354
async fn send_matrix_outbound_message(
347355
homeserver: &str,
348356
http: &reqwest::Client,
@@ -1002,6 +1010,16 @@ mod tests {
10021010
assert!(lookup.contains("event2049"));
10031011
}
10041012

1013+
#[test]
1014+
fn test_matrix_retry_after_ms_clamps_untrusted_server_delay() {
1015+
assert_eq!(matrix_retry_after_ms(r#"{"retry_after_ms":250}"#), 250);
1016+
assert_eq!(
1017+
matrix_retry_after_ms(r#"{"retry_after_ms":999999999999}"#),
1018+
30_000
1019+
);
1020+
assert_eq!(matrix_retry_after_ms(r#"{}"#), 1_000);
1021+
}
1022+
10051023
#[tokio::test]
10061024
async fn matrix_loop_dispatches_allowed_message_and_sends_agent_reply() {
10071025
use crate::config::{

crates/calciforge/src/channels/sms.rs

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ impl<C: Channel + ?Sized + 'static> SmsChannel<C> {
9090
);
9191
}
9292
Err(e) => {
93+
telemetry::reply_failed(
94+
"sms",
95+
recipient,
96+
"reply",
97+
start.elapsed().as_millis() as u64,
98+
&e,
99+
);
93100
warn!(recipient = %recipient, error = %e, "Text/iMessage: failed to send reply");
94101
}
95102
}
@@ -122,7 +129,7 @@ impl<C: Channel + ?Sized + 'static> SmsChannel<C> {
122129

123130
telemetry::authorized_message("sms", &identity.id, &from, text.len(), delivery_lag_ms);
124131

125-
let chat_key = format!("sms-{}", identity.id);
132+
let chat_key = conversation_chat_key(&identity.id, &reply_target);
126133

127134
if self.scan_enabled() {
128135
let verdict = self
@@ -385,6 +392,10 @@ impl<C: Channel + ?Sized + 'static> SmsChannel<C> {
385392
}
386393
}
387394

395+
fn conversation_chat_key(identity_id: &str, reply_target: &str) -> String {
396+
format!("sms-{identity_id}-{reply_target}")
397+
}
398+
388399
#[derive(Clone)]
389400
struct WebhookState {
390401
bridge: Arc<SmsChannel<ZclLinqChannel>>,
@@ -733,4 +744,96 @@ mod tests {
733744
assert_eq!(sent.len(), 1);
734745
assert_eq!(sent[0].recipient, "chat_123");
735746
}
747+
748+
#[tokio::test]
749+
async fn test_conversation_ids_do_not_share_context_between_agents() {
750+
let mut config = (*make_test_config()).clone();
751+
config.agents = vec![
752+
AgentConfig {
753+
id: "librarian".to_string(),
754+
kind: "artifact-cli".to_string(),
755+
command: Some("/bin/sh".to_string()),
756+
args: Some(vec!["-c".to_string(), "cat".to_string()]),
757+
..Default::default()
758+
},
759+
AgentConfig {
760+
id: "critic".to_string(),
761+
kind: "artifact-cli".to_string(),
762+
command: Some("/bin/sh".to_string()),
763+
args: Some(vec!["-c".to_string(), "cat".to_string()]),
764+
..Default::default()
765+
},
766+
];
767+
config.routing[0].allowed_agents = vec!["librarian".to_string(), "critic".to_string()];
768+
let transport = Arc::new(MockChannel::new());
769+
let bridge = dummy_bridge_with(Arc::new(config), transport.clone());
770+
771+
bridge
772+
.bridge
773+
.clone()
774+
.handle_message(ChannelMessage {
775+
id: "1".into(),
776+
sender: "+15555550100".into(),
777+
reply_target: "chat_123".into(),
778+
content: "alpha private context".into(),
779+
channel: "linq".into(),
780+
timestamp: 0,
781+
thread_ts: None,
782+
interruption_scope_id: None,
783+
attachments: vec![],
784+
})
785+
.await;
786+
transport.wait_for_sent_len(1).await;
787+
let first = transport.drain();
788+
assert_eq!(first[0].recipient, "chat_123");
789+
assert!(first[0].content.contains("alpha private context"));
790+
791+
bridge
792+
.bridge
793+
.clone()
794+
.handle_message(ChannelMessage {
795+
id: "2".into(),
796+
sender: "+15555550100".into(),
797+
reply_target: "chat_456".into(),
798+
content: "!switch critic".into(),
799+
channel: "linq".into(),
800+
timestamp: 0,
801+
thread_ts: None,
802+
interruption_scope_id: None,
803+
attachments: vec![],
804+
})
805+
.await;
806+
transport.wait_for_sent_len(1).await;
807+
let switch_reply = transport.drain();
808+
assert_eq!(switch_reply[0].recipient, "chat_456");
809+
810+
bridge
811+
.bridge
812+
.handle_message(ChannelMessage {
813+
id: "3".into(),
814+
sender: "+15555550100".into(),
815+
reply_target: "chat_456".into(),
816+
content: "beta fresh prompt".into(),
817+
channel: "linq".into(),
818+
timestamp: 0,
819+
thread_ts: None,
820+
interruption_scope_id: None,
821+
attachments: vec![],
822+
})
823+
.await;
824+
transport.wait_for_sent_len(1).await;
825+
let second = transport.drain();
826+
assert_eq!(second[0].recipient, "chat_456");
827+
assert!(second[0].content.contains("beta fresh prompt"));
828+
assert!(
829+
!second[0].content.contains("alpha private context"),
830+
"chat_456 must not receive chat_123 context: {}",
831+
second[0].content
832+
);
833+
assert!(
834+
!second[0].content.contains("[Recent context:"),
835+
"new conversation/agent pair should start without another chat's preamble: {}",
836+
second[0].content
837+
);
838+
}
736839
}

crates/calciforge/src/channels/whatsapp.rs

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ impl<C: Channel + ?Sized + 'static> WhatsAppChannel<C> {
9191
);
9292
}
9393
Err(e) => {
94+
telemetry::reply_failed(
95+
"whatsapp",
96+
recipient,
97+
"reply",
98+
start.elapsed().as_millis() as u64,
99+
&e,
100+
);
94101
warn!(recipient = %recipient, error = %e, "WhatsApp: failed to send reply");
95102
}
96103
}
@@ -123,7 +130,7 @@ impl<C: Channel + ?Sized + 'static> WhatsAppChannel<C> {
123130

124131
telemetry::authorized_message("whatsapp", &identity.id, &from, text.len(), delivery_lag_ms);
125132

126-
let chat_key = format!("whatsapp-{}", identity.id);
133+
let chat_key = conversation_chat_key(&identity.id, &reply_target);
127134

128135
if self.scan_enabled() {
129136
let verdict = self
@@ -386,6 +393,10 @@ impl<C: Channel + ?Sized + 'static> WhatsAppChannel<C> {
386393
}
387394
}
388395

396+
fn conversation_chat_key(identity_id: &str, reply_target: &str) -> String {
397+
format!("whatsapp-{identity_id}-{reply_target}")
398+
}
399+
389400
const MIGRATION_TOML: &str = r#"
390401
[[channels]]
391402
kind = "whatsapp"
@@ -803,6 +814,99 @@ mod tests {
803814
assert_eq!(sent[0].recipient, "12345@g.us");
804815
}
805816

817+
#[tokio::test]
818+
async fn test_group_targets_do_not_share_context_between_agents() {
819+
let mut config = (*make_test_config(|_| {})).clone();
820+
config.agents = vec![
821+
AgentConfig {
822+
id: "librarian".to_string(),
823+
kind: "artifact-cli".to_string(),
824+
command: Some("/bin/sh".to_string()),
825+
args: Some(vec!["-c".to_string(), "cat".to_string()]),
826+
..Default::default()
827+
},
828+
AgentConfig {
829+
id: "critic".to_string(),
830+
kind: "artifact-cli".to_string(),
831+
command: Some("/bin/sh".to_string()),
832+
args: Some(vec!["-c".to_string(), "cat".to_string()]),
833+
..Default::default()
834+
},
835+
];
836+
config.routing[0].allowed_agents = vec!["librarian".to_string(), "critic".to_string()];
837+
let config = Arc::new(config);
838+
let transport = Arc::new(MockChannel::new());
839+
let bridge = dummy_bridge_with(config, transport.clone());
840+
841+
bridge
842+
.bridge
843+
.clone()
844+
.handle_message(ChannelMessage {
845+
id: "1".into(),
846+
sender: "+15555550100".into(),
847+
reply_target: "group-a@g.us".into(),
848+
content: "alpha private context".into(),
849+
channel: "whatsapp".into(),
850+
timestamp: 0,
851+
thread_ts: None,
852+
interruption_scope_id: None,
853+
attachments: vec![],
854+
})
855+
.await;
856+
transport.wait_for_sent_len(1).await;
857+
let first = transport.drain();
858+
assert_eq!(first[0].recipient, "group-a@g.us");
859+
assert!(first[0].content.contains("alpha private context"));
860+
861+
bridge
862+
.bridge
863+
.clone()
864+
.handle_message(ChannelMessage {
865+
id: "2".into(),
866+
sender: "+15555550100".into(),
867+
reply_target: "group-b@g.us".into(),
868+
content: "!switch critic".into(),
869+
channel: "whatsapp".into(),
870+
timestamp: 0,
871+
thread_ts: None,
872+
interruption_scope_id: None,
873+
attachments: vec![],
874+
})
875+
.await;
876+
transport.wait_for_sent_len(1).await;
877+
let switch_reply = transport.drain();
878+
assert_eq!(switch_reply[0].recipient, "group-b@g.us");
879+
880+
bridge
881+
.bridge
882+
.handle_message(ChannelMessage {
883+
id: "3".into(),
884+
sender: "+15555550100".into(),
885+
reply_target: "group-b@g.us".into(),
886+
content: "beta fresh prompt".into(),
887+
channel: "whatsapp".into(),
888+
timestamp: 0,
889+
thread_ts: None,
890+
interruption_scope_id: None,
891+
attachments: vec![],
892+
})
893+
.await;
894+
transport.wait_for_sent_len(1).await;
895+
let second = transport.drain();
896+
assert_eq!(second[0].recipient, "group-b@g.us");
897+
assert!(second[0].content.contains("beta fresh prompt"));
898+
assert!(
899+
!second[0].content.contains("alpha private context"),
900+
"group B must not receive group A context: {}",
901+
second[0].content
902+
);
903+
assert!(
904+
!second[0].content.contains("[Recent context:"),
905+
"new group/agent pair should start without another group's preamble: {}",
906+
second[0].content
907+
);
908+
}
909+
806910
#[tokio::test]
807911
async fn test_handle_message_renders_artifact_fallback() {
808912
let mut config = (*make_test_config(|_| {})).clone();

crates/calciforge/src/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,9 @@ pub struct RoutingRule {
313313
/// For Matrix: set `homeserver`, `access_token_file`, `room_id`, and optionally `allowed_users`.
314314
/// For WhatsApp: set `whatsapp_session_path` and `allowed_numbers`.
315315
/// For Signal: set `signal_cli_url`, `signal_account`, and `allowed_numbers`.
316-
/// For text/iMessage: set `sms_linq_api_token_file`, `sms_from_phone`, and `allowed_numbers`.
316+
/// For text/iMessage: set `sms_linq_api_token_file`, `sms_from_phone`, and `allowed_numbers`;
317+
/// configure inbound webhooks with `sms_webhook_listen` and `sms_webhook_path`; and prefer
318+
/// `sms_linq_signing_secret_file` or `sms_linq_signing_secret` for webhook signature checks.
317319
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
318320
pub struct ChannelConfig {
319321
pub kind: String,

docs/_config.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,2 @@
11
title: Calciforge
22
description: Keep your castle secure and moving.
3-
exclude:
4-
- rfcs/
5-
- roadmap/

0 commit comments

Comments
 (0)