Skip to content

Commit c825f6c

Browse files
committed
remove useless fields
1 parent be4e14e commit c825f6c

File tree

7 files changed

+27
-60
lines changed

7 files changed

+27
-60
lines changed

downstreamadapter/dispatcher/basic_dispatcher.go

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,6 @@ type BasicDispatcher struct {
165165
// Shared info containing all common configuration and resources
166166
sharedInfo *SharedInfo
167167

168-
eventCollectorBatchCount int
169-
eventCollectorBatchBytes int
170-
171168
// sink is the sink for this dispatcher
172169
sink sink.Sink
173170

@@ -235,35 +232,31 @@ func NewBasicDispatcher(
235232
schemaIDToDispatchers *SchemaIDToDispatchers,
236233
skipSyncpointAtStartTs bool,
237234
skipDMLAsStartTs bool,
238-
eventCollectorBatchCount int,
239-
eventCollectorBatchBytes int,
240235
currentPDTs uint64,
241236
mode int64,
242237
sink sink.Sink,
243238
sharedInfo *SharedInfo,
244239
) *BasicDispatcher {
245240
dispatcher := &BasicDispatcher{
246-
id: id,
247-
tableSpan: tableSpan,
248-
isCompleteTable: common.IsCompleteSpan(tableSpan),
249-
startTs: startTs,
250-
skipSyncpointAtStartTs: skipSyncpointAtStartTs,
251-
skipDMLAsStartTs: skipDMLAsStartTs,
252-
sharedInfo: sharedInfo,
253-
eventCollectorBatchCount: eventCollectorBatchCount,
254-
eventCollectorBatchBytes: eventCollectorBatchBytes,
255-
sink: sink,
256-
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Initializing),
257-
isRemoving: atomic.Bool{},
258-
duringHandleEvents: atomic.Bool{},
259-
blockEventStatus: BlockEventStatus{blockPendingEvent: nil},
260-
tableProgress: NewTableProgress(),
261-
schemaID: schemaID,
262-
schemaIDToDispatchers: schemaIDToDispatchers,
263-
resendTaskMap: newResendTaskMap(),
264-
creationPDTs: currentPDTs,
265-
mode: mode,
266-
BootstrapState: BootstrapFinished,
241+
id: id,
242+
tableSpan: tableSpan,
243+
isCompleteTable: common.IsCompleteSpan(tableSpan),
244+
startTs: startTs,
245+
skipSyncpointAtStartTs: skipSyncpointAtStartTs,
246+
skipDMLAsStartTs: skipDMLAsStartTs,
247+
sharedInfo: sharedInfo,
248+
sink: sink,
249+
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Initializing),
250+
isRemoving: atomic.Bool{},
251+
duringHandleEvents: atomic.Bool{},
252+
blockEventStatus: BlockEventStatus{blockPendingEvent: nil},
253+
tableProgress: NewTableProgress(),
254+
schemaID: schemaID,
255+
schemaIDToDispatchers: schemaIDToDispatchers,
256+
resendTaskMap: newResendTaskMap(),
257+
creationPDTs: currentPDTs,
258+
mode: mode,
259+
BootstrapState: BootstrapFinished,
267260
}
268261
dispatcher.resolvedTs.Store(startTs)
269262

downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,6 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive
158158
NewSchemaIDToDispatchers(),
159159
false,
160160
false,
161-
4096,
162-
0,
163161
200,
164162
common.DefaultMode,
165163
dispatcherSink.Sink(),

downstreamadapter/dispatcher/basic_dispatcher_info.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (d *BasicDispatcher) GetChangefeedID() common.ChangeFeedID {
140140
}
141141

142142
func (d *BasicDispatcher) GetEventCollectorBatchConfig() (batchCount int, batchBytes int) {
143-
return d.eventCollectorBatchCount, d.eventCollectorBatchBytes
143+
return d.sharedInfo.eventCollectorBatchCount, d.sharedInfo.eventCollectorBatchBytes
144144
}
145145

146146
func (d *BasicDispatcher) GetComponentStatus() heartbeatpb.ComponentState {

downstreamadapter/dispatcher/event_dispatcher.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ func NewEventDispatcher(
7373
schemaIDToDispatchers,
7474
skipSyncpointAtStartTs,
7575
skipDMLAsStartTs,
76-
sharedInfo.eventCollectorBatchCount,
77-
sharedInfo.eventCollectorBatchBytes,
7876
currentPdTs,
7977
common.DefaultMode,
8078
sink,

downstreamadapter/dispatcher/redo_dispatcher.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ func NewRedoDispatcher(
5656
schemaIDToDispatchers,
5757
skipSyncpointAtStartTs,
5858
skipDMLAsStartTs,
59-
sharedInfo.eventCollectorBatchCount,
60-
sharedInfo.eventCollectorBatchBytes,
6159
0,
6260
common.RedoMode,
6361
sink,

utils/dynstream/event_queue_batcher_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ func TestBatcherSetLimit(t *testing.T) {
3232
require.Equal(t, 64, b.config.hardBytes)
3333
}
3434

35+
func TestNewAreaSettingsWithMaxPendingSizeAndBatchConfigNormalizesBatchConfig(t *testing.T) {
36+
settings := NewAreaSettingsWithMaxPendingSizeAndBatchConfig(
37+
64*1024*1024, 0, "test", 0, -1,
38+
)
39+
require.Equal(t, newBatchConfig(0, -1), settings.batchConfig)
40+
}
41+
3542
func TestBatchByBytes(t *testing.T) {
3643
handler := mockHandler{}
3744
registry := newAreaBatchConfigRegistry[int](newDefaultBatchConfig())

utils/dynstream/interfaces_test.go

Lines changed: 0 additions & 27 deletions
This file was deleted.

0 commit comments

Comments
 (0)