Skip to content

Commit 8bd6f59

Browse files
authored
Added SharedReplaceableClient and made worker client replaceable between retries (#1000)
1 parent 4078190 commit 8bd6f59

16 files changed

Lines changed: 818 additions & 358 deletions

File tree

client/src/lib.rs

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod metrics;
1313
#[doc(hidden)]
1414
pub mod proxy;
1515
mod raw;
16+
mod replaceable;
1617
mod retry;
1718
mod worker_registry;
1819
mod workflow_handle;
@@ -23,6 +24,7 @@ pub use crate::{
2324
};
2425
pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
2526
pub use raw::{CloudService, HealthService, OperatorService, TestService, WorkflowService};
27+
pub use replaceable::SharedReplaceableClient;
2628
pub use temporal_sdk_core_protos::temporal::api::{
2729
enums::v1::ArchivalState,
2830
filter::v1::{StartTimeFilter, StatusFilter, WorkflowExecutionFilter, WorkflowTypeFilter},
@@ -906,12 +908,12 @@ impl Client {
906908
}
907909

908910
impl NamespacedClient for Client {
909-
fn namespace(&self) -> &str {
910-
&self.namespace
911+
fn namespace(&self) -> String {
912+
self.namespace.clone()
911913
}
912914

913-
fn get_identity(&self) -> &str {
914-
&self.inner.options.identity
915+
fn identity(&self) -> String {
916+
self.inner.options.identity.clone()
915917
}
916918
}
917919

@@ -1234,9 +1236,9 @@ pub trait WorkflowClientTrait: NamespacedClient {
12341236
/// A client that is bound to a namespace
12351237
pub trait NamespacedClient {
12361238
/// Returns the namespace this client is bound to
1237-
fn namespace(&self) -> &str;
1239+
fn namespace(&self) -> String;
12381240
/// Returns the client identity
1239-
fn get_identity(&self) -> &str;
1241+
fn identity(&self) -> String;
12401242
}
12411243

12421244
/// Optional fields supplied at the start of workflow execution
@@ -1398,7 +1400,7 @@ where
13981400
Ok(self
13991401
.clone()
14001402
.start_workflow_execution(StartWorkflowExecutionRequest {
1401-
namespace: self.namespace().to_owned(),
1403+
namespace: self.namespace(),
14021404
input: input.into_payloads(),
14031405
workflow_id,
14041406
workflow_type: Some(WorkflowType {
@@ -1436,7 +1438,7 @@ where
14361438
run_id: String,
14371439
) -> Result<ResetStickyTaskQueueResponse> {
14381440
let request = ResetStickyTaskQueueRequest {
1439-
namespace: self.namespace().to_owned(),
1441+
namespace: self.namespace(),
14401442
execution: Some(WorkflowExecution {
14411443
workflow_id,
14421444
run_id,
@@ -1458,8 +1460,8 @@ where
14581460
RespondActivityTaskCompletedRequest {
14591461
task_token: task_token.0,
14601462
result,
1461-
identity: self.get_identity().to_owned(),
1462-
namespace: self.namespace().to_owned(),
1463+
identity: self.identity(),
1464+
namespace: self.namespace(),
14631465
..Default::default()
14641466
},
14651467
)
@@ -1476,8 +1478,8 @@ where
14761478
RecordActivityTaskHeartbeatRequest {
14771479
task_token: task_token.0,
14781480
details,
1479-
identity: self.get_identity().to_owned(),
1480-
namespace: self.namespace().to_owned(),
1481+
identity: self.identity(),
1482+
namespace: self.namespace(),
14811483
},
14821484
)
14831485
.await?
@@ -1493,8 +1495,8 @@ where
14931495
RespondActivityTaskCanceledRequest {
14941496
task_token: task_token.0,
14951497
details,
1496-
identity: self.get_identity().to_owned(),
1497-
namespace: self.namespace().to_owned(),
1498+
identity: self.identity(),
1499+
namespace: self.namespace(),
14981500
..Default::default()
14991501
},
15001502
)
@@ -1512,14 +1514,14 @@ where
15121514
) -> Result<SignalWorkflowExecutionResponse> {
15131515
Ok(WorkflowService::signal_workflow_execution(&mut self.clone(),
15141516
SignalWorkflowExecutionRequest {
1515-
namespace: self.namespace().to_owned(),
1517+
namespace: self.namespace(),
15161518
workflow_execution: Some(WorkflowExecution {
15171519
workflow_id,
15181520
run_id,
15191521
}),
15201522
signal_name,
15211523
input: payloads,
1522-
identity: self.get_identity().to_owned(),
1524+
identity: self.identity(),
15231525
request_id: request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
15241526
..Default::default()
15251527
},
@@ -1535,7 +1537,7 @@ where
15351537
) -> Result<SignalWithStartWorkflowExecutionResponse> {
15361538
Ok(WorkflowService::signal_with_start_workflow_execution(&mut self.clone(),
15371539
SignalWithStartWorkflowExecutionRequest {
1538-
namespace: self.namespace().to_owned(),
1540+
namespace: self.namespace(),
15391541
workflow_id: options.workflow_id,
15401542
workflow_type: Some(WorkflowType {
15411543
name: options.workflow_type,
@@ -1548,7 +1550,7 @@ where
15481550
input: options.input,
15491551
signal_name: options.signal_name,
15501552
signal_input: options.signal_input,
1551-
identity: self.get_identity().to_owned(),
1553+
identity: self.identity(),
15521554
request_id: options
15531555
.request_id
15541556
.unwrap_or_else(|| Uuid::new_v4().to_string()),
@@ -1579,7 +1581,7 @@ where
15791581
) -> Result<QueryWorkflowResponse> {
15801582
Ok(self.clone().query_workflow(
15811583
QueryWorkflowRequest {
1582-
namespace: self.namespace().to_owned(),
1584+
namespace: self.namespace(),
15831585
execution: Some(WorkflowExecution {
15841586
workflow_id,
15851587
run_id,
@@ -1599,7 +1601,7 @@ where
15991601
) -> Result<DescribeWorkflowExecutionResponse> {
16001602
Ok(WorkflowService::describe_workflow_execution(&mut self.clone(),
16011603
DescribeWorkflowExecutionRequest {
1602-
namespace: self.namespace().to_owned(),
1604+
namespace: self.namespace(),
16031605
execution: Some(WorkflowExecution {
16041606
workflow_id,
16051607
run_id: run_id.unwrap_or_default(),
@@ -1618,7 +1620,7 @@ where
16181620
) -> Result<GetWorkflowExecutionHistoryResponse> {
16191621
Ok(WorkflowService::get_workflow_execution_history(&mut self.clone(),
16201622
GetWorkflowExecutionHistoryRequest {
1621-
namespace: self.namespace().to_owned(),
1623+
namespace: self.namespace(),
16221624
execution: Some(WorkflowExecution {
16231625
workflow_id,
16241626
run_id: run_id.unwrap_or_default(),
@@ -1640,12 +1642,12 @@ where
16401642
) -> Result<RequestCancelWorkflowExecutionResponse> {
16411643
Ok(self.clone().request_cancel_workflow_execution(
16421644
RequestCancelWorkflowExecutionRequest {
1643-
namespace: self.namespace().to_owned(),
1645+
namespace: self.namespace(),
16441646
workflow_execution: Some(WorkflowExecution {
16451647
workflow_id,
16461648
run_id: run_id.unwrap_or_default(),
16471649
}),
1648-
identity: self.get_identity().to_owned(),
1650+
identity: self.identity(),
16491651
request_id: request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
16501652
first_execution_run_id: "".to_string(),
16511653
reason,
@@ -1663,14 +1665,14 @@ where
16631665
) -> Result<TerminateWorkflowExecutionResponse> {
16641666
Ok(WorkflowService::terminate_workflow_execution(&mut self.clone(),
16651667
TerminateWorkflowExecutionRequest {
1666-
namespace: self.namespace().to_owned(),
1668+
namespace: self.namespace(),
16671669
workflow_execution: Some(WorkflowExecution {
16681670
workflow_id,
16691671
run_id: run_id.unwrap_or_default(),
16701672
}),
16711673
reason: "".to_string(),
16721674
details: None,
1673-
identity: self.get_identity().to_owned(),
1675+
identity: self.identity(),
16741676
first_execution_run_id: "".to_string(),
16751677
links: vec![],
16761678
},
@@ -1716,7 +1718,7 @@ where
17161718
) -> Result<ListOpenWorkflowExecutionsResponse> {
17171719
Ok(WorkflowService::list_open_workflow_executions(&mut self.clone(),
17181720
ListOpenWorkflowExecutionsRequest {
1719-
namespace: self.namespace().to_owned(),
1721+
namespace: self.namespace(),
17201722
maximum_page_size,
17211723
next_page_token,
17221724
start_time_filter,
@@ -1736,7 +1738,7 @@ where
17361738
) -> Result<ListClosedWorkflowExecutionsResponse> {
17371739
Ok(WorkflowService::list_closed_workflow_executions(&mut self.clone(),
17381740
ListClosedWorkflowExecutionsRequest {
1739-
namespace: self.namespace().to_owned(),
1741+
namespace: self.namespace(),
17401742
maximum_page_size,
17411743
next_page_token,
17421744
start_time_filter,
@@ -1755,7 +1757,7 @@ where
17551757
) -> Result<ListWorkflowExecutionsResponse> {
17561758
Ok(WorkflowService::list_workflow_executions(&mut self.clone(),
17571759
ListWorkflowExecutionsRequest {
1758-
namespace: self.namespace().to_owned(),
1760+
namespace: self.namespace(),
17591761
page_size,
17601762
next_page_token,
17611763
query,
@@ -1773,7 +1775,7 @@ where
17731775
) -> Result<ListArchivedWorkflowExecutionsResponse> {
17741776
Ok(WorkflowService::list_archived_workflow_executions(&mut self.clone(),
17751777
ListArchivedWorkflowExecutionsRequest {
1776-
namespace: self.namespace().to_owned(),
1778+
namespace: self.namespace(),
17771779
page_size,
17781780
next_page_token,
17791781
query,
@@ -1801,7 +1803,7 @@ where
18011803
) -> Result<UpdateWorkflowExecutionResponse> {
18021804
Ok(WorkflowService::update_workflow_execution(&mut self.clone(),
18031805
UpdateWorkflowExecutionRequest {
1804-
namespace: self.namespace().to_owned(),
1806+
namespace: self.namespace(),
18051807
workflow_execution: Some(WorkflowExecution {
18061808
workflow_id,
18071809
run_id,
@@ -1810,7 +1812,7 @@ where
18101812
request: Some(update::v1::Request {
18111813
meta: Some(update::v1::Meta {
18121814
update_id: "".into(),
1813-
identity: self.get_identity().to_owned(),
1815+
identity: self.identity(),
18141816
}),
18151817
input: Some(update::v1::Input {
18161818
header: None,
@@ -1853,7 +1855,7 @@ pub trait WfClientExt: WfHandleClient + Sized + Clone {
18531855
UntypedWorkflowHandle::new(
18541856
self.clone(),
18551857
WorkflowExecutionInfo {
1856-
namespace: self.namespace().to_string(),
1858+
namespace: self.namespace(),
18571859
workflow_id: workflow_id.into(),
18581860
run_id: if rid.is_empty() { None } else { Some(rid) },
18591861
},

0 commit comments

Comments
 (0)