Skip to content

Commit 1870653

Browse files
authored
[Improve][CDC] Filter heartbeat event (#8569)
1 parent ffbfdab commit 1870653

File tree

2 files changed

+99
-2
lines changed

2 files changed

+99
-2
lines changed

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.List;
3535
import java.util.Map;
3636

37+
import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isHeartbeatRecord;
38+
3739
@Slf4j
3840
public class DebeziumJsonDeserializeSchema
3941
extends AbstractDebeziumDeserializationSchema<SeaTunnelRow> {
@@ -60,9 +62,13 @@ public DebeziumJsonDeserializeSchema(
6062
@Override
6163
public void deserialize(SourceRecord record, Collector<SeaTunnelRow> out) throws Exception {
6264
super.deserialize(record, out);
65+
if (!isHeartbeatRecord(record)) {
66+
SeaTunnelRow row = deserializationSchema.deserialize(record);
67+
out.collect(row);
68+
return;
69+
}
6370

64-
SeaTunnelRow row = deserializationSchema.deserialize(record);
65-
out.collect(row);
71+
log.debug("Unsupported record {}, just skip.", record);
6672
}
6773

6874
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.debezium.row;
19+
20+
import org.apache.seatunnel.api.source.Collector;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
22+
23+
import org.apache.kafka.connect.data.SchemaBuilder;
24+
import org.apache.kafka.connect.data.Struct;
25+
import org.apache.kafka.connect.source.SourceRecord;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.Collections;
30+
import java.util.Map;
31+
32+
import static org.mockito.ArgumentMatchers.any;
33+
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.times;
35+
import static org.mockito.Mockito.verify;
36+
37+
public class DebeziumJsonDeserializeSchemaTest {
38+
@Test
39+
void deserializeNonHeartbeatRecord() throws Exception {
40+
Map<String, String> debeziumConfig = Collections.EMPTY_MAP;
41+
DebeziumJsonDeserializeSchema schema = new DebeziumJsonDeserializeSchema(debeziumConfig);
42+
43+
// Create a schema for the record
44+
SchemaBuilder schemaBuilder =
45+
SchemaBuilder.struct()
46+
.name("test")
47+
.field("field", SchemaBuilder.string().optional().build());
48+
Struct struct = new Struct(schemaBuilder.build()).put("field", "value");
49+
SourceRecord record =
50+
new SourceRecord(
51+
null,
52+
null,
53+
"test",
54+
schemaBuilder.build(),
55+
struct,
56+
schemaBuilder.build(),
57+
struct);
58+
59+
Collector<SeaTunnelRow> collector = mock(Collector.class);
60+
schema.deserialize(record, collector);
61+
62+
verify(collector, times(1)).collect(any(SeaTunnelRow.class));
63+
}
64+
65+
@Test
66+
void skipHeartbeatRecord() throws Exception {
67+
Map<String, String> debeziumConfig = Collections.EMPTY_MAP;
68+
DebeziumJsonDeserializeSchema schema = new DebeziumJsonDeserializeSchema(debeziumConfig);
69+
70+
// Create a schema for the record
71+
SchemaBuilder schemaBuilder =
72+
SchemaBuilder.struct()
73+
.name("io.debezium.connector.common.Heartbeat")
74+
.field("field", SchemaBuilder.string().optional().build());
75+
Struct struct = new Struct(schemaBuilder.build()).put("field", "value");
76+
SourceRecord record =
77+
new SourceRecord(
78+
null,
79+
null,
80+
"test",
81+
schemaBuilder.build(),
82+
struct,
83+
schemaBuilder.build(),
84+
struct);
85+
86+
Collector<SeaTunnelRow> collector = mock(Collector.class);
87+
schema.deserialize(record, collector);
88+
89+
verify(collector, times(0)).collect(any(SeaTunnelRow.class));
90+
}
91+
}

0 commit comments

Comments
 (0)