Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod metrics;
#[doc(hidden)]
pub mod proxy;
mod raw;
mod replaceable;
mod retry;
mod worker_registry;
mod workflow_handle;
Expand All @@ -23,6 +24,7 @@ pub use crate::{
};
pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
pub use raw::{CloudService, HealthService, OperatorService, TestService, WorkflowService};
pub use replaceable::SharedReplaceableClient;
pub use temporal_sdk_core_protos::temporal::api::{
enums::v1::ArchivalState,
filter::v1::{StartTimeFilter, StatusFilter, WorkflowExecutionFilter, WorkflowTypeFilter},
Expand Down Expand Up @@ -906,12 +908,12 @@ impl Client {
}

impl NamespacedClient for Client {
fn namespace(&self) -> &str {
&self.namespace
fn namespace(&self) -> String {
self.namespace.clone()
}

fn get_identity(&self) -> &str {
&self.inner.options.identity
fn identity(&self) -> String {
self.inner.options.identity.clone()
}
}

Expand Down Expand Up @@ -1234,9 +1236,9 @@ pub trait WorkflowClientTrait: NamespacedClient {
/// A client that is bound to a namespace
pub trait NamespacedClient {
/// Returns the namespace this client is bound to
fn namespace(&self) -> &str;
fn namespace(&self) -> String;
/// Returns the client identity
fn get_identity(&self) -> &str;
fn identity(&self) -> String;
}

/// Optional fields supplied at the start of workflow execution
Expand Down Expand Up @@ -1398,7 +1400,7 @@ where
Ok(self
.clone()
.start_workflow_execution(StartWorkflowExecutionRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
input: input.into_payloads(),
workflow_id,
workflow_type: Some(WorkflowType {
Expand Down Expand Up @@ -1436,7 +1438,7 @@ where
run_id: String,
) -> Result<ResetStickyTaskQueueResponse> {
let request = ResetStickyTaskQueueRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
execution: Some(WorkflowExecution {
workflow_id,
run_id,
Expand All @@ -1458,8 +1460,8 @@ where
RespondActivityTaskCompletedRequest {
task_token: task_token.0,
result,
identity: self.get_identity().to_owned(),
namespace: self.namespace().to_owned(),
identity: self.identity(),
namespace: self.namespace(),
..Default::default()
},
)
Expand All @@ -1476,8 +1478,8 @@ where
RecordActivityTaskHeartbeatRequest {
task_token: task_token.0,
details,
identity: self.get_identity().to_owned(),
namespace: self.namespace().to_owned(),
identity: self.identity(),
namespace: self.namespace(),
},
)
.await?
Expand All @@ -1493,8 +1495,8 @@ where
RespondActivityTaskCanceledRequest {
task_token: task_token.0,
details,
identity: self.get_identity().to_owned(),
namespace: self.namespace().to_owned(),
identity: self.identity(),
namespace: self.namespace(),
..Default::default()
},
)
Expand All @@ -1512,14 +1514,14 @@ where
) -> Result<SignalWorkflowExecutionResponse> {
Ok(WorkflowService::signal_workflow_execution(&mut self.clone(),
SignalWorkflowExecutionRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
workflow_execution: Some(WorkflowExecution {
workflow_id,
run_id,
}),
signal_name,
input: payloads,
identity: self.get_identity().to_owned(),
identity: self.identity(),
request_id: request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
..Default::default()
},
Expand All @@ -1535,7 +1537,7 @@ where
) -> Result<SignalWithStartWorkflowExecutionResponse> {
Ok(WorkflowService::signal_with_start_workflow_execution(&mut self.clone(),
SignalWithStartWorkflowExecutionRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
workflow_id: options.workflow_id,
workflow_type: Some(WorkflowType {
name: options.workflow_type,
Expand All @@ -1548,7 +1550,7 @@ where
input: options.input,
signal_name: options.signal_name,
signal_input: options.signal_input,
identity: self.get_identity().to_owned(),
identity: self.identity(),
request_id: options
.request_id
.unwrap_or_else(|| Uuid::new_v4().to_string()),
Expand Down Expand Up @@ -1579,7 +1581,7 @@ where
) -> Result<QueryWorkflowResponse> {
Ok(self.clone().query_workflow(
QueryWorkflowRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
execution: Some(WorkflowExecution {
workflow_id,
run_id,
Expand All @@ -1599,7 +1601,7 @@ where
) -> Result<DescribeWorkflowExecutionResponse> {
Ok(WorkflowService::describe_workflow_execution(&mut self.clone(),
DescribeWorkflowExecutionRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
execution: Some(WorkflowExecution {
workflow_id,
run_id: run_id.unwrap_or_default(),
Expand All @@ -1618,7 +1620,7 @@ where
) -> Result<GetWorkflowExecutionHistoryResponse> {
Ok(WorkflowService::get_workflow_execution_history(&mut self.clone(),
GetWorkflowExecutionHistoryRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
execution: Some(WorkflowExecution {
workflow_id,
run_id: run_id.unwrap_or_default(),
Expand All @@ -1640,12 +1642,12 @@ where
) -> Result<RequestCancelWorkflowExecutionResponse> {
Ok(self.clone().request_cancel_workflow_execution(
RequestCancelWorkflowExecutionRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
workflow_execution: Some(WorkflowExecution {
workflow_id,
run_id: run_id.unwrap_or_default(),
}),
identity: self.get_identity().to_owned(),
identity: self.identity(),
request_id: request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
first_execution_run_id: "".to_string(),
reason,
Expand All @@ -1663,14 +1665,14 @@ where
) -> Result<TerminateWorkflowExecutionResponse> {
Ok(WorkflowService::terminate_workflow_execution(&mut self.clone(),
TerminateWorkflowExecutionRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
workflow_execution: Some(WorkflowExecution {
workflow_id,
run_id: run_id.unwrap_or_default(),
}),
reason: "".to_string(),
details: None,
identity: self.get_identity().to_owned(),
identity: self.identity(),
first_execution_run_id: "".to_string(),
links: vec![],
},
Expand Down Expand Up @@ -1716,7 +1718,7 @@ where
) -> Result<ListOpenWorkflowExecutionsResponse> {
Ok(WorkflowService::list_open_workflow_executions(&mut self.clone(),
ListOpenWorkflowExecutionsRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
maximum_page_size,
next_page_token,
start_time_filter,
Expand All @@ -1736,7 +1738,7 @@ where
) -> Result<ListClosedWorkflowExecutionsResponse> {
Ok(WorkflowService::list_closed_workflow_executions(&mut self.clone(),
ListClosedWorkflowExecutionsRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
maximum_page_size,
next_page_token,
start_time_filter,
Expand All @@ -1755,7 +1757,7 @@ where
) -> Result<ListWorkflowExecutionsResponse> {
Ok(WorkflowService::list_workflow_executions(&mut self.clone(),
ListWorkflowExecutionsRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
page_size,
next_page_token,
query,
Expand All @@ -1773,7 +1775,7 @@ where
) -> Result<ListArchivedWorkflowExecutionsResponse> {
Ok(WorkflowService::list_archived_workflow_executions(&mut self.clone(),
ListArchivedWorkflowExecutionsRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
page_size,
next_page_token,
query,
Expand Down Expand Up @@ -1801,7 +1803,7 @@ where
) -> Result<UpdateWorkflowExecutionResponse> {
Ok(WorkflowService::update_workflow_execution(&mut self.clone(),
UpdateWorkflowExecutionRequest {
namespace: self.namespace().to_owned(),
namespace: self.namespace(),
workflow_execution: Some(WorkflowExecution {
workflow_id,
run_id,
Expand All @@ -1810,7 +1812,7 @@ where
request: Some(update::v1::Request {
meta: Some(update::v1::Meta {
update_id: "".into(),
identity: self.get_identity().to_owned(),
identity: self.identity(),
}),
input: Some(update::v1::Input {
header: None,
Expand Down Expand Up @@ -1853,7 +1855,7 @@ pub trait WfClientExt: WfHandleClient + Sized + Clone {
UntypedWorkflowHandle::new(
self.clone(),
WorkflowExecutionInfo {
namespace: self.namespace().to_string(),
namespace: self.namespace(),
workflow_id: workflow_id.into(),
run_id: if rid.is_empty() { None } else { Some(rid) },
},
Expand Down
Loading
Loading