Skip to content

Commit 3e1211e

Browse files
authored
[FLINK-39696][runtime] Use source partition for distributed flush events (#4400)
1 parent cbdd4da commit 3e1211e

3 files changed

Lines changed: 56 additions & 7 deletions

File tree

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,16 @@ public void processElement(StreamRecord<PartitioningEvent> streamRecord) throws
195195

196196
private void requestSchemaChange(
197197
TableId sourceTableId, SchemaChangeRequest schemaChangeRequest) {
198-
LOG.info("{}> Sent FlushEvent to downstream...", subTaskId);
198+
final int sourcePartition = schemaChangeRequest.getSourceSubTaskId();
199+
200+
LOG.info(
201+
"{}> Sent FlushEvent to downstream for source partition {}...",
202+
subTaskId,
203+
sourcePartition);
199204
output.collect(
200205
new StreamRecord<>(
201206
new FlushEvent(
202-
subTaskId,
207+
sourcePartition,
203208
tableIdRouter.route(sourceTableId),
204209
schemaChangeRequest.getSchemaChangeEvent().getType())));
205210

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,37 @@ void testExceptionSchemaEvolution() throws Exception {
382382
"Unexpected schema change events occurred in EXCEPTION mode. Job will fail now.");
383383
}
384384

385+
@Test
386+
void testFlushEventUsesSourcePartitionInsteadOfSchemaOperatorSubtask() throws Exception {
387+
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, INITIAL_SCHEMA);
388+
389+
Assertions.assertThat(
390+
runInHarness(
391+
() ->
392+
new SchemaOperator(
393+
ROUTING_RULES,
394+
RouteMode.ALL_MATCH,
395+
Duration.ofMinutes(3),
396+
SchemaChangeBehavior.LENIENT,
397+
"UTC"),
398+
(op) ->
399+
new DistributedEventOperatorTestHarness<>(
400+
op,
401+
20,
402+
1,
403+
Duration.ofSeconds(3),
404+
Duration.ofMinutes(3)),
405+
(operator, harness) ->
406+
operator.processElement(wrap(createTableEvent, 0, 1))))
407+
.map(StreamRecord::getValue)
408+
.first()
409+
.isEqualTo(
410+
new FlushEvent(
411+
0,
412+
Collections.singletonList(TABLE_ID),
413+
createTableEvent.getType()));
414+
}
415+
385416
protected static <
386417
OP extends AbstractStreamOperatorAdapter<E>,
387418
E extends Event,

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,26 @@ public class DistributedEventOperatorTestHarness<
8181
private final TestingSchemaRegistryGateway schemaRegistryGateway;
8282
private final LinkedList<StreamRecord<E>> outputRecords = new LinkedList<>();
8383
private final MockedOperatorCoordinatorContext mockedContext;
84+
private final int subtaskIndex;
8485

8586
public DistributedEventOperatorTestHarness(OP operator, int numOutputs) {
86-
this(operator, numOutputs, Duration.ofSeconds(3), Duration.ofMinutes(3));
87+
this(operator, numOutputs, 0, Duration.ofSeconds(3), Duration.ofMinutes(3));
8788
}
8889

8990
public DistributedEventOperatorTestHarness(
9091
OP operator, int numOutputs, Duration applyDuration, Duration rpcTimeout) {
92+
this(operator, numOutputs, 0, applyDuration, rpcTimeout);
93+
}
94+
95+
public DistributedEventOperatorTestHarness(
96+
OP operator,
97+
int numOutputs,
98+
int subtaskIndex,
99+
Duration applyDuration,
100+
Duration rpcTimeout) {
91101
this.operator = operator;
92102
this.numOutputs = numOutputs;
103+
this.subtaskIndex = subtaskIndex;
93104
this.mockedContext =
94105
new MockedOperatorCoordinatorContext(
95106
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
@@ -160,7 +171,7 @@ public void close() throws Exception {
160171

161172
private void initializeOperator() throws Exception {
162173
operator.setup(
163-
new MockStreamTask(schemaRegistryGateway),
174+
new MockStreamTask(schemaRegistryGateway, subtaskIndex),
164175
new MockStreamConfig(new Configuration(), numOutputs),
165176
new EventCollectingOutput<>(outputRecords, schemaRegistryGateway));
166177
schemaRegistryGateway.sendOperatorEventToCoordinator(
@@ -227,9 +238,10 @@ public void close() {}
227238
}
228239

229240
private static class MockStreamTask extends StreamTask<Event, AbstractStreamOperator<Event>> {
230-
protected MockStreamTask(TestingSchemaRegistryGateway schemaRegistryGateway)
241+
protected MockStreamTask(
242+
TestingSchemaRegistryGateway schemaRegistryGateway, int subtaskIndex)
231243
throws Exception {
232-
super(new SchemaRegistryCoordinatingEnvironment(schemaRegistryGateway));
244+
super(new SchemaRegistryCoordinatingEnvironment(schemaRegistryGateway, subtaskIndex));
233245
}
234246

235247
@Override
@@ -240,7 +252,8 @@ private static class SchemaRegistryCoordinatingEnvironment extends DummyEnvironm
240252
private final TestingSchemaRegistryGateway schemaRegistryGateway;
241253

242254
public SchemaRegistryCoordinatingEnvironment(
243-
TestingSchemaRegistryGateway schemaRegistryGateway) {
255+
TestingSchemaRegistryGateway schemaRegistryGateway, int subtaskIndex) {
256+
super("test-task", 2, subtaskIndex, 2);
244257
this.schemaRegistryGateway = schemaRegistryGateway;
245258
}
246259

0 commit comments

Comments
 (0)