forked from temporalio/sdk-rust
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlib.rs
More file actions
323 lines (297 loc) · 11.4 KB
/
lib.rs
File metadata and controls
323 lines (297 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
#![warn(missing_docs)] // error if there are missing docs
#![allow(clippy::upper_case_acronyms)]
//! This crate provides a basis for creating new Temporal SDKs without completely starting from
//! scratch. APIs provided by this crate are not considered stable and may break at any time.
//!
//! If you are looking for the Temporal Rust SDK, please use `temporalio-sdk`.
#[cfg(test)]
#[macro_use]
pub extern crate assert_matches;
#[macro_use]
extern crate tracing;
extern crate core;
mod abstractions;
#[cfg(feature = "antithesis_assertions")]
mod antithesis;
#[cfg(feature = "debug-plugin")]
pub mod debug_client;
#[cfg(feature = "ephemeral-server")]
pub mod ephemeral_server;
mod internal_flags;
mod pollers;
mod protosext;
pub mod replay;
pub(crate) mod retry_logic;
pub mod telemetry;
mod worker;
#[cfg(test)]
mod core_tests;
#[cfg(any(feature = "test-utilities", test))]
#[macro_use]
pub mod test_help;
pub use crate::worker::client::{
PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient, WorkflowTaskCompletion,
};
pub use pollers::{Client, ClientOptions, ClientTlsOptions, RetryOptions, TlsOptions};
pub use temporalio_common::protos::TaskToken;
pub use url::Url;
pub use worker::{
ActivitySlotKind, CompleteActivityError, CompleteNexusError, CompleteWfError,
FixedSizeSlotSupplier, LocalActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollError,
PollerBehavior, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder,
ResourceBasedTuner, ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType,
SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
SlotSupplierOptions, SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions,
TunerHolderOptionsBuilder, Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner,
WorkerValidationError, WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
};
use crate::{
replay::{HistoryForReplay, ReplayWorkerInput},
telemetry::metrics::MetricsContext,
worker::client::WorkerClientBag,
};
use anyhow::bail;
use futures_util::Stream;
use std::{sync::Arc, time::Duration};
use temporalio_client::{Connection, SharedReplaceableClient};
use temporalio_common::{
protos::coresdk::ActivityHeartbeat,
telemetry::{
TelemetryInstance, TelemetryOptions, remove_trace_subscriber_for_current_thread,
set_trace_subscriber_for_current_thread, telemetry_init,
},
};
/// Initialize a worker bound to a task queue.
///
/// You will need to have already initialized a [CoreRuntime] which will be used for this worker.
/// After the worker is initialized, you should use [CoreRuntime::tokio_handle] to run the worker's
/// async functions.
///
/// Lang implementations must pass in a [Client] When they do so, this function will always
/// overwrite the client retry configuration, force the client to use the namespace defined in the
/// worker config, and set the client identity appropriately.
pub fn init_worker(
runtime: &CoreRuntime,
worker_config: WorkerConfig,
mut connection: Connection,
) -> Result<Worker, anyhow::Error> {
let namespace = worker_config.namespace.clone();
if namespace.is_empty() {
bail!("Worker namespace cannot be empty");
}
*connection.retry_options_mut() = RetryOptions::default();
init_worker_client(
&mut connection,
worker_config.client_identity_override.clone(),
);
let client = SharedReplaceableClient::new(connection);
let client_ident = client.inner_cow().identity().to_owned();
if client_ident.is_empty() {
bail!("Client identity cannot be empty. Either lang or user should be setting this value");
}
let sticky_q = sticky_q_name_for_worker(&client_ident, worker_config.max_cached_workflows);
let worker_instance_key = uuid::Uuid::new_v4();
let client_bag = Arc::new(WorkerClientBag::new(
client,
namespace.clone(),
worker_config.versioning_strategy.clone(),
worker_instance_key,
));
Worker::new(
worker_config.clone(),
sticky_q,
client_bag.clone(),
Some(&runtime.telemetry),
runtime.heartbeat_interval,
)
}
/// Create a worker for replaying one or more existing histories. It will auto-shutdown as soon as
/// all histories have finished being replayed.
///
/// You do not necessarily need a [CoreRuntime] for replay workers, but it's advisable to create
/// one and use it to run the replay worker's async functions the same way you would for a normal
/// worker.
pub fn init_replay_worker<I>(rwi: ReplayWorkerInput<I>) -> Result<Worker, anyhow::Error>
where
I: Stream<Item = HistoryForReplay> + Send + 'static,
{
info!(
task_queue = rwi.config.task_queue.as_str(),
"Registering replay worker"
);
rwi.into_core_worker()
}
pub(crate) fn init_worker_client(
connection: &mut Connection,
client_identity_override: Option<String>,
) {
if let Some(ref id_override) = client_identity_override {
connection.identity_mut().clone_from(id_override);
}
}
/// Creates a unique sticky queue name for a worker, iff the config allows for 1 or more cached
/// workflows.
pub(crate) fn sticky_q_name_for_worker(
process_identity: &str,
max_cached_workflows: usize,
) -> Option<String> {
if max_cached_workflows > 0 {
Some(format!(
"{}-{}",
&process_identity,
uuid::Uuid::new_v4().simple()
))
} else {
None
}
}
/// Holds shared state/components needed to back instances of workers and clients. More than one
/// may be instantiated, but typically only one is needed. More than one runtime instance may be
/// useful if multiple different telemetry settings are required.
pub struct CoreRuntime {
telemetry: TelemetryInstance,
runtime: Option<tokio::runtime::Runtime>,
runtime_handle: tokio::runtime::Handle,
heartbeat_interval: Option<Duration>,
}
/// Holds telemetry options, as well as worker heartbeat_interval. Construct with
/// [RuntimeOptions::builder]
#[derive(Default, bon::Builder)]
#[builder(finish_fn(vis = "", name = build_internal))]
#[non_exhaustive]
pub struct RuntimeOptions {
/// Telemetry configuration options.
#[builder(default)]
telemetry_options: TelemetryOptions,
/// Optional worker heartbeat interval - This configures the heartbeat setting of all
/// workers created using this runtime.
///
/// Interval must be between 1s and 60s, inclusive.
#[builder(required, default = Some(Duration::from_secs(60)))]
heartbeat_interval: Option<Duration>,
}
impl<S: runtime_options_builder::State> RuntimeOptionsBuilder<S> {
/// Builds the RuntimeOptions
///
/// # Errors
/// Returns an error if heartbeat_interval is set but not between 1s and 60s inclusive.
pub fn build(self) -> Result<RuntimeOptions, String> {
let options = self.build_internal();
{
if let Some(interval) = options.heartbeat_interval
&& (interval < Duration::from_secs(1) || interval > Duration::from_secs(60))
{
return Err(format!(
"heartbeat_interval ({interval:?}) must be between 1s and 60s",
));
}
Ok(options)
}
}
}
/// Wraps a [tokio::runtime::Builder] to allow layering multiple on_thread_start functions
pub struct TokioRuntimeBuilder<F> {
/// The underlying tokio runtime builder
pub inner: tokio::runtime::Builder,
/// A function to be called when setting the runtime builder's on thread start
pub lang_on_thread_start: Option<F>,
}
impl Default for TokioRuntimeBuilder<Box<dyn Fn() + Send + Sync>> {
fn default() -> Self {
TokioRuntimeBuilder {
inner: tokio::runtime::Builder::new_multi_thread(),
lang_on_thread_start: None,
}
}
}
impl CoreRuntime {
/// Create a new core runtime with the provided telemetry options and tokio runtime builder.
/// Also initialize telemetry for the thread this is being called on.
///
/// Note that this function will call the [tokio::runtime::Builder::enable_all] builder option
/// on the Tokio runtime builder, and will call [tokio::runtime::Builder::on_thread_start] to
/// ensure telemetry subscribers are set on every tokio thread.
///
/// **Important**: You need to call this *before* calling any async functions on workers or
/// clients, otherwise the tracing subscribers will not be properly attached.
///
/// # Panics
/// If a tokio runtime has already been initialized. To re-use an existing runtime, call
/// [CoreRuntime::new_assume_tokio].
pub fn new<F>(
runtime_options: RuntimeOptions,
mut tokio_builder: TokioRuntimeBuilder<F>,
) -> Result<Self, anyhow::Error>
where
F: Fn() + Send + Sync + 'static,
{
let telemetry = telemetry_init(runtime_options.telemetry_options)?;
let subscriber = telemetry.trace_subscriber();
let runtime = tokio_builder
.inner
.enable_all()
.on_thread_start(move || {
if let Some(sub) = subscriber.as_ref() {
set_trace_subscriber_for_current_thread(sub.clone());
}
if let Some(lang_on_thread_start) = tokio_builder.lang_on_thread_start.as_ref() {
lang_on_thread_start();
}
})
.build()?;
let _rg = runtime.enter();
let mut me =
Self::new_assume_tokio_initialized_telem(telemetry, runtime_options.heartbeat_interval);
me.runtime = Some(runtime);
Ok(me)
}
/// Initialize telemetry for the thread this is being called on, assuming a tokio runtime is
/// already active and this call exists in its context. See [Self::new] for more.
///
/// # Panics
/// If there is no currently active Tokio runtime
pub fn new_assume_tokio(runtime_options: RuntimeOptions) -> Result<Self, anyhow::Error> {
let telemetry = telemetry_init(runtime_options.telemetry_options)?;
Ok(Self::new_assume_tokio_initialized_telem(
telemetry,
runtime_options.heartbeat_interval,
))
}
/// Construct a runtime from an already-initialized telemetry instance, assuming a tokio runtime
/// is already active and this call exists in its context. See [Self::new] for more.
///
/// # Panics
/// If there is no currently active Tokio runtime
pub fn new_assume_tokio_initialized_telem(
telemetry: TelemetryInstance,
heartbeat_interval: Option<Duration>,
) -> Self {
let runtime_handle = tokio::runtime::Handle::current();
if let Some(sub) = telemetry.trace_subscriber() {
set_trace_subscriber_for_current_thread(sub);
}
Self {
telemetry,
runtime: None,
runtime_handle,
heartbeat_interval,
}
}
/// Get a handle to the tokio runtime used by this Core runtime.
pub fn tokio_handle(&self) -> tokio::runtime::Handle {
self.runtime_handle.clone()
}
/// Return a reference to the owned [TelemetryInstance]
pub fn telemetry(&self) -> &TelemetryInstance {
&self.telemetry
}
/// Return a mutable reference to the owned [TelemetryInstance]
pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
&mut self.telemetry
}
}
impl Drop for CoreRuntime {
fn drop(&mut self) {
remove_trace_subscriber_for_current_thread();
}
}