Skip to content

Commit 4962ff9

Browse files
yuandrewclaude
andauthored
Add Serialization context for codecs and converters (temporalio#2225)
* Add SerializationContext for DataConverter, PayloadCodec, and FailureConverter Introduces an opt-in mechanism that provides metadata (namespace, workflow ID, activity type, etc.) to converter implementations during serialization. This enables use cases like per-workflow encryption using workflow ID as associated data for a key. Adds WorkflowSerializationContext (for workflow-level payloads) and ActivitySerializationContext (for activity-level payloads), with opt-in interfaces for DataConverter, PayloadCodec, and FailureConverter. Context is provided at every serialization site: workflow execution, activity, local activity, child workflow, signal, query, update, continue-as-new, side effect, client operations, schedule client, and async activity completion. Matches the feature already implemented in Python, .NET, and Java SDKs. Closes temporalio#1352 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix incorrect double-wrapping for query and update handlers, add tests * Rename WithDataConverterSerializationContext, CompleteActivityByIDWithOptions and RecordActivityHeartbeatByIDWithOptions, remove cmd.Or usage * Child workflow fix * remove serialization context from getDataConverterFromWorkflowContext, require callers to explicitly wrap context themselves. Use wfInfo.Namespace for signal external workflow, wrap context for testWorkflowEnvironmentImpl.executeWorkflow * fix double wrapping, nil panic for bindings, other misc fixes * More double wrapping fixing, also fix testsuite * Missed some GenerateSequence spots. Default back to wc.namespace for activity methods * Namespace fallback * testsuite fixes --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 27a2cb8 commit 4962ff9

27 files changed

Lines changed: 2518 additions & 185 deletions

client/client.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,21 @@ type (
225225
// StartWorkflowOptions configuration parameters for starting a workflow execution.
226226
StartWorkflowOptions = internal.StartWorkflowOptions
227227

228+
// CompleteActivityByIDOptions provides options for CompleteActivityByIDWithOptions.
229+
CompleteActivityByIDOptions = internal.CompleteActivityByIDOptions
230+
231+
// RecordActivityHeartbeatByIDOptions provides options for RecordActivityHeartbeatByIDWithOptions.
232+
RecordActivityHeartbeatByIDOptions = internal.RecordActivityHeartbeatByIDOptions
233+
234+
// CompleteActivityOptions provides options for CompleteActivityWithOptions.
235+
CompleteActivityOptions = internal.CompleteActivityOptions
236+
237+
// CompleteActivityByActivityIDOptions provides options for CompleteActivityByActivityIDWithOptions.
238+
CompleteActivityByActivityIDOptions = internal.CompleteActivityByActivityIDOptions
239+
240+
// RecordActivityHeartbeatOptions provides options for RecordActivityHeartbeatWithOptions.
241+
RecordActivityHeartbeatOptions = internal.RecordActivityHeartbeatOptions
242+
228243
// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
229244
// See [client.Client.NewWithStartWorkflowOperation] and [client.Client.UpdateWithStartWorkflow].
230245
WithStartWorkflowOperation = internal.WithStartWorkflowOperation
@@ -1108,8 +1123,18 @@ type (
11081123
// To fail the activity with an error.
11091124
// CompleteActivity(token, nil, temporal.NewApplicationError("reason", details)
11101125
// The activity can fail with below errors ApplicationError, TimeoutError, CanceledError.
1126+
//
1127+
// If using a context-aware converter (DataConverterWithSerializationContext or
1128+
// FailureConverterWithSerializationContext), consider using
1129+
// CompleteActivityWithOptions to provide full activity metadata
1130+
// (ActivityType, WorkflowType, TaskQueue) to your codec.
11111131
CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error
11121132

1133+
// CompleteActivityWithOptions reports activity completed with full context options.
1134+
// Similar to CompleteActivity but accepts a struct with optional ActivitySerializationContext
1135+
// fields (ActivityType, WorkflowType, TaskQueue, etc.) for custom codec support.
1136+
CompleteActivityWithOptions(ctx context.Context, opts CompleteActivityOptions) error
1137+
11131138
// CompleteActivityByID reports activity completed.
11141139
// Similar to CompleteActivity, but may save the user from keeping taskToken info.
11151140
// This method works only for workflow activities. workflowID and runID must be set to the workflow ID and workflow run ID
@@ -1126,8 +1151,18 @@ type (
11261151
// - ApplicationError
11271152
// - TimeoutError
11281153
// - CanceledError
1154+
//
1155+
// If using a context-aware converter (DataConverterWithSerializationContext or
1156+
// FailureConverterWithSerializationContext), consider using
1157+
// CompleteActivityByIDWithOptions to provide full activity metadata
1158+
// (ActivityType, WorkflowType, TaskQueue) to your codec.
11291159
CompleteActivityByID(ctx context.Context, namespace, workflowID, runID, activityID string, result interface{}, err error) error
11301160

1161+
// CompleteActivityByIDWithOptions reports activity completed with full context options.
1162+
// Similar to CompleteActivityByID but accepts a struct with optional ActivitySerializationContext
1163+
// fields (ActivityType, WorkflowType, TaskQueue) for custom codec support.
1164+
CompleteActivityByIDWithOptions(ctx context.Context, opts CompleteActivityByIDOptions) error
1165+
11311166
// CompleteActivityByActivityID reports activity completed.
11321167
// Similar to CompleteActivity, but may save the user from keeping taskToken info.
11331168
// This method works only for standalone activities. To complete a workflow activity, use CompleteActivityByID.
@@ -1142,8 +1177,18 @@ type (
11421177
// - ApplicationError
11431178
// - TimeoutError
11441179
// - CanceledError
1180+
//
1181+
// If using a context-aware converter (DataConverterWithSerializationContext or
1182+
// FailureConverterWithSerializationContext), consider using
1183+
// CompleteActivityByActivityIDWithOptions to provide full activity metadata
1184+
// (ActivityType, WorkflowType, TaskQueue) to your codec.
11451185
CompleteActivityByActivityID(ctx context.Context, namespace, activityID, activityRunID string, result interface{}, err error) error
11461186

1187+
// CompleteActivityByActivityIDWithOptions reports standalone activity completed with full context options.
1188+
// Similar to CompleteActivityByActivityID but accepts a struct with optional
1189+
// ActivitySerializationContext fields for custom codec support.
1190+
CompleteActivityByActivityIDWithOptions(ctx context.Context, opts CompleteActivityByActivityIDOptions) error
1191+
11471192
// RecordActivityHeartbeat records heartbeat for an activity.
11481193
// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity.
11491194
// details - is the progress you want to record along with heart beat for this activity. If the activity is canceled,
@@ -1153,8 +1198,18 @@ type (
11531198
// - serviceerror.NotFound
11541199
// - serviceerror.Internal
11551200
// - serviceerror.Unavailable
1201+
//
1202+
// If using a context-aware converter (DataConverterWithSerializationContext or
1203+
// FailureConverterWithSerializationContext), consider using
1204+
// RecordActivityHeartbeatWithOptions to provide full activity metadata
1205+
// (ActivityType, WorkflowType, TaskQueue) to your codec.
11561206
RecordActivityHeartbeat(ctx context.Context, taskToken []byte, details ...interface{}) error
11571207

1208+
// RecordActivityHeartbeatWithOptions records heartbeat with full context options.
1209+
// Similar to RecordActivityHeartbeat but accepts a struct with optional
1210+
// ActivitySerializationContext fields for custom codec support.
1211+
RecordActivityHeartbeatWithOptions(ctx context.Context, opts RecordActivityHeartbeatOptions) error
1212+
11581213
// RecordActivityHeartbeatByID records heartbeat for an activity.
11591214
// details - is the progress you want to record along with heart beat for this activity. If the activity is canceled,
11601215
// the error returned will be a CanceledError. If the activity is paused by the server, the error returned will be a
@@ -1163,8 +1218,18 @@ type (
11631218
// - serviceerror.NotFound
11641219
// - serviceerror.Internal
11651220
// - serviceerror.Unavailable
1221+
//
1222+
// If using a context-aware converter (DataConverterWithSerializationContext or
1223+
// FailureConverterWithSerializationContext), consider using
1224+
// RecordActivityHeartbeatByIDWithOptions to provide full activity metadata
1225+
// (ActivityType, WorkflowType, TaskQueue) to your codec.
11661226
RecordActivityHeartbeatByID(ctx context.Context, namespace, workflowID, runID, activityID string, details ...interface{}) error
11671227

1228+
// RecordActivityHeartbeatByIDWithOptions records heartbeat with full context options.
1229+
// Similar to RecordActivityHeartbeatByID but accepts a struct with optional
1230+
// ActivitySerializationContext fields for custom codec support.
1231+
RecordActivityHeartbeatByIDWithOptions(ctx context.Context, opts RecordActivityHeartbeatByIDOptions) error
1232+
11681233
// ListClosedWorkflow gets closed workflow executions based on request filters.
11691234
// Retrieved workflow executions are sorted by close time in descending order.
11701235
//

converter/codec.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,27 @@ func (e *CodecDataConverter) ToStrings(payloads *commonpb.Payloads) []string {
231231
return strs
232232
}
233233

234+
func (e *CodecDataConverter) WithSerializationContext(ctx SerializationContext) DataConverter {
235+
parent := e.parent
236+
if p, ok := parent.(DataConverterWithSerializationContext); ok {
237+
parent = p.WithSerializationContext(ctx)
238+
}
239+
codecs := make([]PayloadCodec, len(e.codecs))
240+
changed := parent != e.parent
241+
for i, c := range e.codecs {
242+
if cc, ok := c.(PayloadCodecWithSerializationContext); ok {
243+
codecs[i] = cc.WithSerializationContext(ctx)
244+
changed = changed || codecs[i] != c
245+
} else {
246+
codecs[i] = c
247+
}
248+
}
249+
if !changed {
250+
return e
251+
}
252+
return &CodecDataConverter{parent, codecs}
253+
}
254+
234255
const remotePayloadCodecEncodePath = "/encode"
235256
const remotePayloadCodecDecodePath = "/decode"
236257

converter/serialization_context.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package converter
2+
3+
// SerializationContext provides metadata about where serialization is occurring.
4+
// Implementations include [WorkflowSerializationContext] for workflow-level
5+
// payloads, and [ActivitySerializationContext] for activity-level payloads.
6+
type SerializationContext interface {
7+
isSerializationContext()
8+
}
9+
10+
// WorkflowSerializationContext is the serialization context for workflow-level payloads.
11+
// This includes: workflow input/result, child workflow input/result, signal input,
12+
// query input/result, update input/result, memo, continue-as-new args, and
13+
// external signal payloads.
14+
//
15+
// For child workflows, WorkflowID is the child's ID, not the parent's.
16+
// For external signals, WorkflowID is the target workflow's ID.
17+
type WorkflowSerializationContext struct {
18+
Namespace string
19+
WorkflowID string
20+
}
21+
22+
func (WorkflowSerializationContext) isSerializationContext() {}
23+
24+
// ActivitySerializationContext is the serialization context for activity-level payloads.
25+
// This includes: activity input/result, heartbeat details, and activity failure details.
26+
type ActivitySerializationContext struct {
27+
Namespace string
28+
WorkflowID string
29+
WorkflowType string
30+
ActivityType string
31+
TaskQueue string
32+
IsLocal bool
33+
}
34+
35+
func (ActivitySerializationContext) isSerializationContext() {}
36+
37+
// DataConverterWithSerializationContext is an optional interface that [DataConverter]
38+
// implementations can implement to receive serialization context.
39+
//
40+
// When implemented, the SDK calls WithSerializationContext before serializing/deserializing
41+
// payloads. The returned DataConverter should use the context to vary its behavior
42+
// (e.g. using workflow ID as associated data for encryption).
43+
//
44+
// Implementations must work correctly without context — the SDK and user code may use
45+
// the DataConverter directly without calling WithSerializationContext first.
46+
//
47+
// This method should be cheap and fast. The SDK does not cache returned instances
48+
// and may call this method frequently. Avoid recreating expensive objects on every call.
49+
type DataConverterWithSerializationContext interface {
50+
WithSerializationContext(SerializationContext) DataConverter
51+
}
52+
53+
// PayloadCodecWithSerializationContext is an optional interface that [PayloadCodec]
54+
// implementations can implement to receive serialization context.
55+
//
56+
// When implemented, the SDK calls WithSerializationContext before encoding/decoding payloads.
57+
// The returned PayloadCodec should use the context to vary its behavior
58+
// (e.g. using workflow ID as associated data for encoding).
59+
//
60+
// Implementations must work correctly without context — the SDK and user code may use
61+
// the PayloadCodec directly without calling WithSerializationContext first.
62+
//
63+
// This method should be cheap and fast. The SDK does not cache returned instances
64+
// and may call this method frequently. Avoid recreating expensive objects on every call.
65+
type PayloadCodecWithSerializationContext interface {
66+
WithSerializationContext(SerializationContext) PayloadCodec
67+
}
68+
69+
// FailureConverterWithSerializationContext is an optional interface that [FailureConverter]
70+
// implementations can implement to receive serialization context.
71+
//
72+
// When implemented, the SDK calls WithSerializationContext before converting errors to/from
73+
// failures. The returned FailureConverter should use the context to vary its behavior
74+
// (e.g. encrypting failure details using a workflow-ID-derived key).
75+
//
76+
// Implementations must work correctly without context — the SDK and user code may use
77+
// the FailureConverter directly without calling WithSerializationContext first.
78+
//
79+
// This method should be cheap and fast. The SDK does not cache returned instances
80+
// and may call this method frequently. Avoid recreating expensive objects on every call.
81+
type FailureConverterWithSerializationContext interface {
82+
WithSerializationContext(SerializationContext) FailureConverter
83+
}
84+
85+
// WithDataConverterSerializationContext returns a DataConverter that is aware of the given
86+
// serialization context. If the DataConverter implements
87+
// [DataConverterWithSerializationContext], it delegates to that implementation;
88+
// otherwise it returns the original DataConverter unchanged.
89+
func WithDataConverterSerializationContext(dc DataConverter, ctx SerializationContext) DataConverter {
90+
if sc, ok := dc.(DataConverterWithSerializationContext); ok {
91+
result := sc.WithSerializationContext(ctx)
92+
if result == nil {
93+
panic("DataConverterWithSerializationContext.WithSerializationContext must not return nil")
94+
}
95+
return result
96+
}
97+
return dc
98+
}
99+
100+
// WithFailureConverterSerializationContext returns a FailureConverter that is aware of the given
101+
// serialization context. If the FailureConverter implements
102+
// [FailureConverterWithSerializationContext], it delegates to that implementation;
103+
// otherwise it returns the original FailureConverter unchanged.
104+
func WithFailureConverterSerializationContext(fc FailureConverter, ctx SerializationContext) FailureConverter {
105+
if sc, ok := fc.(FailureConverterWithSerializationContext); ok {
106+
result := sc.WithSerializationContext(ctx)
107+
if result == nil {
108+
panic("FailureConverterWithSerializationContext.WithSerializationContext must not return nil")
109+
}
110+
return result
111+
}
112+
return fc
113+
}

0 commit comments

Comments
 (0)