Skip to content

Commit 1292c22

Browse files
committed
feat(workflow): Add support for failing workflow execution on certain error types
1 parent 9b3ec22 commit 1292c22

4 files changed

Lines changed: 111 additions & 8 deletions

File tree

packages/core-bridge/src/helpers/try_from_js.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{collections::HashMap, net::SocketAddr, time::Duration};
1+
use std::{
2+
collections::{HashMap, HashSet},
3+
net::SocketAddr,
4+
time::Duration,
5+
};
26

37
use neon::{
48
handle::Handle,
@@ -163,6 +167,24 @@ impl<T: TryFromJs> TryFromJs for Vec<T> {
163167
}
164168
}
165169

170+
#[allow(clippy::implicit_hasher)]
171+
impl<T: TryFromJs + std::hash::Hash + Eq> TryFromJs for HashSet<T> {
172+
fn try_from_js<'cx, 'b>(
173+
cx: &mut impl Context<'cx>,
174+
js_value: Handle<'b, JsValue>,
175+
) -> BridgeResult<Self> {
176+
let array = js_value.downcast::<JsArray, _>(cx)?;
177+
let len = array.len(cx);
178+
let mut result = Self::with_capacity(len as usize);
179+
180+
for i in 0..len {
181+
let value = array.get_value(cx, i)?;
182+
result.insert(T::try_from_js(cx, value)?);
183+
}
184+
Ok(result)
185+
}
186+
}
187+
166188
#[allow(clippy::implicit_hasher)]
167189
impl<T: TryFromJs> TryFromJs for HashMap<String, T> {
168190
fn try_from_js<'cx, 'b>(

packages/core-bridge/src/worker.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -404,16 +404,23 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {}
404404
////////////////////////////////////////////////////////////////////////////////////////////////////
405405

406406
mod config {
407-
use std::{sync::Arc, time::Duration};
407+
use std::{
408+
collections::{HashMap, HashSet},
409+
sync::Arc,
410+
time::Duration,
411+
};
408412

409413
use temporal_sdk_core::{
410414
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
411415
SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder,
412-
api::worker::{
413-
ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior,
414-
SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError,
415-
WorkerDeploymentOptions as CoreWorkerDeploymentOptions,
416-
WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind,
416+
api::{
417+
errors::WorkflowErrorType as CoreWorkflowErrorType,
418+
worker::{
419+
ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior,
420+
SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError,
421+
WorkerDeploymentOptions as CoreWorkerDeploymentOptions,
422+
WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind,
423+
},
417424
},
418425
protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior,
419426
};
@@ -447,6 +454,8 @@ mod config {
447454
max_activities_per_second: Option<f64>,
448455
max_task_queue_activities_per_second: Option<f64>,
449456
shutdown_grace_time: Option<Duration>,
457+
workflow_failure_errors: HashSet<WorkflowErrorType>,
458+
workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
450459
}
451460

452461
#[derive(TryFromJs)]
@@ -513,6 +522,10 @@ mod config {
513522
.max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
514523
.max_worker_activities_per_second(self.max_activities_per_second)
515524
.graceful_shutdown_period(self.shutdown_grace_time)
525+
.workflow_failure_errors(into_core_workflow_error_set(self.workflow_failure_errors))
526+
.workflow_types_to_failure_errors(into_core_workflow_error_map_of_sets(
527+
self.workflow_types_to_failure_errors,
528+
))
516529
.build()
517530
}
518531
}
@@ -584,6 +597,33 @@ mod config {
584597
}
585598
}
586599

600+
#[derive(TryFromJs, Hash, Eq, PartialEq)]
601+
pub enum WorkflowErrorType {
602+
Nondeterminism,
603+
}
604+
605+
impl From<WorkflowErrorType> for CoreWorkflowErrorType {
606+
fn from(val: WorkflowErrorType) -> Self {
607+
match val {
608+
WorkflowErrorType::Nondeterminism => Self::Nondeterminism,
609+
}
610+
}
611+
}
612+
613+
fn into_core_workflow_error_set(
614+
val: HashSet<WorkflowErrorType>,
615+
) -> HashSet<CoreWorkflowErrorType> {
616+
val.into_iter().map(Into::into).collect()
617+
}
618+
619+
fn into_core_workflow_error_map_of_sets(
620+
val: HashMap<String, HashSet<WorkflowErrorType>>,
621+
) -> HashMap<String, HashSet<CoreWorkflowErrorType>> {
622+
val.into_iter()
623+
.map(|(k, v)| (k, into_core_workflow_error_set(v)))
624+
.collect()
625+
}
626+
587627
#[derive(TryFromJs)]
588628
#[allow(clippy::struct_field_names)]
589629
pub(super) struct WorkerTuner {

packages/core-bridge/ts/native.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ export interface WorkerOptions {
200200
maxTaskQueueActivitiesPerSecond: Option<number>;
201201
maxActivitiesPerSecond: Option<number>;
202202
shutdownGraceTime: number;
203+
workflowFailureErrors: WorkflowErrorType[];
204+
workflowTypesToFailureErrors: Record<string, WorkflowErrorType[]>;
203205
}
204206

205207
export type PollerBehavior =
@@ -227,6 +229,8 @@ export type WorkerDeploymentVersion = {
227229

228230
export type VersioningBehavior = { type: 'pinned' } | { type: 'auto-upgrade' };
229231

232+
export type WorkflowErrorType = { type: 'nondeterminism' };
233+
230234
////////////////////////////////////////////////////////////////////////////////////////////////////
231235
// Worker Tuner
232236
////////////////////////////////////////////////////////////////////////////////////////////////////

packages/worker/src/worker-options.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,23 @@ export interface WorkerOptions {
474474
*/
475475
sinks?: InjectedSinks<any>;
476476

477+
/**
478+
* The types of exceptions that, if a Workflow-thrown error extends, will cause the Workflow
479+
* Execution or the Update to fail instead of suspending the Workflow via task failure.
480+
*
481+
* This property expects a record of Workflow-type names to the list of error types that will
482+
* cause that type of Workflow to fail. Uses the `'*'` key to specify a list of error types that
483+
* applies to all Workflow types.
484+
*
485+
* If either list of error types includes `NondeterminismError`, then non-determinism errors
486+
* will cause the Workflow Excution to fail. If the list of error types includes `Error`, it
487+
* effectively will fail a workflow/update in all user exception cases, including non-determinism
488+
* errors.
489+
*
490+
* @experimental
491+
*/
492+
workflowTypesToFailureErrors?: Record<'*' | string, (string | 'NondeterminismError' | 'Error')[]>;
493+
477494
/**
478495
* @deprecated SDK tracing is no longer supported. This option is ignored.
479496
*/
@@ -972,6 +989,24 @@ export function compileWorkerOptions(
972989
}
973990

974991
export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions {
992+
const workflowFailureErrors: native.WorkflowErrorType[] = [];
993+
const workflowTypesToFailureErrors: Record<string, native.WorkflowErrorType[]> = {};
994+
995+
for (const [k, v] of Object.entries(opts.workflowTypesToFailureErrors ?? {})) {
996+
const errorTypes: native.WorkflowErrorType[] = [];
997+
998+
// Core only cares about Non-Determinism Error; other error types are handled by lang side
999+
if (v.includes('NondeterminismError') || v.includes('Error')) {
1000+
errorTypes.push({ type: 'nondeterminism' });
1001+
}
1002+
1003+
if (k === '*') {
1004+
workflowFailureErrors.push(...errorTypes);
1005+
} else {
1006+
workflowTypesToFailureErrors[k] = errorTypes;
1007+
}
1008+
}
1009+
9751010
return {
9761011
identity: opts.identity,
9771012
buildId: opts.buildId, // eslint-disable-line deprecation/deprecation
@@ -983,14 +1018,16 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n
9831018
nonStickyToStickyPollRatio: opts.nonStickyToStickyPollRatio,
9841019
workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior),
9851020
activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior),
986-
enableNonLocalActivities: opts.enableNonLocalActivities,
1021+
enableNonLocalActivities: opts.enableNonLocalActivities && opts.activities.size > 0,
9871022
stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout),
9881023
maxCachedWorkflows: opts.maxCachedWorkflows,
9891024
maxHeartbeatThrottleInterval: msToNumber(opts.maxHeartbeatThrottleInterval),
9901025
defaultHeartbeatThrottleInterval: msToNumber(opts.defaultHeartbeatThrottleInterval),
9911026
maxTaskQueueActivitiesPerSecond: opts.maxTaskQueueActivitiesPerSecond ?? null,
9921027
maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null,
9931028
shutdownGraceTime: msToNumber(opts.shutdownGraceTime),
1029+
workflowFailureErrors,
1030+
workflowTypesToFailureErrors,
9941031
};
9951032
}
9961033

0 commit comments

Comments
 (0)