forked from pingcap/ticdc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmock_sink_helper_test.go
More file actions
109 lines (95 loc) · 3.15 KB
/
mock_sink_helper_test.go
File metadata and controls
109 lines (95 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package dispatcher
import (
"sync"
"sync/atomic"
"testing"
"github.com/golang/mock/gomock"
"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/downstreamadapter/sink/mock"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
)
// dispatcherTestSink wraps gomock sink and keeps the few stateful helpers that
// old tests used (captured DML events and normal/abnormal switch).
type dispatcherTestSink struct {
sink *mock.MockSink
mu sync.Mutex
dmls []*commonEvent.DMLEvent
isNormal atomic.Bool
flushMu sync.Mutex
flushBeforeBlockHook func(commonEvent.BlockEvent) error
}
func newDispatcherTestSink(t *testing.T, sinkType common.SinkType) *dispatcherTestSink {
t.Helper()
ctrl := gomock.NewController(t)
testSink := &dispatcherTestSink{
sink: mock.NewMockSink(ctrl),
dmls: make([]*commonEvent.DMLEvent, 0),
}
testSink.isNormal.Store(true)
testSink.sink.EXPECT().SinkType().Return(sinkType).AnyTimes()
testSink.sink.EXPECT().IsNormal().DoAndReturn(func() bool {
return testSink.isNormal.Load()
}).AnyTimes()
testSink.sink.EXPECT().AddDMLEvent(gomock.Any()).Do(func(event *commonEvent.DMLEvent) {
testSink.mu.Lock()
defer testSink.mu.Unlock()
testSink.dmls = append(testSink.dmls, event)
}).AnyTimes()
testSink.sink.EXPECT().WriteBlockEvent(gomock.Any()).DoAndReturn(func(event commonEvent.BlockEvent) error {
event.PostFlush()
return nil
}).AnyTimes()
testSink.sink.EXPECT().FlushDMLBeforeBlock(gomock.Any()).DoAndReturn(func(event commonEvent.BlockEvent) error {
testSink.flushMu.Lock()
hook := testSink.flushBeforeBlockHook
testSink.flushMu.Unlock()
if hook != nil {
return hook(event)
}
return nil
}).AnyTimes()
testSink.sink.EXPECT().AddCheckpointTs(gomock.Any()).AnyTimes()
testSink.sink.EXPECT().SetTableSchemaStore(gomock.Any()).AnyTimes()
testSink.sink.EXPECT().Close(gomock.Any()).AnyTimes()
testSink.sink.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes()
return testSink
}
func (s *dispatcherTestSink) Sink() sink.Sink {
return s.sink
}
func (s *dispatcherTestSink) SetIsNormal(isNormal bool) {
s.isNormal.Store(isNormal)
}
func (s *dispatcherTestSink) SetFlushBeforeBlockHook(hook func(commonEvent.BlockEvent) error) {
s.flushMu.Lock()
defer s.flushMu.Unlock()
s.flushBeforeBlockHook = hook
}
func (s *dispatcherTestSink) GetDMLs() []*commonEvent.DMLEvent {
s.mu.Lock()
defer s.mu.Unlock()
dmls := make([]*commonEvent.DMLEvent, len(s.dmls))
copy(dmls, s.dmls)
return dmls
}
func (s *dispatcherTestSink) FlushDMLs() {
s.mu.Lock()
defer s.mu.Unlock()
for _, dml := range s.dmls {
dml.PostFlush()
}
s.dmls = s.dmls[:0]
}