Skip to content
This repository was archived by the owner on Jun 19, 2026. It is now read-only.

Commit dad8f42

Browse files
feat: Add browser tests for streaming RPC methods
Add comprehensive browser tests for all 4 streaming patterns: - sum: client-to-server streaming (Push<T> → scalar) - range: server-to-client streaming (scalar → Pull<T>) - pipe: bidirectional streaming (Push<T> ↔ Pull<T>) - stats: aggregating stream to compound result Key fixes to make streaming work: - Spawn streaming handlers as async tasks to avoid deadlock - Flush outgoing Data/Close before Response for server-to-client - Wrap streaming results in CallResult (like unary methods) - Add Clone + 'static bounds for service types in dispatchers
1 parent db8ed9f commit dad8f42

13 files changed

Lines changed: 419 additions & 91 deletions

File tree

rust/codegen-test-consumer/src/bin/ws_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::env;
1010
use tokio::net::TcpListener;
1111
use tokio_tungstenite::accept_async;
1212

13+
#[derive(Clone)]
1314
struct Calculator;
1415

1516
#[allow(clippy::manual_async_fn)]

rust/codegen-test-consumer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod tests {
1717
}
1818

1919
// Test that we can implement the handler trait
20+
#[derive(Clone)]
2021
struct TestCalculator;
2122

2223
#[allow(clippy::manual_async_fn)]

rust/roam-codegen/src/targets/rust.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,13 @@ impl<'a> RustGenerator<'a> {
188188
}
189189

190190
// impl ServiceDispatcher for Dispatcher
191+
// S: Clone is required so dispatch_streaming can clone the service into 'static futures
191192
{
192193
let impl_block = self.scope.new_impl(&format!("{service_name}Dispatcher<S>"));
193194
impl_block.generic("S");
194195
impl_block.bound("S", &handler_trait);
196+
impl_block.bound("S", "Clone");
197+
impl_block.bound("S", "'static");
195198
impl_block.impl_trait("::roam_stream::ServiceDispatcher");
196199

197200
generate_is_streaming(impl_block, self.service);
@@ -371,7 +374,8 @@ fn generate_dispatch_streaming(
371374
service: &ServiceDetail,
372375
options: &RustCodegenOptions,
373376
) {
374-
let ret_type = "::std::pin::Pin<::std::boxed::Box<dyn ::std::future::Future<Output = ::std::result::Result<::std::vec::Vec<u8>, ::std::string::String>> + Send + '_>>";
377+
// Return type uses 'static so the future can be spawned by the connection
378+
let ret_type = "::std::pin::Pin<::std::boxed::Box<dyn ::std::future::Future<Output = ::std::result::Result<::std::vec::Vec<u8>, ::std::string::String>> + Send + 'static>>";
375379

376380
let func = impl_block.new_fn("dispatch_streaming");
377381
func.arg_ref_self();
@@ -497,6 +501,9 @@ fn generate_streaming_setup_and_dispatch(
497501
let arg_names: Vec<String> = method.args.iter().map(|a| a.name.to_snake_case()).collect();
498502
let call_args = arg_names.join(", ");
499503

504+
// Clone service before async block so the future is 'static
505+
block.line("let service = self.service.clone();");
506+
500507
let mut async_block = Block::new("::std::boxed::Box::pin(async move");
501508
async_block.after(")");
502509

@@ -511,27 +518,31 @@ fn generate_streaming_setup_and_dispatch(
511518
.line("tracing::debug!(payload_len = payload.len(), \"streaming request received\");");
512519
}
513520

521+
// Call handler and encode response per spec r[unary.response.encoding]
522+
// Response is always CallResult<T, Never> = Result<T, RoamError<Never>>
523+
let return_ty = rust_type_server_return(&method.return_type);
524+
514525
// For unit return types, don't bind to a variable (clippy: let_unit_value)
515526
if method.return_type == TypeDetail::Unit {
516-
async_block.line(format!("self.service.{method_name}({call_args}).await"));
527+
async_block.line(format!("service.{method_name}({call_args}).await"));
517528
async_block.line(" .map_err(|e| format!(\"method error: {e}\"))?;");
518-
519-
if options.tracing {
520-
async_block.line("let response = facet_postcard::to_vec(&())");
521-
} else {
522-
async_block.line("facet_postcard::to_vec(&())");
523-
}
529+
async_block.line(format!(
530+
"let call_result: CallResult<{return_ty}, Never> = Ok(());"
531+
));
524532
} else {
525533
async_block.line(format!(
526-
"let result = self.service.{method_name}({call_args}).await"
534+
"let result = service.{method_name}({call_args}).await"
527535
));
528536
async_block.line(" .map_err(|e| format!(\"method error: {e}\"))?;");
537+
async_block.line(format!(
538+
"let call_result: CallResult<{return_ty}, Never> = Ok(result);"
539+
));
540+
}
529541

530-
if options.tracing {
531-
async_block.line("let response = facet_postcard::to_vec(&result)");
532-
} else {
533-
async_block.line("facet_postcard::to_vec(&result)");
534-
}
542+
if options.tracing {
543+
async_block.line("let response = facet_postcard::to_vec(&call_result)");
544+
} else {
545+
async_block.line("facet_postcard::to_vec(&call_result)");
535546
}
536547

537548
if options.tracing {

rust/roam-codegen/src/targets/typescript.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -526,12 +526,14 @@ fn generate_handler_interface(service: &ServiceDetail) -> String {
526526
let can_encode_return = is_fully_supported(&method.return_type);
527527

528528
if can_decode_args && can_encode_return {
529-
// Decode all arguments
529+
// Decode all arguments (using server context for proper Push/Pull inversion)
530530
out.push_str(" const buf = payload;\n");
531531
out.push_str(" let offset = 0;\n");
532532
for arg in &method.args {
533533
let arg_name = arg.name.to_lower_camel_case();
534-
let decode_stmt = generate_decode_stmt(&arg.type_info, &arg_name, "offset");
534+
// Use server-side decode for proper Push/Pull inversion
535+
// r[impl streaming.caller-pov] - Schema is caller's perspective, server inverts.
536+
let decode_stmt = generate_decode_stmt_server(&arg.type_info, &arg_name, "offset");
535537
out.push_str(&format!(" {decode_stmt}\n"));
536538
}
537539
out.push_str(
@@ -849,6 +851,35 @@ fn generate_decode_stmt_client(ty: &TypeDetail, var_name: &str, offset_var: &str
849851
}
850852
}
851853

854+
/// Generate TypeScript code that decodes a value from a buffer for SERVER context.
855+
/// Schema types are from caller's perspective - server needs INVERTED types:
856+
/// - Schema Push (caller sends) → server receives → Pull for server
857+
/// - Schema Pull (caller receives) → server sends → Push for server
858+
///
859+
/// r[impl streaming.caller-pov] - Schema is from caller's perspective, server inverts.
860+
fn generate_decode_stmt_server(ty: &TypeDetail, var_name: &str, offset_var: &str) -> String {
861+
match ty {
862+
TypeDetail::Push(inner) => {
863+
// Schema Push (caller sends) → server receives → Pull for server
864+
// r[impl streaming.type] - Stream types decode as stream_id on wire.
865+
let inner_type = ts_type_server_arg(inner);
866+
format!(
867+
"const _{var_name}_r = decodeU64(buf, {offset_var}); const {var_name} = {{ streamId: _{var_name}_r.value }} as Pull<{inner_type}>; {offset_var} = _{var_name}_r.next; /* TODO: create real Pull handle */"
868+
)
869+
}
870+
TypeDetail::Pull(inner) => {
871+
// Schema Pull (caller receives) → server sends → Push for server
872+
// r[impl streaming.type] - Stream types decode as stream_id on wire.
873+
let inner_type = ts_type_server_arg(inner);
874+
format!(
875+
"const _{var_name}_r = decodeU64(buf, {offset_var}); const {var_name} = {{ streamId: _{var_name}_r.value }} as Push<{inner_type}>; {offset_var} = _{var_name}_r.next; /* TODO: create real Push handle */"
876+
)
877+
}
878+
// For non-streaming types, use the regular decode
879+
_ => generate_decode_stmt(ty, var_name, offset_var),
880+
}
881+
}
882+
852883
/// Generate TypeScript code that decodes a value from a buffer.
853884
/// Returns the decoded value in a variable and updates offset.
854885
/// `var_name` is the variable to assign the result to.

rust/roam-session/src/lib.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,8 @@ pub trait ServiceDispatcher: Send + Sync {
692692
/// - Call the handler method with those handles
693693
/// - Serialize the response
694694
///
695-
/// Returns a boxed future since each streaming method may have different async block types.
695+
/// Returns a boxed future with `'static` lifetime so it can be spawned.
696+
/// Implementations should clone their service into the future to achieve this.
696697
///
697698
/// Takes ownership of the payload to avoid copies - the caller already owns it from
698699
/// the decoded message frame.
@@ -703,7 +704,9 @@ pub trait ServiceDispatcher: Send + Sync {
703704
method_id: u64,
704705
payload: Vec<u8>,
705706
registry: &mut StreamRegistry,
706-
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + '_>>;
707+
) -> std::pin::Pin<
708+
Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + 'static>,
709+
>;
707710
}
708711

709712
/// A dispatcher that routes to one of two dispatchers based on method ID.
@@ -763,8 +766,9 @@ where
763766
method_id: u64,
764767
payload: Vec<u8>,
765768
registry: &mut StreamRegistry,
766-
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + '_>>
767-
{
769+
) -> std::pin::Pin<
770+
Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + 'static>,
771+
> {
768772
if self.first_methods.contains(&method_id) {
769773
self.first.dispatch_streaming(method_id, payload, registry)
770774
} else {

rust/roam-stream/src/connection.rs

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ impl From<std::io::Error> for ConnectionError {
5454
}
5555
}
5656

57+
/// Result from a completed streaming handler: (request_id, serialized_result).
58+
type StreamingResult = (u64, Result<Vec<u8>, String>);
59+
5760
/// A live connection with completed Hello exchange.
5861
///
5962
/// Generic over the transport type `T` which must implement [`MessageTransport`].
@@ -70,6 +73,10 @@ pub struct Connection<T> {
7073
in_flight_requests: HashSet<u64>,
7174
#[allow(dead_code)]
7275
our_hello: Hello,
76+
/// Channel for receiving completed streaming handler results.
77+
/// Spawned tasks send (request_id, result) when they complete.
78+
streaming_results_tx: tokio::sync::mpsc::Sender<StreamingResult>,
79+
streaming_results_rx: tokio::sync::mpsc::Receiver<StreamingResult>,
7380
}
7481

7582
impl<T> Connection<T> {
@@ -211,6 +218,27 @@ where
211218
tokio::select! {
212219
biased;
213220

221+
// Handle completed streaming handlers
222+
Some((request_id, result)) = self.streaming_results_rx.recv() => {
223+
let response_payload = result.map_err(ConnectionError::Dispatch)?;
224+
225+
// Flush any outgoing stream data (Data/Close) BEFORE Response.
226+
// This ensures the client receives all streamed data before the
227+
// Response that signals call completion.
228+
// r[impl streaming.flush-before-response] - Stream data sent before Response.
229+
self.flush_outgoing().await?;
230+
231+
// r[impl streaming.call-complete] - Call completes when Response sent.
232+
// r[impl streaming.lifecycle.response-closes-pulls] - Pull streams close with Response.
233+
let resp = Message::Response {
234+
request_id,
235+
metadata: Vec::new(),
236+
payload: response_payload,
237+
};
238+
self.io.send(&resp).await?;
239+
self.in_flight_requests.remove(&request_id);
240+
}
241+
214242
// Prioritize incoming messages over outgoing flush
215243
result = self.io.recv_timeout(Duration::from_secs(30)) => {
216244
let msg = match result {
@@ -295,35 +323,50 @@ where
295323
}
296324

297325
// Dispatch to service - use streaming dispatch if method has Push/Pull args
298-
let response_payload = if dispatcher.is_streaming(method_id) {
299-
dispatcher
300-
.dispatch_streaming(method_id, payload, &mut self.stream_registry)
301-
.await
302-
.map_err(ConnectionError::Dispatch)?
326+
if dispatcher.is_streaming(method_id) {
327+
// For streaming methods, we need to continue processing messages
328+
// (Data, Close) while the handler runs. The handler reads from
329+
// Pull<T> which is backed by an mpsc channel that we route to.
330+
//
331+
// dispatch_streaming registers streams synchronously, then returns
332+
// a future. We spawn that future as a task so the message loop
333+
// can continue processing Data messages.
334+
let handler_fut = dispatcher.dispatch_streaming(
335+
method_id,
336+
payload,
337+
&mut self.stream_registry,
338+
);
339+
340+
// Spawn the handler as a task that sends its result to our channel
341+
let results_tx = self.streaming_results_tx.clone();
342+
tokio::spawn(async move {
343+
let result = handler_fut.await;
344+
// Send result to the connection's run loop
345+
// Ignore send error if connection closed
346+
let _ = results_tx.send((request_id, result)).await;
347+
});
303348
} else {
304-
dispatcher
349+
let response_payload = dispatcher
305350
.dispatch_unary(method_id, &payload)
306351
.await
307-
.map_err(ConnectionError::Dispatch)?
308-
};
309-
310-
// r[impl core.call] - Callee sends Response for caller's Request.
311-
// r[impl core.call.request-id] - Response has same request_id.
312-
// r[impl unary.complete] - Send Response with matching request_id.
313-
// r[impl unary.lifecycle.single-response] - Exactly one Response per Request.
314-
// r[impl unary.request-id.in-flight] - Request no longer in-flight after Response.
315-
// r[impl streaming.call-complete] - Call completes when Response sent.
316-
// r[impl streaming.lifecycle.response-closes-pulls] - Pull streams close with Response.
317-
let resp = Message::Response {
318-
request_id,
319-
metadata: Vec::new(),
320-
payload: response_payload,
321-
};
322-
self.io.send(&resp).await?;
323-
self.in_flight_requests.remove(&request_id);
324-
325-
// Flush any outgoing stream data that handlers may have queued
326-
self.flush_outgoing().await?;
352+
.map_err(ConnectionError::Dispatch)?;
353+
354+
// r[impl core.call] - Callee sends Response for caller's Request.
355+
// r[impl core.call.request-id] - Response has same request_id.
356+
// r[impl unary.complete] - Send Response with matching request_id.
357+
// r[impl unary.lifecycle.single-response] - Exactly one Response per Request.
358+
// r[impl unary.request-id.in-flight] - Request no longer in-flight after Response.
359+
let resp = Message::Response {
360+
request_id,
361+
metadata: Vec::new(),
362+
payload: response_payload,
363+
};
364+
self.io.send(&resp).await?;
365+
self.in_flight_requests.remove(&request_id);
366+
367+
// Flush any outgoing stream data that handlers may have queued
368+
self.flush_outgoing().await?;
369+
}
327370
}
328371
Message::Response { request_id, .. } => {
329372
// Server doesn't expect Response messages (it sends them, not receives them).
@@ -472,6 +515,7 @@ where
472515
initial_credit: our_credit.min(peer_credit),
473516
};
474517

518+
let (streaming_results_tx, streaming_results_rx) = tokio::sync::mpsc::channel(64);
475519
Ok(Connection {
476520
io,
477521
role: Role::Acceptor,
@@ -481,6 +525,8 @@ where
481525
stream_registry: StreamRegistry::new_with_credit(negotiated.initial_credit),
482526
in_flight_requests: HashSet::new(),
483527
our_hello,
528+
streaming_results_tx,
529+
streaming_results_rx,
484530
})
485531
}
486532

@@ -551,6 +597,7 @@ where
551597
initial_credit: our_credit.min(peer_credit),
552598
};
553599

600+
let (streaming_results_tx, streaming_results_rx) = tokio::sync::mpsc::channel(64);
554601
Ok(Connection {
555602
io,
556603
role: Role::Initiator,
@@ -560,5 +607,7 @@ where
560607
stream_registry: StreamRegistry::new_with_credit(negotiated.initial_credit),
561608
in_flight_requests: HashSet::new(),
562609
our_hello,
610+
streaming_results_tx,
611+
streaming_results_rx,
563612
})
564613
}

rust/subject-rust/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub use spec_proto::{Canvas, Color, Message, Person, Point, Rectangle, Shape};
1212
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
1313

1414
// Service implementation using generated EchoHandler trait
15+
#[derive(Clone)]
1516
struct EchoService;
1617

1718
#[allow(clippy::manual_async_fn)]

spec/spec-proto/src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,30 +79,31 @@ pub enum Message {
7979
/// Streaming service for cross-language conformance testing.
8080
///
8181
/// Tests Push/Pull semantics, stream lifecycle, and bidirectional streaming.
82+
/// r[impl streaming.caller-pov] - Types are from caller's perspective.
8283
#[service]
8384
pub trait Streaming {
8485
/// Client pushes numbers, server returns their sum.
8586
///
8687
/// Tests: client-to-server streaming (`Push<T>` → scalar return).
8788
/// r[impl streaming.client-to-server] - Client sends stream, server returns scalar.
88-
async fn sum(&self, numbers: Pull<i32>) -> i64;
89+
async fn sum(&self, numbers: Push<i32>) -> i64;
8990

90-
/// Client sends a count, server returns that many numbers.
91+
/// Client sends a count, server streams that many numbers back.
9192
///
92-
/// Tests: server-to-client streaming (scalar → `Pull<T>`).
93+
/// Tests: server-to-client streaming (scalar → `Pull<T>` as output parameter).
9394
/// r[impl streaming.server-to-client] - Client sends scalar, server returns stream.
94-
async fn range(&self, count: u32) -> Push<u32>;
95+
async fn range(&self, count: u32, output: Pull<u32>);
9596

9697
/// Client pushes strings, server echoes each back.
9798
///
9899
/// Tests: bidirectional streaming (`Push<T>` ↔ `Pull<T>`).
99100
/// r[impl streaming.bidirectional] - Both sides stream simultaneously.
100-
async fn pipe(&self, input: Pull<String>) -> Push<String>;
101+
async fn pipe(&self, input: Push<String>, output: Pull<String>);
101102

102103
/// Client pushes numbers, server returns (sum, count, average).
103104
///
104105
/// Tests: aggregating a stream into a compound result.
105-
async fn stats(&self, numbers: Pull<i32>) -> (i64, u64, f64);
106+
async fn stats(&self, numbers: Push<i32>) -> (i64, u64, f64);
106107
}
107108

108109
/// Complex types service for testing struct/enum encoding.

0 commit comments

Comments
 (0)