Skip to content

Commit ed65c43

Browse files
authored
test: modify debezium integration test (#3537)
ref #442
1 parent 7222a7c commit ed65c43

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

tests/integration_tests/debezium01/src/test_cases.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
"go.uber.org/zap"
3737
)
3838

39-
const timeout = time.Second * 20
39+
const timeout = time.Second * 30
4040

4141
var (
4242
nFailed = 0
@@ -164,14 +164,16 @@ func runTestCase(testCasePath, dbConnMySQL, dbConnTiDB string) {
164164
}
165165
}
166166

167-
func fetchNextCDCRecord(reader *kafka.Reader, kind Kind, timeout time.Duration) (map[string]any, map[string]any, error) {
167+
func fetchNextCDCRecord(reader *kafka.Reader, kind Kind, timeout time.Duration, needWait bool, query string) (map[string]any, map[string]any, error) {
168168
ctx, cancel := context.WithTimeout(context.Background(), timeout)
169169
defer cancel()
170170
for {
171171
m, err := reader.FetchMessage(ctx)
172172
if err != nil {
173173
if errors.Is(err, context.DeadlineExceeded) {
174-
logger.Warn("fetch record timed out", zap.Error(err))
174+
if needWait {
175+
logger.Warn("fetch record timed out", zap.String("query", query), zap.Any("kind", kind), zap.Error(err))
176+
}
175177
return nil, nil, nil
176178
}
177179
return nil, nil, fmt.Errorf("Failed to read CDC record of %s: %w", kind, err)
@@ -299,14 +301,14 @@ func runSingleQuery(query string, waitCDCRows bool) bool {
299301
wg := &sync.WaitGroup{}
300302
wg.Add(2)
301303
go func() {
302-
if _, _, err := fetchNextCDCRecord(readerDebezium, KindMySQL, timeout); err != nil {
303-
logger.Error("fetch record failed", zap.Error(err))
304+
if _, _, err := fetchNextCDCRecord(readerDebezium, KindMySQL, timeout, false, query); err != nil {
305+
logger.Error("fetch record failed", zap.String("query", query), zap.Error(err))
304306
}
305307
wg.Done()
306308
}()
307309
go func() {
308-
if _, _, err := fetchNextCDCRecord(readerTiCDC, KindTiDB, timeout); err != nil {
309-
logger.Error("fetch record failed", zap.Error(err))
310+
if _, _, err := fetchNextCDCRecord(readerTiCDC, KindTiDB, timeout, false, query); err != nil {
311+
logger.Error("fetch record failed", zap.String("query", query), zap.Error(err))
310312
}
311313
wg.Done()
312314
}()
@@ -333,17 +335,17 @@ func runSingleQuery(query string, waitCDCRows bool) bool {
333335
wg.Add(2)
334336
go func() {
335337
var err error
336-
keyMapsDebezium, objsDebezium, err = fetchNextCDCRecord(readerDebezium, KindMySQL, timeout)
338+
keyMapsDebezium, objsDebezium, err = fetchNextCDCRecord(readerDebezium, KindMySQL, timeout, true, query)
337339
if err != nil {
338-
logger.Error("fetch record failed", zap.Error(err))
340+
logger.Error("fetch record failed", zap.String("query", query), zap.Error(err))
339341
}
340342
wg.Done()
341343
}()
342344
go func() {
343345
var err error
344-
keyMapsTiCDC, objsTiCDC, err = fetchNextCDCRecord(readerTiCDC, KindTiDB, timeout)
346+
keyMapsTiCDC, objsTiCDC, err = fetchNextCDCRecord(readerTiCDC, KindTiDB, timeout, true, query)
345347
if err != nil {
346-
logger.Error("fetch record failed", zap.Error(err))
348+
logger.Error("fetch record failed", zap.String("query", query), zap.Error(err))
347349
}
348350
wg.Done()
349351
}()

tests/integration_tests/run_heavy_it_in_ci.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ kafka_groups=(
9696
# G14
9797
'debezium02'
9898
# G15
99-
# 'debezium03'
99+
'debezium03'
100100
)
101101

102102
# 12 CPU cores will be allocated to run each pulsar heavy group in CI pipelines.

0 commit comments

Comments
 (0)