Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
executorService.execute(
() -> {
try {
currentTaskRunning = true;
startCurrentTask();
final SnapshotSplitChangeEventSourceContextImpl sourceContext =
new SnapshotSplitChangeEventSourceContextImpl();

Expand Down Expand Up @@ -432,6 +432,11 @@ public void close() {
}
}

private void startCurrentTask() {
currentTaskRunning = true;
changeEventSourceContext.startChangeEventSource();
}

private void stopCurrentTask() {
currentTaskRunning = false;
changeEventSourceContext.stopChangeEventSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class StoppableChangeEventSourceContext

private volatile boolean isRunning = true;

public void startChangeEventSource() {
isRunning = true;
}

public void stopChangeEventSource() {
isRunning = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import io.debezium.relational.TableId;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.sql.SQLException;
Expand All @@ -64,25 +64,25 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
private static BinaryLogClient binaryLogClient;
private static MySqlConnection mySqlConnection;

@BeforeAll
public static void init() {
@BeforeEach
public void beforeEach() {
customerDatabase.createAndInitialize();
customer3_0Database.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(customerDatabase, new String[] {"customers"}, 10);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
}

@AfterAll
public static void afterClass() throws Exception {
@AfterEach
public void afterEach() throws Exception {
if (mySqlConnection != null) {
mySqlConnection.close();
}

if (binaryLogClient != null) {
binaryLogClient.disconnect();
}
customerDatabase.dropDatabase();
}

@Test
Expand Down Expand Up @@ -113,6 +113,7 @@ void testReadSingleSnapshotSplit() throws Exception {

@Test
void testReadSingleSnapshotSplitWithDotName() throws Exception {
customer3_0Database.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(customer3_0Database, new String[] {"customers3.0"}, 4);
BinaryLogClient binaryLogClient =
Expand Down Expand Up @@ -146,6 +147,9 @@ void testReadSingleSnapshotSplitWithDotName() throws Exception {
};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, statefulTaskContext, 1, dataType);
mySqlConnection.close();
binaryLogClient.disconnect();

assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

Expand Down Expand Up @@ -565,6 +569,65 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
void testMultipleSplitsWithBackfill() throws Exception {
String tableName = "customers_even_dist";
String tableId = customerDatabase.getDatabaseName() + "." + tableName;
MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {tableName}, 4);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);

// Hook to make highWatermark > lowWatermark, enforcing backfill phase
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
snapshotHooks.setPreHighWatermarkAction(
(mySqlConnection, split) -> {
if (split.splitId().equals(tableId + ":0")) {
mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103");
mySqlConnection.commit();
} else if (split.splitId().equals(tableId + ":1")) {
mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 106");
mySqlConnection.commit();
} else if (split.splitId().equals(tableId + ":2")) {
mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 109");
mySqlConnection.commit();
}
});

final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);

String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Beijing, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]",
"+I[105, user_5, Shanghai, 123567891234]",
"+I[106, user_6, Beijing, 123567891234]",
"+I[107, user_7, Shanghai, 123567891234]",
"+I[108, user_8, Shanghai, 123567891234]",
"+I[109, user_9, Beijing, 123567891234]",
"+I[110, user_10, Shanghai, 123567891234]"
};

List<String> actual =
readTableSnapshotSplits(
mySqlSplits,
statefulTaskContext,
mySqlSplits.size(),
dataType,
snapshotHooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

private List<String> readTableSnapshotSplits(
List<MySqlSplit> mySqlSplits,
StatefulTaskContext statefulTaskContext,
Expand Down
Loading