Skip to content

Commit 57eb8fa

Browse files
authored
Merge branch 'apache:master' into master-36794
2 parents 2225400 + f520424 commit 57eb8fa

13 files changed

Lines changed: 470 additions & 43 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.configuration.ConfigOption;
2222
import org.apache.flink.cdc.common.configuration.Configuration;
23+
import org.apache.flink.cdc.common.configuration.FallbackKey;
2324
import org.apache.flink.cdc.common.utils.Preconditions;
2425
import org.apache.flink.configuration.ReadableConfig;
2526
import org.apache.flink.table.api.ValidationException;
@@ -32,6 +33,7 @@
3233
import java.util.Set;
3334
import java.util.stream.Collectors;
3435
import java.util.stream.Stream;
36+
import java.util.stream.StreamSupport;
3537

3638
/** A helper for working with {@link Factory}. */
3739
@PublicEvolving
@@ -71,7 +73,7 @@ public static void validateFactoryOptions(
7173
final List<String> missingRequiredOptions =
7274
requiredOptions.stream()
7375
.filter(option -> configuration.get(option) == null)
74-
.map(ConfigOption::key)
76+
.flatMap(FactoryHelper::allKeys)
7577
.sorted()
7678
.collect(Collectors.toList());
7779

@@ -114,8 +116,8 @@ public static void validateUnconsumedKeys(
114116
public void validate() {
115117
Set<String> allOptionKeys =
116118
Stream.concat(
117-
factory.requiredOptions().stream().map(ConfigOption::key),
118-
factory.optionalOptions().stream().map(ConfigOption::key))
119+
factory.requiredOptions().stream().flatMap(FactoryHelper::allKeys),
120+
factory.optionalOptions().stream().flatMap(FactoryHelper::allKeys))
119121
.collect(Collectors.toSet());
120122

121123
validateFactoryOptions(factory, context.getFactoryConfiguration());
@@ -140,8 +142,8 @@ public void validateExcept(String... prefixesToSkip) {
140142

141143
Set<String> allOptionKeys =
142144
Stream.concat(
143-
factory.requiredOptions().stream().map(ConfigOption::key),
144-
factory.optionalOptions().stream().map(ConfigOption::key))
145+
factory.requiredOptions().stream().flatMap(FactoryHelper::allKeys),
146+
factory.optionalOptions().stream().flatMap(FactoryHelper::allKeys))
145147
.collect(Collectors.toSet());
146148

147149
Set<String> filteredOptionKeys =
@@ -153,6 +155,13 @@ public void validateExcept(String... prefixesToSkip) {
153155
validateUnconsumedKeys(factory.identifier(), filteredOptionKeys, allOptionKeys);
154156
}
155157

158+
private static Stream<String> allKeys(ConfigOption<?> option) {
159+
return Stream.concat(
160+
Stream.of(option.key()),
161+
StreamSupport.stream(option.fallbackKeys().spliterator(), false)
162+
.map(FallbackKey::getKey));
163+
}
164+
156165
public ReadableConfig getFormatConfig(String formatPrefix) {
157166
final String prefix = formatPrefix + ".";
158167
Map<String, String> formatConfigMap = new HashMap<>();

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,21 @@ public String identifier() {
4646
@Override
4747
public Set<ConfigOption<?>> requiredOptions() {
4848
return Sets.newHashSet(
49-
ConfigOptions.key("id").intType().noDefaultValue(),
49+
ConfigOptions.key("id")
50+
.intType()
51+
.noDefaultValue()
52+
.withFallbackKeys("id_fallback"),
5053
ConfigOptions.key("name").stringType().noDefaultValue(),
5154
ConfigOptions.key("age").doubleType().noDefaultValue());
5255
}
5356

5457
@Override
5558
public Set<ConfigOption<?>> optionalOptions() {
5659
return Sets.newHashSet(
57-
ConfigOptions.key("hobby").stringType().noDefaultValue(),
60+
ConfigOptions.key("hobby")
61+
.stringType()
62+
.noDefaultValue()
63+
.withFallbackKeys("hobby_fallback"),
5864
ConfigOptions.key("location").stringType().defaultValue("Everywhere"),
5965
ConfigOptions.key("misc")
6066
.mapType()
@@ -79,6 +85,19 @@ void testCorrectConfigValidation() {
7985
Configuration.fromMap(configurations), null, null));
8086

8187
factoryHelper.validate();
88+
89+
// Validation for fallback keys.
90+
configurations.clear();
91+
configurations.put("id_fallback", "2");
92+
configurations.put("name", "Bob");
93+
configurations.put("age", "18");
94+
configurations.put("hobby_fallback", "Swimming");
95+
factoryHelper =
96+
FactoryHelper.createFactoryHelper(
97+
getDummyFactory(),
98+
new FactoryHelper.DefaultContext(
99+
Configuration.fromMap(configurations), null, null));
100+
factoryHelper.validate();
82101
}
83102

84103
@Test

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.flink.cdc.composer.definition;
1919

20-
import org.apache.flink.cdc.common.utils.StringUtils;
21-
2220
import java.util.Objects;
2321

2422
/**
@@ -78,18 +76,10 @@ public String getProjection() {
7876
return projection;
7977
}
8078

81-
public boolean isValidProjection() {
82-
return !StringUtils.isNullOrWhitespaceOnly(projection);
83-
}
84-
8579
public String getFilter() {
8680
return filter;
8781
}
8882

89-
public boolean isValidFilter() {
90-
return !StringUtils.isNullOrWhitespaceOnly(filter);
91-
}
92-
9383
public String getDescription() {
9484
return description;
9585
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,15 @@ public DataStream<Event> translatePostTransform(
105105
PostTransformOperatorBuilder postTransformFunctionBuilder =
106106
PostTransformOperator.newBuilder();
107107
for (TransformDef transform : transforms) {
108-
if (transform.isValidProjection() || transform.isValidFilter()) {
109-
postTransformFunctionBuilder.addTransform(
110-
transform.getSourceTable(),
111-
transform.getProjection(),
112-
transform.getFilter(),
113-
transform.getPrimaryKeys(),
114-
transform.getPartitionKeys(),
115-
transform.getTableOptions(),
116-
transform.getPostTransformConverter(),
117-
supportedMetadataColumns);
118-
}
108+
postTransformFunctionBuilder.addTransform(
109+
transform.getSourceTable(),
110+
transform.getProjection(),
111+
transform.getFilter(),
112+
transform.getPrimaryKeys(),
113+
transform.getPartitionKeys(),
114+
transform.getTableOptions(),
115+
transform.getPostTransformConverter(),
116+
supportedMetadataColumns);
119117
}
120118
postTransformFunctionBuilder.addTimezone(timezone);
121119
postTransformFunctionBuilder.addUdfFunctions(

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -577,11 +577,11 @@ void testMetadataInfoWithoutChangingSchema(ValuesDataSink.SinkApi sinkApi) throw
577577
"A Transform Block without projection or filter",
578578
null)),
579579
Arrays.asList(
580-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
580+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING NOT NULL,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
581581
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
582582
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
583583
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
584-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
584+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
585585
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
586586
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}",
587587
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}"));
@@ -2032,7 +2032,7 @@ void testExplicitPrimaryKeyWithNullable() throws Exception {
20322032
assertThat(outputEvents)
20332033
.containsExactly(
20342034
// Initial stage
2035-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=name, partitionKeys=id;name, options=()}",
2035+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING NOT NULL,`age` INT}, primaryKeys=name, partitionKeys=id;name, options=()}",
20362036
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}",
20372037
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
20382038
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
@@ -2048,7 +2048,7 @@ void testExplicitPrimaryKeyWithNullable() throws Exception {
20482048
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
20492049

20502050
// Alter column type stage
2051-
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
2051+
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING NOT NULL, age=INT}}",
20522052
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
20532053
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
20542054
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
@@ -2998,6 +2998,32 @@ void testDateAndTimeCastingFunctions() throws Exception {
29982998
"DataChangeEvent{tableId=default_namespace.default_schema.my_table, before=[], after=[2, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
29992999
}
30003000

3001+
@ParameterizedTest
3002+
@EnumSource
3003+
void testPostTransformConvertersWoProjection(ValuesDataSink.SinkApi sinkApi) throws Exception {
3004+
runGenericTransformTest(
3005+
sinkApi,
3006+
Collections.singletonList(
3007+
new TransformDef(
3008+
"default_namespace.default_schema.\\.*",
3009+
null,
3010+
null,
3011+
null,
3012+
null,
3013+
null,
3014+
null,
3015+
"SOFT_DELETE")),
3016+
Arrays.asList(
3017+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
3018+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
3019+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
3020+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
3021+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
3022+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
3023+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}",
3024+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}"));
3025+
}
3026+
30013027
private List<Event> generateFloorCeilAndRoundEvents(TableId tableId) {
30023028
List<Event> events = new ArrayList<>();
30033029
Schema schema =

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ limitations under the License.
2727
<name>flink-cdc-pipeline-connector-doris</name>
2828

2929
<properties>
30-
<doris.connector.version>25.0.0</doris.connector.version>
30+
<doris.connector.version>25.1.0</doris.connector.version>
3131
<mysql.connector.version>8.0.26</mysql.connector.version>
3232
</properties>
3333

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ limitations under the License.
2929
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
3030

3131
<properties>
32-
<paimon.version>1.2.0</paimon.version>
32+
<paimon.version>1.3.1</paimon.version>
3333
<hadoop.version>2.8.5</hadoop.version>
3434
<hive.version>2.3.9</hive.version>
3535
<mockito.version>3.4.6</mockito.version>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.memory.MemoryPoolFactory;
3434
import org.apache.paimon.memory.MemorySegmentPool;
3535
import org.apache.paimon.operation.FileStoreWrite;
36+
import org.apache.paimon.operation.WriteRestore;
3637
import org.apache.paimon.table.FileStoreTable;
3738
import org.apache.paimon.table.sink.CommitMessage;
3839
import org.apache.paimon.table.sink.SinkRecord;
@@ -118,7 +119,7 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
118119
"memoryPool and memoryPoolFactory cannot be set at the same time.");
119120

120121
TableWriteImpl<?> tableWrite =
121-
table.newWrite(commitUser, (part, bucket) -> true, null)
122+
table.newWrite(commitUser)
122123
.withIOManager(paimonIOManager)
123124
.withIgnorePreviousFiles(ignorePreviousFiles);
124125

@@ -129,19 +130,25 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
129130
if (memoryPoolFactory != null) {
130131
return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
131132
} else {
132-
return tableWrite.withMemoryPool(
133-
memoryPool != null
134-
? memoryPool
135-
: new HeapMemorySegmentPool(
136-
table.coreOptions().writeBufferSize(),
137-
table.coreOptions().pageSize()));
133+
return (TableWriteImpl<?>)
134+
tableWrite.withMemoryPool(
135+
memoryPool != null
136+
? memoryPool
137+
: new HeapMemorySegmentPool(
138+
table.coreOptions().writeBufferSize(),
139+
table.coreOptions().pageSize()));
138140
}
139141
}
140142

141143
public void withCompactExecutor(ExecutorService compactExecutor) {
142144
write.withCompactExecutor(compactExecutor);
143145
}
144146

147+
@Override
148+
public void setWriteRestore(WriteRestore writeRestore) {
149+
this.write.withWriteRestore(writeRestore);
150+
}
151+
145152
@Override
146153
public SinkRecord write(InternalRow internalRow) throws Exception {
147154
return write.writeAndReturn(internalRow);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
184184
bucket = 0;
185185
break;
186186
}
187-
case CROSS_PARTITION:
187+
case KEY_DYNAMIC:
188188
default:
189189
{
190190
throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0);

0 commit comments

Comments
 (0)