Skip to content

Commit a155e42

Browse files
authored
Log duplicated activity events (#6813)
* Log duplicated activity events
1 parent 8376a05 commit a155e42

File tree

2 files changed

+216
-0
lines changed

2 files changed

+216
-0
lines changed

Diff for: service/history/execution/mutable_state_builder.go

+67
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,14 @@ func (e *mutableStateBuilder) FlushBufferedEvents() error {
529529

530530
newCommittedEvents = e.trimEventsAfterWorkflowClose(newCommittedEvents)
531531
e.hBuilder.history = newCommittedEvents
532+
533+
// adding logs to help identify duplicate activity task events
534+
// duplicated activity events can cause DecisionTaskFailed events with cause UNHANDLED_DECISION
535+
// and cause workflow to be stuck in decision task failed state
536+
// this can be removed after the root cause is identified and fixed
537+
// TODO: remove this after the root cause is identified and fixed or add deduplication
538+
e.logDuplicatedActivityEvents()
539+
532540
// make sure all new committed events have correct EventID
533541
e.assignEventIDToBufferedEvents()
534542
if err := e.assignTaskIDToEvents(); err != nil {
@@ -2253,6 +2261,65 @@ func (e *mutableStateBuilder) logDataInconsistency() {
22532261
tag.WorkflowRunID(runID),
22542262
)
22552263
}
2264+
func (e *mutableStateBuilder) logDuplicatedActivityEvents() {
2265+
type activityTaskUniqueEventParams struct {
2266+
eventType types.EventType
2267+
scheduledEventID int64
2268+
attempt int32
2269+
startedEventID int64
2270+
}
2271+
2272+
activityTaskUniqueEvents := make(map[activityTaskUniqueEventParams]struct{})
2273+
2274+
checkActivityTaskEventUniqueness := func(event *types.HistoryEvent) {
2275+
uniqueEventParams := activityTaskUniqueEventParams{
2276+
eventType: event.GetEventType(),
2277+
}
2278+
2279+
var scheduledEventID int64
2280+
2281+
switch event.GetEventType() {
2282+
case types.EventTypeActivityTaskStarted:
2283+
scheduledEventID = event.ActivityTaskStartedEventAttributes.GetScheduledEventID()
2284+
uniqueEventParams.scheduledEventID = scheduledEventID
2285+
uniqueEventParams.attempt = event.ActivityTaskStartedEventAttributes.Attempt
2286+
case types.EventTypeActivityTaskCompleted:
2287+
scheduledEventID = event.ActivityTaskCompletedEventAttributes.GetScheduledEventID()
2288+
uniqueEventParams.scheduledEventID = scheduledEventID
2289+
uniqueEventParams.startedEventID = event.ActivityTaskCompletedEventAttributes.GetStartedEventID()
2290+
case types.EventTypeActivityTaskFailed:
2291+
scheduledEventID = event.ActivityTaskFailedEventAttributes.GetScheduledEventID()
2292+
uniqueEventParams.scheduledEventID = scheduledEventID
2293+
uniqueEventParams.startedEventID = event.ActivityTaskFailedEventAttributes.GetStartedEventID()
2294+
case types.EventTypeActivityTaskCanceled:
2295+
scheduledEventID = event.ActivityTaskCanceledEventAttributes.GetScheduledEventID()
2296+
uniqueEventParams.scheduledEventID = scheduledEventID
2297+
uniqueEventParams.startedEventID = event.ActivityTaskCanceledEventAttributes.StartedEventID
2298+
case types.EventTypeActivityTaskTimedOut:
2299+
scheduledEventID = event.ActivityTaskTimedOutEventAttributes.GetScheduledEventID()
2300+
uniqueEventParams.scheduledEventID = scheduledEventID
2301+
uniqueEventParams.startedEventID = event.ActivityTaskTimedOutEventAttributes.StartedEventID
2302+
default:
2303+
return
2304+
}
2305+
2306+
if _, ok := activityTaskUniqueEvents[uniqueEventParams]; ok {
2307+
e.logger.Error("Duplicate activity task event found",
2308+
tag.WorkflowDomainName(e.GetDomainEntry().GetInfo().Name),
2309+
tag.WorkflowID(e.GetExecutionInfo().WorkflowID),
2310+
tag.WorkflowRunID(e.GetExecutionInfo().RunID),
2311+
tag.WorkflowScheduleID(scheduledEventID),
2312+
tag.WorkflowEventType(event.GetEventType().String()),
2313+
)
2314+
} else {
2315+
activityTaskUniqueEvents[uniqueEventParams] = struct{}{}
2316+
}
2317+
}
2318+
2319+
for _, event := range e.hBuilder.history {
2320+
checkActivityTaskEventUniqueness(event)
2321+
}
2322+
}
22562323

22572324
func mergeMapOfByteArray(
22582325
current map[string][]byte,

Diff for: service/history/execution/mutable_state_builder_test.go

+149
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
"github.com/stretchr/testify/suite"
3434
"github.com/uber-go/tally"
3535
"go.uber.org/mock/gomock"
36+
"go.uber.org/zap"
37+
"go.uber.org/zap/zaptest/observer"
3638

3739
"github.com/uber/cadence/common"
3840
"github.com/uber/cadence/common/backoff"
@@ -3600,6 +3602,153 @@ func TestCloseTransactionAsMutation(t *testing.T) {
36003602
}
36013603
}
36023604

3605+
func Test__logDuplicatedActivityEvents(t *testing.T) {
3606+
testCases := []struct {
3607+
name string
3608+
buildHistory func(msb *mutableStateBuilder) []*types.HistoryEvent
3609+
assertions func(*testing.T, *observer.ObservedLogs)
3610+
}{
3611+
{
3612+
name: "no duplicates",
3613+
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
3614+
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskStarted)
3615+
event1.ActivityTaskStartedEventAttributes = &types.ActivityTaskStartedEventAttributes{
3616+
ScheduledEventID: 1,
3617+
Attempt: 1,
3618+
}
3619+
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCompleted)
3620+
event2.ActivityTaskCompletedEventAttributes = &types.ActivityTaskCompletedEventAttributes{
3621+
ScheduledEventID: 1,
3622+
}
3623+
3624+
return []*types.HistoryEvent{event1, event2}
3625+
},
3626+
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
3627+
assert.Equal(t, 0, logs.FilterMessage("Duplicate activity task event found").Len())
3628+
},
3629+
},
3630+
{
3631+
name: "started event duplicated",
3632+
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
3633+
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskStarted)
3634+
event1.ActivityTaskStartedEventAttributes = &types.ActivityTaskStartedEventAttributes{
3635+
ScheduledEventID: 1,
3636+
Attempt: 1,
3637+
}
3638+
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskStarted)
3639+
event2.ActivityTaskStartedEventAttributes = &types.ActivityTaskStartedEventAttributes{
3640+
ScheduledEventID: 1,
3641+
Attempt: 1,
3642+
}
3643+
3644+
return []*types.HistoryEvent{event1, event2}
3645+
},
3646+
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
3647+
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
3648+
},
3649+
},
3650+
{
3651+
name: "completed event duplicated",
3652+
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
3653+
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCompleted)
3654+
event1.ActivityTaskCompletedEventAttributes = &types.ActivityTaskCompletedEventAttributes{
3655+
ScheduledEventID: 1,
3656+
}
3657+
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCompleted)
3658+
event2.ActivityTaskCompletedEventAttributes = &types.ActivityTaskCompletedEventAttributes{
3659+
ScheduledEventID: 1,
3660+
}
3661+
3662+
return []*types.HistoryEvent{event1, event2}
3663+
},
3664+
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
3665+
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
3666+
},
3667+
},
3668+
{
3669+
name: "canceled event duplicated",
3670+
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
3671+
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCanceled)
3672+
event1.ActivityTaskCanceledEventAttributes = &types.ActivityTaskCanceledEventAttributes{
3673+
ScheduledEventID: 1,
3674+
}
3675+
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskCanceled)
3676+
event2.ActivityTaskCanceledEventAttributes = &types.ActivityTaskCanceledEventAttributes{
3677+
ScheduledEventID: 1,
3678+
}
3679+
3680+
return []*types.HistoryEvent{event1, event2}
3681+
},
3682+
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
3683+
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
3684+
},
3685+
},
3686+
{
3687+
name: "failed event duplicated",
3688+
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
3689+
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskFailed)
3690+
event1.ActivityTaskFailedEventAttributes = &types.ActivityTaskFailedEventAttributes{
3691+
ScheduledEventID: 1,
3692+
}
3693+
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskFailed)
3694+
event2.ActivityTaskFailedEventAttributes = &types.ActivityTaskFailedEventAttributes{
3695+
ScheduledEventID: 1,
3696+
}
3697+
3698+
return []*types.HistoryEvent{event1, event2}
3699+
},
3700+
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
3701+
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
3702+
},
3703+
},
3704+
{
3705+
name: "timed out event duplicated",
3706+
buildHistory: func(msb *mutableStateBuilder) []*types.HistoryEvent {
3707+
event1 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskTimedOut)
3708+
event1.ActivityTaskTimedOutEventAttributes = &types.ActivityTaskTimedOutEventAttributes{
3709+
ScheduledEventID: 1,
3710+
}
3711+
event2 := msb.CreateNewHistoryEvent(types.EventTypeActivityTaskTimedOut)
3712+
event2.ActivityTaskTimedOutEventAttributes = &types.ActivityTaskTimedOutEventAttributes{
3713+
ScheduledEventID: 1,
3714+
}
3715+
3716+
return []*types.HistoryEvent{event1, event2}
3717+
},
3718+
assertions: func(t *testing.T, logs *observer.ObservedLogs) {
3719+
assert.Equal(t, 1, logs.FilterMessage("Duplicate activity task event found").Len())
3720+
},
3721+
},
3722+
}
3723+
3724+
for _, tc := range testCases {
3725+
t.Run(tc.name, func(t *testing.T) {
3726+
ctrl := gomock.NewController(t)
3727+
defer ctrl.Finish()
3728+
3729+
core, observedLogs := observer.New(zap.ErrorLevel)
3730+
3731+
mockCache := events.NewMockCache(ctrl)
3732+
shardContext := shard.NewMockContext(ctrl)
3733+
mockDomainCache := cache.NewMockDomainCache(ctrl)
3734+
3735+
msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache)
3736+
3737+
msb.logger = log.NewLogger(zap.New(core))
3738+
3739+
msb.executionInfo.DomainID = "some-domain-id"
3740+
msb.executionInfo.WorkflowID = "some-workflow-id"
3741+
msb.executionInfo.RunID = "some-run-id"
3742+
3743+
msb.hBuilder.history = tc.buildHistory(msb)
3744+
3745+
msb.logDuplicatedActivityEvents()
3746+
3747+
tc.assertions(t, observedLogs)
3748+
})
3749+
}
3750+
}
3751+
36033752
func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder {
36043753
// the MSB constructor calls a bunch of endpoints on the mocks, so
36053754
// put them in here as a set of fixed expectations so the actual mocking

0 commit comments

Comments
 (0)