Skip to content

Commit 841f2a8

Browse files
yuxiqianMrart
authored andcommitted
[FLINK-39230] Transform should convert partially filtered UPDATE events to INSERT / DELETE (apache#4319)
1 parent 5579844 commit 841f2a8

9 files changed

Lines changed: 398 additions & 47 deletions

File tree

docs/content.zh/docs/core-concept/transform.md

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -338,22 +338,21 @@ transform:
338338
小技巧:table-options 的格式是 `key1=value1,key2=value2`。
339339

340340
## 分类映射
341-
多个转换规则可以定义为分类映射。
341+
在一张表同时被多个转换规则命中时,
342342
只有第一个匹配的转换规则将应用。
343343
举个例子,我们可以定义一个转换规则如下:
344344

345345
```yaml
346346
transform:
347347
- source-table: mydb.web_order
348348
projection: id, order_id
349-
filter: UPPER(province) = 'SHANGHAI'
350-
description: classification mapping example
351-
- source-table: mydb.web_order
352-
projection: order_id as id, id as order_id
353-
filter: UPPER(province) = 'BEIJING'
354-
description: classification mapping example
349+
filter: id > 1001
350+
- source-table: mydb.\.*
351+
projection: \*, 'fallback' AS FALLBACK
355352
```
356353

354+
这里,即使 `mydb.web_order` 表同样可以被第二条规则匹配,但因为排序靠前的第一条规则已经匹配,因此不会落入后续的 Transform 规则中。
355+
357356
## 用户自定义函数
358357
用户自定义函数(UDF)可以在转换规则中使用。
359358

docs/content/docs/core-concept/transform.md

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -341,22 +341,20 @@ transform:
341341
Tips: The format of table-options is `key1=value1,key2=value2`.
342342

343343
## Classification mapping
344-
Multiple transform rules can be defined to classify input data rows and apply different processing.
345-
Only the first matched transform rule will apply.
344+
If a table hits ultiple transform rules, only the first matched transform rule will apply.
346345
For example, we may define a transform rule as follows:
347346

348347
```yaml
349348
transform:
350349
- source-table: mydb.web_order
351350
projection: id, order_id
352-
filter: UPPER(province) = 'SHANGHAI'
353-
description: classification mapping example
354-
- source-table: mydb.web_order
355-
projection: order_id as id, id as order_id
356-
filter: UPPER(province) = 'BEIJING'
357-
description: classification mapping example
351+
filter: id > 1001
352+
- source-table: mydb.\.*
353+
projection: \*, 'fallback' AS FALLBACK
358354
```
359355

356+
Here, though `mydb.web_order` matches the second rule (`mydb.\.*`), it will not fall through the next rule as it has been handled in the first rule.
357+
360358
## User-defined Functions
361359

362360
User-defined functions (UDFs) can be used in transform rules.

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858

5959
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
6060

61-
import org.apache.calcite.sql.validate.SqlValidatorException;
6261
import org.assertj.core.api.Assertions;
6362
import org.assertj.core.api.ThrowableAssert;
6463
import org.codehaus.commons.compiler.CompileException;
@@ -241,6 +240,136 @@ void testFilteringRules(ValuesDataSink.SinkApi sinkApi) throws Exception {
241240
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}"));
242241
}
243242

243+
@ParameterizedTest(name = "API version: {0}")
244+
@EnumSource(ValuesDataSink.SinkApi.class)
245+
void testFilterUpdateOpTypeConversion(ValuesDataSink.SinkApi sinkApi) throws Exception {
246+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
247+
248+
Configuration sourceConfig = new Configuration();
249+
sourceConfig.set(
250+
ValuesDataSourceOptions.EVENT_SET_ID,
251+
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
252+
253+
TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
254+
Schema table1Schema =
255+
Schema.newBuilder()
256+
.physicalColumn("id", DataTypes.INT())
257+
.physicalColumn("name", DataTypes.STRING())
258+
.physicalColumn("age", DataTypes.INT())
259+
.primaryKey("id")
260+
.build();
261+
262+
BinaryRecordDataGenerator generator =
263+
new BinaryRecordDataGenerator(
264+
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
265+
266+
List<Event> events = new ArrayList<>();
267+
events.add(new CreateTableEvent(myTable1, table1Schema));
268+
// Case 1: before=Y, after=Y -> UPDATE
269+
events.add(
270+
DataChangeEvent.insertEvent(
271+
myTable1,
272+
generator.generate(
273+
new Object[] {1, BinaryStringData.fromString("Alice"), 30})));
274+
events.add(
275+
DataChangeEvent.updateEvent(
276+
myTable1,
277+
generator.generate(
278+
new Object[] {1, BinaryStringData.fromString("Alice"), 30}),
279+
generator.generate(
280+
new Object[] {1, BinaryStringData.fromString("Alice"), 40})));
281+
// Case 2: before=Y, after=N -> DELETE
282+
events.add(
283+
DataChangeEvent.insertEvent(
284+
myTable1,
285+
generator.generate(
286+
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
287+
events.add(
288+
DataChangeEvent.updateEvent(
289+
myTable1,
290+
generator.generate(
291+
new Object[] {2, BinaryStringData.fromString("Bob"), 30}),
292+
generator.generate(
293+
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
294+
// Case 3: before=N, after=Y -> INSERT
295+
events.add(
296+
DataChangeEvent.insertEvent(
297+
myTable1,
298+
generator.generate(
299+
new Object[] {3, BinaryStringData.fromString("Carol"), 20})));
300+
events.add(
301+
DataChangeEvent.updateEvent(
302+
myTable1,
303+
generator.generate(
304+
new Object[] {3, BinaryStringData.fromString("Carol"), 20}),
305+
generator.generate(
306+
new Object[] {3, BinaryStringData.fromString("Carol"), 35})));
307+
// Case 4: before=N, after=N -> drop
308+
events.add(
309+
DataChangeEvent.insertEvent(
310+
myTable1,
311+
generator.generate(
312+
new Object[] {4, BinaryStringData.fromString("Dave"), 10})));
313+
events.add(
314+
DataChangeEvent.updateEvent(
315+
myTable1,
316+
generator.generate(
317+
new Object[] {4, BinaryStringData.fromString("Dave"), 10}),
318+
generator.generate(
319+
new Object[] {4, BinaryStringData.fromString("Dave"), 15})));
320+
321+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
322+
323+
SourceDef sourceDef =
324+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
325+
326+
Configuration sinkConfig = new Configuration();
327+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
328+
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
329+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
330+
331+
Configuration pipelineConfig = new Configuration();
332+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
333+
PipelineDef pipelineDef =
334+
new PipelineDef(
335+
sourceDef,
336+
sinkDef,
337+
Collections.emptyList(),
338+
Collections.singletonList(
339+
new TransformDef(
340+
"default_namespace.default_schema.\\.*",
341+
null,
342+
"age > 25",
343+
null,
344+
null,
345+
null,
346+
null,
347+
null)),
348+
Collections.emptyList(),
349+
pipelineConfig);
350+
351+
PipelineExecution execution = composer.compose(pipelineDef);
352+
execution.execute();
353+
354+
String[] outputEvents = outCaptor.toString().trim().split("\n");
355+
356+
assertThat(outputEvents)
357+
.containsExactlyInAnyOrder(
358+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
359+
// INSERT id=1 (age=30 passes)
360+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 30], op=INSERT, meta=()}",
361+
// UPDATE id=1 (30->40): before=Y, after=Y -> UPDATE
362+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[1, Alice, 30], after=[1, Alice, 40], op=UPDATE, meta=()}",
363+
// INSERT id=2 (age=30 passes)
364+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 30], op=INSERT, meta=()}",
365+
// UPDATE id=2 (30->20): before=Y, after=N -> DELETE
366+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 30], after=[], op=DELETE, meta=()}",
367+
// UPDATE id=3 (20->35): before=N, after=Y -> INSERT
368+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Carol, 35], op=INSERT, meta=()}");
369+
// INSERT id=3 (age=20 fails), INSERT id=4 (age=10 fails),
370+
// UPDATE id=4 (10->15, both fail) are all filtered out.
371+
}
372+
244373
/**
245374
* This tests if transform rule could be used to classify source records based on filtering
246375
* rules.
@@ -2507,7 +2636,6 @@ void testTransformErrorMessage() {
25072636
+ "to schema\n"
25082637
+ "\t(Unknown).")
25092638
.rootCause()
2510-
.isExactlyInstanceOf(SqlValidatorException.class)
25112639
.hasMessage("Column 'id1' not found in any table");
25122640

25132641
// Unexpected column in filter rule

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -501,13 +501,6 @@ void runTransformSpecs(String group, String name, TestSpec spec) throws Exceptio
501501
}
502502
}
503503

504-
enum SpecContext {
505-
PROJECTION,
506-
EXPECT,
507-
EXPECT_ERROR,
508-
NULL
509-
}
510-
511504
private static final String[] EXPECTED_SPECS = {
512505
"specs/arithmetic.yaml",
513506
"specs/basic.yaml",

flink-cdc-composer/src/test/resources/specs/basic.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,14 @@
108108
expect: |-
109109
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()}
110110
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie], op=INSERT, meta=()}
111+
DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie], after=[], op=DELETE, meta=()}
111112
- do: Filter by Expression
112113
projection: id_, string_
113114
filter: id_ + 1 <= 1
114115
primary-key: id_
115116
expect: |-
116117
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()}
117-
DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie], after=[-1, 天地玄黄宇宙洪荒], op=UPDATE, meta=()}
118+
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, 天地玄黄宇宙洪荒], op=INSERT, meta=()}
118119
DataChangeEvent{tableId=foo.bar.baz, before=[-1, 天地玄黄宇宙洪荒], after=[], op=DELETE, meta=()}
119120
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()}
120121
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()}
@@ -126,6 +127,7 @@
126127
expect: |-
127128
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()}
128129
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie, 18], op=INSERT, meta=()}
130+
DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18], after=[], op=DELETE, meta=()}
129131
- do: Filter by Calculation Column (With NULL)
130132
ignore: FLINK-38905
131133
projection: id_, string_, CHAR_LENGTH(string_) AS strlen_
@@ -134,6 +136,7 @@
134136
expect: |-
135137
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()}
136138
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie, 18], op=INSERT, meta=()}
139+
DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18], after=[], op=DELETE, meta=()}
137140
- do: Invalid Projection Expr
138141
projection: id_, a_column_that_is_nowhere_to_be_found
139142
primary-key: id_

flink-cdc-composer/src/test/resources/specs/nested.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@
176176
primary-key: id_
177177
expect: |-
178178
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`array_string_` ARRAY<STRING>}, primaryKeys=id_, options=()}
179-
DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
179+
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, [二, san, 五, qi, 十一]], op=INSERT, meta=()}
180180
DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], after=[], op=DELETE, meta=()}
181181
- do: Map Subscripting
182182
projection: |-
@@ -248,6 +248,7 @@
248248
expect: |-
249249
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_, options=()}
250250
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> two, 3 -> three}], op=INSERT, meta=()}
251+
DataChangeEvent{tableId=foo.bar.baz, before=[1, {1 -> one, 2 -> two, 3 -> three}], after=[], op=DELETE, meta=()}
251252
- do: Record Subscripting With Index
252253
projection: |-
253254
id_
@@ -282,7 +283,7 @@
282283
primary-key: id_
283284
expect: |-
284285
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`complex_row_` ROW<`name` STRING, `length` INT>}, primaryKeys=id_, options=()}
285-
DataChangeEvent{tableId=foo.bar.baz, before=[1, {name: STRING -> Alice, length: INT -> 5}], after=[-1, {name: STRING -> Derrida, length: INT -> 7}], op=UPDATE, meta=()}
286+
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, {name: STRING -> Derrida, length: INT -> 7}], op=INSERT, meta=()}
286287
DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida, length: INT -> 7}], after=[], op=DELETE, meta=()}
287288
- do: Variant Object Subscripting With String Key
288289
projection: |-

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -289,40 +289,71 @@ private Optional<Event> processDataChangeEvent(
289289
getProjectionProcessor(tableId, effectiveTransformer);
290290
TransformFilterProcessor filterProcessor =
291291
getFilterProcessor(tableId, effectiveTransformer);
292-
RecordData beforeRow = null;
293-
RecordData afterRow = null;
294-
boolean filterPassed = true;
292+
293+
BinaryRecordData beforeRow = null;
294+
BinaryRecordData afterRow = null;
295+
boolean beforeFilterPassed = false;
296+
boolean afterFilterPassed = false;
297+
295298
if (event.before() != null) {
296299
context.opType = beforeOp;
297300
Tuple2<BinaryRecordData, Boolean> result =
298301
transformRecord(
299302
event.before(), info, projectionProcessor, filterProcessor, context);
300303
beforeRow = result.f0;
301-
filterPassed = result.f1;
304+
beforeFilterPassed = result.f1;
302305
}
303306
if (event.after() != null) {
304307
context.opType = afterOp;
305308
Tuple2<BinaryRecordData, Boolean> result =
306309
transformRecord(
307310
event.after(), info, projectionProcessor, filterProcessor, context);
308311
afterRow = result.f0;
309-
filterPassed = result.f1;
312+
afterFilterPassed = result.f1;
310313
}
311-
if (filterPassed) {
312-
DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow);
313-
if (effectiveTransformer.getPostTransformConverter().isPresent()) {
314-
return effectiveTransformer
315-
.getPostTransformConverter()
316-
.get()
317-
.convert(finalEvent)
318-
.map(Event.class::cast);
319-
} else {
320-
return Optional.of(finalEvent);
321-
}
314+
// For UPDATE events, before and after filter results may differ, requiring op type
315+
// conversion:
316+
// before=Y, after=Y -> UPDATE; before=Y, after=N -> DELETE;
317+
// before=N, after=Y -> INSERT; before=N, after=N -> drop.
318+
DataChangeEvent finalEvent;
319+
switch (event.op()) {
320+
case INSERT:
321+
case REPLACE:
322+
if (!afterFilterPassed) {
323+
return Optional.empty();
324+
}
325+
finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow);
326+
break;
327+
case DELETE:
328+
if (!beforeFilterPassed) {
329+
return Optional.empty();
330+
}
331+
finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow);
332+
break;
333+
case UPDATE:
334+
if (beforeFilterPassed && afterFilterPassed) {
335+
finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow);
336+
} else if (beforeFilterPassed) {
337+
finalEvent = DataChangeEvent.deleteEvent(tableId, beforeRow, event.meta());
338+
} else if (afterFilterPassed) {
339+
finalEvent = DataChangeEvent.insertEvent(tableId, afterRow, event.meta());
340+
} else {
341+
return Optional.empty();
342+
}
343+
break;
344+
default:
345+
throw new UnsupportedOperationException(
346+
"Unsupported operation type: " + event.op());
322347
}
323348

324-
// Events with no matching filters satisfied won't be emitted to downstream.
325-
return Optional.empty();
349+
if (effectiveTransformer.getPostTransformConverter().isPresent()) {
350+
return effectiveTransformer
351+
.getPostTransformConverter()
352+
.get()
353+
.convert(finalEvent)
354+
.map(Event.class::cast);
355+
}
356+
return Optional.of(finalEvent);
326357
}
327358

328359
/**

0 commit comments

Comments
 (0)