Skip to content

Commit fe33422

Browse files
authored
[Improve][CDC] Optimize split state memory allocation in increment phase (#6554)
1 parent e60beb2 commit fe33422

File tree

18 files changed

+921
-13
lines changed

18 files changed

+921
-13
lines changed

Diff for: pom.xml

+14
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
<jcommander.version>1.81</jcommander.version>
118118
<junit4.version>4.13.2</junit4.version>
119119
<junit5.version>5.9.0</junit5.version>
120+
<mockito.version>4.11.0</mockito.version>
120121
<config.version>1.3.3</config.version>
121122
<maven-shade-plugin.version>3.3.0</maven-shade-plugin.version>
122123
<maven-helper-plugin.version>3.2.0</maven-helper-plugin.version>
@@ -357,6 +358,13 @@
357358
<version>${junit4.version}</version>
358359
</dependency>
359360

361+
<dependency>
362+
<groupId>org.mockito</groupId>
363+
<artifactId>mockito-junit-jupiter</artifactId>
364+
<version>${mockito.version}</version>
365+
<scope>test</scope>
366+
</dependency>
367+
360368
<dependency>
361369
<groupId>com.fasterxml.jackson.core</groupId>
362370
<artifactId>jackson-annotations</artifactId>
@@ -521,6 +529,12 @@
521529
<scope>test</scope>
522530
</dependency>
523531

532+
<dependency>
533+
<groupId>org.mockito</groupId>
534+
<artifactId>mockito-junit-jupiter</artifactId>
535+
<scope>test</scope>
536+
</dependency>
537+
524538
</dependencies>
525539

526540
<build>

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java

+18
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import org.apache.kafka.connect.source.SourceRecord;
2626

2727
import io.debezium.config.CommonConnectorConfig;
28+
import io.debezium.config.Configuration;
2829
import io.debezium.connector.base.ChangeEventQueue;
30+
import io.debezium.heartbeat.Heartbeat;
2931
import io.debezium.pipeline.DataChangeEvent;
3032
import io.debezium.pipeline.EventDispatcher;
3133
import io.debezium.pipeline.source.spi.EventMetadataProvider;
@@ -37,6 +39,8 @@
3739
import io.debezium.schema.TopicSelector;
3840
import io.debezium.util.SchemaNameAdjuster;
3941

42+
import java.time.Duration;
43+
import java.time.temporal.ChronoUnit;
4044
import java.util.Map;
4145

4246
/**
@@ -71,6 +75,10 @@ public JdbcSourceEventDispatcher(
7175
filter,
7276
changeEventCreator,
7377
metadataProvider,
78+
Heartbeat.create(
79+
getHeartbeatInterval(connectorConfig),
80+
topicSelector.getHeartbeatTopic(),
81+
connectorConfig.getLogicalName()),
7482
schemaNameAdjuster);
7583
this.queue = queue;
7684
this.topic = topicSelector.getPrimaryTopic();
@@ -92,4 +100,14 @@ public void dispatchWatermarkEvent(
92100
sourcePartition, topic, sourceSplit.splitId(), watermarkKind, watermark);
93101
queue.enqueue(new DataChangeEvent(sourceRecord));
94102
}
103+
104+
private static Duration getHeartbeatInterval(CommonConnectorConfig connectorConfig) {
105+
Configuration configuration = connectorConfig.getConfig();
106+
Duration heartbeatInterval =
107+
configuration.getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
108+
if (heartbeatInterval.isZero()) {
109+
return Duration.ofMillis(5000);
110+
}
111+
return heartbeatInterval;
112+
}
95113
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java

+22
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
2022
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
2123
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
2224
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
@@ -31,9 +33,11 @@
3133
import io.debezium.relational.TableId;
3234

3335
import java.util.ArrayList;
36+
import java.util.Arrays;
3437
import java.util.Collection;
3538
import java.util.List;
3639
import java.util.Optional;
40+
import java.util.function.Predicate;
3741

3842
/** Assigner for Hybrid split which contains snapshot splits and incremental splits. */
3943
public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigner {
@@ -146,4 +150,22 @@ public void notifyCheckpointComplete(long checkpointId) {
146150
snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
147151
incrementalSplitAssigner.notifyCheckpointComplete(checkpointId);
148152
}
153+
154+
@VisibleForTesting
155+
IncrementalSplitAssigner<C> getIncrementalSplitAssigner() {
156+
return incrementalSplitAssigner;
157+
}
158+
159+
@VisibleForTesting
160+
SnapshotSplitAssigner<C> getSnapshotSplitAssigner() {
161+
return snapshotSplitAssigner;
162+
}
163+
164+
public boolean completedSnapshotPhase(List<TableId> tableIds) {
165+
return Arrays.asList(
166+
snapshotSplitAssigner.completedSnapshotPhase(tableIds),
167+
incrementalSplitAssigner.completedSnapshotPhase(tableIds))
168+
.stream()
169+
.allMatch(Predicate.isEqual(true));
170+
}
149171
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java

+12
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.api.source.SourceEvent;
2121
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2222
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
23+
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
2324
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsAckEvent;
2425
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
2526
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
@@ -120,6 +121,17 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
120121
.map(SnapshotSplitWatermark::getSplitId)
121122
.collect(Collectors.toList()));
122123
context.sendEventToSourceReader(subtaskId, ackEvent);
124+
} else if (sourceEvent instanceof CompletedSnapshotPhaseEvent) {
125+
LOG.debug(
126+
"The enumerator receives completed snapshot phase event {} from subtask {}.",
127+
sourceEvent,
128+
subtaskId);
129+
CompletedSnapshotPhaseEvent event = (CompletedSnapshotPhaseEvent) sourceEvent;
130+
if (splitAssigner instanceof HybridSplitAssigner) {
131+
((HybridSplitAssigner) splitAssigner).completedSnapshotPhase(event.getTableIds());
132+
LOG.info(
133+
"Clean the SnapshotSplitAssigner#assignedSplits/splitCompletedOffsets to empty.");
134+
}
123135
}
124136
}
125137

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java

+23
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
2022
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2123
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
2224
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
@@ -45,6 +47,8 @@
4547
import java.util.Set;
4648
import java.util.stream.Collectors;
4749

50+
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
51+
4852
/** Assigner for incremental split. */
4953
public class IncrementalSplitAssigner<C extends SourceConfig> implements SplitAssigner {
5054

@@ -255,4 +259,23 @@ private IncrementalSplit createIncrementalSplit(
255259
completedSnapshotSplitInfos,
256260
checkpointDataType);
257261
}
262+
263+
@VisibleForTesting
264+
void setSplitAssigned(boolean assigned) {
265+
this.splitAssigned = assigned;
266+
}
267+
268+
public boolean completedSnapshotPhase(List<TableId> tableIds) {
269+
checkArgument(splitAssigned && noMoreSplits());
270+
271+
for (String splitKey : new ArrayList<>(context.getAssignedSnapshotSplit().keySet())) {
272+
SnapshotSplit assignedSplit = context.getAssignedSnapshotSplit().get(splitKey);
273+
if (tableIds.contains(assignedSplit.getTableId())) {
274+
context.getAssignedSnapshotSplit().remove(splitKey);
275+
context.getSplitCompletedOffsets().remove(assignedSplit.splitId());
276+
}
277+
}
278+
return context.getAssignedSnapshotSplit().isEmpty()
279+
&& context.getSplitCompletedOffsets().isEmpty();
280+
}
258281
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java

+28
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
2022
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
2123
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
2224
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
@@ -45,6 +47,8 @@
4547
import java.util.concurrent.ConcurrentLinkedQueue;
4648
import java.util.stream.Collectors;
4749

50+
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
51+
4852
/** Assigner for snapshot split. */
4953
public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner {
5054
private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class);
@@ -278,4 +282,28 @@ public boolean isCompleted() {
278282
private boolean allSplitsCompleted() {
279283
return noMoreSplits() && assignedSplits.size() == splitCompletedOffsets.size();
280284
}
285+
286+
@VisibleForTesting
287+
Map<String, SnapshotSplit> getAssignedSplits() {
288+
return assignedSplits;
289+
}
290+
291+
@VisibleForTesting
292+
Map<String, SnapshotSplitWatermark> getSplitCompletedOffsets() {
293+
return splitCompletedOffsets;
294+
}
295+
296+
public boolean completedSnapshotPhase(List<TableId> tableIds) {
297+
checkArgument(isCompleted() && allSplitsCompleted());
298+
299+
for (String splitKey : new ArrayList<>(assignedSplits.keySet())) {
300+
SnapshotSplit assignedSplit = assignedSplits.get(splitKey);
301+
if (tableIds.contains(assignedSplit.getTableId())) {
302+
assignedSplits.remove(splitKey);
303+
splitCompletedOffsets.remove(assignedSplit.splitId());
304+
}
305+
}
306+
307+
return assignedSplits.isEmpty() && splitCompletedOffsets.isEmpty();
308+
}
281309
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.source.event;
19+
20+
import org.apache.seatunnel.api.source.SourceEvent;
21+
22+
import io.debezium.relational.TableId;
23+
import lombok.AllArgsConstructor;
24+
import lombok.Data;
25+
26+
import java.util.List;
27+
28+
@Data
29+
@AllArgsConstructor
30+
public class CompletedSnapshotPhaseEvent implements SourceEvent {
31+
private static final long serialVersionUID = 1L;
32+
33+
private List<TableId> tableIds;
34+
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2323
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
2424
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
25+
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
2526
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
2627
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
2728
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
@@ -207,7 +208,19 @@ protected SourceSplitStateBase initializedState(SourceSplitBase split) {
207208
debeziumDeserializationSchema.restoreCheckpointProducedType(
208209
incrementalSplit.getCheckpointDataType());
209210
}
210-
return new IncrementalSplitState(split.asIncrementalSplit());
211+
IncrementalSplitState splitState = new IncrementalSplitState(incrementalSplit);
212+
if (splitState.autoEnterPureIncrementPhaseIfAllowed()) {
213+
log.info(
214+
"The incremental split[{}] startup position {} is equal the maxSnapshotSplitsHighWatermark {}, auto enter pure increment phase.",
215+
incrementalSplit.splitId(),
216+
splitState.getStartupOffset(),
217+
splitState.getMaxSnapshotSplitsHighWatermark());
218+
log.info("Clean the IncrementalSplit#completedSnapshotSplitInfos to empty.");
219+
CompletedSnapshotPhaseEvent event =
220+
new CompletedSnapshotPhaseEvent(splitState.getTableIds());
221+
context.sendSourceEventToEnumerator(event);
222+
}
223+
return splitState;
211224
}
212225
}
213226

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java

+28
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.apache.seatunnel.api.source.Collector;
2323
import org.apache.seatunnel.api.source.SourceReader;
2424
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
25+
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
2526
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
2627
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
2728
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
29+
import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
2830
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
2931
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
3032
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
@@ -65,6 +67,7 @@ public class IncrementalSourceRecordEmitter<T>
6567

6668
protected final OffsetFactory offsetFactory;
6769

70+
protected final SourceReader.Context context;
6871
protected final Counter recordFetchDelay;
6972
protected final Counter recordEmitDelay;
7073
protected final EventListener eventListener;
@@ -76,6 +79,7 @@ public IncrementalSourceRecordEmitter(
7679
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
7780
this.outputCollector = new OutputCollector<>();
7881
this.offsetFactory = offsetFactory;
82+
this.context = context;
7983
this.recordFetchDelay = context.getMetricsContext().counter(CDC_RECORD_FETCH_DELAY);
8084
this.recordEmitDelay = context.getMetricsContext().counter(CDC_RECORD_EMIT_DELAY);
8185
this.eventListener = context.getEventListener();
@@ -90,6 +94,7 @@ public void emitRecord(
9094
SourceRecord next = elementIterator.next();
9195
reportMetrics(next);
9296
processElement(next, collector, splitState);
97+
markEnterPureIncrementPhase(next, splitState);
9398
}
9499
}
95100

@@ -138,6 +143,29 @@ protected void processElement(
138143
}
139144
}
140145

146+
private void markEnterPureIncrementPhase(
147+
SourceRecord element, SourceSplitStateBase splitState) {
148+
if (splitState.isIncrementalSplitState()) {
149+
IncrementalSplitState incrementalSplitState = splitState.asIncrementalSplitState();
150+
if (incrementalSplitState.isEnterPureIncrementPhase()) {
151+
return;
152+
}
153+
Offset position = getOffsetPosition(element);
154+
if (incrementalSplitState.markEnterPureIncrementPhaseIfNeed(position)) {
155+
log.info(
156+
"The current record position {} is after the maxSnapshotSplitsHighWatermark {}, "
157+
+ "mark enter pure increment phase.",
158+
position,
159+
incrementalSplitState.getMaxSnapshotSplitsHighWatermark());
160+
log.info("Clean the IncrementalSplit#completedSnapshotSplitInfos to empty.");
161+
162+
CompletedSnapshotPhaseEvent completedSnapshotPhaseEvent =
163+
new CompletedSnapshotPhaseEvent(incrementalSplitState.getTableIds());
164+
context.sendSourceEventToEnumerator(completedSnapshotPhaseEvent);
165+
}
166+
}
167+
}
168+
141169
private Offset getWatermark(SourceRecord watermarkEvent) {
142170
return getOffsetPosition(watermarkEvent.sourceOffset());
143171
}

0 commit comments

Comments
 (0)