Skip to content

Log duplicated activity events #6813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,14 @@ func (e *mutableStateBuilder) FlushBufferedEvents() error {

newCommittedEvents = e.trimEventsAfterWorkflowClose(newCommittedEvents)
e.hBuilder.history = newCommittedEvents

// adding logs to help identify duplicate activity task events
// duplicated activity events can cause DecisionTaskFailed events with cause UNHANDLED_DECISION
// and cause workflow to be stuck in decision task failed state
// this can be removed after the root cause is identified and fixed
// TODO: remove this after the root cause is identified and fixed or add deduplication
e.logDuplicatedActivityEvents()

// make sure all new committed events have correct EventID
e.assignEventIDToBufferedEvents()
if err := e.assignTaskIDToEvents(); err != nil {
Expand Down Expand Up @@ -2253,6 +2261,65 @@ func (e *mutableStateBuilder) logDataInconsistency() {
tag.WorkflowRunID(runID),
)
}
func (e *mutableStateBuilder) logDuplicatedActivityEvents() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we have some unit test on those cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll add that to show how it works

type activityTaskUniqueEventParams struct {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't compare events directly. Using this struct to dereference attributes that can guarantee the activity uniqueness

Copy link
Member

@davidporter-id-au davidporter-id-au Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will work, equality in maps is being compared by pointer, not by value ( I think, let's confirm I'm not lying). Let's start with a unit test to validate this is working as expected. I think this is will probably not catch duplicates in its current state, unless you squash it into something that's more comparable like a string.

Ensuring that log is called is a bit annoying, so changing the check to just return a bool is one easy solution. eg:


  isDuplicated := e.checkDuplicatedActivityEvents()
  if isDuplicated{
    e.logger.Error("Duplicate activity task event found",
				tag.WorkflowDomainName(e.GetDomainEntry().GetInfo().Name),
				tag.WorkflowID(e.GetExecutionInfo().WorkflowID),
				tag.WorkflowRunID(e.GetExecutionInfo().RunID),
				tag.WorkflowScheduleID(scheduledEventID),
				tag.WorkflowEventType(event.GetEventType().String())
    )

  }
  })

func (e *mutableStateBuilder) checkDuplicatedActivityEvents() bool {
   ... // check if the fields are duplicated 
}

For the duplicated check, my vague memory was that you were looking to check if the scheduledEventID was duplicated? That's just a primitive type, I think that's possible to just do a map check on? I would start there to begin with, though if you want to check other fields, there's a few solutions, but i'm not entirely sure what it means for the attempt to be duplicated, for example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually seems I am a bit wrong and clearly don't understand map keys well enough

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, you're correct and I'm wrong (https://go.dev/ref/spec#Comparison_operators), (demonstrated here)

Struct types are comparable if all their field types are comparable. Two struct values are equal if their corresponding non-blank field values are equal. The fields are compared in source order, and comparison stops as soon as two field values differ (or all fields have been compared).

So I take back what I said, but would agree with Tim's point about a unit test, both to check and to also catch NPE problems when dereferencing

eventType types.EventType
scheduledEventID int64
attempt int32
startedEventID int64
}

activityTaskUniqueEvents := make(map[activityTaskUniqueEventParams]struct{})

checkActivityTaskEventUniqueness := func(event *types.HistoryEvent) {
uniqueEventParams := activityTaskUniqueEventParams{
eventType: event.GetEventType(),
}

var scheduledEventID int64

switch event.GetEventType() {
case types.EventTypeActivityTaskStarted:
scheduledEventID = event.ActivityTaskStartedEventAttributes.GetScheduledEventID()
uniqueEventParams.scheduledEventID = scheduledEventID
uniqueEventParams.attempt = event.ActivityTaskStartedEventAttributes.Attempt
case types.EventTypeActivityTaskCompleted:
scheduledEventID = event.ActivityTaskCompletedEventAttributes.GetScheduledEventID()
uniqueEventParams.scheduledEventID = scheduledEventID
uniqueEventParams.startedEventID = event.ActivityTaskCompletedEventAttributes.GetStartedEventID()
case types.EventTypeActivityTaskFailed:
scheduledEventID = event.ActivityTaskFailedEventAttributes.GetScheduledEventID()
uniqueEventParams.scheduledEventID = scheduledEventID
uniqueEventParams.startedEventID = event.ActivityTaskFailedEventAttributes.GetStartedEventID()
case types.EventTypeActivityTaskCanceled:
scheduledEventID = event.ActivityTaskCanceledEventAttributes.GetScheduledEventID()
uniqueEventParams.scheduledEventID = scheduledEventID
uniqueEventParams.startedEventID = event.ActivityTaskCanceledEventAttributes.StartedEventID
case types.EventTypeActivityTaskTimedOut:
scheduledEventID = event.ActivityTaskTimedOutEventAttributes.GetScheduledEventID()
uniqueEventParams.scheduledEventID = scheduledEventID
uniqueEventParams.startedEventID = event.ActivityTaskTimedOutEventAttributes.StartedEventID
default:
return
}

if _, ok := activityTaskUniqueEvents[uniqueEventParams]; ok {
e.logger.Error("Duplicate activity task event found",
tag.WorkflowDomainName(e.GetDomainEntry().GetInfo().Name),
tag.WorkflowID(e.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(e.GetExecutionInfo().RunID),
tag.WorkflowScheduleID(scheduledEventID),
tag.WorkflowEventType(event.GetEventType().String()),
)
} else {
activityTaskUniqueEvents[uniqueEventParams] = struct{}{}
}
}

for _, event := range e.hBuilder.history {
checkActivityTaskEventUniqueness(event)
}
}

func mergeMapOfByteArray(
current map[string][]byte,
Expand Down
149 changes: 149 additions & 0 deletions service/history/execution/mutable_state_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
Expand Down Expand Up @@ -3600,6 +3602,153 @@ func TestCloseTransactionAsMutation(t *testing.T) {
}
}

func Test__logDuplicatedActivityEvents(t *testing.T) {
testCases := []struct {
name string
buildHistory func(msb *mutableStateBuilder) []*types.HistoryEvent
assertions func(*testing.T, *observer.ObservedLogs)
}{
{
name: "no duplicates",
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskStarted)
event1.ActivityTaskStartedEventAttributes = &types.ActivityTaskStartedEventAttributes{
ScheduledEventID: 1,
Attempt: 1,
}
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCompleted)
event2.ActivityTaskCompletedEventAttributes = &types.ActivityTaskCompletedEventAttributes{
ScheduledEventID: 1,
}

return []*types.HistoryEvent{event1, event2}
},
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
assert.Equal(t, 0, logs.FilterMessage("Duplicate activity task event found").Len())
},
},
{
name: "started event duplicated",
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskStarted)
event1.ActivityTaskStartedEventAttributes = &types.ActivityTaskStartedEventAttributes{
ScheduledEventID: 1,
Attempt: 1,
}
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskStarted)
event2.ActivityTaskStartedEventAttributes = &types.ActivityTaskStartedEventAttributes{
ScheduledEventID: 1,
Attempt: 1,
}

return []*types.HistoryEvent{event1, event2}
},
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
},
},
{
name: "completed event duplicated",
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCompleted)
event1.ActivityTaskCompletedEventAttributes = &types.ActivityTaskCompletedEventAttributes{
ScheduledEventID: 1,
}
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCompleted)
event2.ActivityTaskCompletedEventAttributes = &types.ActivityTaskCompletedEventAttributes{
ScheduledEventID: 1,
}

return []*types.HistoryEvent{event1, event2}
},
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
},
},
{
name: "canceled event duplicated",
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCanceled)
event1.ActivityTaskCanceledEventAttributes = &types.ActivityTaskCanceledEventAttributes{
ScheduledEventID: 1,
}
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCanceled)
event2.ActivityTaskCanceledEventAttributes = &types.ActivityTaskCanceledEventAttributes{
ScheduledEventID: 1,
}

return []*types.HistoryEvent{event1, event2}
},
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
},
},
{
name: "failed event duplicated",
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskFailed)
event1.ActivityTaskFailedEventAttributes = &types.ActivityTaskFailedEventAttributes{
ScheduledEventID: 1,
}
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskFailed)
event2.ActivityTaskFailedEventAttributes = &types.ActivityTaskFailedEventAttributes{
ScheduledEventID: 1,
}

return []*types.HistoryEvent{event1, event2}
},
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
},
},
{
name: "timed out event duplicated",
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskTimedOut)
event1.ActivityTaskTimedOutEventAttributes = &types.ActivityTaskTimedOutEventAttributes{
ScheduledEventID: 1,
}
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskTimedOut)
event2.ActivityTaskTimedOutEventAttributes = &types.ActivityTaskTimedOutEventAttributes{
ScheduledEventID: 1,
}

return []*types.HistoryEvent{event1, event2}
},
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

core, observedLogs := observer.New(zap.ErrorLevel)

mockCache := events.NewMockCache(ctrl)
shardContext := shard.NewMockContext(ctrl)
mockDomainCache := cache.NewMockDomainCache(ctrl)

msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache)

msb.logger = log.NewLogger(zap.New(core))

msb.executionInfo.DomainID = "some-domain-id"
msb.executionInfo.WorkflowID = "some-workflow-id"
msb.executionInfo.RunID = "some-run-id"

msb.hBuilder.history = tc.buildHistory(msb)

msb.logDuplicatedActivityEvents()

tc.assertions(t, observedLogs)
})
}
}

func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder {
// the MSB constructor calls a bunch of endpoints on the mocks, so
// put them in here as a set of fixed expectations so the actual mocking
Expand Down