Skip to content

Commit d714a56

Browse files
committed
Bind OpenClaw replies to callback tokens
1 parent 986bba5 commit d714a56

1 file changed

Lines changed: 153 additions & 14 deletions

File tree

crates/calciforge/src/adapters/openclaw_channel.rs

Lines changed: 153 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,31 +39,45 @@ type ReplyResult = Result<OutboundMessage, String>;
3939
struct PendingReply {
4040
request_id: String,
4141
session_key: String,
42+
auth_token: Option<String>,
4243
tx: Arc<Mutex<Option<oneshot::Sender<ReplyResult>>>>,
4344
}
4445

4546
/// Correlates OpenClaw callbacks to pending dispatch requests.
4647
#[derive(Clone, Default)]
47-
pub struct ReplyRouter {
48+
struct ReplyRouter {
4849
pending: Arc<Mutex<HashMap<String, PendingReply>>>,
4950
}
5051

5152
impl ReplyRouter {
52-
pub fn new() -> Self {
53+
fn new() -> Self {
5354
Self {
5455
pending: Arc::new(Mutex::new(HashMap::new())),
5556
}
5657
}
5758

58-
pub async fn insert(
59+
#[cfg(test)]
60+
async fn insert(
5961
&self,
6062
request_id: String,
6163
session_key: String,
6264
tx: oneshot::Sender<ReplyResult>,
65+
) {
66+
self.insert_with_auth(request_id, session_key, None, tx)
67+
.await;
68+
}
69+
70+
async fn insert_with_auth(
71+
&self,
72+
request_id: String,
73+
session_key: String,
74+
auth_token: Option<String>,
75+
tx: oneshot::Sender<ReplyResult>,
6376
) {
6477
let entry = PendingReply {
6578
request_id: request_id.clone(),
6679
session_key: session_key.clone(),
80+
auth_token,
6781
tx: Arc::new(Mutex::new(Some(tx))),
6882
};
6983
let mut pending = self.pending.lock().await;
@@ -82,25 +96,50 @@ impl ReplyRouter {
8296
}
8397
}
8498

85-
pub async fn take(&self, correlation_key: &str) -> Option<oneshot::Sender<ReplyResult>> {
99+
#[cfg(test)]
100+
async fn take(&self, correlation_key: &str) -> Option<oneshot::Sender<ReplyResult>> {
101+
self.take_authorized(correlation_key, None)
102+
.await
103+
.ok()
104+
.flatten()
105+
}
106+
107+
async fn take_authorized(
108+
&self,
109+
correlation_key: &str,
110+
presented_token: Option<&str>,
111+
) -> Result<Option<oneshot::Sender<ReplyResult>>, ReplyAuthError> {
86112
let entry = {
87113
let mut pending = self.pending.lock().await;
88-
let entry = pending.remove(correlation_key)?;
114+
let Some(entry) = pending.get(correlation_key).cloned() else {
115+
return Ok(None);
116+
};
117+
if entry.auth_token.as_deref() != presented_token {
118+
return Err(ReplyAuthError::TokenMismatch);
119+
}
120+
let entry = pending
121+
.remove(correlation_key)
122+
.expect("pending reply disappeared after authorization check");
89123
pending.retain(|_, candidate| candidate.request_id != entry.request_id);
90124
entry
91125
};
92126

93-
entry.tx.lock().await.take()
127+
Ok(entry.tx.lock().await.take())
94128
}
95129

96-
pub async fn remove(&self, request_id: &str) {
130+
async fn remove(&self, request_id: &str) {
97131
self.pending
98132
.lock()
99133
.await
100134
.retain(|_, entry| entry.request_id != request_id);
101135
}
102136
}
103137

138+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139+
enum ReplyAuthError {
140+
TokenMismatch,
141+
}
142+
104143
#[derive(Clone)]
105144
struct ReplyServerState {
106145
router: ReplyRouter,
@@ -201,6 +240,7 @@ struct SharedReplyServer {
201240
#[derive(Clone)]
202241
struct ReplyServerHandle {
203242
shared: SharedReplyServer,
243+
auth_token: Option<String>,
204244
config_error: Option<String>,
205245
}
206246

@@ -230,6 +270,7 @@ impl ReplyServerHandle {
230270
});
231271
return Self {
232272
shared,
273+
auth_token: None,
233274
config_error: Some(
234275
"openclaw-channel requires reply_auth_token or reply_auth_token_file; unauthenticated callback listeners are not allowed"
235276
.to_string(),
@@ -242,15 +283,16 @@ impl ReplyServerHandle {
242283
.auth_tokens
243284
.lock()
244285
.expect("openclaw-channel reply auth token set poisoned");
245-
tokens.insert(auth_token);
286+
tokens.insert(auth_token.clone());
246287
return Self {
247288
shared: existing.clone(),
289+
auth_token: Some(auth_token),
248290
config_error: None,
249291
};
250292
}
251293

252294
let mut auth_tokens = HashSet::new();
253-
auth_tokens.insert(auth_token);
295+
auth_tokens.insert(auth_token.clone());
254296

255297
let shared = SharedReplyServer {
256298
port,
@@ -266,6 +308,7 @@ impl ReplyServerHandle {
266308

267309
Self {
268310
shared,
311+
auth_token: Some(auth_token),
269312
config_error: None,
270313
}
271314
}
@@ -276,12 +319,14 @@ async fn handle_reply(
276319
headers: HeaderMap,
277320
Json(payload): Json<ReplyPayload>,
278321
) -> (StatusCode, Json<AckResponse>) {
279-
{
322+
let presented_token = {
280323
let auth_tokens = state
281324
.auth_tokens
282325
.lock()
283326
.expect("openclaw-channel reply auth token set poisoned");
284-
if !auth_tokens.is_empty() {
327+
if auth_tokens.is_empty() {
328+
None
329+
} else {
285330
let auth = headers
286331
.get("authorization")
287332
.and_then(|v| v.to_str().ok())
@@ -290,8 +335,9 @@ async fn handle_reply(
290335
if !auth_tokens.contains(token) {
291336
return (StatusCode::UNAUTHORIZED, Json(AckResponse { ok: false }));
292337
}
338+
Some(token.to_string())
293339
}
294-
}
340+
};
295341

296342
let correlation_key = payload
297343
.request_id
@@ -300,7 +346,24 @@ async fn handle_reply(
300346
.unwrap_or(&payload.session_key)
301347
.to_string();
302348

303-
if let Some(tx) = state.router.take(&correlation_key).await {
349+
let pending_tx = match state
350+
.router
351+
.take_authorized(&correlation_key, presented_token.as_deref())
352+
.await
353+
{
354+
Ok(tx) => tx,
355+
Err(ReplyAuthError::TokenMismatch) => {
356+
warn!(
357+
session_key = %payload.session_key,
358+
correlation_key = %correlation_key,
359+
request_id = ?payload.request_id,
360+
"openclaw-channel reply token did not match pending request"
361+
);
362+
return (StatusCode::UNAUTHORIZED, Json(AckResponse { ok: false }));
363+
}
364+
};
365+
366+
if let Some(tx) = pending_tx {
304367
if let Some(error) = payload.callback_error_message() {
305368
warn!(
306369
session_key = %payload.session_key,
@@ -570,7 +633,12 @@ impl AgentAdapter for OpenClawChannelAdapter {
570633
self.reply_server
571634
.shared
572635
.router
573-
.insert(request_id.clone(), session_key.clone(), tx)
636+
.insert_with_auth(
637+
request_id.clone(),
638+
session_key.clone(),
639+
self.reply_server.auth_token.clone(),
640+
tx,
641+
)
574642
.await;
575643

576644
let body = InboundPayload {
@@ -1078,6 +1146,77 @@ mod tests {
10781146
);
10791147
}
10801148

1149+
#[tokio::test]
1150+
async fn test_reply_token_must_match_pending_legacy_callback() {
1151+
let router = ReplyRouter::new();
1152+
let request_id = "req-victim".to_string();
1153+
let session_key = "calciforge:victim:alice".to_string();
1154+
let (tx, mut rx) = oneshot::channel::<ReplyResult>();
1155+
router
1156+
.insert_with_auth(
1157+
request_id.clone(),
1158+
session_key.clone(),
1159+
Some("reply-secret-a".to_string()),
1160+
tx,
1161+
)
1162+
.await;
1163+
1164+
let state = ReplyServerState {
1165+
router,
1166+
auth_tokens: Arc::new(StdMutex::new(HashSet::from([
1167+
"reply-secret-a".to_string(),
1168+
"reply-secret-b".to_string(),
1169+
]))),
1170+
};
1171+
1172+
let payload = ReplyPayload {
1173+
session_key: session_key.clone(),
1174+
request_id: None,
1175+
message: Some("spoofed legacy reply".into()),
1176+
error: None,
1177+
error_kind: None,
1178+
no_visible_reply_reason: None,
1179+
attachments: Vec::new(),
1180+
channel: None,
1181+
to: None,
1182+
};
1183+
let mut attacker_headers = HeaderMap::new();
1184+
attacker_headers.insert("authorization", "Bearer reply-secret-b".parse().unwrap());
1185+
1186+
let (status, Json(ack)) =
1187+
handle_reply(State(state.clone()), attacker_headers, Json(payload)).await;
1188+
1189+
assert_eq!(status, StatusCode::UNAUTHORIZED);
1190+
assert!(!ack.ok);
1191+
assert!(
1192+
rx.try_recv().is_err(),
1193+
"wrong-token callback must not consume the pending request"
1194+
);
1195+
1196+
let payload = ReplyPayload {
1197+
session_key,
1198+
request_id: Some(request_id),
1199+
message: Some("victim reply".into()),
1200+
error: None,
1201+
error_kind: None,
1202+
no_visible_reply_reason: None,
1203+
attachments: Vec::new(),
1204+
channel: None,
1205+
to: None,
1206+
};
1207+
let mut victim_headers = HeaderMap::new();
1208+
victim_headers.insert("authorization", "Bearer reply-secret-a".parse().unwrap());
1209+
1210+
let (status, Json(ack)) = handle_reply(State(state), victim_headers, Json(payload)).await;
1211+
1212+
assert_eq!(status, StatusCode::OK);
1213+
assert!(ack.ok);
1214+
assert_eq!(
1215+
rx.await.unwrap().unwrap().render_text_fallback(),
1216+
"victim reply"
1217+
);
1218+
}
1219+
10811220
#[tokio::test]
10821221
async fn test_dispatch_ignores_ambient_proxy_for_agent_control_plane() {
10831222
let _env_lock = ENV_LOCK.lock().await;

0 commit comments

Comments
 (0)