Skip to content

Commit 2c19379

Browse files
authored
Remove pending activity cancellations when activity completion occurs (#726)
1 parent 69da258 commit 2c19379

File tree

2 files changed

+46
-5
lines changed

2 files changed

+46
-5
lines changed

internal/internal_decision_state_machine.go

+45-4
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ type (
8686
cancelActivityStateMachine struct {
8787
*commandStateMachineBase
8888
attributes *commandpb.RequestCancelActivityTaskCommandAttributes
89+
90+
// The commandsHelper.nextCommandEventIDResetCounter when this command
91+
// incremented commandsHelper.commandsCancelledDuringWFCancellation.
92+
cancelledOnEventIDResetCounter uint64
8993
}
9094

9195
timerCommandStateMachine struct {
@@ -96,6 +100,10 @@ type (
96100
cancelTimerCommandStateMachine struct {
97101
*commandStateMachineBase
98102
attributes *commandpb.CancelTimerCommandAttributes
103+
104+
// The commandsHelper.nextCommandEventIDResetCounter when this command
105+
// incremented commandsHelper.commandsCancelledDuringWFCancellation.
106+
cancelledOnEventIDResetCounter uint64
99107
}
100108

101109
childWorkflowCommandStateMachine struct {
@@ -137,6 +145,12 @@ type (
137145
versionMarkerLookup map[int64]string
138146
commandsCancelledDuringWFCancellation int64
139147
workflowExecutionIsCancelling bool
148+
149+
// Incremented everytime nextCommandEventID and
150+
// commandsCancelledDuringWFCancellation is reset (i.e. on new workflow
151+
// task). Won't ever happen, but technically the way this value is compared
152+
// is safe for overflow wrap around.
153+
nextCommandEventIDResetCounter uint64
140154
}
141155

142156
// panic when command state machine is in illegal state
@@ -528,6 +542,10 @@ func (d *activityCommandStateMachine) cancel() {
528542
}
529543
cancelCmd := d.helper.newCancelActivityStateMachine(attribs)
530544
d.helper.addCommand(cancelCmd)
545+
// We must mark the event ID reset counter for when we performed this
546+
// increment so a potential decrement can only decrement if it wasn't
547+
// reset
548+
cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter
531549
}
532550

533551
d.commandStateMachineBase.cancel()
@@ -541,6 +559,10 @@ func (d *timerCommandStateMachine) cancel() {
541559
}
542560
cancelCmd := d.helper.newCancelTimerCommandStateMachine(attribs)
543561
d.helper.addCommand(cancelCmd)
562+
// We must mark the event ID reset counter for when we performed this
563+
// increment so a potential decrement can only decrement if it wasn't
564+
// reset
565+
cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter
544566
}
545567

546568
d.commandStateMachineBase.cancel()
@@ -824,6 +846,9 @@ func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStarte
824846
// execution as those canceled command events will show up *after* the workflow task completed event.
825847
h.nextCommandEventID = workflowTaskStartedEventID + 2 + h.commandsCancelledDuringWFCancellation
826848
h.commandsCancelledDuringWFCancellation = 0
849+
// We must change the counter here so that others who mutate
850+
// commandsCancelledDuringWFCancellation know it has since been reset
851+
h.nextCommandEventIDResetCounter++
827852
}
828853

829854
func (h *commandsHelper) getNextID() int64 {
@@ -877,14 +902,26 @@ func (h *commandsHelper) addCommand(command commandStateMachine) {
877902
// might be in the same workflow task. In practice this only seems to happen during unhandled command events.
878903
func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) {
879904
// Ensure this isn't misused for non-cancel commands
880-
if commandID.commandType != commandTypeCancelTimer {
881-
panic("removeCancelOfResolvedCommand should only be called for cancel timer")
905+
if commandID.commandType != commandTypeCancelTimer && commandID.commandType != commandTypeRequestCancelActivityTask {
906+
panic("removeCancelOfResolvedCommand should only be called for cancel timer / activity")
882907
}
883908
orderedCmdEl, ok := h.commands[commandID]
884909
if ok {
885910
delete(h.commands, commandID)
886-
h.orderedCommands.Remove(orderedCmdEl)
887-
h.commandsCancelledDuringWFCancellation--
911+
command := h.orderedCommands.Remove(orderedCmdEl)
912+
// Sometimes commandsCancelledDuringWFCancellation was incremented before
913+
// it was reset and sometimes not. We use the reset counter to see if we're
914+
// still on the same iteration where we may have incremented it before.
915+
switch command := command.(type) {
916+
case *cancelActivityStateMachine:
917+
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
918+
h.commandsCancelledDuringWFCancellation--
919+
}
920+
case *cancelTimerCommandStateMachine:
921+
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
922+
h.commandsCancelledDuringWFCancellation--
923+
}
924+
}
888925
}
889926
}
890927

@@ -916,6 +953,10 @@ func (h *commandsHelper) requestCancelActivityTask(activityID string) commandSta
916953

917954
func (h *commandsHelper) handleActivityTaskClosed(activityID string, scheduledEventID int64) commandStateMachine {
918955
command := h.getCommand(makeCommandID(commandTypeActivity, activityID))
956+
// If, for whatever reason, we were going to send an activity cancel request, don't do that anymore
957+
// since we already know the activity is resolved.
958+
possibleCancelID := makeCommandID(commandTypeRequestCancelActivityTask, activityID)
959+
h.removeCancelOfResolvedCommand(possibleCancelID)
919960
command.handleCompletionEvent()
920961
delete(h.scheduledEventIDToActivityID, scheduledEventID)
921962
return command

internal/version.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ package internal
3030
const (
3131
// SDKVersion is a semver (https://semver.org/) that represents the version of this Temporal GoSDK.
3232
// Server validates if SDKVersion fits its supported range and rejects request if it doesn't.
33-
SDKVersion = "1.13.0"
33+
SDKVersion = "1.13.1"
3434

3535
// SupportedServerVersions is a semver rages (https://github.com/blang/semver#ranges) of server versions that
3636
// are supported by this Temporal SDK.

0 commit comments

Comments
 (0)