Skip to content

Commit 17f49f5

Browse files
committed
add test for restore.
1 parent 22f3f97 commit 17f49f5

2 files changed

Lines changed: 199 additions & 91 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java

Lines changed: 189 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2121
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
22-
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.api.common.typeutils.TypeSerializer;
2323
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2424
import org.apache.flink.cdc.common.event.CreateTableEvent;
2525
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -40,7 +40,16 @@
4040
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
4141
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
4242
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
43+
import org.apache.flink.core.execution.JobClient;
44+
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
45+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4346
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
47+
import org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
48+
import org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
49+
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
50+
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
51+
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
52+
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
4453
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
4554
import org.apache.flink.util.CloseableIterator;
4655

@@ -52,6 +61,8 @@
5261
import org.slf4j.Logger;
5362
import org.slf4j.LoggerFactory;
5463

64+
import java.nio.file.Files;
65+
import java.nio.file.Path;
5566
import java.sql.Connection;
5667
import java.sql.SQLException;
5768
import java.sql.Statement;
@@ -61,6 +72,7 @@
6172
import java.util.Iterator;
6273
import java.util.List;
6374
import java.util.Map;
75+
import java.util.UUID;
6476
import java.util.stream.Collectors;
6577

6678
import static org.assertj.core.api.Assertions.assertThat;
@@ -153,114 +165,188 @@ public void testLatestOffsetStartupMode() throws Exception {
153165
configFactory.slotName(slotName);
154166
configFactory.decodingPluginName("pgoutput");
155167

168+
// Create a temporary directory for savepoint
169+
Path savepointDir = Files.createTempDirectory("postgres-savepoint-test");
170+
final String savepointDirectory = savepointDir.toAbsolutePath().toString();
171+
String finishedSavePointPath = null;
172+
173+
// Listen to tables first time
174+
StreamExecutionEnvironment env = getStreamExecutionEnvironment(finishedSavePointPath, 4);
156175
FlinkSourceProvider sourceProvider =
157176
(FlinkSourceProvider)
158177
new PostgresDataSource(configFactory).getEventSourceProvider();
159-
CloseableIterator<Event> events =
160-
env.fromSource(
161-
sourceProvider.getSource(),
162-
WatermarkStrategy.noWatermarks(),
163-
PostgresDataSourceFactory.IDENTIFIER,
164-
new EventTypeInfo())
165-
.executeAndCollect();
166-
Thread.sleep(10_000);
167-
TableId tableId = TableId.tableId("inventory", "products");
168-
CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId);
169178

170-
List<Event> expectedBinlog = new ArrayList<>();
171-
try (Connection connection =
179+
DataStreamSource<Event> source =
180+
env.fromSource(
181+
sourceProvider.getSource(),
182+
WatermarkStrategy.noWatermarks(),
183+
PostgresDataSourceFactory.IDENTIFIER,
184+
new EventTypeInfo());
185+
186+
TypeSerializer<Event> serializer =
187+
source.getTransformation().getOutputType().createSerializer(env.getConfig());
188+
CheckpointedCollectResultBuffer<Event> resultBuffer =
189+
new CheckpointedCollectResultBuffer<>(serializer);
190+
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
191+
CollectResultIterator<Event> iterator =
192+
addCollector(env, source, resultBuffer, serializer, accumulatorName);
193+
194+
JobClient jobClient = env.executeAsync("beforeSavepoint");
195+
iterator.setJobClient(jobClient);
196+
197+
// Insert two records while the pipeline is running
198+
try (Connection conn =
172199
getJdbcConnection(POSTGRES_CONTAINER, inventoryDatabase.getDatabaseName());
173-
Statement statement = connection.createStatement()) {
174-
RowType rowType =
175-
RowType.of(
176-
new DataType[] {
177-
DataTypes.INT().notNull(),
178-
DataTypes.VARCHAR(255).notNull(),
179-
DataTypes.VARCHAR(512),
180-
DataTypes.DOUBLE()
181-
},
182-
new String[] {"id", "name", "description", "weight"});
183-
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
200+
Statement stmt = conn.createStatement()) {
201+
stmt.execute(
202+
"INSERT INTO inventory.products (name, description, weight) "
203+
+ "VALUES ('scooter', 'Small 2-wheel scooter', 3.14)");
204+
stmt.execute(
205+
"INSERT INTO inventory.products (name, description, weight) "
206+
+ "VALUES ('football', 'A leather football', 0.45)");
207+
}
184208

185-
// Insert new data
186-
statement.execute(
187-
String.format(
188-
"INSERT INTO inventory.products (name, description, weight) VALUES ('scooter', 'New scooter', 5.5);",
189-
inventoryDatabase.getDatabaseName()));
190-
expectedBinlog.add(
191-
DataChangeEvent.insertEvent(
192-
tableId,
193-
generator.generate(
194-
new Object[] {
195-
110, // next id after initialization
196-
BinaryStringData.fromString("scooter"),
197-
BinaryStringData.fromString("New scooter"),
198-
5.5
199-
})));
209+
// Wait for the pipeline to process the insert events
210+
Thread.sleep(5000);
211+
212+
// Trigger a savepoint and cancel the job
213+
LOG.info("Triggering savepoint");
214+
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
215+
LOG.info("Savepoint created at: {}", finishedSavePointPath);
216+
jobClient.cancel().get();
217+
iterator.close();
218+
219+
// Restore from savepoint
220+
LOG.info("Restoring from savepoint: {}", finishedSavePointPath);
221+
StreamExecutionEnvironment restoredEnv =
222+
getStreamExecutionEnvironment(finishedSavePointPath, 4);
223+
FlinkSourceProvider restoredSourceProvider =
224+
(FlinkSourceProvider)
225+
new PostgresDataSource(configFactory).getEventSourceProvider();
200226

201-
statement.execute(
202-
String.format(
203-
"INSERT INTO inventory.products (name, description, weight) VALUES ('football', 'New football', 6.6);",
204-
inventoryDatabase.getDatabaseName()));
205-
expectedBinlog.add(
206-
DataChangeEvent.insertEvent(
207-
tableId,
208-
generator.generate(
209-
new Object[] {
210-
111, // next id after initialization
211-
BinaryStringData.fromString("football"),
212-
BinaryStringData.fromString("New football"),
213-
6.6
214-
})));
227+
DataStreamSource<Event> restoredSource =
228+
restoredEnv.fromSource(
229+
restoredSourceProvider.getSource(),
230+
WatermarkStrategy.noWatermarks(),
231+
PostgresDataSourceFactory.IDENTIFIER,
232+
new EventTypeInfo());
233+
234+
TypeSerializer<Event> restoredSerializer =
235+
restoredSource
236+
.getTransformation()
237+
.getOutputType()
238+
.createSerializer(restoredEnv.getConfig());
239+
CheckpointedCollectResultBuffer<Event> restoredResultBuffer =
240+
new CheckpointedCollectResultBuffer<>(restoredSerializer);
241+
String restoredAccumulatorName = "dataStreamCollect_" + UUID.randomUUID();
242+
CollectResultIterator<Event> restoredIterator =
243+
addCollector(
244+
restoredEnv,
245+
restoredSource,
246+
restoredResultBuffer,
247+
restoredSerializer,
248+
restoredAccumulatorName);
249+
250+
JobClient restoredJobClient = restoredEnv.executeAsync("afterSavepoint");
251+
restoredIterator.setJobClient(restoredJobClient);
252+
253+
// Insert data into the table after restoration
254+
try (Connection conn =
255+
getJdbcConnection(POSTGRES_CONTAINER, inventoryDatabase.getDatabaseName());
256+
Statement stmt = conn.createStatement()) {
257+
stmt.execute(
258+
"INSERT INTO inventory.products (name, description, weight) "
259+
+ "VALUES ('new_product_1', 'New product description', 1.0)");
260+
}
215261

216-
// Update existing data
217-
statement.execute(
218-
String.format(
219-
"UPDATE inventory.products SET description = 'Updated description' WHERE id = 101;"));
220-
expectedBinlog.add(
221-
DataChangeEvent.updateEvent(
222-
tableId,
223-
generator.generate(
224-
new Object[] {
225-
101,
226-
BinaryStringData.fromString("scooter"),
227-
BinaryStringData.fromString("Small 2-wheel scooter"),
228-
3.14
229-
}),
230-
generator.generate(
231-
new Object[] {
232-
101,
233-
BinaryStringData.fromString("scooter"),
234-
BinaryStringData.fromString("Updated description"),
235-
3.14
236-
})));
262+
// Wait for the pipeline to stabilize and process events
263+
Thread.sleep(10000);
237264

238-
// Wait for the events to be processed
239-
Thread.sleep(5_000);
265+
// Fetch results and check for CreateTableEvent and data change events
266+
List<Event> restoreAfterEvents = new ArrayList<>();
267+
while (restoreAfterEvents.size() < 2 && restoredIterator.hasNext()) {
268+
restoreAfterEvents.add(restoredIterator.next());
240269
}
270+
restoredIterator.close();
271+
restoredJobClient.cancel().get();
272+
273+
// Check if CreateTableEvent for new_products is present
274+
boolean hasCreateTableEvent =
275+
restoreAfterEvents.stream().anyMatch(event -> event instanceof CreateTableEvent);
276+
assertThat(hasCreateTableEvent).isTrue();
277+
278+
// Check if data change event for new_products is present
279+
boolean hasProductDataEvent =
280+
restoreAfterEvents.stream().anyMatch(event -> event instanceof DataChangeEvent);
281+
assertThat(hasProductDataEvent).isTrue();
282+
}
241283

242-
// Collect the actual events
243-
List<Event> actualEvents =
244-
fetchResultsExcept(events, expectedBinlog.size(), createTableEvent);
245-
246-
// Filter out schema change events and keep only data change events
247-
List<Event> actualDataChangeEvents =
248-
actualEvents.stream()
249-
.filter(event -> event instanceof DataChangeEvent)
250-
.collect(Collectors.toList());
284+
// Helper method to trigger a savepoint with retry mechanism
285+
private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory)
286+
throws Exception {
287+
int retryCount = 0;
288+
final int maxRetries = 600;
289+
while (retryCount < maxRetries) {
290+
try {
291+
return jobClient.stopWithSavepoint(true, savepointDirectory).get();
292+
} catch (Exception e) {
293+
retryCount++;
294+
LOG.error(
295+
"Retry {}/{}: Failed to trigger savepoint: {}",
296+
retryCount,
297+
maxRetries,
298+
e.getMessage());
299+
if (retryCount >= maxRetries) {
300+
throw e;
301+
}
302+
Thread.sleep(100);
303+
}
304+
}
305+
throw new Exception("Failed to trigger savepoint after " + maxRetries + " retries");
306+
}
251307

252-
// Verify that we captured the expected number of data change events
253-
assertThat(actualDataChangeEvents.size()).isGreaterThanOrEqualTo(expectedBinlog.size());
308+
// Helper method to get a configured StreamExecutionEnvironment
309+
private StreamExecutionEnvironment getStreamExecutionEnvironment(
310+
String finishedSavePointPath, int parallelism) {
311+
org.apache.flink.configuration.Configuration configuration =
312+
new org.apache.flink.configuration.Configuration();
313+
if (finishedSavePointPath != null) {
314+
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
315+
}
316+
StreamExecutionEnvironment env =
317+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
318+
env.setParallelism(parallelism);
319+
env.enableCheckpointing(500L);
320+
env.setRestartStrategy(RestartStrategies.noRestart());
321+
return env;
322+
}
254323

255-
// Verify slot is created
256-
assertThat(inventoryDatabase.checkSlot(slotName)).isEqualTo(slotName);
324+
// Helper method to add a collector sink and get the iterator
325+
private <T> CollectResultIterator<T> addCollector(
326+
StreamExecutionEnvironment env,
327+
DataStreamSource<T> source,
328+
AbstractCollectResultBuffer<T> buffer,
329+
TypeSerializer<T> serializer,
330+
String accumulatorName) {
331+
CollectSinkOperatorFactory<T> sinkFactory =
332+
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
333+
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) sinkFactory.getOperator();
334+
CollectResultIterator<T> iterator =
335+
new CollectResultIterator<>(
336+
buffer, operator.getOperatorIdFuture(), accumulatorName, 0);
337+
CollectStreamSink<T> sink = new CollectStreamSink<>(source, sinkFactory);
338+
sink.name("Data stream collect sink");
339+
env.addOperator(sink.getTransformation());
340+
env.registerCollectIterator(iterator);
341+
return iterator;
257342
}
258343

259344
@ParameterizedTest(name = "unboundedChunkFirst: {0}")
260345
@ValueSource(booleans = {true, false})
261346
public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws Exception {
262347
inventoryDatabase.createAndInitialize();
263-
Configuration sourceConfiguration = new Configuration();
348+
org.apache.flink.cdc.common.configuration.Configuration sourceConfiguration =
349+
new org.apache.flink.cdc.common.configuration.Configuration();
264350
sourceConfiguration.set(PostgresDataSourceOptions.HOSTNAME, POSTGRES_CONTAINER.getHost());
265351
sourceConfiguration.set(
266352
PostgresDataSourceOptions.PG_PORT,
@@ -282,7 +368,9 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E
282368

283369
Factory.Context context =
284370
new FactoryHelper.DefaultContext(
285-
sourceConfiguration, new Configuration(), this.getClass().getClassLoader());
371+
sourceConfiguration,
372+
new org.apache.flink.cdc.common.configuration.Configuration(),
373+
this.getClass().getClassLoader());
286374
FlinkSourceProvider sourceProvider =
287375
(FlinkSourceProvider)
288376
new PostgresDataSourceFactory()
@@ -494,6 +582,16 @@ private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T side
494582
return result;
495583
}
496584

585+
// Helper method to create a temporary directory for savepoint
586+
private Path createTempSavepointDir() throws Exception {
587+
return Files.createTempDirectory("postgres-savepoint");
588+
}
589+
590+
// Helper method to execute the job and create a savepoint
591+
private String createSavepoint(JobClient jobClient, Path savepointDir) throws Exception {
592+
return jobClient.stopWithSavepoint(true, savepointDir.toAbsolutePath().toString()).get();
593+
}
594+
497595
private List<Event> getSnapshotExpected(TableId tableId) {
498596
RowType rowType =
499597
RowType.of(

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,16 @@ protected void emitElement(SourceRecord element, SourceOutput<T> output) throws
161161
debeziumDeserializationSchema.deserialize(element, outputCollector);
162162
}
163163

164+
/**
165+
* Apply the split to the record emitter.
166+
*
167+
* <p>This method is called when a new split is assigned to the record emitter. It allows the
168+
* record emitter to perform any necessary initialization or setup based on the characteristics
169+
* of the assigned split. In this implementation, we may need to handle split-specific
170+
* configurations or state initialization.
171+
*
172+
* @param split the split to apply
173+
*/
164174
public void applySplit(SourceSplitBase split) {}
165175

166176
protected void reportMetrics(SourceRecord element) {

0 commit comments

Comments
 (0)