Skip to content

Commit 7df356c

Browse files
serrrfiratclaude
andauthored
fix: persist WASM channel workspace writes across callbacks (#264)
* fix: persist WASM channel workspace writes across callbacks WASM channel callbacks (polling, webhooks, on_start) call workspace_write() to persist state, but the host code never committed these writes — take_pending_writes() was never called. Additionally, no WorkspaceReader was injected into channel capabilities, so workspace_read() always returned None. This caused Telegram's polling offset to reset to 0 on every tick, making getUpdates re-deliver already-processed messages and producing 2-4 duplicate LLM responses per user message. Add ChannelWorkspaceStore (Arc-wrapped HashMap with std::sync::RwLock) that persists across callback invocations within a channel's lifetime. Inject it as the WorkspaceReader and commit pending writes after every callback execution (on_start, on_poll, on_http_request, execute_poll). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * style: fix formatting Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3829d81 commit 7df356c

2 files changed

Lines changed: 167 additions & 10 deletions

File tree

src/channels/wasm/host.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,51 @@ impl ChannelHostState {
300300
}
301301
}
302302

303+
/// In-memory workspace store for WASM channels.
304+
///
305+
/// Persists workspace writes across callback invocations within a single
306+
/// channel lifetime. This allows WASM channels to maintain state (e.g.,
307+
/// Telegram polling offsets) between poll ticks without requiring a
308+
/// full database-backed workspace.
309+
///
310+
/// Uses `std::sync::RwLock` (not tokio) because WASM execution runs
311+
/// inside `spawn_blocking`.
312+
pub struct ChannelWorkspaceStore {
313+
data: std::sync::RwLock<std::collections::HashMap<String, String>>,
314+
}
315+
316+
impl ChannelWorkspaceStore {
317+
/// Create a new empty workspace store.
318+
pub fn new() -> Self {
319+
Self {
320+
data: std::sync::RwLock::new(std::collections::HashMap::new()),
321+
}
322+
}
323+
324+
/// Commit pending writes from a callback execution into the store.
325+
pub fn commit_writes(&self, writes: &[PendingWorkspaceWrite]) {
326+
if writes.is_empty() {
327+
return;
328+
}
329+
if let Ok(mut data) = self.data.write() {
330+
for write in writes {
331+
tracing::debug!(
332+
path = %write.path,
333+
content_len = write.content.len(),
334+
"Committing workspace write to channel store"
335+
);
336+
data.insert(write.path.clone(), write.content.clone());
337+
}
338+
}
339+
}
340+
}
341+
342+
impl crate::tools::wasm::WorkspaceReader for ChannelWorkspaceStore {
343+
fn read(&self, path: &str) -> Option<String> {
344+
self.data.read().ok()?.get(path).cloned()
345+
}
346+
}
347+
303348
/// Rate limiter for channel message emission.
304349
///
305350
/// Tracks emission rates across multiple executions.
@@ -497,4 +542,56 @@ mod tests {
497542

498543
assert_eq!(state.channel_name(), "telegram");
499544
}
545+
546+
#[test]
547+
fn test_channel_workspace_store_commit_and_read() {
548+
use crate::channels::wasm::host::{ChannelWorkspaceStore, PendingWorkspaceWrite};
549+
use crate::tools::wasm::WorkspaceReader;
550+
551+
let store = ChannelWorkspaceStore::new();
552+
553+
// Initially empty
554+
assert!(store.read("channels/telegram/offset").is_none());
555+
556+
// Commit some writes
557+
let writes = vec![
558+
PendingWorkspaceWrite {
559+
path: "channels/telegram/offset".to_string(),
560+
content: "103".to_string(),
561+
},
562+
PendingWorkspaceWrite {
563+
path: "channels/telegram/state.json".to_string(),
564+
content: r#"{"ok":true}"#.to_string(),
565+
},
566+
];
567+
store.commit_writes(&writes);
568+
569+
// Should be readable
570+
assert_eq!(
571+
store.read("channels/telegram/offset"),
572+
Some("103".to_string())
573+
);
574+
assert_eq!(
575+
store.read("channels/telegram/state.json"),
576+
Some(r#"{"ok":true}"#.to_string())
577+
);
578+
579+
// Overwrite a value
580+
let writes2 = vec![PendingWorkspaceWrite {
581+
path: "channels/telegram/offset".to_string(),
582+
content: "200".to_string(),
583+
}];
584+
store.commit_writes(&writes2);
585+
assert_eq!(
586+
store.read("channels/telegram/offset"),
587+
Some("200".to_string())
588+
);
589+
590+
// Empty writes are a no-op
591+
store.commit_writes(&[]);
592+
assert_eq!(
593+
store.read("channels/telegram/offset"),
594+
Some("200".to_string())
595+
);
596+
}
500597
}

src/channels/wasm/wrapper.rs

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxBuilder, WasiView};
4242

4343
use crate::channels::wasm::capabilities::ChannelCapabilities;
4444
use crate::channels::wasm::error::WasmChannelError;
45-
use crate::channels::wasm::host::{ChannelEmitRateLimiter, ChannelHostState, EmittedMessage};
45+
use crate::channels::wasm::host::{
46+
ChannelEmitRateLimiter, ChannelHostState, ChannelWorkspaceStore, EmittedMessage,
47+
};
4648
use crate::channels::wasm::router::RegisteredEndpoint;
4749
use crate::channels::wasm::runtime::{PreparedChannelModule, WasmChannelRuntime};
4850
use crate::channels::wasm::schema::ChannelConfig;
@@ -547,6 +549,10 @@ pub struct WasmChannel {
547549

548550
/// Pairing store for DM pairing (guest access control).
549551
pairing_store: Arc<PairingStore>,
552+
553+
/// In-memory workspace store persisting writes across callback invocations.
554+
/// Ensures WASM channels can maintain state (e.g., polling offsets) between ticks.
555+
workspace_store: Arc<ChannelWorkspaceStore>,
550556
}
551557

552558
impl WasmChannel {
@@ -577,6 +583,7 @@ impl WasmChannel {
577583
credentials: Arc::new(RwLock::new(HashMap::new())),
578584
typing_task: RwLock::new(None),
579585
pairing_store,
586+
workspace_store: Arc::new(ChannelWorkspaceStore::new()),
580587
}
581588
}
582589

@@ -634,6 +641,26 @@ impl WasmChannel {
634641
self.endpoints.read().await.clone()
635642
}
636643

644+
/// Inject the workspace store as the reader into a capabilities clone.
645+
///
646+
/// Ensures `workspace_read` capability is present with the store as its reader,
647+
/// so WASM callbacks can read previously written workspace state.
648+
fn inject_workspace_reader(
649+
capabilities: &ChannelCapabilities,
650+
store: &Arc<ChannelWorkspaceStore>,
651+
) -> ChannelCapabilities {
652+
let mut caps = capabilities.clone();
653+
let ws_cap = caps
654+
.tool_capabilities
655+
.workspace_read
656+
.get_or_insert_with(|| crate::tools::wasm::WorkspaceCapability {
657+
allowed_prefixes: Vec::new(),
658+
reader: None,
659+
});
660+
ws_cap.reader = Some(Arc::clone(store) as Arc<dyn crate::tools::wasm::WorkspaceReader>);
661+
caps
662+
}
663+
637664
/// Add channel host functions to the linker using generated bindings.
638665
///
639666
/// Uses the wasmtime::component::bindgen! generated `add_to_linker` function
@@ -765,12 +792,13 @@ impl WasmChannel {
765792

766793
let runtime = Arc::clone(&self.runtime);
767794
let prepared = Arc::clone(&self.prepared);
768-
let capabilities = self.capabilities.clone();
795+
let capabilities = Self::inject_workspace_reader(&self.capabilities, &self.workspace_store);
769796
let config_json = self.config_json.read().await.clone();
770797
let timeout = self.runtime.config().callback_timeout;
771798
let channel_name = self.name.clone();
772799
let credentials = self.get_credentials().await;
773800
let pairing_store = self.pairing_store.clone();
801+
let workspace_store = self.workspace_store.clone();
774802

775803
// Execute in blocking task with timeout
776804
let result = tokio::time::timeout(timeout, async move {
@@ -801,8 +829,13 @@ impl WasmChannel {
801829
}
802830
};
803831

804-
let host_state =
832+
let mut host_state =
805833
Self::extract_host_state(&mut store, &prepared.name, &capabilities);
834+
835+
// Commit pending workspace writes to the persistent store
836+
let pending_writes = host_state.take_pending_writes();
837+
workspace_store.commit_writes(&pending_writes);
838+
806839
Ok((config, host_state))
807840
})
808841
.await
@@ -897,10 +930,11 @@ impl WasmChannel {
897930

898931
let runtime = Arc::clone(&self.runtime);
899932
let prepared = Arc::clone(&self.prepared);
900-
let capabilities = self.capabilities.clone();
933+
let capabilities = Self::inject_workspace_reader(&self.capabilities, &self.workspace_store);
901934
let timeout = self.runtime.config().callback_timeout;
902935
let credentials = self.get_credentials().await;
903936
let pairing_store = self.pairing_store.clone();
937+
let workspace_store = self.workspace_store.clone();
904938

905939
// Prepare request data
906940
let method = method.to_string();
@@ -940,8 +974,13 @@ impl WasmChannel {
940974
.map_err(|e| Self::map_wasm_error(e, &prepared.name, prepared.limits.fuel))?;
941975

942976
let response = convert_http_response(wit_response);
943-
let host_state =
977+
let mut host_state =
944978
Self::extract_host_state(&mut store, &prepared.name, &capabilities);
979+
980+
// Commit pending workspace writes to the persistent store
981+
let pending_writes = host_state.take_pending_writes();
982+
workspace_store.commit_writes(&pending_writes);
983+
945984
Ok((response, host_state))
946985
})
947986
.await
@@ -989,11 +1028,12 @@ impl WasmChannel {
9891028

9901029
let runtime = Arc::clone(&self.runtime);
9911030
let prepared = Arc::clone(&self.prepared);
992-
let capabilities = self.capabilities.clone();
1031+
let capabilities = Self::inject_workspace_reader(&self.capabilities, &self.workspace_store);
9931032
let timeout = self.runtime.config().callback_timeout;
9941033
let channel_name = self.name.clone();
9951034
let credentials = self.get_credentials().await;
9961035
let pairing_store = self.pairing_store.clone();
1036+
let workspace_store = self.workspace_store.clone();
9971037

9981038
// Execute in blocking task with timeout
9991039
let result = tokio::time::timeout(timeout, async move {
@@ -1013,8 +1053,13 @@ impl WasmChannel {
10131053
.call_on_poll(&mut store)
10141054
.map_err(|e| Self::map_wasm_error(e, &prepared.name, prepared.limits.fuel))?;
10151055

1016-
let host_state =
1056+
let mut host_state =
10171057
Self::extract_host_state(&mut store, &prepared.name, &capabilities);
1058+
1059+
// Commit pending workspace writes to the persistent store
1060+
let pending_writes = host_state.take_pending_writes();
1061+
workspace_store.commit_writes(&pending_writes);
1062+
10181063
Ok(((), host_state))
10191064
})
10201065
.await
@@ -1501,6 +1546,7 @@ impl WasmChannel {
15011546
let credentials = self.credentials.clone();
15021547
let pairing_store = self.pairing_store.clone();
15031548
let callback_timeout = self.runtime.config().callback_timeout;
1549+
let workspace_store = self.workspace_store.clone();
15041550

15051551
tokio::spawn(async move {
15061552
let mut interval_timer = tokio::time::interval(interval);
@@ -1523,6 +1569,7 @@ impl WasmChannel {
15231569
&credentials,
15241570
pairing_store.clone(),
15251571
callback_timeout,
1572+
&workspace_store,
15261573
).await;
15271574

15281575
match result {
@@ -1565,7 +1612,10 @@ impl WasmChannel {
15651612

15661613
/// Execute a single poll callback with a fresh WASM instance.
15671614
///
1568-
/// Returns any emitted messages from the callback.
1615+
/// Returns any emitted messages from the callback. Pending workspace writes
1616+
/// are committed to the shared `ChannelWorkspaceStore` so state persists
1617+
/// across poll ticks (e.g., Telegram polling offset).
1618+
#[allow(clippy::too_many_arguments)]
15691619
async fn execute_poll(
15701620
channel_name: &str,
15711621
runtime: &Arc<WasmChannelRuntime>,
@@ -1574,6 +1624,7 @@ impl WasmChannel {
15741624
credentials: &RwLock<HashMap<String, String>>,
15751625
pairing_store: Arc<PairingStore>,
15761626
timeout: Duration,
1627+
workspace_store: &Arc<ChannelWorkspaceStore>,
15771628
) -> Result<Vec<EmittedMessage>, WasmChannelError> {
15781629
// Skip if no WASM bytes (testing mode)
15791630
if prepared.component_bytes.is_empty() {
@@ -1586,9 +1637,10 @@ impl WasmChannel {
15861637

15871638
let runtime = Arc::clone(runtime);
15881639
let prepared = Arc::clone(prepared);
1589-
let capabilities = capabilities.clone();
1640+
let capabilities = Self::inject_workspace_reader(capabilities, workspace_store);
15901641
let credentials_snapshot = credentials.read().await.clone();
15911642
let channel_name_owned = channel_name.to_string();
1643+
let workspace_store = Arc::clone(workspace_store);
15921644

15931645
// Execute in blocking task with timeout
15941646
let result = tokio::time::timeout(timeout, async move {
@@ -1608,8 +1660,13 @@ impl WasmChannel {
16081660
.call_on_poll(&mut store)
16091661
.map_err(|e| Self::map_wasm_error(e, &prepared.name, prepared.limits.fuel))?;
16101662

1611-
let host_state =
1663+
let mut host_state =
16121664
Self::extract_host_state(&mut store, &prepared.name, &capabilities);
1665+
1666+
// Commit pending workspace writes to the persistent store
1667+
let pending_writes = host_state.take_pending_writes();
1668+
workspace_store.commit_writes(&pending_writes);
1669+
16131670
Ok(host_state)
16141671
})
16151672
.await
@@ -2230,6 +2287,8 @@ mod tests {
22302287
let credentials = Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
22312288
let timeout = std::time::Duration::from_secs(5);
22322289

2290+
let workspace_store = Arc::new(crate::channels::wasm::host::ChannelWorkspaceStore::new());
2291+
22332292
let result = WasmChannel::execute_poll(
22342293
"poll-test",
22352294
&runtime,
@@ -2238,6 +2297,7 @@ mod tests {
22382297
&credentials,
22392298
Arc::new(PairingStore::new()),
22402299
timeout,
2300+
&workspace_store,
22412301
)
22422302
.await;
22432303

0 commit comments

Comments
 (0)