Skip to content

Commit fb242f6

Browse files
committed
feat(workflow): Add support for failing workflow execution on certain error types
1 parent f465681 commit fb242f6

4 files changed

Lines changed: 107 additions & 15 deletions

File tree

packages/core-bridge/src/helpers/try_from_js.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{collections::HashMap, net::SocketAddr, time::Duration};
1+
use std::{
2+
collections::{HashMap, HashSet},
3+
net::SocketAddr,
4+
time::Duration,
5+
};
26

37
use neon::{
48
handle::Handle,
@@ -163,6 +167,24 @@ impl<T: TryFromJs> TryFromJs for Vec<T> {
163167
}
164168
}
165169

170+
#[allow(clippy::implicit_hasher)]
171+
impl<T: TryFromJs + std::hash::Hash + Eq> TryFromJs for HashSet<T> {
172+
fn try_from_js<'cx, 'b>(
173+
cx: &mut impl Context<'cx>,
174+
js_value: Handle<'b, JsValue>,
175+
) -> BridgeResult<Self> {
176+
let array = js_value.downcast::<JsArray, _>(cx)?;
177+
let len = array.len(cx);
178+
let mut result = Self::with_capacity(len as usize);
179+
180+
for i in 0..len {
181+
let value = array.get_value(cx, i)?;
182+
result.insert(T::try_from_js(cx, value)?);
183+
}
184+
Ok(result)
185+
}
186+
}
187+
166188
#[allow(clippy::implicit_hasher)]
167189
impl<T: TryFromJs> TryFromJs for HashMap<String, T> {
168190
fn try_from_js<'cx, 'b>(

packages/core-bridge/src/worker.rs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -469,29 +469,24 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {}
469469
////////////////////////////////////////////////////////////////////////////////////////////////////
470470

471471
mod config {
472-
use std::collections::HashSet;
472+
use super::custom_slot_supplier::CustomSlotSupplierOptions;
473+
use crate::helpers::TryIntoJs;
474+
use bridge_macros::TryFromJs;
475+
use neon::context::Context;
476+
use neon::object::Object;
477+
use neon::prelude::JsResult;
478+
use neon::types::JsObject;
479+
use std::collections::{HashMap, HashSet};
473480
use std::{sync::Arc, time::Duration};
474481
use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior;
475482
use temporalio_common::protos::temporal::api::worker::v1::PluginInfo;
483+
use temporalio_common::worker::WorkerVersioningStrategy;
476484
use temporalio_common::worker::{
477485
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind,
478486
PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder,
479487
WorkerConfigBuilderError, WorkerDeploymentOptions as CoreWorkerDeploymentOptions,
480488
WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind,
481489
};
482-
use temporalio_sdk_core::{
483-
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
484-
SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder,
485-
};
486-
487-
use super::custom_slot_supplier::CustomSlotSupplierOptions;
488-
use crate::helpers::TryIntoJs;
489-
use bridge_macros::TryFromJs;
490-
use neon::context::Context;
491-
use neon::object::Object;
492-
use neon::prelude::JsResult;
493-
use neon::types::JsObject;
494-
use temporalio_common::worker::WorkerVersioningStrategy;
495490

496491
#[derive(TryFromJs)]
497492
pub struct BridgeWorkerOptions {
@@ -515,6 +510,8 @@ mod config {
515510
max_task_queue_activities_per_second: Option<f64>,
516511
shutdown_grace_time: Option<Duration>,
517512
plugins: Vec<String>,
513+
workflow_failure_errors: HashSet<WorkflowErrorType>,
514+
workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
518515
}
519516

520517
#[derive(TryFromJs)]
@@ -611,6 +608,10 @@ mod config {
611608
})
612609
.collect::<HashSet<_>>(),
613610
)
611+
.workflow_failure_errors(into_core_workflow_error_set(self.workflow_failure_errors))
612+
.workflow_types_to_failure_errors(into_core_workflow_error_map_of_sets(
613+
self.workflow_types_to_failure_errors,
614+
))
614615
.build()
615616
}
616617
}
@@ -682,6 +683,33 @@ mod config {
682683
}
683684
}
684685

686+
#[derive(TryFromJs, Hash, Eq, PartialEq)]
687+
pub enum WorkflowErrorType {
688+
Nondeterminism,
689+
}
690+
691+
impl From<WorkflowErrorType> for CoreWorkflowErrorType {
692+
fn from(val: WorkflowErrorType) -> Self {
693+
match val {
694+
WorkflowErrorType::Nondeterminism => Self::Nondeterminism,
695+
}
696+
}
697+
}
698+
699+
fn into_core_workflow_error_set(
700+
val: HashSet<WorkflowErrorType>,
701+
) -> HashSet<CoreWorkflowErrorType> {
702+
val.into_iter().map(Into::into).collect()
703+
}
704+
705+
fn into_core_workflow_error_map_of_sets(
706+
val: HashMap<String, HashSet<WorkflowErrorType>>,
707+
) -> HashMap<String, HashSet<CoreWorkflowErrorType>> {
708+
val.into_iter()
709+
.map(|(k, v)| (k, into_core_workflow_error_set(v)))
710+
.collect()
711+
}
712+
685713
#[derive(TryFromJs)]
686714
#[allow(clippy::struct_field_names)]
687715
pub(super) struct WorkerTuner {

packages/core-bridge/ts/native.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ export interface WorkerOptions {
228228
maxActivitiesPerSecond: Option<number>;
229229
shutdownGraceTime: number;
230230
plugins: string[];
231+
workflowFailureErrors: WorkflowErrorType[];
232+
workflowTypesToFailureErrors: Record<string, WorkflowErrorType[]>;
231233
}
232234

233235
export type PollerBehavior =
@@ -255,6 +257,8 @@ export type WorkerDeploymentVersion = {
255257

256258
export type VersioningBehavior = { type: 'pinned' } | { type: 'auto-upgrade' };
257259

260+
export type WorkflowErrorType = { type: 'nondeterminism' };
261+
258262
////////////////////////////////////////////////////////////////////////////////////////////////////
259263
// Worker Tuner
260264
////////////////////////////////////////////////////////////////////////////////////////////////////

packages/worker/src/worker-options.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,23 @@ export interface WorkerOptions {
532532
*/
533533
sinks?: InjectedSinks<any>;
534534

535+
/**
536+
* The types of exceptions that, if a Workflow-thrown error extends, will cause the Workflow
537+
* Execution or the Update to fail instead of suspending the Workflow via task failure.
538+
*
539+
* This property expects a record of Workflow-type names to the list of error types that will
540+
* cause that type of Workflow to fail. Uses the `'*'` key to specify a list of error types that
541+
* applies to all Workflow types.
542+
*
543+
* If either list of error types includes `NondeterminismError`, then non-determinism errors
544+
* will cause the Workflow Excution to fail. If the list of error types includes `Error`, it
545+
* effectively will fail a workflow/update in all user exception cases, including non-determinism
546+
* errors.
547+
*
548+
* @experimental
549+
*/
550+
workflowTypesToFailureErrors?: Record<'*' | string, (string | 'NondeterminismError' | 'Error')[]>;
551+
535552
/**
536553
* @deprecated SDK tracing is no longer supported. This option is ignored.
537554
*/
@@ -1080,6 +1097,25 @@ function nexusServiceRegistryFromOptions(opts: WorkerOptions): nexus.ServiceRegi
10801097
export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions {
10811098
const enableWorkflows = opts.workflowBundle !== undefined || opts.workflowsPath !== undefined;
10821099
const enableLocalActivities = enableWorkflows && opts.activities.size > 0;
1100+
1101+
const workflowFailureErrors: native.WorkflowErrorType[] = [];
1102+
const workflowTypesToFailureErrors: Record<string, native.WorkflowErrorType[]> = {};
1103+
1104+
for (const [k, v] of Object.entries(opts.workflowTypesToFailureErrors ?? {})) {
1105+
const errorTypes: native.WorkflowErrorType[] = [];
1106+
1107+
// Core only cares about Non-Determinism Error; other error types are handled by lang side
1108+
if (v.includes('NondeterminismError') || v.includes('Error')) {
1109+
errorTypes.push({ type: 'nondeterminism' });
1110+
}
1111+
1112+
if (k === '*') {
1113+
workflowFailureErrors.push(...errorTypes);
1114+
} else {
1115+
workflowTypesToFailureErrors[k] = errorTypes;
1116+
}
1117+
}
1118+
10831119
return {
10841120
identity: opts.identity,
10851121
buildId: opts.buildId, // eslint-disable-line deprecation/deprecation
@@ -1106,6 +1142,8 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n
11061142
maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null,
11071143
shutdownGraceTime: msToNumber(opts.shutdownGraceTime),
11081144
plugins: opts.plugins?.map((p) => p.name) ?? [],
1145+
workflowFailureErrors,
1146+
workflowTypesToFailureErrors,
11091147
};
11101148
}
11111149

0 commit comments

Comments
 (0)