Skip to content

Commit 0d3b36b

Browse files
committed
Implement standalone callbacks
Enable long-polling for Describe
1 parent a4d105e commit 0d3b36b

44 files changed

Lines changed: 5212 additions & 129 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

chasm/lib/activity/activity.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,9 @@ func (a *Activity) addCompletionCallbacks(
335335
return serviceerror.NewInvalidArgumentf("unsupported callback variant: %T", variant)
336336
}
337337

338-
// requestID (unique per API call) + idx (position within the request) ensures unique,idempotent callback IDs.
338+
// requestID (unique per API call) + idx (position within the request) ensures unique, idempotent callback IDs.
339339
id := fmt.Sprintf("%s-%d", requestID, idx)
340-
callbackObj := callback.NewCallback(requestID, registrationTime, &callbackspb.CallbackState{}, chasmCB)
340+
callbackObj := callback.NewEmbeddedCallback(ctx, requestID, registrationTime, chasmCB)
341341
a.Callbacks[id] = chasm.NewComponentField(ctx, callbackObj)
342342
}
343343
return nil

chasm/lib/callback/component.go

Lines changed: 145 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,72 @@ import (
44
"fmt"
55
"time"
66

7+
callbackpb "go.temporal.io/api/callback/v1"
78
commonpb "go.temporal.io/api/common/v1"
9+
failurepb "go.temporal.io/api/failure/v1"
810
"go.temporal.io/api/serviceerror"
911
"go.temporal.io/server/chasm"
1012
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
1113
"go.temporal.io/server/common/backoff"
1214
"go.temporal.io/server/common/nexus/nexusrpc"
1315
queueserrors "go.temporal.io/server/service/history/queues/errors"
16+
"google.golang.org/protobuf/proto"
17+
"google.golang.org/protobuf/types/known/durationpb"
1418
"google.golang.org/protobuf/types/known/timestamppb"
1519
)
1620

1721
type CompletionSource interface {
1822
GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.CompleteOperationOptions, error)
1923
}
2024

21-
var _ chasm.Component = (*Callback)(nil)
22-
var _ chasm.StateMachine[callbackspb.CallbackStatus] = (*Callback)(nil)
25+
// CompletionSourceFn allows a function value to be used as a CompletionSource instance.
26+
type CompletionSourceFn func(chasm.Context, string) (nexusrpc.CompleteOperationOptions, error)
27+
28+
func (csFunc CompletionSourceFn) GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.CompleteOperationOptions, error) {
29+
return csFunc(ctx, requestID)
30+
}
31+
32+
var (
33+
_ chasm.Component = (*Callback)(nil)
34+
_ chasm.StateMachine[callbackspb.CallbackStatus] = (*Callback)(nil)
35+
36+
// Capabilities only supported/used for standalone callbacks.
37+
_ chasm.RootComponent = (*Callback)(nil)
38+
_ chasm.VisibilityMemoProvider = (*Callback)(nil)
39+
_ chasm.VisibilitySearchAttributesProvider = (*Callback)(nil)
40+
)
41+
42+
var executionStatusSearchAttribute = chasm.NewSearchAttributeKeyword(
43+
"ExecutionStatus",
44+
chasm.SearchAttributeFieldLowCardinalityKeyword01,
45+
)
2346

2447
// Callback represents a callback component in CHASM.
2548
type Callback struct {
2649
chasm.UnimplementedComponent
2750

2851
// Persisted internal state
2952
*callbackspb.CallbackState
53+
// Failure from an external termination (timeout or terminate), stored separately because
54+
// of its potential size, and to not overload CallbackState::LastAttemptFailure.
55+
TerminalFailure chasm.Field[*failurepb.Failure]
56+
57+
// For most callbacks, the completion result is obtained from the parent component.
58+
// e.g. the Workflow result to be delivered. However, for "standalone" callbacks, there
59+
// is no parent and the user-supplied SuppliedCompletion will be used instead.
60+
ParentCompletionSource chasm.ParentPtr[CompletionSource]
61+
SuppliedCompletion chasm.Field[*callbackpb.CallbackExecutionCompletion]
3062

31-
// Interface to retrieve Nexus operation completion data
32-
CompletionSource chasm.ParentPtr[CompletionSource]
63+
// Visibility sub-component for search attributes and memo indexing.
64+
Visibility chasm.Field[*chasm.Visibility]
3365
}
3466

35-
func NewCallback(
67+
// NewEmbeddedCallback returns a Callback component, which will deliver the completion from
68+
// its parent CHASM component. The parent must implement CompletionSource.
69+
func NewEmbeddedCallback(
70+
ctx chasm.MutableContext,
3671
requestID string,
3772
registrationTime *timestamppb.Timestamp,
38-
state *callbackspb.CallbackState,
3973
cb *callbackspb.Callback,
4074
) *Callback {
4175
return &Callback{
@@ -45,14 +79,49 @@ func NewCallback(
4579
Callback: cb,
4680
Status: callbackspb.CALLBACK_STATUS_STANDBY,
4781
},
82+
TerminalFailure: chasm.NewDataField[*failurepb.Failure](ctx, nil),
4883
}
4984
}
5085

86+
type newStandaloneCallbackOpts struct {
87+
RequestID string
88+
RegistrationTime *timestamppb.Timestamp
89+
Callback *callbackspb.Callback
90+
91+
CallbackID string
92+
CompletionScheduleToCloseTimeout *durationpb.Duration
93+
Completion *callbackpb.CallbackExecutionCompletion
94+
SearchAttributes map[string]*commonpb.Payload
95+
}
96+
97+
// newStandaloneCallback returns a new Callback component which will deliver the supplied
98+
// completion result.
99+
func newStandaloneCallback(
100+
ctx chasm.MutableContext,
101+
opts newStandaloneCallbackOpts,
102+
) *Callback {
103+
cb := NewEmbeddedCallback(ctx, opts.RequestID, opts.RegistrationTime, opts.Callback)
104+
105+
// Add standalone-specific fields.
106+
cb.CallbackId = opts.CallbackID
107+
cb.CompletionScheduleToCloseTimeout = opts.CompletionScheduleToCloseTimeout
108+
cb.SuppliedCompletion = chasm.NewDataField(ctx, opts.Completion)
109+
110+
visibility := chasm.NewVisibilityWithData(ctx, opts.SearchAttributes, nil)
111+
cb.Visibility = chasm.NewComponentField(ctx, visibility)
112+
113+
return cb
114+
}
115+
51116
func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState {
52117
switch c.Status {
53118
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
54119
return chasm.LifecycleStateCompleted
55-
case callbackspb.CALLBACK_STATUS_FAILED:
120+
case callbackspb.CALLBACK_STATUS_FAILED,
121+
callbackspb.CALLBACK_STATUS_TERMINATED:
122+
// TODO: Use chasm.LifecycleStateTerminated when it's available (currently commented out
123+
// in chasm/component.go:70). For now, LifecycleStateFailed is functionally correct
124+
// as IsClosed() returns true for all states >= LifecycleStateCompleted.
56125
return chasm.LifecycleStateFailed
57126
default:
58127
return chasm.LifecycleStateRunning
@@ -67,6 +136,62 @@ func (c *Callback) SetStateMachineState(status callbackspb.CallbackStatus) {
67136
c.Status = status
68137
}
69138

139+
func (c *Callback) ContextMetadata(_ chasm.Context) map[string]string {
140+
return map[string]string{
141+
"RequestID": c.RequestId,
142+
// Only set for standalone callbacks.
143+
"CallbackID": c.CallbackId,
144+
}
145+
}
146+
147+
// SearchAttributes implements chasm.VisibilitySearchAttributesProvider.
148+
func (c *Callback) SearchAttributes(ctx chasm.Context) []chasm.SearchAttributeKeyValue {
149+
apiStatus := callbackStatusToAPIExecutionStatus(c.Status)
150+
return []chasm.SearchAttributeKeyValue{
151+
executionStatusSearchAttribute.Value(apiStatus.String()),
152+
}
153+
}
154+
155+
// Memo implements chasm.VisibilityMemoProvider. Returns the CallbackExecutionListInfo
156+
// as the memo for visibility queries.
157+
func (c *Callback) Memo(ctx chasm.Context) proto.Message {
158+
return &callbackpb.CallbackExecutionListInfo{
159+
CallbackId: c.CallbackId,
160+
Status: callbackStatusToAPIExecutionStatus(c.Status),
161+
CreateTime: c.RegistrationTime,
162+
CloseTime: c.CloseTime,
163+
}
164+
}
165+
166+
// Terminate forcefully terminates the callback execution.
167+
//
168+
// If already terminated with the same request ID, this is a no-op.
169+
// If already terminated with a different request ID, returns FailedPrecondition.
170+
func (c *Callback) Terminate(
171+
ctx chasm.MutableContext,
172+
req chasm.TerminateComponentRequest,
173+
) (chasm.TerminateComponentResponse, error) {
174+
if c.LifecycleState(ctx).IsClosed() {
175+
if c.TerminateRequestId == "" {
176+
// Completed organically (succeeded/failed/timed out), not via Terminate.
177+
err := serviceerror.NewFailedPreconditionf("callback execution already in terminal state %v", c.Status)
178+
return chasm.TerminateComponentResponse{}, err
179+
}
180+
if c.TerminateRequestId != req.RequestID {
181+
err := serviceerror.NewFailedPreconditionf("already terminated with request ID %s", c.TerminateRequestId)
182+
return chasm.TerminateComponentResponse{}, err
183+
}
184+
return chasm.TerminateComponentResponse{}, nil
185+
}
186+
if err := TransitionTerminated.Apply(c, ctx, EventTerminated{Reason: req.Reason}); err != nil {
187+
return chasm.TerminateComponentResponse{}, fmt.Errorf("failed to terminate callback: %w", err)
188+
}
189+
190+
c.TerminateRequestId = req.RequestID
191+
// c.TerminalFailure is set in the transition handler.
192+
return chasm.TerminateComponentResponse{}, nil
193+
}
194+
70195
func (c *Callback) recordAttempt(ts time.Time) {
71196
c.Attempt++
72197
c.LastAttemptCompleteTime = timestamppb.New(ts)
@@ -77,9 +202,9 @@ func (c *Callback) loadInvocationArgs(
77202
ctx chasm.Context,
78203
_ chasm.NoValue,
79204
) (invocable, error) {
80-
target := c.CompletionSource.Get(ctx)
81-
82-
completion, err := target.GetNexusCompletion(ctx, c.RequestId)
205+
// Get the completion result to be delivered.
206+
completionSource := c.CompletionSource(ctx)
207+
completion, err := completionSource.GetNexusCompletion(ctx, c.RequestId)
83208
if err != nil {
84209
return nil, err
85210
}
@@ -117,6 +242,16 @@ func (c *Callback) saveResult(
117242
ctx chasm.MutableContext,
118243
input saveResultInput,
119244
) (chasm.NoValue, error) {
245+
// If the callback was terminated while the invocation was in-flight,
246+
// the result is no longer relevant. We'll just drop it silently.
247+
//
248+
// This shouldn't happen outside of tests, since the Nexus machinary
249+
// would prevent an invalid transition anyways. (e.g. terminating
250+
// an already terminated Callback.)
251+
if c.LifecycleState(ctx).IsClosed() {
252+
return nil, nil
253+
}
254+
120255
switch r := input.result.(type) {
121256
case invocationResultOK:
122257
err := TransitionSucceeded.Apply(c, ctx, EventSucceeded{Time: ctx.Now(c)})

0 commit comments

Comments
 (0)