Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.DateData;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.GenericArrayData;
import org.apache.flink.cdc.common.data.GenericMapData;
import org.apache.flink.cdc.common.data.GenericRecordData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
Expand Down Expand Up @@ -72,6 +77,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -1733,6 +1739,234 @@ void testRouteModeAllMatch(ValuesDataSink.SinkApi sinkApi) throws Exception {
.isGreaterThan(0);
}

@ParameterizedTest
@EnumSource
void testGenericRecordDataEndToEnd(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
Schema table1Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.physicalColumn("col_bool", DataTypes.BOOLEAN())
.physicalColumn("col_tinyint", DataTypes.TINYINT())
.physicalColumn("col_smallint", DataTypes.SMALLINT())
.physicalColumn("col_bigint", DataTypes.BIGINT())
.physicalColumn("col_float", DataTypes.FLOAT())
.physicalColumn("col_double", DataTypes.DOUBLE())
.physicalColumn("col_decimal", DataTypes.DECIMAL(10, 2))
.physicalColumn("col_date", DataTypes.DATE())
.physicalColumn("col_time", DataTypes.TIME())
.physicalColumn("col_timestamp", DataTypes.TIMESTAMP(3))
.physicalColumn("col_timestamp_ltz", DataTypes.TIMESTAMP_LTZ(3))
.physicalColumn("col_timestamp_tz", DataTypes.TIMESTAMP_TZ(3))
.physicalColumn("col_array", DataTypes.ARRAY(DataTypes.STRING()))
.physicalColumn(
"col_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
.physicalColumn(
"col_row",
DataTypes.ROW(
DataTypes.FIELD("f0", DataTypes.INT()),
DataTypes.FIELD("f1", DataTypes.STRING())))
.primaryKey("id")
.build();

GenericArrayData testArray =
new GenericArrayData(
new Object[] {
BinaryStringData.fromString("x"), BinaryStringData.fromString("y")
});
GenericMapData testMap = new GenericMapData(Map.of(BinaryStringData.fromString("k1"), 100));
GenericRecordData testRow = GenericRecordData.of(77, BinaryStringData.fromString("inner"));
DecimalData testDecimal = DecimalData.fromBigDecimal(new BigDecimal("123.45"), 10, 2);
TimestampData testTs = TimestampData.fromMillis(1609459200000L);
LocalZonedTimestampData testTsLtz = LocalZonedTimestampData.fromEpochMillis(1609459200000L);
ZonedTimestampData testTsTz = ZonedTimestampData.of(1609459200000L, 0, "UTC");

List<Event> events = new ArrayList<>();
events.add(new CreateTableEvent(myTable1, table1Schema));
events.add(
DataChangeEvent.insertEvent(
myTable1,
GenericRecordData.of(
1,
BinaryStringData.fromString("Alice"),
18,
true,
(byte) 1,
(short) 100,
1000L,
1.0f,
1.0,
testDecimal,
DateData.fromEpochDay(18628),
TimeData.fromMillisOfDay(43200000),
testTs,
testTsLtz,
testTsTz,
testArray,
testMap,
testRow)));
events.add(
DataChangeEvent.insertEvent(
myTable1,
GenericRecordData.of(
2,
BinaryStringData.fromString("Bob"),
20,
true,
(byte) 42,
(short) 200,
9876543210L,
3.14f,
2.718,
testDecimal,
DateData.fromEpochDay(18628),
TimeData.fromMillisOfDay(43200000),
testTs,
testTsLtz,
testTsTz,
testArray,
testMap,
testRow)));
events.add(
DataChangeEvent.updateEvent(
myTable1,
GenericRecordData.of(
2,
BinaryStringData.fromString("Bob"),
20,
true,
(byte) 42,
(short) 200,
9876543210L,
3.14f,
2.718,
testDecimal,
DateData.fromEpochDay(18628),
TimeData.fromMillisOfDay(43200000),
testTs,
testTsLtz,
testTsTz,
testArray,
testMap,
testRow),
GenericRecordData.of(
2,
BinaryStringData.fromString("Bob"),
30,
false,
(byte) 43,
(short) 201,
9876543211L,
3.15f,
2.719,
testDecimal,
DateData.fromEpochDay(18629),
TimeData.fromMillisOfDay(43201000),
testTs,
testTsLtz,
testTsTz,
testArray,
testMap,
testRow)));
events.add(
DataChangeEvent.insertEvent(
myTable1,
GenericRecordData.of(
3, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null)));
events.add(
DataChangeEvent.deleteEvent(
myTable1,
GenericRecordData.of(
1,
BinaryStringData.fromString("Alice"),
18,
true,
(byte) 1,
(short) 100,
1000L,
1.0f,
1.0,
testDecimal,
DateData.fromEpochDay(18628),
TimeData.fromMillisOfDay(43200000),
testTs,
testTsLtz,
testTsTz,
testArray,
testMap,
testRow)));

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

TransformDef transformDef =
new TransformDef(
"default_namespace.default_schema.mytable1",
"*, 'test_tag' as tag",
"id <> 1",
null,
null,
null,
"",
null);

Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
new ArrayList<>(Collections.singletonList(transformDef)),
Collections.emptyList(),
pipelineConfig);

PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

List<String> results = ValuesDatabase.getResults(myTable1);
assertThat(results).hasSize(2);
assertThat(results)
.anySatisfy(
r ->
assertThat(r)
.startsWith(
"default_namespace.default_schema.mytable1:id=2;name=Bob;age=30;col_bool=false;col_tinyint=43;col_smallint=201;col_bigint=9876543211;col_float=3.15;col_double=2.719;col_decimal=123.45;col_date=2021-01-02;col_time=12:00:01;col_timestamp=2021-01-01T00:00;col_timestamp_ltz=2021-01-01T00:00;col_timestamp_tz=2021-01-01T00:00:00Z;")
.endsWith("tag=test_tag"))
.anySatisfy(
r ->
assertThat(r)
.isEqualTo(
"default_namespace.default_schema.mytable1:id=3;name=;age=;col_bool=;col_tinyint=;col_smallint=;col_bigint=;col_float=;col_double=;col_decimal=;col_date=;col_time=;col_timestamp=;col_timestamp_ltz=;col_timestamp_tz=;col_array=;col_map=;col_row=;tag=test_tag"));

String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(Arrays.asList(outputEvents))
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col_bool` BOOLEAN,`col_tinyint` TINYINT,`col_smallint` SMALLINT,`col_bigint` BIGINT,`col_float` FLOAT,`col_double` DOUBLE,`col_decimal` DECIMAL(10, 2),`col_date` DATE,`col_time` TIME(0),`col_timestamp` TIMESTAMP(3),`col_timestamp_ltz` TIMESTAMP_LTZ(3),`col_timestamp_tz` TIMESTAMP(3) WITH TIME ZONE,`col_array` ARRAY<STRING>,`col_map` MAP<STRING, INT>,`col_row` ROW<`f0` INT, `f1` STRING>,`tag` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, true, 42, 200, 9876543210, 3.14, 2.718, 123.45, 2021-01-01, 12:00, 2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, {f0: INT -> 77, f1: STRING -> inner}, test_tag], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, true, 42, 200, 9876543210, 3.14, 2.718, 123.45, 2021-01-01, 12:00, 2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, {f0: INT -> 77, f1: STRING -> inner}, test_tag], after=[2, Bob, 30, false, 43, 201, 9876543211, 3.15, 2.719, 123.45, 2021-01-02, 12:00:01, 2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, {f0: INT -> 77, f1: STRING -> inner}, test_tag], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, test_tag], op=INSERT, meta=()}");
}

BinaryRecordData generate(Schema schema, Object... fields) {
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,11 @@ private void updatePrimaryKeyIndexes() {
private String buildPrimaryKeyStr(RecordData recordData) {
StringBuilder stringBuilder = new StringBuilder();
for (Integer primaryKeyIndex : primaryKeyIndexes) {
stringBuilder.append(recordData.getString(primaryKeyIndex).toString()).append(",");
RecordData.FieldGetter fieldGetter =
RecordData.createFieldGetter(
columns.get(primaryKeyIndex).getType(), primaryKeyIndex);
Object value = fieldGetter.getFieldOrNull(recordData);
stringBuilder.append(value != null ? value.toString() : "null").append(",");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
return stringBuilder.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
Expand Down Expand Up @@ -391,8 +392,8 @@ private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent)
+ "This is likely a bug, please consider filing an issue.",
tableId);

BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
RecordData before = dataChangeEvent.before();
RecordData after = dataChangeEvent.after();
if (before != null) {
BinaryRecordData projectedBefore = processor.processFillDataField(before);
dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public CreateTableEvent preTransformCreateTableEvent(CreateTableEvent createTabl
return new CreateTableEvent(createTableEvent.tableId(), schema);
}

public BinaryRecordData processFillDataField(BinaryRecordData data) {
public BinaryRecordData processFillDataField(RecordData data) {
List<Object> valueList = new ArrayList<>();
List<Column> columns = tableChangeInfo.getPreTransformedSchema().getColumns();
Map<String, RecordData.FieldGetter> sourceFieldGettersMap =
Expand Down
Loading
Loading