Skip to content

Commit 1c607bd

Browse files
greenwoodcmaajtodd
andauthored
make aws-smithy-mocks interceptor multi-concurrent (#4469)
the current interceptor is single-threaded in that it only stores the last mocked response and returns it in the interceptor chain when appropriate. this doesn't work in multi-concurrent scenarios where you want to create and then return multiple (potentially different) responses in parallel. this change improves upon that so that the interceptor returns the mocked response pertaining to the appropriate request at the appropriate time. ## Motivation and Context Allows for the `aws-smithy-mocks` tooling to be used in multi-concurrent or multi-threaded scenarios ## Testing I have application testing that relies on this functionality, but couldn't find a way to satisfactorily test this within `aws-smithy-mocks` itself. I think the problem is that the test operation tooling used by most of the tests is serial, which makes it hard if not impossible to exercise the race condition. Maybe I'm missing something easy here. ## Checklist <!--- If a checkbox below is not applicable, then please DELETE it rather than leaving it unchecked --> - [x] For changes to the smithy-rs codegen or runtime crates, I have created a changelog entry Markdown file in the `.changelog` directory, specifying "client," "server," or both in the `applies_to` key. - [ ] For changes to the AWS SDK, generated SDK code, or SDK runtime crates, I have created a changelog entry Markdown file in the `.changelog` directory, specifying "aws-sdk-rust" in the `applies_to` key. ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --------- Co-authored-by: Aaron Todd <aajtodd@users.noreply.github.com>
1 parent 88a916e commit 1c607bd

3 files changed

Lines changed: 84 additions & 48 deletions

File tree

.changelog/1767741904.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
applies_to: ["client"]
3+
authors:
4+
- greenwoodcm
5+
references:
6+
- smithy-rs#4469
7+
breaking: false
8+
new_feature: false
9+
bug_fix: false
10+
---
11+
Add support for `aws-smithy-mocks` interceptor to handle concurrent requests.

rust-runtime/aws-smithy-mocks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "aws-smithy-mocks"
3-
version = "0.2.2"
3+
version = "0.2.3"
44
authors = ["AWS Rust SDK Team <aws-sdk-rust@amazon.com>"]
55
description = "Testing utilities for smithy-rs generated clients"
66
edition = "2021"

rust-runtime/aws-smithy-mocks/src/interceptor.rs

Lines changed: 72 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@ use aws_smithy_http_client::test_util::infallible_client_fn;
88
use aws_smithy_runtime_api::box_error::BoxError;
99
use aws_smithy_runtime_api::client::http::SharedHttpClient;
1010
use aws_smithy_runtime_api::client::interceptors::context::{
11-
BeforeSerializationInterceptorContextMut, BeforeTransmitInterceptorContextMut, Error,
11+
BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut, Error,
1212
FinalizerInterceptorContextMut, Input, Output,
1313
};
1414
use aws_smithy_runtime_api::client::interceptors::Intercept;
1515
use aws_smithy_runtime_api::client::orchestrator::{HttpResponse, OrchestratorError};
1616
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
1717
use aws_smithy_types::body::SdkBody;
1818
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
19-
use std::collections::VecDeque;
19+
use std::collections::{HashMap, VecDeque};
2020
use std::fmt;
21+
use std::sync::atomic::{AtomicUsize, Ordering};
2122
use std::sync::{Arc, Mutex};
2223

2324
// Store active rule in config bag
@@ -28,12 +29,24 @@ impl Storable for ActiveRule {
2829
type Storer = StoreReplace<ActiveRule>;
2930
}
3031

32+
// Store the response ID in the config bag so that we can find the proper response
33+
#[derive(Debug, Clone)]
34+
struct ResponseId(usize);
35+
36+
impl Storable for ResponseId {
37+
type Storer = StoreReplace<ResponseId>;
38+
}
39+
3140
/// Interceptor which produces mock responses based on a list of rules
3241
pub struct MockResponseInterceptor {
3342
rules: Arc<Mutex<VecDeque<Rule>>>,
3443
rule_mode: RuleMode,
3544
must_match: bool,
36-
active_response: Arc<Mutex<Option<MockResponse<Output, Error>>>>,
45+
active_responses: Arc<Mutex<HashMap<usize, MockResponse<Output, Error>>>>,
46+
/// Monotonically increasing identifier that identifies a given response
47+
/// so that we can store it on the request path and correctly load it back
48+
/// on the response path.
49+
current_response_id: Arc<AtomicUsize>,
3750
}
3851

3952
impl fmt::Debug for MockResponseInterceptor {
@@ -57,7 +70,8 @@ impl MockResponseInterceptor {
5770
rules: Default::default(),
5871
rule_mode: RuleMode::MatchAny,
5972
must_match: true,
60-
active_response: Default::default(),
73+
active_responses: Default::default(),
74+
current_response_id: Default::default(),
6175
}
6276
}
6377
/// Add a rule to the Interceptor
@@ -91,9 +105,9 @@ impl Intercept for MockResponseInterceptor {
91105
"MockResponseInterceptor"
92106
}
93107

94-
fn modify_before_serialization(
108+
fn read_before_serialization(
95109
&self,
96-
context: &mut BeforeSerializationInterceptorContextMut<'_>,
110+
context: &BeforeSerializationInterceptorContextRef<'_>,
97111
_runtime_components: &RuntimeComponents,
98112
cfg: &mut ConfigBag,
99113
) -> Result<(), BoxError> {
@@ -165,10 +179,17 @@ impl Intercept for MockResponseInterceptor {
165179
(Some(rule), Some(response)) => {
166180
// Store the rule in the config bag
167181
cfg.interceptor_state().store_put(ActiveRule(rule));
168-
// store the response on the interceptor (because going
169-
// through interceptor context requires the type to impl Clone)
170-
let mut active_resp = self.active_response.lock().unwrap();
171-
let _ = (*active_resp).replace(response);
182+
183+
// we have to store the response on the interceptor, because going
184+
// through interceptor context requires the type to impl Clone. to
185+
// find the right response for this request we generate a new monotonically
186+
// increasing identifier, store that on request context, and then map from
187+
// the response identifier to the response payload on the global interceptor
188+
// state.
189+
let response_id = self.current_response_id.fetch_add(1, Ordering::SeqCst);
190+
cfg.interceptor_state().store_put(ResponseId(response_id));
191+
let mut active_responses = self.active_responses.lock().unwrap();
192+
active_responses.insert(response_id, response);
172193
}
173194
_ => {
174195
// No matching rule or no response
@@ -189,30 +210,32 @@ impl Intercept for MockResponseInterceptor {
189210
_runtime_components: &RuntimeComponents,
190211
cfg: &mut ConfigBag,
191212
) -> Result<(), BoxError> {
192-
let mut state = self.active_response.lock().unwrap();
193-
let mut active_response = (*state).take();
194-
if active_response.is_none() {
195-
// in the case of retries we try to get the next response if it has been consumed
196-
if let Some(active_rule) = cfg.load::<ActiveRule>() {
197-
// During retries, input is not available in modify_before_transmit.
198-
// For HTTP status responses that don't use the input, we can use a dummy input.
199-
let dummy_input = Input::doesnt_matter();
200-
let next_resp = active_rule.0.next_response(&dummy_input);
201-
active_response = next_resp;
213+
if let Some(response_id) = cfg.load::<ResponseId>() {
214+
let mut state = self.active_responses.lock().unwrap();
215+
let mut active_response = state.remove(&response_id.0);
216+
if active_response.is_none() {
217+
// in the case of retries we try to get the next response if it has been consumed
218+
if let Some(active_rule) = cfg.load::<ActiveRule>() {
219+
// During retries, input is not available in modify_before_transmit.
220+
// For HTTP status responses that don't use the input, we can use a dummy input.
221+
let dummy_input = Input::doesnt_matter();
222+
let next_resp = active_rule.0.next_response(&dummy_input);
223+
active_response = next_resp;
224+
}
202225
}
203-
}
204226

205-
if let Some(resp) = active_response {
206-
match resp {
207-
// place the http response into the extensions and let the HTTP client return it
208-
MockResponse::Http(http_resp) => {
209-
context
210-
.request_mut()
211-
.add_extension(MockHttpResponse(Arc::new(http_resp)));
212-
}
213-
_ => {
214-
// put it back for modeled output/errors
215-
let _ = (*state).replace(resp);
227+
if let Some(resp) = active_response {
228+
match resp {
229+
// place the http response into the extensions and let the HTTP client return it
230+
MockResponse::Http(http_resp) => {
231+
context
232+
.request_mut()
233+
.add_extension(MockHttpResponse(Arc::new(http_resp)));
234+
}
235+
_ => {
236+
// put it back for modeled output/errors
237+
state.insert(response_id.0, resp);
238+
}
216239
}
217240
}
218241
}
@@ -224,23 +247,25 @@ impl Intercept for MockResponseInterceptor {
224247
&self,
225248
context: &mut FinalizerInterceptorContextMut<'_>,
226249
_runtime_components: &RuntimeComponents,
227-
_cfg: &mut ConfigBag,
250+
cfg: &mut ConfigBag,
228251
) -> Result<(), BoxError> {
229252
// Handle modeled responses
230-
let mut state = self.active_response.lock().unwrap();
231-
let active_response = (*state).take();
232-
if let Some(resp) = active_response {
233-
match resp {
234-
MockResponse::Output(output) => {
235-
context.inner_mut().set_output_or_error(Ok(output));
236-
}
237-
MockResponse::Error(error) => {
238-
context
239-
.inner_mut()
240-
.set_output_or_error(Err(OrchestratorError::operation(error)));
241-
}
242-
MockResponse::Http(_) => {
243-
// HTTP responses are handled by the mock HTTP client
253+
if let Some(response_id) = cfg.load::<ResponseId>() {
254+
let mut state = self.active_responses.lock().unwrap();
255+
let active_response = state.remove(&response_id.0);
256+
if let Some(resp) = active_response {
257+
match resp {
258+
MockResponse::Output(output) => {
259+
context.inner_mut().set_output_or_error(Ok(output));
260+
}
261+
MockResponse::Error(error) => {
262+
context
263+
.inner_mut()
264+
.set_output_or_error(Err(OrchestratorError::operation(error)));
265+
}
266+
MockResponse::Http(_) => {
267+
// HTTP responses are handled by the mock HTTP client
268+
}
244269
}
245270
}
246271
}

0 commit comments

Comments
 (0)