Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
() -> {
try {
currentTaskRunning = true;
changeEventSourceContext.startChangeEventSource();
Comment thread
ruanhang1993 marked this conversation as resolved.
Outdated
final SnapshotSplitChangeEventSourceContextImpl sourceContext =
new SnapshotSplitChangeEventSourceContextImpl();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class StoppableChangeEventSourceContext

private volatile boolean isRunning = true;

public void startChangeEventSource() {
isRunning = true;
}

public void stopChangeEventSource() {
isRunning = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,65 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
void testMultipleSplitsWithBackfill() throws Exception {
String tableName = "customers_even_dist";
String tableId = customerDatabase.getDatabaseName() + "." + tableName;
MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {tableName}, 4);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);

// Hook to make highWatermark > lowWatermark, enforcing backfill phase
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
snapshotHooks.setPreHighWatermarkAction(
(mySqlConnection, split) -> {
if (split.splitId().equals(tableId + ":0")) {
mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103");
mySqlConnection.commit();
} else if (split.splitId().equals(tableId + ":1")) {
mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 106");
mySqlConnection.commit();
} else if (split.splitId().equals(tableId + ":2")) {
mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 109");
mySqlConnection.commit();
}
});

final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);

String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Beijing, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]",
"+I[105, user_5, Shanghai, 123567891234]",
"+I[106, user_6, Beijing, 123567891234]",
"+I[107, user_7, Shanghai, 123567891234]",
"+I[108, user_8, Shanghai, 123567891234]",
"+I[109, user_9, Beijing, 123567891234]",
"+I[110, user_10, Shanghai, 123567891234]"
};

List<String> actual =
readTableSnapshotSplits(
mySqlSplits,
statefulTaskContext,
mySqlSplits.size(),
dataType,
snapshotHooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

private List<String> readTableSnapshotSplits(
List<MySqlSplit> mySqlSplits,
StatefulTaskContext statefulTaskContext,
Expand Down
Loading