forked from temporalio/sdk-rust
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlib.rs
More file actions
226 lines (196 loc) · 9.51 KB
/
lib.rs
File metadata and controls
226 lines (196 loc) · 9.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#[cfg(feature = "envconfig")]
pub mod envconfig;
pub mod errors;
pub mod telemetry;
pub mod worker;
use crate::{
errors::{
CompleteActivityError, CompleteNexusError, CompleteWfError, PollError,
WorkerValidationError,
},
worker::WorkerConfig,
};
use std::sync::Arc;
use temporal_sdk_core_protos::coresdk::{
ActivityHeartbeat, ActivityTaskCompletion,
activity_task::ActivityTask,
nexus::{NexusTask, NexusTaskCompletion},
workflow_activation::WorkflowActivation,
workflow_completion::WorkflowActivationCompletion,
};
use uuid::Uuid;
/// This trait is the primary way by which language specific SDKs interact with the core SDK.
/// It represents one worker, which has a (potentially shared) client for connecting to the service
/// and is bound to a specific task queue.
#[async_trait::async_trait]
pub trait Worker: Send + Sync {
/// Validate that the worker can properly connect to server, plus any other validation that
/// needs to be done asynchronously. Lang SDKs should call this function once before calling
/// any others.
async fn validate(&self) -> Result<(), WorkerValidationError>;
/// Ask the worker for some work, returning a [WorkflowActivation]. It is then the language
/// SDK's responsibility to call the appropriate workflow code with the provided inputs. Blocks
/// indefinitely until such work is available or [Worker::shutdown] is called.
///
/// It is important to understand that all activations must be responded to. There can only
/// be one outstanding activation for a particular run of a workflow at any time. If an
/// activation is not responded to, it will cause that workflow to become stuck forever.
///
/// See [WorkflowActivation] for more details on the expected behavior of lang w.r.t activation
/// & job processing.
///
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError>;
/// Ask the worker for some work, returning an [ActivityTask]. It is then the language SDK's
/// responsibility to call the appropriate activity code with the provided inputs. Blocks
/// indefinitely until such work is available or [Worker::shutdown] is called.
///
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_activity_task(&self) -> Result<ActivityTask, PollError>;
/// Ask the worker for some nexus related work. It is then the language SDK's
/// responsibility to call the appropriate nexus operation handler code with the provided
/// inputs. Blocks indefinitely until such work is available or [Worker::shutdown] is called.
///
/// All tasks must be responded to for shutdown to complete.
///
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_nexus_task(&self) -> Result<NexusTask, PollError>;
/// Tell the worker that a workflow activation has completed. May (and should) be freely called
/// concurrently. The future may take some time to resolve, as fetching more events might be
/// necessary for completion to... complete - thus SDK implementers should make sure they do
/// not serialize completions.
async fn complete_workflow_activation(
&self,
completion: WorkflowActivationCompletion,
) -> Result<(), CompleteWfError>;
/// Tell the worker that an activity has finished executing. May (and should) be freely called
/// concurrently.
async fn complete_activity_task(
&self,
completion: ActivityTaskCompletion,
) -> Result<(), CompleteActivityError>;
/// Tell the worker that a nexus task has completed. May (and should) be freely called
/// concurrently.
async fn complete_nexus_task(
&self,
completion: NexusTaskCompletion,
) -> Result<(), CompleteNexusError>;
/// Notify the Temporal service that an activity is still alive. Long running activities that
/// take longer than `activity_heartbeat_timeout` to finish must call this function in order to
/// report progress, otherwise the activity will timeout and a new attempt will be scheduled.
///
/// The first heartbeat request will be sent immediately, subsequent rapid calls to this
/// function will result in heartbeat requests being aggregated and the last one received during
/// the aggregation period will be sent to the server, where that period is defined as half the
/// heartbeat timeout.
///
/// Unlike Java/Go SDKs we do not return cancellation status as part of heartbeat response and
/// instead send it as a separate activity task to the lang, decoupling heartbeat and
/// cancellation processing.
///
/// For now activity still need to send heartbeats if they want to receive cancellation
/// requests. In the future we will change this and will dispatch cancellations more
/// proactively. Note that this function does not block on the server call and returns
/// immediately. Underlying validation errors are swallowed and logged, this has been agreed to
/// be optimal behavior for the user as we don't want to break activity execution due to badly
/// configured heartbeat options.
fn record_activity_heartbeat(&self, details: ActivityHeartbeat);
/// Request that a workflow be evicted by its run id. This will generate a workflow activation
/// with the eviction job inside it to be eventually returned by
/// [Worker::poll_workflow_activation]. If the workflow had any existing outstanding activations,
/// such activations are invalidated and subsequent completions of them will do nothing and log
/// a warning.
fn request_workflow_eviction(&self, run_id: &str);
/// Return this worker's config
fn get_config(&self) -> &WorkerConfig;
/// Initiate shutdown. See [Worker::shutdown], this is just a sync version that starts the
/// process. You can then wait on `shutdown` or [Worker::finalize_shutdown].
fn initiate_shutdown(&self);
/// Initiates async shutdown procedure, eventually ceases all polling of the server and shuts
/// down this worker. [Worker::poll_workflow_activation] and [Worker::poll_activity_task] should
/// be called until both return a `ShutDown` error to ensure that all outstanding work is
/// complete. This means that the lang sdk will need to call
/// [Worker::complete_workflow_activation] and [Worker::complete_activity_task] for those
/// workflows & activities until they are done. At that point, the lang SDK can end the process,
/// or drop the [Worker] instance via [Worker::finalize_shutdown], which will close the
/// connection and free resources. If you have set [WorkerConfig::no_remote_activities], you may
/// skip calling [Worker::poll_activity_task].
///
/// Lang implementations should use [Worker::initiate_shutdown] followed by
/// [Worker::finalize_shutdown].
async fn shutdown(&self);
/// Completes shutdown and frees all resources. You should avoid simply dropping workers, as
/// this does not allow async tasks to report any panics that may have occurred cleanly.
///
/// This should be called only after [Worker::shutdown] has resolved and/or both polling
/// functions have returned `ShutDown` errors.
async fn finalize_shutdown(self);
/// Unique identifier for this worker instance.
/// This must be stable across the worker's lifetime and unique per instance.
fn worker_instance_key(&self) -> Uuid;
}
#[async_trait::async_trait]
impl<W> Worker for Arc<W>
where
W: Worker + ?Sized,
{
async fn validate(&self) -> Result<(), WorkerValidationError> {
(**self).validate().await
}
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError> {
(**self).poll_workflow_activation().await
}
async fn poll_activity_task(&self) -> Result<ActivityTask, PollError> {
(**self).poll_activity_task().await
}
async fn poll_nexus_task(&self) -> Result<NexusTask, PollError> {
(**self).poll_nexus_task().await
}
async fn complete_workflow_activation(
&self,
completion: WorkflowActivationCompletion,
) -> Result<(), CompleteWfError> {
(**self).complete_workflow_activation(completion).await
}
async fn complete_activity_task(
&self,
completion: ActivityTaskCompletion,
) -> Result<(), CompleteActivityError> {
(**self).complete_activity_task(completion).await
}
async fn complete_nexus_task(
&self,
completion: NexusTaskCompletion,
) -> Result<(), CompleteNexusError> {
(**self).complete_nexus_task(completion).await
}
fn record_activity_heartbeat(&self, details: ActivityHeartbeat) {
(**self).record_activity_heartbeat(details)
}
fn request_workflow_eviction(&self, run_id: &str) {
(**self).request_workflow_eviction(run_id)
}
fn get_config(&self) -> &WorkerConfig {
(**self).get_config()
}
fn initiate_shutdown(&self) {
(**self).initiate_shutdown()
}
async fn shutdown(&self) {
(**self).shutdown().await
}
async fn finalize_shutdown(self) {
panic!("Can't finalize shutdown on Arc'd worker")
}
fn worker_instance_key(&self) -> Uuid {
(**self).worker_instance_key()
}
}
macro_rules! dbg_panic {
($($arg:tt)*) => {
use tracing::error;
error!($($arg)*);
debug_assert!(false, $($arg)*);
};
}
pub(crate) use dbg_panic;