Skip to content

Commit 856e276

Browse files
rkannan82cursoragentclaudeCopilot
authored
Dispatch activity cancellation to worker using Nexus (#9233)
## What Dispatches worker commands (starting with activity cancellation) to workers via their Nexus control queue. When the outbound queue processes a `WorkerCommandsTask`, the dispatcher sends an `ExecuteCommands` Nexus operation to the worker's control queue via `DispatchNexusTask`. - Retries are capped at 3 attempts since these commands are best-effort (the activity will eventually time out anyway). Uses the retryable matching client so transient RPC errors (ShardOwnershipLost, Unavailable) are retried at the RPC layer without consuming queue-level attempts. - Stores the clock used to generate the task token in the ActivityInfo. This is needed to reconstruct the same task token when dispatching to the worker. Otherwise, it will not match the token expected by the sdk. Suggested review order: `worker_commands_task_dispatcher.go` → `nexus_dispatch_response.go` → `recordactivitytaskstarted/api.go`. ## Why To support activity cancellation without activity heartbeat. This is the dispatch leg of the flow: 1. [#9231] Store `worker_control_task_queue` in `ActivityInfo` at activity start. 2. [#9232] On `RequestCancelActivityTask`, batch commands by control queue into `WorkerCommandsTask` outbound tasks. 3. **[This PR]** Dispatch each task as a Nexus `ExecuteCommands` operation to the worker, with a 3-attempt retry cap. 4. [SDK] Worker receives the cancel command and cancels the running activity. Gated by dynamic config `EnableCancelActivityWorkerCommand` (default: off). ## How did you test it? - **Unit tests** cover all dispatch outcomes (success, RPC error, timeout, worker error, feature-flag-off, max-attempts-exceeded) and response-to-error conversion paths. - **Functional test** verifies end-to-end: cancel request → Nexus dispatch → correct payload arrives on the control queue, and asserts that the cancel command's task token matches the one from the original activity poll response. --------- Co-authored-by: Cursor <cursoragent@cursor.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: rkannan82 <5853897+rkannan82@users.noreply.github.com>
1 parent d12d4a4 commit 856e276

19 files changed

Lines changed: 1365 additions & 97 deletions

api/persistence/v1/executions.pb.go

Lines changed: 79 additions & 56 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/metrics/metric_defs.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,11 @@ var (
748748
"nexus_completion_request_preprocess_errors",
749749
WithDescription("The number of Nexus completion requests for which pre-processing failed."),
750750
)
751+
WorkerCommandsSent = NewCounterDef(
752+
"worker_commands_sent",
753+
WithDescription("The number of worker command dispatches, tagged by outcome (e.g. success, no_poller, rpc_error)."),
754+
)
755+
751756
HostRPSLimit = NewGaugeDef("host_rps_limit")
752757
NamespaceHostRPSLimit = NewGaugeDef("namespace_host_rps_limit")
753758
HandoverWaitLatency = NewTimerDef("handover_wait_latency")

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,21 @@ message ActivityInfo {
678678
// A dedicated per-worker Nexus task queue on which the server sends control
679679
// tasks (e.g. activity cancellation) to this specific worker instance.
680680
string worker_control_task_queue = 51;
681+
682+
// The shard clock at the time this activity was started (RecordActivityTaskStarted).
683+
// Matching uses this clock to build the task token sent to the worker. Stored here so
684+
// that history can later reconstruct the same task token (e.g. for cancel worker commands).
685+
//
686+
// IMPORTANT: The clock approach requires history to reconstruct the token using
687+
// the same fields and logic as matching — if NewActivityTaskToken changes, both
688+
// call sites must stay in sync or the tokens will silently diverge. An alternative
689+
// is to store the full serialized task token (~150-300 bytes), which avoids
690+
// reconstruction entirely and is immune to token format changes. We chose the
691+
// clock approach to keep the per-activity memory footprint minimal (~24 bytes).
692+
//
693+
// Replication: This field is part of ActivityInfo and is automatically replicated
694+
// via state-based replication. No special handling is needed.
695+
temporal.server.api.clock.v1.VectorClock started_clock = 52;
681696
}
682697

683698
// timer_map column

service/history/api/recordactivitytaskstarted/api.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ func recordActivityTaskStarted(
152152
if ai.RequestId == requestID {
153153
response.StartedTime = ai.StartedTime
154154
response.Attempt = ai.Attempt
155+
if ai.StartedClock != nil {
156+
response.Clock = ai.StartedClock
157+
} else {
158+
// Activity started before the StartedClock field was added.
159+
// Create a fresh clock for the shard staleness check.
160+
response.Clock, err = shardContext.NewVectorClock()
161+
if err != nil {
162+
return nil, rejectCodeUndefined, err
163+
}
164+
}
155165
return response, rejectCodeAccepted, nil
156166
}
157167

@@ -238,6 +248,17 @@ func recordActivityTaskStarted(
238248
}
239249
}
240250

251+
// Create the shard clock before recording the start event. Matching uses the returned clock
252+
// to build the task token sent to the worker. We store it in ActivityInfo so that history
253+
// can later reconstruct the same task token that matching created (e.g. for cancel worker
254+
// commands). On retries of this RPC, we return the stored clock from the early return
255+
// path above.
256+
clock, err := shardContext.NewVectorClock()
257+
if err != nil {
258+
return nil, rejectCodeUndefined, err
259+
}
260+
ai.StartedClock = clock
261+
241262
versioningStamp := worker_versioning.StampFromCapabilities(request.PollRequest.WorkerVersionCapabilities, request.PollRequest.DeploymentOptions) //nolint:staticcheck // SA1019: WorkerVersionCapabilities is deprecated but still used for old versioning [cleanup-old-wv]
242263
if _, err := mutableState.AddActivityTaskStartedEvent(
243264
ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(),
@@ -261,6 +282,8 @@ func recordActivityTaskStarted(
261282
),
262283
).Record(scheduleToStartLatency)
263284

285+
response.Clock = clock
286+
264287
response.StartedTime = ai.StartedTime
265288
response.Attempt = ai.Attempt
266289
response.HeartbeatDetails = ai.LastHeartbeatDetails

service/history/api/recordactivitytaskstarted/api_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,23 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/google/uuid"
78
"github.com/stretchr/testify/assert"
89
"github.com/stretchr/testify/require"
10+
commonpb "go.temporal.io/api/common/v1"
911
enumspb "go.temporal.io/api/enums/v1"
12+
historypb "go.temporal.io/api/history/v1"
13+
clockspb "go.temporal.io/server/api/clock/v1"
1014
deploymentspb "go.temporal.io/server/api/deployment/v1"
15+
"go.temporal.io/server/api/historyservice/v1"
1116
"go.temporal.io/server/api/matchingservice/v1"
1217
"go.temporal.io/server/api/matchingservicemock/v1"
1318
persistencespb "go.temporal.io/server/api/persistence/v1"
19+
"go.temporal.io/server/common/cluster"
20+
"go.temporal.io/server/common/metrics"
21+
"go.temporal.io/server/common/namespace"
1422
"go.temporal.io/server/common/worker_versioning"
23+
historyi "go.temporal.io/server/service/history/interfaces"
1524
"go.uber.org/mock/gomock"
1625
"google.golang.org/protobuf/types/known/timestamppb"
1726
)
@@ -265,3 +274,106 @@ func TestGetDeploymentVersionForWorkflowID_UnversionedTaskQueue(t *testing.T) {
265274
assert.Nil(t, targetVersion, "Unversioned task queue should return nil version")
266275
assert.Equal(t, int64(0), targetRevNum, "Unversioned task queue should return 0 revision number")
267276
}
277+
278+
const testClusterName = "active"
279+
280+
// setupMutableStateWithStartedActivity creates mock shard and mutable state with an activity
281+
// that is already started (StartedEventId != EmptyEventID). The activity's StartedClock is set
282+
// to the provided value. Returns the mocks and a request whose RequestId matches the activity's,
283+
// so the call will hit the duplicate-request early-return path in recordActivityTaskStarted.
284+
func setupMutableStateWithStartedActivity(t *testing.T, startedClock *clockspb.VectorClock) (
285+
*historyi.MockShardContext,
286+
*historyi.MockMutableState,
287+
*historyservice.RecordActivityTaskStartedRequest,
288+
) {
289+
t.Helper()
290+
ctrl := gomock.NewController(t)
291+
292+
mockShard := historyi.NewMockShardContext(ctrl)
293+
mockMS := historyi.NewMockMutableState(ctrl)
294+
295+
nsID := uuid.New().String()
296+
scheduledEventID := int64(5)
297+
requestID := "test-request-id"
298+
299+
// Namespace registry
300+
nsEntry := namespace.NewLocalNamespaceForTest(
301+
&persistencespb.NamespaceInfo{Id: nsID, Name: "test-namespace"},
302+
&persistencespb.NamespaceConfig{},
303+
testClusterName,
304+
)
305+
mockNSReg := namespace.NewMockRegistry(ctrl)
306+
mockNSReg.EXPECT().GetNamespaceByID(namespace.ID(nsID)).Return(nsEntry, nil)
307+
308+
// Cluster metadata
309+
mockClusterMeta := cluster.NewMockMetadata(ctrl)
310+
mockClusterMeta.EXPECT().GetCurrentClusterName().Return(testClusterName)
311+
312+
mockShard.EXPECT().GetNamespaceRegistry().Return(mockNSReg)
313+
mockShard.EXPECT().GetClusterMetadata().Return(mockClusterMeta)
314+
mockShard.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes()
315+
316+
ai := &persistencespb.ActivityInfo{
317+
ScheduledEventId: scheduledEventID,
318+
StartedEventId: 7, // already started
319+
RequestId: requestID,
320+
StartedTime: timestamppb.Now(),
321+
Attempt: 1,
322+
StartedClock: startedClock,
323+
}
324+
325+
mockMS.EXPECT().GetActivityInfo(scheduledEventID).Return(ai, true)
326+
mockMS.EXPECT().GetActivityScheduledEvent(gomock.Any(), scheduledEventID).Return(
327+
&historypb.HistoryEvent{EventId: scheduledEventID}, nil,
328+
)
329+
mockMS.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{})
330+
331+
request := &historyservice.RecordActivityTaskStartedRequest{
332+
NamespaceId: nsID,
333+
WorkflowExecution: &commonpb.WorkflowExecution{
334+
WorkflowId: "test-wf-id",
335+
RunId: "test-run-id",
336+
},
337+
ScheduledEventId: scheduledEventID,
338+
RequestId: requestID,
339+
}
340+
341+
return mockShard, mockMS, request
342+
}
343+
344+
// TestRecordActivityTaskStarted_DuplicateRequest_NilStartedClock verifies that
345+
// when a duplicate RecordActivityTaskStarted request arrives for an activity
346+
// started before the StartedClock field was added (StartedClock is nil), the response
347+
// still contains a non-nil Clock for the shard staleness check.
348+
func TestRecordActivityTaskStarted_DuplicateRequest_NilStartedClock(t *testing.T) {
349+
mockShard, mockMS, request := setupMutableStateWithStartedActivity(t, nil /* no StartedClock */)
350+
351+
// Should call NewVectorClock as fallback for nil StartedClock
352+
fallbackClock := &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 42}
353+
mockShard.EXPECT().NewVectorClock().Return(fallbackClock, nil)
354+
355+
resp, code, err := recordActivityTaskStarted(
356+
context.Background(), mockShard, mockMS, request, nil, nil,
357+
)
358+
require.NoError(t, err)
359+
require.Equal(t, rejectCodeAccepted, code)
360+
require.NotNil(t, resp.Clock, "Clock must be non-nil even for pre-deploy activities")
361+
require.Equal(t, fallbackClock, resp.Clock)
362+
}
363+
364+
// TestRecordActivityTaskStarted_DuplicateRequest_WithStartedClock verifies that
365+
// when StartedClock is stored, the stored clock is returned without creating a new one.
366+
func TestRecordActivityTaskStarted_DuplicateRequest_WithStartedClock(t *testing.T) {
367+
storedClock := &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 100}
368+
mockShard, mockMS, request := setupMutableStateWithStartedActivity(t, storedClock)
369+
370+
// Should NOT call NewVectorClock since StartedClock is available
371+
mockShard.EXPECT().NewVectorClock().Times(0)
372+
373+
resp, code, err := recordActivityTaskStarted(
374+
context.Background(), mockShard, mockMS, request, nil, nil,
375+
)
376+
require.NoError(t, err)
377+
require.Equal(t, rejectCodeAccepted, code)
378+
require.Equal(t, storedClock, resp.Clock, "Should return the stored StartedClock")
379+
}

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,6 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity(
682682
if ai != nil {
683683
// If ai is nil, the activity has already been canceled/completed/timedout. The cancel request
684684
// will be recorded in the history, but no further action will be taken.
685-
686685
if ai.StartedEventId == common.EmptyEventID {
687686
// We haven't started the activity yet, we can cancel the activity right away and
688687
// schedule a workflow task to ensure the workflow makes progress.
@@ -697,37 +696,48 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity(
697696
return nil, err
698697
}
699698
handler.activityNotStartedCancelled = true
700-
} else if ai.StartedEventId != common.EmptyEventID && ai.WorkerControlTaskQueue != "" {
701-
// Activity has started and worker supports Nexus control tasks - collect for batched dispatch.
702-
taskToken, err := handler.tokenSerializer.Serialize(tasktoken.NewActivityTaskToken(
703-
handler.mutableState.GetNamespaceEntry().ID().String(),
704-
handler.mutableState.GetWorkflowKey().WorkflowID,
705-
handler.mutableState.GetWorkflowKey().RunID,
706-
ai.ScheduledEventId,
707-
ai.ActivityId,
708-
ai.ActivityType.GetName(),
709-
ai.Attempt,
710-
nil, // Clock not needed for cancel
711-
ai.Version,
712-
ai.StartVersion,
713-
nil,
714-
))
715-
if err != nil {
716-
return nil, err
717-
}
718-
if handler.pendingWorkerCommandsByControlQueue == nil {
719-
handler.pendingWorkerCommandsByControlQueue = make(map[string][]*workerpb.WorkerCommand)
720-
}
721-
handler.pendingWorkerCommandsByControlQueue[ai.WorkerControlTaskQueue] = append(
722-
handler.pendingWorkerCommandsByControlQueue[ai.WorkerControlTaskQueue],
723-
&workerpb.WorkerCommand{
724-
Type: &workerpb.WorkerCommand_CancelActivity{
725-
CancelActivity: &workerpb.CancelActivityCommand{
726-
TaskToken: taskToken,
699+
} else if ai.WorkerControlTaskQueue != "" {
700+
if ai.StartedClock == nil {
701+
// StartedClock may be nil for activities started before this feature was deployed.
702+
// Skip cancel command; the activity will time out normally.
703+
handler.logger.Info("Skipping worker cancel command: activity missing StartedClock (pre-deploy)",
704+
tag.WorkflowNamespaceID(handler.mutableState.GetWorkflowKey().NamespaceID),
705+
tag.WorkflowID(handler.mutableState.GetWorkflowKey().WorkflowID),
706+
tag.WorkflowRunID(handler.mutableState.GetWorkflowKey().RunID),
707+
tag.WorkflowScheduledEventID(ai.ScheduledEventId),
708+
)
709+
} else {
710+
// Activity has started and worker supports Nexus control tasks - collect for batched dispatch.
711+
taskToken, err := handler.tokenSerializer.Serialize(tasktoken.NewActivityTaskToken(
712+
handler.mutableState.GetNamespaceEntry().ID().String(),
713+
handler.mutableState.GetWorkflowKey().WorkflowID,
714+
handler.mutableState.GetWorkflowKey().RunID,
715+
ai.ScheduledEventId,
716+
ai.ActivityId,
717+
ai.ActivityType.GetName(),
718+
ai.Attempt,
719+
ai.StartedClock,
720+
ai.Version,
721+
ai.StartVersion,
722+
nil,
723+
))
724+
if err != nil {
725+
return nil, err
726+
}
727+
if handler.pendingWorkerCommandsByControlQueue == nil {
728+
handler.pendingWorkerCommandsByControlQueue = make(map[string][]*workerpb.WorkerCommand)
729+
}
730+
handler.pendingWorkerCommandsByControlQueue[ai.WorkerControlTaskQueue] = append(
731+
handler.pendingWorkerCommandsByControlQueue[ai.WorkerControlTaskQueue],
732+
&workerpb.WorkerCommand{
733+
Type: &workerpb.WorkerCommand_CancelActivity{
734+
CancelActivity: &workerpb.CancelActivityCommand{
735+
TaskToken: taskToken,
736+
},
727737
},
728738
},
729-
},
730-
)
739+
)
740+
}
731741
}
732742
}
733743
return actCancelReqEvent, nil

0 commit comments

Comments
 (0)