-
Notifications
You must be signed in to change notification settings - Fork 175
feat(nexus): Initial Nexus support #1708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
01502c7
0490e7b
be83b0d
2150a48
3dcc150
5e4a5c3
d29dee1
29e6c12
244cbbb
0e6dad1
64cee58
ab2645a
a627bfd
69d01c0
ebf1311
9ed2ddf
79c9303
7a836da
64aee0a
386fac6
86d6575
a442b2c
35ef1b9
d90ef24
40d6b52
0728ae2
9073514
5b9e50e
ae5b2fe
4822548
90e20a0
9a18b02
3bfcc85
d6af5a7
bf4f774
c5d1f51
6dae17d
9885140
c637d02
49848e8
30b35c6
e0212df
72e805e
0f96a5c
a3a17e6
615ed56
3972dba
048cf90
8d18e0d
95c1838
65a7913
a6f3585
d27fbbb
d17bbe6
90d03c0
0bba211
999f6d6
deb7d43
88b9d77
622dd4c
207f1a8
1177be3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| import { temporal } from '@temporalio/proto'; | ||
|
|
||
| // A key used internally to pass "hidden options to the WorkflowClient.start() call. | ||
| export const InternalWorkflowStartOptionsKey = Symbol.for('__temporal_client_internal_workflow_start_options'); | ||
| export interface InternalWorkflowStartOptions { | ||
| requestId?: string; | ||
| /** | ||
| * Callbacks to be called by the server when this workflow reaches a terminal state. | ||
| * If the workflow continues-as-new, these callbacks will be carried over to the new execution. | ||
| * Callback addresses must be whitelisted in the server's dynamic configuration. | ||
| */ | ||
| completionCallbacks?: temporal.api.common.v1.ICallback[]; | ||
| /** Links to be associated with the workflow. */ | ||
| links?: temporal.api.common.v1.ILink[]; | ||
| /** | ||
| * Backlink copied by the client from the StartWorkflowExecutionResponse. Only populated in servers newer than 1.27. | ||
| */ | ||
| backLink?: temporal.api.common.v1.ILink; | ||
|
|
||
| /** | ||
| * Conflict options for when USE_EXISTING is specified. | ||
| * | ||
| * Used by the nexus WorkflowRunOperations to attach to a callback to a running workflow. | ||
| */ | ||
| onConflictOptions?: temporal.api.workflow.v1.IOnConflictOptions; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,6 @@ | ||
| import * as nexus from 'nexus-rpc'; | ||
| import Long from 'long'; | ||
| import type { temporal } from '@temporalio/proto'; | ||
| import { | ||
| ActivityFailure, | ||
| ApplicationFailure, | ||
|
|
@@ -8,16 +11,42 @@ import { | |
| encodeRetryState, | ||
| encodeTimeoutType, | ||
| FAILURE_SOURCE, | ||
| NexusOperationFailure, | ||
| ProtoFailure, | ||
| ServerFailure, | ||
| TemporalFailure, | ||
| TerminatedFailure, | ||
| TimeoutFailure, | ||
| } from '../failure'; | ||
| import { makeProtoEnumConverters } from '../internal-workflow'; | ||
| import { isError } from '../type-helpers'; | ||
| import { msOptionalToTs } from '../time'; | ||
| import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from './payload-converter'; | ||
|
|
||
| // Can't import enums into the workflow sandbox, use this helper type and enum converter instead. | ||
| const NexusHandlerErrorRetryBehavior = { | ||
| RETRYABLE: 'RETRYABLE', | ||
| NON_RETRYABLE: 'NON_RETRYABLE', | ||
| } as const; | ||
|
|
||
| type NexusHandlerErrorRetryBehavior = | ||
| (typeof NexusHandlerErrorRetryBehavior)[keyof typeof NexusHandlerErrorRetryBehavior]; | ||
|
|
||
| const [encodeNexusHandlerErrorRetryBehavior, decodeNexusHandlerErrorRetryBehavior] = makeProtoEnumConverters< | ||
| temporal.api.enums.v1.NexusHandlerErrorRetryBehavior, | ||
| typeof temporal.api.enums.v1.NexusHandlerErrorRetryBehavior, | ||
| keyof typeof temporal.api.enums.v1.NexusHandlerErrorRetryBehavior, | ||
| typeof NexusHandlerErrorRetryBehavior, | ||
| 'NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_' | ||
| >( | ||
| { | ||
| UNSPECIFIED: 0, | ||
| [NexusHandlerErrorRetryBehavior.RETRYABLE]: 1, | ||
| [NexusHandlerErrorRetryBehavior.NON_RETRYABLE]: 2, | ||
| } as const, | ||
| 'NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_' | ||
| ); | ||
|
|
||
| function combineRegExp(...regexps: RegExp[]): RegExp { | ||
| return new RegExp(regexps.map((x) => `(?:${x.source})`).join('|')); | ||
| } | ||
|
|
@@ -28,6 +57,8 @@ function combineRegExp(...regexps: RegExp[]): RegExp { | |
| const CUTOFF_STACK_PATTERNS = combineRegExp( | ||
| /** Activity execution */ | ||
| /\s+at Activity\.execute \(.*[\\/]worker[\\/](?:src|lib)[\\/]activity\.[jt]s:\d+:\d+\)/, | ||
| /** Nexus execution */ | ||
| /\s+at( async)? NexusHandler\.invokeUserCode \(.*[\\/]worker[\\/](?:src|lib)[\\/]nexus\/index\.[jt]s:\d+:\d+\)/, | ||
| /** Workflow activation */ | ||
| /\s+at Activator\.\S+NextHandler \(.*[\\/]workflow[\\/](?:src|lib)[\\/]internals\.[jt]s:\d+:\d+\)/, | ||
| /** Workflow run anything in context */ | ||
|
|
@@ -120,7 +151,7 @@ export class DefaultFailureConverter implements FailureConverter { | |
| * | ||
| * Does not set common properties, that is done in {@link failureToError}. | ||
| */ | ||
| failureToErrorInner(failure: ProtoFailure, payloadConverter: PayloadConverter): TemporalFailure { | ||
| failureToErrorInner(failure: ProtoFailure, payloadConverter: PayloadConverter): Error { | ||
| if (failure.applicationFailureInfo) { | ||
| return new ApplicationFailure( | ||
| failure.message ?? undefined, | ||
|
|
@@ -192,6 +223,38 @@ export class DefaultFailureConverter implements FailureConverter { | |
| this.optionalFailureToOptionalError(failure.cause, payloadConverter) | ||
| ); | ||
| } | ||
| if (failure.nexusHandlerFailureInfo) { | ||
| if (failure.cause == null) { | ||
| throw new TypeError('Missing failure cause on nexusHandlerFailureInfo'); | ||
| } | ||
| let retryable: boolean | undefined = undefined; | ||
| const retryBehavior = decodeNexusHandlerErrorRetryBehavior(failure.nexusHandlerFailureInfo.retryBehavior); | ||
| switch (retryBehavior) { | ||
| case 'RETRYABLE': | ||
| retryable = true; | ||
| break; | ||
| case 'NON_RETRYABLE': | ||
| retryable = false; | ||
| break; | ||
| } | ||
|
|
||
| return new nexus.HandlerError({ | ||
| type: (failure.nexusHandlerFailureInfo.type as nexus.HandlerErrorType) ?? 'INTERNAL', | ||
| cause: this.failureToError(failure.cause, payloadConverter), | ||
| retryable, | ||
| }); | ||
| } | ||
| if (failure.nexusOperationExecutionFailureInfo) { | ||
| return new NexusOperationFailure( | ||
| failure.nexusOperationExecutionFailureInfo.scheduledEventId?.toNumber(), | ||
| // We assume these will always be set or gracefully set to empty strings. | ||
| failure.nexusOperationExecutionFailureInfo.endpoint ?? '', | ||
| failure.nexusOperationExecutionFailureInfo.service ?? '', | ||
| failure.nexusOperationExecutionFailureInfo.operation ?? '', | ||
| failure.nexusOperationExecutionFailureInfo.operationToken ?? undefined, | ||
| this.optionalFailureToOptionalError(failure.cause, payloadConverter) | ||
| ); | ||
| } | ||
| return new TemporalFailure( | ||
| failure.message ?? undefined, | ||
| this.optionalFailureToOptionalError(failure.cause, payloadConverter) | ||
|
|
@@ -216,7 +279,9 @@ export class DefaultFailureConverter implements FailureConverter { | |
| } | ||
| const err = this.failureToErrorInner(failure, payloadConverter); | ||
| err.stack = failure.stackTrace ?? ''; | ||
| err.failure = failure; | ||
| if (err instanceof TemporalFailure) { | ||
| err.failure = failure; | ||
| } | ||
|
Comment on lines
-223
to
+291
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Has the blind
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was simply not possible before. I mean, until now, I'm considering storing the failure on HandlerError anyway, possibly using a private symbol property. But we are revisiting nexus error serialization at the moment, so I'm deferring that decision until we've set our mind. |
||
| return err; | ||
| } | ||
|
|
||
|
|
@@ -232,8 +297,8 @@ export class DefaultFailureConverter implements FailureConverter { | |
| } | ||
|
|
||
| errorToFailureInner(err: unknown, payloadConverter: PayloadConverter): ProtoFailure { | ||
| if (err instanceof TemporalFailure) { | ||
| if (err.failure) return err.failure; | ||
| if (err instanceof TemporalFailure || err instanceof nexus.HandlerError) { | ||
| if (err instanceof TemporalFailure && err.failure) return err.failure; | ||
| const base = { | ||
| message: err.message, | ||
| stackTrace: cutoffStackTrace(err.stack), | ||
|
|
@@ -310,6 +375,34 @@ export class DefaultFailureConverter implements FailureConverter { | |
| terminatedFailureInfo: {}, | ||
| }; | ||
| } | ||
| if (err instanceof nexus.HandlerError) { | ||
| let retryBehavior: temporal.api.enums.v1.NexusHandlerErrorRetryBehavior | undefined = undefined; | ||
| if (err.retryable === true) { | ||
| retryBehavior = encodeNexusHandlerErrorRetryBehavior('RETRYABLE'); | ||
| } else if (err.retryable === false) { | ||
| retryBehavior = encodeNexusHandlerErrorRetryBehavior('NON_RETRYABLE'); | ||
| } | ||
|
|
||
| return { | ||
| ...base, | ||
| nexusHandlerFailureInfo: { | ||
| type: err.type, | ||
| retryBehavior, | ||
| }, | ||
| }; | ||
| } | ||
| if (err instanceof NexusOperationFailure) { | ||
| return { | ||
| ...base, | ||
| nexusOperationExecutionFailureInfo: { | ||
| scheduledEventId: err.scheduledEventId ? Long.fromNumber(err.scheduledEventId) : undefined, | ||
| endpoint: err.endpoint, | ||
| service: err.service, | ||
| operation: err.operation, | ||
| operationToken: err.operationToken, | ||
| }, | ||
| }; | ||
| } | ||
| // Just a TemporalFailure | ||
| return base; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.