Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,12 @@ func TestProcessTableTriggerDispatcherSendsSignalResolvedWhenNoForwardRangeAndNo

changefeed := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval())
dispatcher := newDispatcherStat(info, 1, 1, nil, changefeed)
dispatcher.lastSentResolvedTsTime.Store(time.Now().Add(-defaultSendResolvedTsInterval - time.Second))
// Keep the test focused on the signal resolved path instead of coupling it with
// the separate handshake behavior.
dispatcher.seq.Store(1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider using the setHandshaked() method instead of directly storing a value in the unexported seq field. This is more idiomatic, ensures consistency with many other tests in this file (e.g., lines 172, 321, 400, 700), and provides better encapsulation of the dispatcher's handshake state.

Suggested change
dispatcher.seq.Store(1)
dispatcher.setHandshaked()

// Use the zero time to bypass the rate limit deterministically without depending
// on wall clock timing.
dispatcher.lastSentResolvedTsTime.Store(time.Time{})

dispatcherPtr := &atomic.Pointer[dispatcherStat]{}
dispatcherPtr.Store(dispatcher)
Expand All @@ -1497,12 +1502,10 @@ func TestProcessTableTriggerDispatcherSendsSignalResolvedWhenNoForwardRangeAndNo

broker.processTableTriggerDispatcher(context.Background(), dispatcher.id, dispatcher)

require.Equal(t, 2, len(broker.messageCh[dispatcher.messageWorkerIndex]))
first := <-broker.messageCh[dispatcher.messageWorkerIndex]
second := <-broker.messageCh[dispatcher.messageWorkerIndex]
require.Equal(t, event.TypeHandshakeEvent, first.msgType)
require.Equal(t, event.TypeResolvedEvent, second.msgType)
require.Equal(t, ts10, second.resolvedTsEvent.ResolvedTs)
require.Equal(t, 1, len(broker.messageCh[dispatcher.messageWorkerIndex]))
msg := <-broker.messageCh[dispatcher.messageWorkerIndex]
require.Equal(t, event.TypeResolvedEvent, msg.msgType)
require.Equal(t, ts10, msg.resolvedTsEvent.ResolvedTs)
require.Equal(t, ts10, dispatcher.sentResolvedTs.Load())
require.Equal(t, ts20, dispatcher.nextSyncPoint.Load())
}
Expand Down
Loading