Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,21 @@ transform:
小技巧:table-options 的格式是 `key1=value1,key2=value2`。

## 分类映射
多个转换规则可以定义为分类映射。
在一张表同时被多个转换规则命中时,
只有第一个匹配的转换规则将应用。
举个例子,我们可以定义一个转换规则如下:

```yaml
transform:
- source-table: mydb.web_order
projection: id, order_id
filter: UPPER(province) = 'SHANGHAI'
description: classification mapping example
- source-table: mydb.web_order
projection: order_id as id, id as order_id
filter: UPPER(province) = 'BEIJING'
description: classification mapping example
filter: id > 1001
- source-table: mydb.\.*
projection: \*, 'fallback' AS FALLBACK
```

这里,即使 `mydb.web_order` 表同样可以被第二条规则匹配,但因为排序靠前的第一条规则已经匹配,因此不会落入后续的 Transform 规则中。

## 用户自定义函数
用户自定义函数(UDF)可以在转换规则中使用。

Expand Down
14 changes: 6 additions & 8 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,22 +341,20 @@ transform:
Tips: The format of table-options is `key1=value1,key2=value2`.

Comment thread
yuxiqian marked this conversation as resolved.
## Classification mapping
Multiple transform rules can be defined to classify input data rows and apply different processing.
Only the first matched transform rule will apply.
If a table hits ultiple transform rules, only the first matched transform rule will apply.
For example, we may define a transform rule as follows:

```yaml
transform:
- source-table: mydb.web_order
projection: id, order_id
filter: UPPER(province) = 'SHANGHAI'
description: classification mapping example
- source-table: mydb.web_order
projection: order_id as id, id as order_id
filter: UPPER(province) = 'BEIJING'
description: classification mapping example
filter: id > 1001
- source-table: mydb.\.*
projection: \*, 'fallback' AS FALLBACK
```

Here, though `mydb.web_order` matches the second rule (`mydb.\.*`), it will not fall through the next rule as it has been handled in the first rule.

## User-defined Functions

User-defined functions (UDFs) can be used in transform rules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;

import org.apache.calcite.sql.validate.SqlValidatorException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssert;
import org.codehaus.commons.compiler.CompileException;
Expand Down Expand Up @@ -241,6 +240,136 @@ void testFilteringRules(ValuesDataSink.SinkApi sinkApi) throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}"));
}

@ParameterizedTest(name = "API version: {0}")
@EnumSource(ValuesDataSink.SinkApi.class)
void testFilterUpdateOpTypeConversion(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
Schema table1Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.primaryKey("id")
.build();

BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(
table1Schema.getColumnDataTypes().toArray(new DataType[0]));

List<Event> events = new ArrayList<>();
events.add(new CreateTableEvent(myTable1, table1Schema));
// Case 1: before=Y, after=Y -> UPDATE
events.add(
DataChangeEvent.insertEvent(
myTable1,
generator.generate(
new Object[] {1, BinaryStringData.fromString("Alice"), 30})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
generator.generate(
new Object[] {1, BinaryStringData.fromString("Alice"), 30}),
generator.generate(
new Object[] {1, BinaryStringData.fromString("Alice"), 40})));
// Case 2: before=Y, after=N -> DELETE
events.add(
DataChangeEvent.insertEvent(
myTable1,
generator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
generator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 30}),
generator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
// Case 3: before=N, after=Y -> INSERT
events.add(
DataChangeEvent.insertEvent(
myTable1,
generator.generate(
new Object[] {3, BinaryStringData.fromString("Carol"), 20})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
generator.generate(
new Object[] {3, BinaryStringData.fromString("Carol"), 20}),
generator.generate(
new Object[] {3, BinaryStringData.fromString("Carol"), 35})));
// Case 4: before=N, after=N -> drop
events.add(
DataChangeEvent.insertEvent(
myTable1,
generator.generate(
new Object[] {4, BinaryStringData.fromString("Dave"), 10})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
generator.generate(
new Object[] {4, BinaryStringData.fromString("Dave"), 10}),
generator.generate(
new Object[] {4, BinaryStringData.fromString("Dave"), 15})));

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"default_namespace.default_schema.\\.*",
null,
"age > 25",
null,
null,
null,
null,
null)),
Collections.emptyList(),
pipelineConfig);

PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

String[] outputEvents = outCaptor.toString().trim().split("\n");

assertThat(outputEvents)
.containsExactlyInAnyOrder(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
// INSERT id=1 (age=30 passes)
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 30], op=INSERT, meta=()}",
// UPDATE id=1 (30->40): before=Y, after=Y -> UPDATE
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[1, Alice, 30], after=[1, Alice, 40], op=UPDATE, meta=()}",
// INSERT id=2 (age=30 passes)
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 30], op=INSERT, meta=()}",
// UPDATE id=2 (30->20): before=Y, after=N -> DELETE
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 30], after=[], op=DELETE, meta=()}",
// UPDATE id=3 (20->35): before=N, after=Y -> INSERT
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Carol, 35], op=INSERT, meta=()}");
// INSERT id=3 (age=20 fails), INSERT id=4 (age=10 fails),
// UPDATE id=4 (10->15, both fail) are all filtered out.
}

/**
* This tests if transform rule could be used to classify source records based on filtering
* rules.
Expand Down Expand Up @@ -2507,7 +2636,6 @@ void testTransformErrorMessage() {
+ "to schema\n"
+ "\t(Unknown).")
.rootCause()
.isExactlyInstanceOf(SqlValidatorException.class)
.hasMessage("Column 'id1' not found in any table");

// Unexpected column in filter rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,13 +501,6 @@ void runTransformSpecs(String group, String name, TestSpec spec) throws Exceptio
}
}

enum SpecContext {
PROJECTION,
EXPECT,
EXPECT_ERROR,
NULL
}

private static final String[] EXPECTED_SPECS = {
"specs/arithmetic.yaml",
"specs/basic.yaml",
Expand Down
5 changes: 4 additions & 1 deletion flink-cdc-composer/src/test/resources/specs/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie], after=[], op=DELETE, meta=()}
- do: Filter by Expression
projection: id_, string_
filter: id_ + 1 <= 1
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie], after=[-1, 天地玄黄宇宙洪荒], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, 天地玄黄宇宙洪荒], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, 天地玄黄宇宙洪荒], after=[], op=DELETE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()}
Expand All @@ -126,6 +127,7 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie, 18], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18], after=[], op=DELETE, meta=()}
- do: Filter by Calculation Column (With NULL)
ignore: FLINK-38905
projection: id_, string_, CHAR_LENGTH(string_) AS strlen_
Expand All @@ -134,6 +136,7 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie, 18], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18], after=[], op=DELETE, meta=()}
- do: Invalid Projection Expr
projection: id_, a_column_that_is_nowhere_to_be_found
primary-key: id_
Expand Down
5 changes: 3 additions & 2 deletions flink-cdc-composer/src/test/resources/specs/nested.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`array_string_` ARRAY<STRING>}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, [二, san, 五, qi, 十一]], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], after=[], op=DELETE, meta=()}
- do: Map Subscripting
projection: |-
Expand Down Expand Up @@ -248,6 +248,7 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> two, 3 -> three}], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, {1 -> one, 2 -> two, 3 -> three}], after=[], op=DELETE, meta=()}
- do: Record Subscripting With Index
projection: |-
id_
Expand Down Expand Up @@ -282,7 +283,7 @@
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`complex_row_` ROW<`name` STRING, `length` INT>}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, {name: STRING -> Alice, length: INT -> 5}], after=[-1, {name: STRING -> Derrida, length: INT -> 7}], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, {name: STRING -> Derrida, length: INT -> 7}], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida, length: INT -> 7}], after=[], op=DELETE, meta=()}
- do: Variant Object Subscripting With String Key
projection: |-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,40 +289,71 @@ private Optional<Event> processDataChangeEvent(
getProjectionProcessor(tableId, effectiveTransformer);
TransformFilterProcessor filterProcessor =
getFilterProcessor(tableId, effectiveTransformer);
RecordData beforeRow = null;
RecordData afterRow = null;
boolean filterPassed = true;

BinaryRecordData beforeRow = null;
BinaryRecordData afterRow = null;
boolean beforeFilterPassed = false;
boolean afterFilterPassed = false;

if (event.before() != null) {
context.opType = beforeOp;
Tuple2<BinaryRecordData, Boolean> result =
transformRecord(
event.before(), info, projectionProcessor, filterProcessor, context);
beforeRow = result.f0;
filterPassed = result.f1;
beforeFilterPassed = result.f1;
}
if (event.after() != null) {
context.opType = afterOp;
Tuple2<BinaryRecordData, Boolean> result =
transformRecord(
event.after(), info, projectionProcessor, filterProcessor, context);
afterRow = result.f0;
filterPassed = result.f1;
afterFilterPassed = result.f1;
}
if (filterPassed) {
DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow);
if (effectiveTransformer.getPostTransformConverter().isPresent()) {
return effectiveTransformer
.getPostTransformConverter()
.get()
.convert(finalEvent)
.map(Event.class::cast);
} else {
return Optional.of(finalEvent);
}
// For UPDATE events, before and after filter results may differ, requiring op type
// conversion:
// before=Y, after=Y -> UPDATE; before=Y, after=N -> DELETE;
// before=N, after=Y -> INSERT; before=N, after=N -> drop.
DataChangeEvent finalEvent;
switch (event.op()) {
case INSERT:
case REPLACE:
if (!afterFilterPassed) {
return Optional.empty();
}
finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow);
break;
case DELETE:
if (!beforeFilterPassed) {
return Optional.empty();
}
finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow);
break;
case UPDATE:
if (beforeFilterPassed && afterFilterPassed) {
finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow);
} else if (beforeFilterPassed) {
finalEvent = DataChangeEvent.deleteEvent(tableId, beforeRow, event.meta());
} else if (afterFilterPassed) {
Comment on lines +334 to +338
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to re-evaluate expressions as the filter condition itself may depend on the opType itself, and that might cause inconsistencies.

Also, op_type should represent the original type from source, and keeping it intact should be acceptable.

finalEvent = DataChangeEvent.insertEvent(tableId, afterRow, event.meta());
} else {
return Optional.empty();
}
break;
default:
throw new UnsupportedOperationException(
"Unsupported operation type: " + event.op());
}

// Events with no matching filters satisfied won't be emitted to downstream.
return Optional.empty();
if (effectiveTransformer.getPostTransformConverter().isPresent()) {
return effectiveTransformer
.getPostTransformConverter()
.get()
.convert(finalEvent)
.map(Event.class::cast);
}
return Optional.of(finalEvent);
}

/**
Expand Down
Loading
Loading