Skip to content

Commit 4205e68

Browse files
committed
Bind converters to workflow and activity serialization context
Wire serialization context through the core workflow, worker, and primary client paths so custom payload and failure converters can see which workflow or activity owns each payload. Add focused tests for the main success and failure boundaries. See tests in test-serialization-context.ts and the workflows it uses in test/src/workflows/serialization-context.ts to see the current boundaries where serialization context is implemented.
1 parent d981c4c commit 4205e68

11 files changed

Lines changed: 1225 additions & 83 deletions

File tree

packages/client/src/workflow-client.ts

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import {
2323
encodeWorkflowIdConflictPolicy,
2424
WorkflowIdConflictPolicy,
2525
compilePriority,
26+
LoadedDataConverter,
2627
} from '@temporalio/common';
28+
import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context';
2729
import { encodeUserMetadata } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
2830
import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes';
2931
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
@@ -523,6 +525,14 @@ export class WorkflowClient extends BaseClient {
523525
return this.connection.workflowService;
524526
}
525527

528+
protected dataConverterWithWorkflowContext(workflowId: string): LoadedDataConverter {
529+
return withSerializationContext(this.dataConverter, {
530+
type: 'workflow',
531+
namespace: this.options.namespace,
532+
workflowId,
533+
});
534+
}
535+
526536
protected async _start<T extends Workflow>(
527537
workflowTypeOrFunc: string | T,
528538
options: WorkflowStartOptions<T>,
@@ -789,6 +799,7 @@ export class WorkflowClient extends BaseClient {
789799
runId?: string,
790800
opts?: WorkflowResultOptions
791801
): Promise<WorkflowResultType<T>> {
802+
const dataConverter = this.dataConverterWithWorkflowContext(workflowId);
792803
const followRuns = opts?.followRuns ?? true;
793804
const execution: temporal.api.common.v1.IWorkflowExecution = { workflowId, runId };
794805
const req: GetWorkflowExecutionHistoryRequest = {
@@ -828,7 +839,7 @@ export class WorkflowClient extends BaseClient {
828839
// Note that we can only return one value from our workflow function in JS.
829840
// Ignore any other payloads in result
830841
const [result] = await decodeArrayFromPayloads(
831-
this.dataConverter,
842+
dataConverter,
832843
ev.workflowExecutionCompletedEventAttributes.result?.payloads
833844
);
834845
return result as any;
@@ -841,16 +852,13 @@ export class WorkflowClient extends BaseClient {
841852
const { failure, retryState } = ev.workflowExecutionFailedEventAttributes;
842853
throw new WorkflowFailedError(
843854
'Workflow execution failed',
844-
await decodeOptionalFailureToOptionalError(this.dataConverter, failure),
855+
await decodeOptionalFailureToOptionalError(dataConverter, failure),
845856
decodeRetryState(retryState)
846857
);
847858
} else if (ev.workflowExecutionCanceledEventAttributes) {
848859
const failure = new CancelledFailure(
849860
'Workflow canceled',
850-
await decodeArrayFromPayloads(
851-
this.dataConverter,
852-
ev.workflowExecutionCanceledEventAttributes.details?.payloads
853-
)
861+
await decodeArrayFromPayloads(dataConverter, ev.workflowExecutionCanceledEventAttributes.details?.payloads)
854862
);
855863
failure.stack = '';
856864
throw new WorkflowFailedError('Workflow execution cancelled', failure, RetryState.NON_RETRYABLE_FAILURE);
@@ -937,13 +945,14 @@ export class WorkflowClient extends BaseClient {
937945
* Used as the final function of the query interceptor chain
938946
*/
939947
protected async _queryWorkflowHandler(input: WorkflowQueryInput): Promise<unknown> {
948+
const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!);
940949
const req: temporal.api.workflowservice.v1.IQueryWorkflowRequest = {
941950
queryRejectCondition: input.queryRejectCondition,
942951
namespace: this.options.namespace,
943952
execution: input.workflowExecution,
944953
query: {
945954
queryType: input.queryType,
946-
queryArgs: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
955+
queryArgs: { payloads: await encodeToPayloads(dataConverter, ...input.args) },
947956
header: { fields: input.headers },
948957
},
949958
};
@@ -969,13 +978,14 @@ export class WorkflowClient extends BaseClient {
969978
throw new TypeError('Invalid response from server');
970979
}
971980
// We ignore anything but the first result
972-
return await decodeFromPayloadsAtIndex(this.dataConverter, 0, response.queryResult?.payloads);
981+
return await decodeFromPayloadsAtIndex(dataConverter, 0, response.queryResult?.payloads);
973982
}
974983

975984
protected async _createUpdateWorkflowRequest(
976985
lifecycleStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
977986
input: WorkflowStartUpdateInput
978987
): Promise<temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest> {
988+
const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!);
979989
const updateId = input.options?.updateId ?? uuid4();
980990
return {
981991
namespace: this.options.namespace,
@@ -992,7 +1002,7 @@ export class WorkflowClient extends BaseClient {
9921002
input: {
9931003
header: { fields: input.headers },
9941004
name: input.updateName,
995-
args: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
1005+
args: { payloads: await encodeToPayloads(dataConverter, ...input.args) },
9961006
},
9971007
},
9981008
};
@@ -1136,6 +1146,7 @@ export class WorkflowClient extends BaseClient {
11361146
workflowRunId?: string,
11371147
outcome?: temporal.api.update.v1.IOutcome
11381148
): WorkflowUpdateHandle<Ret> {
1149+
const dataConverter = this.dataConverterWithWorkflowContext(workflowId);
11391150
return {
11401151
updateId,
11411152
workflowId,
@@ -1146,10 +1157,10 @@ export class WorkflowClient extends BaseClient {
11461157
if (completedOutcome.failure) {
11471158
throw new WorkflowUpdateFailedError(
11481159
'Workflow Update failed',
1149-
await decodeOptionalFailureToOptionalError(this.dataConverter, completedOutcome.failure)
1160+
await decodeOptionalFailureToOptionalError(dataConverter, completedOutcome.failure)
11501161
);
11511162
} else {
1152-
return await decodeFromPayloadsAtIndex<Ret>(this.dataConverter, 0, completedOutcome.success?.payloads);
1163+
return await decodeFromPayloadsAtIndex<Ret>(dataConverter, 0, completedOutcome.success?.payloads);
11531164
}
11541165
},
11551166
};
@@ -1190,6 +1201,7 @@ export class WorkflowClient extends BaseClient {
11901201
* Used as the final function of the signal interceptor chain
11911202
*/
11921203
protected async _signalWorkflowHandler(input: WorkflowSignalInput): Promise<void> {
1204+
const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!);
11931205
const req: temporal.api.workflowservice.v1.ISignalWorkflowExecutionRequest = {
11941206
identity: this.options.identity,
11951207
namespace: this.options.namespace,
@@ -1198,7 +1210,7 @@ export class WorkflowClient extends BaseClient {
11981210
// control is unused,
11991211
signalName: input.signalName,
12001212
header: { fields: input.headers },
1201-
input: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
1213+
input: { payloads: await encodeToPayloads(dataConverter, ...input.args) },
12021214
};
12031215
try {
12041216
await this.workflowService.signalWorkflowExecution(req);
@@ -1215,6 +1227,7 @@ export class WorkflowClient extends BaseClient {
12151227
protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise<string> {
12161228
const { identity } = this.options;
12171229
const { options, workflowType, signalName, signalArgs, headers } = input;
1230+
const dataConverter = this.dataConverterWithWorkflowContext(options.workflowId);
12181231
const req: temporal.api.workflowservice.v1.ISignalWithStartWorkflowExecutionRequest = {
12191232
namespace: this.options.namespace,
12201233
identity,
@@ -1223,9 +1236,9 @@ export class WorkflowClient extends BaseClient {
12231236
workflowIdReusePolicy: encodeWorkflowIdReusePolicy(options.workflowIdReusePolicy),
12241237
workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(options.workflowIdConflictPolicy),
12251238
workflowType: { name: workflowType },
1226-
input: { payloads: await encodeToPayloads(this.dataConverter, ...options.args) },
1239+
input: { payloads: await encodeToPayloads(dataConverter, ...options.args) },
12271240
signalName,
1228-
signalInput: { payloads: await encodeToPayloads(this.dataConverter, ...signalArgs) },
1241+
signalInput: { payloads: await encodeToPayloads(dataConverter, ...signalArgs) },
12291242
taskQueue: {
12301243
kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL,
12311244
name: options.taskQueue,
@@ -1235,7 +1248,7 @@ export class WorkflowClient extends BaseClient {
12351248
workflowTaskTimeout: options.workflowTaskTimeout,
12361249
workflowStartDelay: options.startDelay,
12371250
retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined,
1238-
memo: options.memo ? { fields: await encodeMapToPayloads(this.dataConverter, options.memo) } : undefined,
1251+
memo: options.memo ? { fields: await encodeMapToPayloads(dataConverter, options.memo) } : undefined,
12391252
searchAttributes:
12401253
options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated
12411254
? {
@@ -1244,7 +1257,7 @@ export class WorkflowClient extends BaseClient {
12441257
: undefined,
12451258
cronSchedule: options.cronSchedule,
12461259
header: { fields: headers },
1247-
userMetadata: await encodeUserMetadata(this.dataConverter, options.staticSummary, options.staticDetails),
1260+
userMetadata: await encodeUserMetadata(dataConverter, options.staticSummary, options.staticDetails),
12481261
priority: options.priority ? compilePriority(options.priority) : undefined,
12491262
versioningOverride: options.versioningOverride ?? undefined,
12501263
};
@@ -1295,6 +1308,7 @@ export class WorkflowClient extends BaseClient {
12951308
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> {
12961309
const { options: opts, workflowType, headers } = input;
12971310
const { identity, namespace } = this.options;
1311+
const dataConverter = this.dataConverterWithWorkflowContext(opts.workflowId);
12981312
const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol];
12991313
const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol]
13001314
?.supportsEagerStart;
@@ -1314,7 +1328,7 @@ export class WorkflowClient extends BaseClient {
13141328
workflowIdReusePolicy: encodeWorkflowIdReusePolicy(opts.workflowIdReusePolicy),
13151329
workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(opts.workflowIdConflictPolicy),
13161330
workflowType: { name: workflowType },
1317-
input: { payloads: await encodeToPayloads(this.dataConverter, ...opts.args) },
1331+
input: { payloads: await encodeToPayloads(dataConverter, ...opts.args) },
13181332
taskQueue: {
13191333
kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL,
13201334
name: opts.taskQueue,
@@ -1324,7 +1338,7 @@ export class WorkflowClient extends BaseClient {
13241338
workflowTaskTimeout: opts.workflowTaskTimeout,
13251339
workflowStartDelay: opts.startDelay,
13261340
retryPolicy: opts.retry ? compileRetryPolicy(opts.retry) : undefined,
1327-
memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined,
1341+
memo: opts.memo ? { fields: await encodeMapToPayloads(dataConverter, opts.memo) } : undefined,
13281342
searchAttributes:
13291343
opts.searchAttributes || opts.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated
13301344
? {
@@ -1333,7 +1347,7 @@ export class WorkflowClient extends BaseClient {
13331347
: undefined,
13341348
cronSchedule: opts.cronSchedule,
13351349
header: { fields: headers },
1336-
userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails),
1350+
userMetadata: await encodeUserMetadata(dataConverter, opts.staticSummary, opts.staticDetails),
13371351
priority: opts.priority ? compilePriority(opts.priority) : undefined,
13381352
versioningOverride: opts.versioningOverride ?? undefined,
13391353
requestEagerExecution: opts.requestEagerStart,
@@ -1349,12 +1363,13 @@ export class WorkflowClient extends BaseClient {
13491363
protected async _terminateWorkflowHandler(
13501364
input: WorkflowTerminateInput
13511365
): Promise<TerminateWorkflowExecutionResponse> {
1366+
const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!);
13521367
const req: temporal.api.workflowservice.v1.ITerminateWorkflowExecutionRequest = {
13531368
namespace: this.options.namespace,
13541369
identity: this.options.identity,
13551370
...input,
13561371
details: {
1357-
payloads: input.details ? await encodeToPayloads(this.dataConverter, ...input.details) : undefined,
1372+
payloads: input.details ? await encodeToPayloads(dataConverter, ...input.details) : undefined,
13581373
},
13591374
firstExecutionRunId: input.firstExecutionRunId,
13601375
};

packages/common/src/converter/serialization-context.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ export function withFailureConverterContext(
8282
/**
8383
* Return a loaded data converter with its payload and failure converters bound to `context`.
8484
*
85-
* Payload codecs are intentionally left unchanged in this PR and will be handled separately.
86-
*
8785
* Internal helper for non-workflow code paths. Workflow-isolate code should bind the individual
8886
* payload or failure converter directly to avoid pulling unnecessary code into the workflow bundle.
87+
*
88+
* NOTE: this does *not* bind `context` to payload codecs
8989
*/
9090
export function withSerializationContext(
9191
converter: LoadedDataConverter,
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { activityInfo, heartbeat } from '@temporalio/activity';
2+
import { ContextTrace, withLabel } from '../payload-converters/serialization-context-converter';
3+
4+
export async function echoTrace<T>(inputTrace: ContextTrace<string>, value: T): Promise<ContextTrace<T>> {
5+
return withLabel(inputTrace, value);
6+
}
7+
8+
export async function heartbeatTrace<T>(inputTrace: ContextTrace<string>, value: T): Promise<ContextTrace<T>> {
9+
if (activityInfo().attempt === 1) {
10+
heartbeat(withLabel(inputTrace, value));
11+
throw new Error('retry me');
12+
}
13+
return activityInfo().heartbeatDetails as ContextTrace<T>;
14+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import {
2+
Payload,
3+
PayloadConverter,
4+
FailureConverter,
5+
ProtoFailure,
6+
defaultPayloadConverter,
7+
defaultFailureConverter,
8+
SerializationContext,
9+
} from '@temporalio/common';
10+
11+
export interface ContextTrace<T> {
12+
label: T;
13+
trace: string[];
14+
}
15+
16+
export function makeContextTrace<T>(label: T): ContextTrace<T> {
17+
return {
18+
label,
19+
trace: [],
20+
};
21+
}
22+
23+
export function withLabel<T>(existing: ContextTrace<unknown>, label: T): ContextTrace<T> {
24+
return { label, trace: existing.trace };
25+
}
26+
27+
function isContextTrace(maybeTrace: unknown): maybeTrace is ContextTrace<unknown> {
28+
return (
29+
typeof maybeTrace === 'object' &&
30+
maybeTrace !== null &&
31+
'label' in maybeTrace &&
32+
'trace' in maybeTrace &&
33+
Array.isArray(maybeTrace.trace)
34+
);
35+
}
36+
37+
function ctxToTraceStr(context: SerializationContext): string {
38+
const parts = [context.type, context.namespace];
39+
40+
if (context.workflowId) parts.push(context.workflowId);
41+
42+
if (context.type === 'activity') {
43+
if (context.activityId) parts.push(context.activityId);
44+
parts.push(String(context.isLocal));
45+
}
46+
47+
return parts.join('.');
48+
}
49+
50+
export class FreePayloadConverter implements PayloadConverter {
51+
withContext(context: SerializationContext): PayloadConverter {
52+
return new BoundPayloadConverter(context);
53+
}
54+
55+
toPayload<T>(value: T): Payload {
56+
if (isContextTrace(value)) {
57+
value.trace.push(`payload.encode.free|${value.label}`);
58+
}
59+
return defaultPayloadConverter.toPayload(value);
60+
}
61+
62+
fromPayload<T>(payload: Payload): T {
63+
const value = defaultPayloadConverter.fromPayload(payload);
64+
if (isContextTrace(value)) {
65+
value.trace.push(`payload.decode.free|${value.label}`);
66+
}
67+
return value as T;
68+
}
69+
}
70+
71+
class BoundPayloadConverter implements PayloadConverter {
72+
constructor(private readonly context: SerializationContext) {}
73+
74+
toPayload<T>(value: T): Payload {
75+
if (isContextTrace(value)) {
76+
value.trace.push(`payload.encode.bound|${value.label}|${ctxToTraceStr(this.context)}`);
77+
}
78+
return defaultPayloadConverter.toPayload(value);
79+
}
80+
81+
fromPayload<T>(payload: Payload): T {
82+
const value = defaultPayloadConverter.fromPayload(payload);
83+
if (isContextTrace(value)) {
84+
value.trace.push(`payload.decode.bound|${value.label}|${ctxToTraceStr(this.context)}`);
85+
}
86+
return value as T;
87+
}
88+
}
89+
90+
export class FreeFailureConverter implements FailureConverter {
91+
errorToFailure(err: unknown, payloadConverter: PayloadConverter): ProtoFailure {
92+
const failure = defaultFailureConverter.errorToFailure(err, payloadConverter);
93+
const existing = failure.message ?? '';
94+
failure.message = `failure.encode.free|${existing}`;
95+
return failure;
96+
}
97+
failureToError(err: ProtoFailure, payloadConverter: PayloadConverter): Error {
98+
const error = defaultFailureConverter.failureToError(err, payloadConverter);
99+
error.message = `failure.decode.free|${error.message}`;
100+
return error;
101+
}
102+
withContext?(context: SerializationContext): FailureConverter {
103+
return new BoundFailureConverter(context);
104+
}
105+
}
106+
107+
class BoundFailureConverter implements FailureConverter {
108+
constructor(private readonly context: SerializationContext) {}
109+
errorToFailure(err: unknown, payloadConverter: PayloadConverter): ProtoFailure {
110+
const failure = defaultFailureConverter.errorToFailure(err, payloadConverter);
111+
const existing = failure.message ?? '';
112+
failure.message = `failure.encode.bound|${ctxToTraceStr(this.context)}|${existing}`;
113+
return failure;
114+
}
115+
failureToError(err: ProtoFailure, payloadConverter: PayloadConverter): Error {
116+
const error = defaultFailureConverter.failureToError(err, payloadConverter);
117+
error.message = `failure.decode.bound|${ctxToTraceStr(this.context)}|${error.message}`;
118+
return error;
119+
}
120+
}
121+
122+
export const payloadConverter = new FreePayloadConverter();
123+
export const failureConverter = new FreeFailureConverter();

packages/test/src/test-otel.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ if (RUN_INTEGRATION_TESTS) {
359359
tracer,
360360
spanName: `test-thrown-${String(thrown)}`,
361361
fn: () => {
362-
throw thrown; // eslint-disable-line no-throw-literal
362+
throw thrown;
363363
},
364364
});
365365
t.fail('expected instrumentSync to throw');

0 commit comments

Comments
 (0)