Skip to content

Commit 52a747a

Browse files
committed
Fix EventStream initial message handling to prevent hangs
Fixes #4435 Previously, codegen would always call `try_recv_initial()` for event streams, which would block waiting for an initial-request or initial-response message. This caused hangs when operations didn't have modeled initial messages. **Changes:** Runtime (aws-smithy-http & aws-smithy-legacy-http): - `recv()` now filters out initial-request/initial-response messages and stores them in a separate buffer - `try_recv_initial()` checks the buffer first before blocking - Initial messages are discarded if never explicitly read - Added `try_recv_initial_with_preprocessor()` to legacy version for SigV4 support Codegen: - Server: Only calls `try_recv_initial()` when `httpBindingResolver.handlesEventStreamInitialRequest()` returns true - Client: Only calls `try_recv_initial_response()` when there's a parser for non-event-stream output members - Uses `let mut receiver = receiver` pattern for conditional mutability Tests: - Added `test_no_hang_without_initial_message()` to verify operations without modeled initial messages don't hang - Added `test_waits_for_initial_message_when_modeled()` to verify operations with modeled initial messages still block correctly - Added `wait_for_response_ready()` to ManualEventStreamClient for testing connection readiness Documentation: - Updated AGENTS.md with EventStream initial message handling behavior - Added conditional mutability pattern to AGENTS.md
1 parent 3786d3c commit 52a747a

9 files changed

Lines changed: 372 additions & 153 deletions

File tree

AGENTS.md

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,18 @@ operation MyOperation {
5050

5151
### httpQueryParams Bug Investigation
5252

53-
When investigating the `@httpQueryParams` bug (where query parameters weren't appearing in requests), the issue was in `RequestBindingGenerator.kt` line 173. The bug occurred when:
53+
When investigating the `@httpQueryParams` bug (where query parameters weren't appearing in requests), the issue was in
54+
`RequestBindingGenerator.kt` line 173. The bug occurred when:
5455

5556
1. An operation had ONLY `@httpQueryParams` (no regular `@httpQuery` parameters)
56-
2. The condition `if (dynamicParams.isEmpty() && literalParams.isEmpty() && mapParams.isEmpty())` would skip generating the `uri_query` function
57+
2. The condition `if (dynamicParams.isEmpty() && literalParams.isEmpty() && mapParams.isEmpty())` would skip generating
58+
the `uri_query` function
5759

58-
The fix was to ensure `mapParams.isEmpty()` was included in the condition check. The current implementation correctly generates query parameters for `@httpQueryParams` even when no other query parameters exist.
60+
The fix was to ensure `mapParams.isEmpty()` was included in the condition check. The current implementation correctly
61+
generates query parameters for `@httpQueryParams` even when no other query parameters exist.
5962

60-
**Testing httpQueryParams**: Create operations with only `@httpQueryParams` to ensure they generate proper query strings in requests.
63+
**Testing httpQueryParams**: Create operations with only `@httpQueryParams` to ensure they generate proper query strings
64+
in requests.
6165

6266
## rustTemplate Formatting
6367

@@ -83,6 +87,28 @@ rustTemplate(
8387
❌ Wrong: `"let result: Result<String, Error> = Ok(value);"`
8488
✅ Correct: Use `*preludeScope` in templates
8589

90+
## Conditional Mutability Pattern
91+
92+
When a variable needs to be mutable only in certain codegen branches, use the rebinding pattern:
93+
94+
```kotlin
95+
rustTemplate(
96+
"""
97+
let receiver = create_receiver();
98+
#{maybeModifyReceiver:W}
99+
use_receiver(receiver);
100+
""",
101+
"maybeModifyReceiver" to writable {
102+
if (needsMutability) {
103+
rust("let mut receiver = receiver;")
104+
rust("receiver.modify();")
105+
}
106+
}
107+
)
108+
```
109+
110+
This avoids unused `mut` warnings when the modification code isn't generated.
111+
86112
## RuntimeType and Dependencies
87113

88114
`RuntimeType` objects contain:
@@ -176,6 +202,7 @@ gh issue comment <number> --repo smithy-lang/smithy-rs --body 'markdown content
176202
```
177203

178204
**Comment Guidelines:**
205+
179206
- Always ask for confirmation before posting comments
180207
- Always start comments with `*Comment from Claude*` in italics
181208

@@ -215,12 +242,14 @@ gh workflow run "Invoke Canary as Maintainer" --repo smithy-lang/smithy-rs \
215242
Client changes often show the pattern for server-side implementation
216243

217244
**Configuration Debugging:**
245+
218246
- Server codegen settings go under `"codegen"` not `"codegenConfig"` in smithy-build.json
219247
- When settings aren't working, check the generated smithy-build.json structure first
220248
- Settings placement matters - wrong nesting means settings are ignored silently
221249
- Always verify actual generated configuration matches expectations
222250

223251
**Testing Configuration Settings:**
252+
224253
- Create separate services with different settings to test configuration behavior
225254
- Use integration tests that verify actual generated code behavior, not just compilation
226255
- Test both enabled and disabled states to ensure the setting actually controls behavior

codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentBuilderGenerator.kt

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -194,55 +194,55 @@ class FluentBuilderGenerator(
194194

195195
return writable {
196196
val eventStreamMemberName = symbolProvider.toMemberName(eventStreamMember)
197+
val structuredDataParser = codegenContext.protocolImpl?.structuredDataParser()
198+
val parser = structuredDataParser?.operationParser(operation)
199+
197200
rustTemplate(
198201
"""
199-
let mut output =
202+
let output =
200203
#{Operation}::orchestrate(
201204
&runtime_plugins,
202205
input,
203206
)
204207
.await?;
205208
206-
// Converts any error encountered beyond this point into an `SdkError` response error
207-
// with an `HttpResponse`. However, since we have already exited the `orchestrate`
208-
// function, the original `HttpResponse` is no longer available and cannot be restored.
209-
// This means that header information from the original response has been lost.
210-
//
211-
// Note that the response body would have been consumed by the deserializer
212-
// regardless, even if the initial message was hypothetically processed during
213-
// the orchestrator's deserialization phase but later resulted in an error.
214-
fn response_error(
215-
err: impl #{Into}<#{BoxError}>
216-
) -> #{SdkError}<#{OperationError}, #{HttpResponse}> {
217-
#{SdkError}::response_error(err, #{HttpResponse}::new(
218-
#{StatusCode}::try_from(200).expect("valid successful code"),
219-
#{SdkBody}::empty()))
220-
}
221-
222-
let message = output.$eventStreamMemberName.try_recv_initial_response().await.map_err(response_error)?;
223-
224-
match message {
225-
#{Some}(_message) => {
226-
#{maybeRecreateOutputWithNonEventStreamMembers:W}
227-
#{Ok}(output)
228-
}
229-
#{None} => #{Ok}(output),
230-
}
209+
#{maybeReadInitialResponse:W}
210+
#{Ok}(output)
231211
""",
232212
*scope,
233-
"maybeRecreateOutputWithNonEventStreamMembers" to
213+
"maybeReadInitialResponse" to
234214
writable {
235-
val structuredDataParser = codegenContext.protocolImpl?.structuredDataParser()
236-
structuredDataParser?.operationParser(operation)?.also { parser ->
215+
if (parser != null) {
237216
rustTemplate(
238217
"""
239-
let mut builder = output.into_builder();
240-
builder = #{parser}(
241-
_message.payload(),
242-
builder
243-
)
244-
.map_err(response_error)?;
245-
let output = builder.build().map_err(response_error)?;
218+
// Converts any error encountered beyond this point into an `SdkError` response error
219+
// with an `HttpResponse`. However, since we have already exited the `orchestrate`
220+
// function, the original `HttpResponse` is no longer available and cannot be restored.
221+
// This means that header information from the original response has been lost.
222+
//
223+
// Note that the response body would have been consumed by the deserializer
224+
// regardless, even if the initial message was hypothetically processed during
225+
// the orchestrator's deserialization phase but later resulted in an error.
226+
fn response_error(
227+
err: impl #{Into}<#{BoxError}>
228+
) -> #{SdkError}<#{OperationError}, #{HttpResponse}> {
229+
#{SdkError}::response_error(err, #{HttpResponse}::new(
230+
#{StatusCode}::try_from(200).expect("valid successful code"),
231+
#{SdkBody}::empty()))
232+
}
233+
let mut output = output;
234+
235+
let message = output.$eventStreamMemberName.try_recv_initial_response().await.map_err(response_error)?;
236+
237+
if let #{Some}(_message) = message {
238+
let mut builder = output.into_builder();
239+
builder = #{parser}(
240+
_message.payload(),
241+
builder
242+
)
243+
.map_err(response_error)?;
244+
output = builder.build().map_err(response_error)?;
245+
}
246246
""",
247247
"parser" to parser,
248248
*scope,

codegen-server-test/integration-tests/Cargo.lock

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codegen-server-test/integration-tests/eventstreams/src/lib.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ use hyper::{
1818
};
1919
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
2020
use std::net::SocketAddr;
21+
use std::sync::Arc;
2122
use std::time::Duration;
22-
use tokio::sync::mpsc;
23+
use tokio::sync::{mpsc, Notify};
2324
use tokio::time::timeout;
2425
use tokio_stream::wrappers::ReceiverStream;
2526

@@ -33,6 +34,7 @@ pub enum RecvError {
3334
pub struct ManualEventStreamClient {
3435
message_sender: mpsc::Sender<Message>,
3536
response_receiver: mpsc::Receiver<Result<Message, RecvError>>,
37+
response_ready: Arc<Notify>,
3638
_handle: tokio::task::JoinHandle<()>,
3739
}
3840

@@ -83,11 +85,17 @@ impl ManualEventStreamClient {
8385
let body = StreamBody::new(stream);
8486

8587
let request = req.body(body).expect("failed to construct request");
88+
let response_ready = Arc::new(Notify::new());
89+
let response_ready_clone = response_ready.clone();
8690
let handle = tokio::spawn(async move {
8791
let response = timeout(Duration::from_secs(1), client.request(request))
8892
.await
8993
.expect("timeout making initial request")
9094
.expect("failed to make initial contact with server");
95+
96+
// Notify that the response is ready
97+
response_ready_clone.notify_one();
98+
9199
let mut body = response.into_body();
92100
let mut decoder = MessageFrameDecoder::new();
93101

@@ -122,10 +130,16 @@ impl ManualEventStreamClient {
122130
Ok(Self {
123131
message_sender,
124132
response_receiver,
133+
response_ready,
125134
_handle: handle,
126135
})
127136
}
128137

138+
/// Waits for the response stream to be ready (server has accepted the connection).
139+
pub async fn wait_for_response_ready(&self) {
140+
self.response_ready.notified().await;
141+
}
142+
129143
/// Sends a message.
130144
pub async fn send(&mut self, message: Message) -> Result<(), String> {
131145
self.message_sender

codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,54 @@ async fn test_streaming_operation_with_initial_data() {
476476
assert_eq!(harness.server.initial_data(), Some("test-data".to_string()));
477477
}
478478

479+
/// Test that operations without modeled initial messages don't hang.
480+
/// This verifies the fix for issue #4435 where recv() would block waiting
481+
/// for an initial message that was never modeled.
482+
#[tokio::test]
483+
async fn test_no_hang_without_initial_message() {
484+
let mut harness = TestHarness::new("StreamingOperation").await;
485+
486+
// Wait for the response to be ready without sending any messages
487+
// This should not hang because StreamingOperation has no modeled initial-request
488+
harness.client.wait_for_response_ready().await;
489+
490+
// Now send an event and verify we can receive it
491+
harness.send_event("A").await;
492+
let resp = harness.expect_message().await;
493+
assert_eq!(get_event_type(&resp), "A");
494+
}
495+
496+
/// Test that operations WITH modeled initial messages DO wait for them.
497+
/// This verifies that try_recv_initial() blocks and the response doesn't become ready
498+
/// until the initial-request is sent.
499+
#[tokio::test]
500+
async fn test_waits_for_initial_message_when_modeled() {
501+
let mut harness = TestHarness::new("StreamingOperationWithInitialData").await;
502+
503+
// Try to wait for response with a timeout - it should timeout because we haven't sent initial data
504+
let result = tokio::time::timeout(
505+
tokio::time::Duration::from_millis(100),
506+
harness.client.wait_for_response_ready(),
507+
)
508+
.await;
509+
assert!(
510+
result.is_err(),
511+
"Response should not be ready without initial data"
512+
);
513+
514+
// Now send the initial data
515+
harness.send_initial_data("test-data").await;
516+
517+
// Now the response should become ready
518+
harness.client.wait_for_response_ready().await;
519+
520+
// Send an event and verify it works
521+
harness.send_event("A").await;
522+
let resp = harness.expect_message().await;
523+
assert_eq!(get_event_type(&resp), "A");
524+
assert_eq!(harness.server.initial_data(), Some("test-data".to_string()));
525+
}
526+
479527
/// StreamingOperationWithInitialData has a mandatory initial data field.
480528
/// If we don't send this field, we'll never hit the handler.
481529
#[tokio::test]

0 commit comments

Comments
 (0)