Skip to content

Commit 31fcf69

Browse files
committed
PR suggestions, formatting
1 parent d4d24c2 commit 31fcf69

4 files changed

Lines changed: 62 additions & 28 deletions

File tree

client/src/raw.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,23 +137,23 @@ where
137137
type SvcType = T;
138138

139139
fn workflow_client_mut(&mut self) -> &mut WorkflowServiceClient<Self::SvcType> {
140-
self.refresh_inner().workflow_client_mut()
140+
self.inner_mut_refreshed().workflow_client_mut()
141141
}
142142

143143
fn operator_client_mut(&mut self) -> &mut OperatorServiceClient<Self::SvcType> {
144-
self.refresh_inner().operator_client_mut()
144+
self.inner_mut_refreshed().operator_client_mut()
145145
}
146146

147147
fn cloud_client_mut(&mut self) -> &mut CloudServiceClient<Self::SvcType> {
148-
self.refresh_inner().cloud_client_mut()
148+
self.inner_mut_refreshed().cloud_client_mut()
149149
}
150150

151151
fn test_client_mut(&mut self) -> &mut TestServiceClient<Self::SvcType> {
152-
self.refresh_inner().test_client_mut()
152+
self.inner_mut_refreshed().test_client_mut()
153153
}
154154

155155
fn health_client_mut(&mut self) -> &mut HealthClient<Self::SvcType> {
156-
self.refresh_inner().health_client_mut()
156+
self.inner_mut_refreshed().health_client_mut()
157157
}
158158

159159
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {

client/src/replaceable.rs

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use crate::NamespacedClient;
2-
use std::borrow::Cow;
3-
use std::sync::atomic::{AtomicU32, Ordering};
4-
use std::sync::{Arc, RwLock};
2+
use std::{
3+
borrow::Cow,
4+
sync::{
5+
Arc, RwLock,
6+
atomic::{AtomicU32, Ordering},
7+
},
8+
};
59

610
/// A client wrapper that allows replacing the underlying client at a later point in time.
711
/// Clones of this struct have a shared reference to the underlying client, and each clone also
@@ -83,24 +87,26 @@ where
8387
self.inner_cow().into_owned()
8488
}
8589

86-
/// Returns a reference to this instance's cached clone of the underlying client if it's up to
87-
/// date, or a fresh clone of the shared client otherwise. Because it's an immutable method,
88-
/// it will not update this instance's cached clone. For this reason, prefer to use
89-
/// [`refresh_inner()`](Self::refresh_inner) when possible.
90+
/// Returns an immutable reference to this instance's cached clone of the underlying client if
91+
/// it's up to date, or a fresh clone of the shared client otherwise. Because it's an immutable
92+
/// method, it will not update this instance's cached clone. For this reason, prefer to use
93+
/// [`inner_mut_refreshed()`](Self::inner_mut_refreshed) when possible.
9094
pub fn inner_cow(&self) -> Cow<'_, C> {
9195
self.shared_data
9296
.fetch_newer_than(self.cloned_generation)
9397
.map(|(c, _)| Cow::Owned(c))
9498
.unwrap_or_else(|| Cow::Borrowed(&self.cloned_client))
9599
}
96100

97-
/// Refreshes this instance's cached clone of the underlying client. Returns a mutable reference
98-
/// to it. Called automatically by other mutable methods, in particular by all RPC calls.
101+
/// Returns a mutable reference to this instance's cached clone of the underlying client. If the
102+
/// cached clone is not up to date, it's refreshed before the reference is returned. This method
103+
/// is called automatically by most other mutable methods, in particular by all service calls,
104+
/// so most of the time it doesn't need to be called directly.
99105
///
100106
/// While this method allows mutable access to the underlying client, any configuration changes
101107
/// will not be shared with other instances, and will be lost if the client gets replaced from
102-
/// anywhere. To make configuration changes, use [`replace_client()`](Self::refresh_client) instead.
103-
pub fn refresh_inner(&mut self) -> &mut C {
108+
/// anywhere. To make configuration changes, use [`replace_client()`](Self::replace_client) instead.
109+
pub fn inner_mut_refreshed(&mut self) -> &mut C {
104110
if let Some((client, generation)) =
105111
self.shared_data.fetch_newer_than(self.cloned_generation)
106112
{
@@ -145,7 +151,8 @@ where
145151

146152
#[cfg(test)]
147153
mod tests {
148-
use crate::{NamespacedClient, SharedReplaceableClient};
154+
use super::*;
155+
use crate::NamespacedClient;
149156
use std::borrow::Cow;
150157

151158
#[derive(Debug, Clone)]
@@ -185,7 +192,7 @@ mod tests {
185192
};
186193
assert_eq!(inner.identity, "2");
187194

188-
assert_eq!(client.refresh_inner().identity, "2");
195+
assert_eq!(client.inner_mut_refreshed().identity, "2");
189196
let Cow::Borrowed(inner) = client.inner_cow() else {
190197
panic!("expected borrowed inner");
191198
};
@@ -214,4 +221,33 @@ mod tests {
214221
assert_eq!(original1.identity(), "2");
215222
assert_eq!(clone1.identity(), "2");
216223
}
224+
225+
#[test]
226+
fn client_replaced_from_multiple_threads() {
227+
let mut client = SharedReplaceableClient::new(StubClient::new("original"));
228+
std::thread::scope(|scope| {
229+
for thread_no in 0..100 {
230+
let mut client = client.clone();
231+
scope.spawn(move || {
232+
for i in 0..1000 {
233+
let old_generation = client.cloned_generation;
234+
client.inner_mut_refreshed();
235+
let current_generation = client.cloned_generation;
236+
assert!(current_generation >= old_generation);
237+
let replace_identity = format!("{thread_no}-{i}");
238+
client.replace_client(StubClient::new(&replace_identity));
239+
client.inner_mut_refreshed();
240+
assert!(client.cloned_generation > current_generation);
241+
let refreshed_identity = client.identity();
242+
if refreshed_identity.split('-').next().unwrap() == thread_no.to_string() {
243+
assert_eq!(replace_identity, refreshed_identity);
244+
}
245+
}
246+
});
247+
}
248+
});
249+
client.inner_mut_refreshed();
250+
assert_eq!(client.cloned_generation, 100_000);
251+
assert!(client.identity().ends_with("-999"));
252+
}
217253
}

tests/integ_tests/polling_tests.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::common::{
2-
INTEG_CLIENT_NAME, INTEG_CLIENT_VERSION, get_integ_server_options, integ_dev_server_config,
3-
};
41
use crate::{
5-
common::{CoreWfStarter, init_core_and_create_wf, init_integ_telem, integ_worker_config},
2+
common::{
3+
CoreWfStarter, INTEG_CLIENT_NAME, INTEG_CLIENT_VERSION, get_integ_server_options,
4+
init_core_and_create_wf, init_integ_telem, integ_dev_server_config, integ_worker_config,
5+
},
66
integ_tests::activity_functions::echo,
77
};
88
use assert_matches::assert_matches;
@@ -17,13 +17,12 @@ use std::{
1717
};
1818
use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions};
1919
use temporal_sdk::{ActivityOptions, WfContext};
20-
use temporal_sdk_core::telemetry::CoreLogStreamConsumer;
21-
use temporal_sdk_core::test_help::NAMESPACE;
2220
use temporal_sdk_core::{
2321
ClientOptionsBuilder, CoreRuntime,
2422
ephemeral_server::{TemporalDevServerConfigBuilder, default_cached_download},
2523
init_worker,
26-
test_help::{WorkerTestHelpers, drain_pollers_and_shutdown},
24+
telemetry::CoreLogStreamConsumer,
25+
test_help::{NAMESPACE, WorkerTestHelpers, drain_pollers_and_shutdown},
2726
};
2827
use temporal_sdk_core_api::{
2928
Worker,

tests/integ_tests/workflow_tests/activities.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use crate::common::INTEG_CLIENT_IDENTITY;
21
use crate::{
32
common::{
4-
ActivationAssertionsInterceptor, CoreWfStarter, build_fake_sdk, init_core_and_create_wf,
5-
mock_sdk, mock_sdk_cfg,
3+
ActivationAssertionsInterceptor, CoreWfStarter, INTEG_CLIENT_IDENTITY, build_fake_sdk,
4+
init_core_and_create_wf, mock_sdk, mock_sdk_cfg,
65
},
76
integ_tests::activity_functions::echo,
87
};

0 commit comments

Comments
 (0)