Skip to content

Commit 2c91099

Browse files
committed
Added SharedReplaceableClient and made worker client replaceable between retries
1 parent 4614dcb commit 2c91099

12 files changed

Lines changed: 306 additions & 220 deletions

File tree

client/src/lib.rs

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod metrics;
1313
#[doc(hidden)]
1414
pub mod proxy;
1515
mod raw;
16+
mod replaceable;
1617
mod retry;
1718
mod worker_registry;
1819
mod workflow_handle;
@@ -23,6 +24,7 @@ pub use crate::{
2324
};
2425
pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
2526
pub use raw::{CloudService, HealthService, OperatorService, TestService, WorkflowService};
27+
pub use replaceable::SharedReplaceableClient;
2628
pub use temporal_sdk_core_protos::temporal::api::{
2729
enums::v1::ArchivalState,
2830
filter::v1::{StartTimeFilter, StatusFilter, WorkflowExecutionFilter, WorkflowTypeFilter},
@@ -774,12 +776,12 @@ impl Client {
774776
}
775777

776778
impl NamespacedClient for Client {
777-
fn namespace(&self) -> &str {
778-
&self.namespace
779+
fn namespace(&self) -> String {
780+
self.namespace.clone()
779781
}
780782

781-
fn get_identity(&self) -> &str {
782-
&self.inner.options.identity
783+
fn identity(&self) -> String {
784+
self.inner.options.identity.clone()
783785
}
784786
}
785787

@@ -1102,9 +1104,9 @@ pub trait WorkflowClientTrait: NamespacedClient {
11021104
/// A client that is bound to a namespace
11031105
pub trait NamespacedClient {
11041106
/// Returns the namespace this client is bound to
1105-
fn namespace(&self) -> &str;
1107+
fn namespace(&self) -> String;
11061108
/// Returns the client identity
1107-
fn get_identity(&self) -> &str;
1109+
fn identity(&self) -> String;
11081110
}
11091111

11101112
/// Optional fields supplied at the start of workflow execution
@@ -1266,7 +1268,7 @@ where
12661268
Ok(self
12671269
.clone()
12681270
.start_workflow_execution(StartWorkflowExecutionRequest {
1269-
namespace: self.namespace().to_owned(),
1271+
namespace: self.namespace(),
12701272
input: input.into_payloads(),
12711273
workflow_id,
12721274
workflow_type: Some(WorkflowType {
@@ -1304,7 +1306,7 @@ where
13041306
run_id: String,
13051307
) -> Result<ResetStickyTaskQueueResponse> {
13061308
let request = ResetStickyTaskQueueRequest {
1307-
namespace: self.namespace().to_owned(),
1309+
namespace: self.namespace(),
13081310
execution: Some(WorkflowExecution {
13091311
workflow_id,
13101312
run_id,
@@ -1326,8 +1328,8 @@ where
13261328
RespondActivityTaskCompletedRequest {
13271329
task_token: task_token.0,
13281330
result,
1329-
identity: self.get_identity().to_owned(),
1330-
namespace: self.namespace().to_owned(),
1331+
identity: self.identity(),
1332+
namespace: self.namespace(),
13311333
..Default::default()
13321334
},
13331335
)
@@ -1344,8 +1346,8 @@ where
13441346
RecordActivityTaskHeartbeatRequest {
13451347
task_token: task_token.0,
13461348
details,
1347-
identity: self.get_identity().to_owned(),
1348-
namespace: self.namespace().to_owned(),
1349+
identity: self.identity(),
1350+
namespace: self.namespace(),
13491351
},
13501352
)
13511353
.await?
@@ -1361,8 +1363,8 @@ where
13611363
RespondActivityTaskCanceledRequest {
13621364
task_token: task_token.0,
13631365
details,
1364-
identity: self.get_identity().to_owned(),
1365-
namespace: self.namespace().to_owned(),
1366+
identity: self.identity(),
1367+
namespace: self.namespace(),
13661368
..Default::default()
13671369
},
13681370
)
@@ -1380,14 +1382,14 @@ where
13801382
) -> Result<SignalWorkflowExecutionResponse> {
13811383
Ok(WorkflowService::signal_workflow_execution(&mut self.clone(),
13821384
SignalWorkflowExecutionRequest {
1383-
namespace: self.namespace().to_owned(),
1385+
namespace: self.namespace(),
13841386
workflow_execution: Some(WorkflowExecution {
13851387
workflow_id,
13861388
run_id,
13871389
}),
13881390
signal_name,
13891391
input: payloads,
1390-
identity: self.get_identity().to_owned(),
1392+
identity: self.identity(),
13911393
request_id: request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
13921394
..Default::default()
13931395
},
@@ -1403,7 +1405,7 @@ where
14031405
) -> Result<SignalWithStartWorkflowExecutionResponse> {
14041406
Ok(WorkflowService::signal_with_start_workflow_execution(&mut self.clone(),
14051407
SignalWithStartWorkflowExecutionRequest {
1406-
namespace: self.namespace().to_owned(),
1408+
namespace: self.namespace(),
14071409
workflow_id: options.workflow_id,
14081410
workflow_type: Some(WorkflowType {
14091411
name: options.workflow_type,
@@ -1416,7 +1418,7 @@ where
14161418
input: options.input,
14171419
signal_name: options.signal_name,
14181420
signal_input: options.signal_input,
1419-
identity: self.get_identity().to_owned(),
1421+
identity: self.identity(),
14201422
request_id: options
14211423
.request_id
14221424
.unwrap_or_else(|| Uuid::new_v4().to_string()),
@@ -1447,7 +1449,7 @@ where
14471449
) -> Result<QueryWorkflowResponse> {
14481450
Ok(self.clone().query_workflow(
14491451
QueryWorkflowRequest {
1450-
namespace: self.namespace().to_owned(),
1452+
namespace: self.namespace(),
14511453
execution: Some(WorkflowExecution {
14521454
workflow_id,
14531455
run_id,
@@ -1467,7 +1469,7 @@ where
14671469
) -> Result<DescribeWorkflowExecutionResponse> {
14681470
Ok(WorkflowService::describe_workflow_execution(&mut self.clone(),
14691471
DescribeWorkflowExecutionRequest {
1470-
namespace: self.namespace().to_owned(),
1472+
namespace: self.namespace(),
14711473
execution: Some(WorkflowExecution {
14721474
workflow_id,
14731475
run_id: run_id.unwrap_or_default(),
@@ -1486,7 +1488,7 @@ where
14861488
) -> Result<GetWorkflowExecutionHistoryResponse> {
14871489
Ok(WorkflowService::get_workflow_execution_history(&mut self.clone(),
14881490
GetWorkflowExecutionHistoryRequest {
1489-
namespace: self.namespace().to_owned(),
1491+
namespace: self.namespace(),
14901492
execution: Some(WorkflowExecution {
14911493
workflow_id,
14921494
run_id: run_id.unwrap_or_default(),
@@ -1508,12 +1510,12 @@ where
15081510
) -> Result<RequestCancelWorkflowExecutionResponse> {
15091511
Ok(self.clone().request_cancel_workflow_execution(
15101512
RequestCancelWorkflowExecutionRequest {
1511-
namespace: self.namespace().to_owned(),
1513+
namespace: self.namespace(),
15121514
workflow_execution: Some(WorkflowExecution {
15131515
workflow_id,
15141516
run_id: run_id.unwrap_or_default(),
15151517
}),
1516-
identity: self.get_identity().to_owned(),
1518+
identity: self.identity(),
15171519
request_id: request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
15181520
first_execution_run_id: "".to_string(),
15191521
reason,
@@ -1531,14 +1533,14 @@ where
15311533
) -> Result<TerminateWorkflowExecutionResponse> {
15321534
Ok(WorkflowService::terminate_workflow_execution(&mut self.clone(),
15331535
TerminateWorkflowExecutionRequest {
1534-
namespace: self.namespace().to_owned(),
1536+
namespace: self.namespace(),
15351537
workflow_execution: Some(WorkflowExecution {
15361538
workflow_id,
15371539
run_id: run_id.unwrap_or_default(),
15381540
}),
15391541
reason: "".to_string(),
15401542
details: None,
1541-
identity: self.get_identity().to_owned(),
1543+
identity: self.identity(),
15421544
first_execution_run_id: "".to_string(),
15431545
links: vec![],
15441546
},
@@ -1584,7 +1586,7 @@ where
15841586
) -> Result<ListOpenWorkflowExecutionsResponse> {
15851587
Ok(WorkflowService::list_open_workflow_executions(&mut self.clone(),
15861588
ListOpenWorkflowExecutionsRequest {
1587-
namespace: self.namespace().to_owned(),
1589+
namespace: self.namespace(),
15881590
maximum_page_size,
15891591
next_page_token,
15901592
start_time_filter,
@@ -1604,7 +1606,7 @@ where
16041606
) -> Result<ListClosedWorkflowExecutionsResponse> {
16051607
Ok(WorkflowService::list_closed_workflow_executions(&mut self.clone(),
16061608
ListClosedWorkflowExecutionsRequest {
1607-
namespace: self.namespace().to_owned(),
1609+
namespace: self.namespace(),
16081610
maximum_page_size,
16091611
next_page_token,
16101612
start_time_filter,
@@ -1623,7 +1625,7 @@ where
16231625
) -> Result<ListWorkflowExecutionsResponse> {
16241626
Ok(WorkflowService::list_workflow_executions(&mut self.clone(),
16251627
ListWorkflowExecutionsRequest {
1626-
namespace: self.namespace().to_owned(),
1628+
namespace: self.namespace(),
16271629
page_size,
16281630
next_page_token,
16291631
query,
@@ -1641,7 +1643,7 @@ where
16411643
) -> Result<ListArchivedWorkflowExecutionsResponse> {
16421644
Ok(WorkflowService::list_archived_workflow_executions(&mut self.clone(),
16431645
ListArchivedWorkflowExecutionsRequest {
1644-
namespace: self.namespace().to_owned(),
1646+
namespace: self.namespace(),
16451647
page_size,
16461648
next_page_token,
16471649
query,
@@ -1669,7 +1671,7 @@ where
16691671
) -> Result<UpdateWorkflowExecutionResponse> {
16701672
Ok(WorkflowService::update_workflow_execution(&mut self.clone(),
16711673
UpdateWorkflowExecutionRequest {
1672-
namespace: self.namespace().to_owned(),
1674+
namespace: self.namespace(),
16731675
workflow_execution: Some(WorkflowExecution {
16741676
workflow_id,
16751677
run_id,
@@ -1678,7 +1680,7 @@ where
16781680
request: Some(update::v1::Request {
16791681
meta: Some(update::v1::Meta {
16801682
update_id: "".into(),
1681-
identity: self.get_identity().to_owned(),
1683+
identity: self.identity(),
16821684
}),
16831685
input: Some(update::v1::Input {
16841686
header: None,
@@ -1721,7 +1723,7 @@ pub trait WfClientExt: WfHandleClient + Sized + Clone {
17211723
UntypedWorkflowHandle::new(
17221724
self.clone(),
17231725
WorkflowExecutionInfo {
1724-
namespace: self.namespace().to_string(),
1726+
namespace: self.namespace(),
17251727
workflow_id: workflow_id.into(),
17261728
run_id: if rid.is_empty() { None } else { Some(rid) },
17271729
},

0 commit comments

Comments
 (0)