Skip to content

Commit 125c625

Browse files
rcohysaito1001
andauthored
Sign event stream initial messages with signer (#4431)
## Motivation and Context #4429 ## Description Event stream operations with initial-request messages send that message unsigned, whose root cause is described in the issue above. To resolve it, instead of pre-serializing the initial message to bytes, this PR 1. creates a wrapper marshaller that can handle both regular events and initial messages 2. creates a stream out of the initial message 3. chains that stream with the event stream 4. lets the entire combined stream flow through `MessageStreamAdapter` for signing ## Testing Added client codegen test for signing initial message and event. ## Checklist <!--- If a checkbox below is not applicable, then please DELETE it rather than leaving it unchecked --> - [x] For changes to the smithy-rs codegen or runtime crates, I have created a changelog entry Markdown file in the `.changelog` directory, specifying "client," "server," or both in the `applies_to` key. - [x] For changes to the AWS SDK, generated SDK code, or SDK runtime crates, I have created a changelog entry Markdown file in the `.changelog` directory, specifying "aws-sdk-rust" in the `applies_to` key. ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --------- Co-authored-by: ysaito1001 <awsaito@amazon.com>
1 parent f4024b9 commit 125c625

7 files changed

Lines changed: 222 additions & 13 deletions

File tree

.changelog/1764888150.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
applies_to:
3+
- aws-sdk-rust
4+
- client
5+
authors:
6+
- rcoh
7+
- ysaito1001
8+
references:
9+
- smithy-rs#4429
10+
breaking: false
11+
new_feature: false
12+
bug_fix: true
13+
---
14+
Fix bug where initial-request messages in event stream operations are not signed.

codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,29 +64,43 @@ private fun eventStreamWithInitialRequest(
6464
return null
6565
}
6666

67+
val smithyHttp = RuntimeType.smithyHttp(codegenContext.runtimeConfig)
68+
val eventOrInitial = smithyHttp.resolve("event_stream::EventOrInitial")
69+
val eventOrInitialMarshaller = smithyHttp.resolve("event_stream::EventOrInitialMarshaller")
70+
6771
return writable {
6872
rustTemplate(
6973
"""
7074
{
7175
use #{futures_util}::StreamExt;
7276
let body = #{parser}(&input)?;
7377
let initial_message = #{initial_message}(body);
74-
let mut buffer = #{Vec}::new();
75-
#{write_message_to}(&initial_message, &mut buffer)?;
76-
let initial_message_stream = futures_util::stream::iter(vec![Ok(#{http_body_1x}::Frame::data(buffer.into()))]);
77-
let adapter = #{message_stream_adaptor:W};
78-
initial_message_stream.chain(adapter)
78+
79+
// Wrap the marshaller to handle both initial and regular messages
80+
let wrapped_marshaller = #{EventOrInitialMarshaller}::new(marshaller);
81+
82+
// Create stream with initial message
83+
let initial_stream = #{futures_util}::stream::once(async move {
84+
#{Ok}(#{EventOrInitial}::InitialMessage(initial_message))
85+
});
86+
87+
// Extract inner stream and map events
88+
let event_stream = ${params.outerName}.${params.memberName}.into_inner()
89+
.map(|result| result.map(#{EventOrInitial}::Event));
90+
91+
// Chain streams and convert to EventStreamSender
92+
let combined = initial_stream.chain(event_stream);
93+
#{EventStreamSender}::from(combined)
94+
.into_body_stream(wrapped_marshaller, error_marshaller, signer)
7995
}
8096
""",
8197
*preludeScope,
8298
"futures_util" to CargoDependency.FuturesUtil.toType(),
8399
"initial_message" to params.eventStreamMarshallerGenerator.renderInitialRequestGenerator(params.payloadContentType),
84-
"message_stream_adaptor" to messageStreamAdaptor(params.outerName, params.memberName),
85100
"parser" to parser,
86-
"write_message_to" to
87-
RuntimeType.smithyEventStream(codegenContext.runtimeConfig)
88-
.resolve("frame::write_message_to"),
89-
"http_body_1x" to CargoDependency.HttpBody1x.toType(),
101+
"EventOrInitial" to eventOrInitial,
102+
"EventOrInitialMarshaller" to eventOrInitialMarshaller,
103+
"EventStreamSender" to smithyHttp.resolve("event_stream::EventStreamSender"),
90104
)
91105
}
92106
}

codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/eventstream/ClientEventStreamMarshallerGeneratorTest.kt

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,134 @@ class ClientEventStreamMarshallerGeneratorTest {
181181
}
182182
}
183183
}
184+
185+
@ParameterizedTest
186+
@ArgumentsSource(RpcEventStreamTestCasesProvider::class)
187+
fun signedInitialMessageTest(rpcEventStreamTestCase: RpcEventStreamTestCase) {
188+
val testCase = rpcEventStreamTestCase.inner
189+
// Filter out tests that do not have initial message
190+
if (rpcEventStreamTestCase.nonEventStreamMember != NonEventStreamMemberInOutput.NONE) {
191+
clientIntegrationTest(
192+
testCase.model,
193+
IntegrationTestParams(service = "test#TestService"),
194+
) { codegenContext, rustCrate ->
195+
rustCrate.testModule {
196+
tokioTest("initial_message_and_event_are_signed") {
197+
rustTemplate(
198+
"""
199+
use crate::types::*;
200+
use aws_smithy_eventstream::frame::{DeferredSignerSender, SignMessage, SignMessageError};
201+
use aws_smithy_http::event_stream::EventStreamSender;
202+
use aws_smithy_runtime_api::box_error::BoxError;
203+
use aws_smithy_runtime_api::client::interceptors::Intercept;
204+
use aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextMut;
205+
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
206+
use aws_smithy_types::config_bag::ConfigBag;
207+
use aws_smithy_types::event_stream::Message;
208+
209+
const FAKE_SIGNATURE: &str = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
210+
211+
##[derive(Debug)]
212+
struct TestSigner;
213+
214+
impl SignMessage for TestSigner {
215+
fn sign(&mut self, message: Message) -> Result<Message, SignMessageError> {
216+
let mut new_payload = message.payload().to_vec();
217+
new_payload.extend_from_slice(FAKE_SIGNATURE.as_bytes());
218+
219+
let mut signed_msg = Message::new(bytes::Bytes::from(new_payload));
220+
for header in message.headers() {
221+
signed_msg = signed_msg.add_header(header.clone());
222+
}
223+
Ok(signed_msg)
224+
}
225+
226+
fn sign_empty(&mut self) -> Option<Result<Message, SignMessageError>> {
227+
None
228+
}
229+
}
230+
231+
##[derive(Debug)]
232+
struct TestSignerInterceptor;
233+
234+
impl Intercept for TestSignerInterceptor {
235+
fn name(&self) -> &'static str {
236+
"TestSignerInterceptor"
237+
}
238+
239+
fn modify_before_signing(
240+
&self,
241+
_context: &mut BeforeTransmitInterceptorContextMut<'_>,
242+
_runtime_components: &RuntimeComponents,
243+
cfg: &mut ConfigBag,
244+
) -> Result<(), BoxError> {
245+
if let Some(signer_sender) = cfg.load::<DeferredSignerSender>() {
246+
signer_sender
247+
.send(Box::new(TestSigner))
248+
.expect("failed to send test signer");
249+
}
250+
Ok(())
251+
}
252+
}
253+
254+
let (http_client, rx) = #{capture_request}(None);
255+
let conf = crate::Config::builder()
256+
.endpoint_url("http://localhost:1234")
257+
.http_client(http_client.clone())
258+
.interceptor(TestSignerInterceptor)
259+
.behavior_version_latest()
260+
.build();
261+
let client = crate::Client::from_conf(conf);
262+
263+
let event = TestStream::MessageWithString(
264+
MessageWithString::builder().data("hello, world!").build(),
265+
);
266+
let stream = ::futures_util::stream::iter(vec![Ok(event)]);
267+
let _ = client
268+
.test_stream_op()
269+
.test_string("this is test")
270+
.value(EventStreamSender::from(stream))
271+
.send()
272+
.await
273+
.unwrap();
274+
275+
let mut request = rx.expect_request();
276+
277+
let mut body = ::aws_smithy_types::body::SdkBody::taken();
278+
std::mem::swap(&mut body, request.body_mut());
279+
280+
let unmarshaller = crate::event_stream_serde::TestStreamUnmarshaller::new();
281+
let mut event_receiver = crate::event_receiver::EventReceiver::new(
282+
::aws_smithy_http::event_stream::Receiver::new(unmarshaller, body),
283+
);
284+
285+
// Check initial message has signature
286+
let initial_msg = event_receiver
287+
.try_recv_initial_request()
288+
.await
289+
.unwrap()
290+
.expect("should receive initial-request");
291+
assert!(initial_msg.payload().ends_with(FAKE_SIGNATURE.as_bytes()));
292+
293+
// Check event payload has signature
294+
if let Some(event) = event_receiver.recv().await.unwrap() {
295+
match event {
296+
TestStream::MessageWithString(message_with_string) => {
297+
assert!(message_with_string.data().unwrap().ends_with(FAKE_SIGNATURE));
298+
}
299+
otherwise => panic!("matched on unexpected variant {otherwise:?}"),
300+
}
301+
} else {
302+
panic!("should receive at least one frame");
303+
}
304+
""",
305+
"capture_request" to RuntimeType.captureRequest(codegenContext.runtimeConfig),
306+
)
307+
}
308+
}
309+
}
310+
}
311+
}
184312
}
185313

186314
class TestCasesProvider : ArgumentsProvider {

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ allowLocalDeps=false
1717
# Avoid registering dependencies/plugins/tasks that are only used for testing purposes
1818
isTestingEnabled=true
1919
# codegen publication version
20-
codegenVersion=0.1.12
20+
codegenVersion=0.1.13

rust-runtime/aws-smithy-http/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "aws-smithy-http"
3-
version = "0.63.5"
3+
version = "0.63.6"
44
authors = [
55
"AWS Rust SDK Team <aws-sdk-rust@amazon.com>",
66
"Russell Cohen <rcoh@amazon.com>",

rust-runtime/aws-smithy-http/src/event_stream.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ mod sender;
1414
pub type BoxError = Box<dyn StdError + Send + Sync + 'static>;
1515

1616
#[doc(inline)]
17-
pub use sender::{EventStreamSender, MessageStreamAdapter, MessageStreamError};
17+
pub use sender::{
18+
EventOrInitial, EventOrInitialMarshaller, EventStreamSender, MessageStreamAdapter,
19+
MessageStreamError,
20+
};
1821

1922
#[doc(inline)]
2023
pub use receiver::{InitialMessageType, Receiver, ReceiverError};

rust-runtime/aws-smithy-http/src/event_stream/sender.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use aws_smithy_eventstream::frame::{write_message_to, MarshallMessage, SignMessa
77
use aws_smithy_eventstream::message_size_hint::MessageSizeHint;
88
use aws_smithy_runtime_api::client::result::SdkError;
99
use aws_smithy_types::error::ErrorMetadata;
10+
use aws_smithy_types::event_stream::Message;
1011
use bytes::Bytes;
1112
use futures_core::Stream;
1213
use std::error::Error as StdError;
@@ -17,6 +18,17 @@ use std::pin::Pin;
1718
use std::task::{Context, Poll};
1819
use tracing::trace;
1920

21+
/// Wrapper for event stream items that may include an initial-request message.
22+
/// This is used internally to allow initial messages to flow through the signing pipeline.
23+
#[doc(hidden)]
24+
#[derive(Debug)]
25+
pub enum EventOrInitial<T> {
26+
/// A regular event that needs marshalling and signing
27+
Event(T),
28+
/// An initial-request message that's already marshalled, just needs signing
29+
InitialMessage(Message),
30+
}
31+
2032
/// Input type for Event Streams.
2133
pub struct EventStreamSender<T, E> {
2234
input_stream: Pin<Box<dyn Stream<Item = Result<T, E>> + Send + Sync>>,
@@ -47,6 +59,12 @@ impl<T: Send + Sync, E: StdError + Send + Sync + 'static> EventStreamSender<T, E
4759
) -> MessageStreamAdapter<T, E> {
4860
MessageStreamAdapter::new(marshaller, error_marshaller, signer, self.input_stream)
4961
}
62+
63+
/// Extract the inner stream. This is used internally for composing streams.
64+
#[doc(hidden)]
65+
pub fn into_inner(self) -> Pin<Box<dyn Stream<Item = Result<T, E>> + Send + Sync>> {
66+
self.input_stream
67+
}
5068
}
5169

5270
impl<T, E, S> From<S> for EventStreamSender<T, E>
@@ -202,6 +220,38 @@ impl<T: Send + Sync, E: StdError + Send + Sync + 'static> Stream for MessageStre
202220
}
203221
}
204222

223+
/// Marshaller wrapper that handles both regular events and initial messages.
224+
/// This is used internally to support initial-request messages in event streams.
225+
#[doc(hidden)]
226+
#[derive(Debug)]
227+
pub struct EventOrInitialMarshaller<M> {
228+
inner: M,
229+
}
230+
231+
impl<M> EventOrInitialMarshaller<M> {
232+
#[doc(hidden)]
233+
pub fn new(inner: M) -> Self {
234+
Self { inner }
235+
}
236+
}
237+
238+
impl<M, T> MarshallMessage for EventOrInitialMarshaller<M>
239+
where
240+
M: MarshallMessage<Input = T>,
241+
{
242+
type Input = EventOrInitial<T>;
243+
244+
fn marshall(
245+
&self,
246+
input: Self::Input,
247+
) -> Result<Message, aws_smithy_eventstream::error::Error> {
248+
match input {
249+
EventOrInitial::Event(event) => self.inner.marshall(event),
250+
EventOrInitial::InitialMessage(message) => Ok(message),
251+
}
252+
}
253+
}
254+
205255
#[cfg(test)]
206256
mod tests {
207257
use super::MarshallMessage;

0 commit comments

Comments
 (0)