Skip to content

Commit 6716a49

Browse files
committed
add more tests
1 parent afc9b67 commit 6716a49

9 files changed

Lines changed: 740 additions & 76 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,20 @@ limitations under the License.
231231
<version>${httpclient.version}</version>
232232
</dependency>
233233

234+
<!-- test -->
235+
<dependency>
236+
<groupId>org.apache.flink</groupId>
237+
<artifactId>flink-cdc-composer</artifactId>
238+
<version>${project.version}</version>
239+
<scope>test</scope>
240+
</dependency>
241+
242+
<dependency>
243+
<groupId>org.apache.flink</groupId>
244+
<artifactId>flink-metrics-dropwizard</artifactId>
245+
<version>${flink.version}</version>
246+
<scope>test</scope>
247+
</dependency>
234248
</dependencies>
235249

236250
<build>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.common.sink.MetadataApplier;
2525
import org.apache.flink.cdc.connectors.hudi.sink.v2.HudiSink;
2626

27+
import org.apache.hudi.common.config.HoodieCommonConfig;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

@@ -59,11 +60,6 @@ public EventSinkProvider getEventSinkProvider() {
5960
// Convert CDC configuration to Flink configuration for HoodieSink
6061
org.apache.flink.configuration.Configuration flinkConfig = toFlinkConfig(config);
6162

62-
// Extract configuration options
63-
java.util.Map<String, String> configMap = config.toMap();
64-
boolean overwrite = "insert_overwrite".equals(configMap.get("write.operation"));
65-
boolean isBounded = "BATCH".equals(configMap.get("execution.checkpointing.mode"));
66-
6763
// Create the HudiSink with multi-table support via wrapper pattern
6864
// Use empty RowType since tables are created dynamically
6965
HudiSink hudiSink = new HudiSink(flinkConfig, schemaOperatorUid, ZoneId.systemDefault());
@@ -94,6 +90,8 @@ private static org.apache.flink.configuration.Configuration toFlinkConfig(
9490
if (cdcConfig != null) {
9591
cdcConfig.toMap().forEach(flinkConfig::setString);
9692
}
93+
// always enable schema evolution
94+
flinkConfig.setString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
9795
return flinkConfig;
9896
}
9997
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ public HudiDataSink createDataSink(Context context) {
7979
public Set<ConfigOption<?>> requiredOptions() {
8080
Set<ConfigOption<?>> options = new HashSet<>();
8181
options.add(HudiConfig.PATH);
82-
options.add(HudiConfig.RECORD_KEY_FIELD);
8382
return options;
8483
}
8584

8685
@Override
8786
public Set<ConfigOption<?>> optionalOptions() {
8887
Set<ConfigOption<?>> options = new HashSet<>();
88+
options.add(HudiConfig.RECORD_KEY_FIELD);
8989
options.add(HudiConfig.TABLE_TYPE);
9090
options.add(HudiConfig.PARTITION_PATH_FIELD);
9191
options.add(HudiConfig.INDEX_TYPE);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/event/HudiRecordEventSerializer.java

Lines changed: 23 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package org.apache.flink.cdc.connectors.hudi.sink.event;
1919

20-
import org.apache.flink.cdc.common.event.CreateTableEvent;
2120
import org.apache.flink.cdc.common.event.DataChangeEvent;
2221
import org.apache.flink.cdc.common.event.Event;
23-
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2422
import org.apache.flink.cdc.common.event.TableId;
2523
import org.apache.flink.cdc.common.schema.Schema;
26-
import org.apache.flink.cdc.common.utils.SchemaUtils;
24+
import org.apache.flink.cdc.common.utils.Preconditions;
2725
import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
2826

2927
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
@@ -78,54 +76,28 @@ public HudiRecordEventSerializer(ZoneId zoneId) {
7876
*/
7977
@Override
8078
public HoodieFlinkInternalRow serialize(Event event, String fileId, String instantTime) {
81-
if (event instanceof CreateTableEvent) {
82-
CreateTableEvent createTableEvent = (CreateTableEvent) event;
83-
schemaMaps.put(createTableEvent.tableId(), createTableEvent.getSchema());
84-
// Clear keyGenCache for this table since schema changed
85-
keyGenCache.remove(createTableEvent.tableId());
86-
// Schema events don't produce records
87-
return null;
88-
89-
} else if (event instanceof SchemaChangeEvent) {
90-
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
91-
Schema existingSchema = schemaMaps.get(schemaChangeEvent.tableId());
92-
if (existingSchema != null
93-
&& !SchemaUtils.isSchemaChangeEventRedundant(
94-
existingSchema, schemaChangeEvent)) {
95-
Schema newSchema =
96-
SchemaUtils.applySchemaChangeEvent(existingSchema, schemaChangeEvent);
97-
schemaMaps.put(schemaChangeEvent.tableId(), newSchema);
98-
// Clear keyGenCache for this table since schema changed
99-
keyGenCache.remove(schemaChangeEvent.tableId());
100-
}
101-
// Schema events don't produce records
102-
return null;
103-
104-
} else if (event instanceof DataChangeEvent) {
105-
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
106-
Schema schema = schemaMaps.get(dataChangeEvent.tableId());
107-
108-
if (schema == null) {
109-
throw new IllegalStateException(
110-
"No schema available for table "
111-
+ dataChangeEvent.tableId()
112-
+ ". CreateTableEvent should arrive before DataChangeEvent.");
113-
}
114-
115-
// Get or create RowDataKeyGen for this table
116-
RowDataKeyGen keyGen =
117-
keyGenCache.computeIfAbsent(
118-
dataChangeEvent.tableId(), tid -> RowDataUtils.createKeyGen(schema));
119-
120-
// Convert DataChangeEvent to HoodieFlinkInternalRow using RowDataKeyGen
121-
return RowDataUtils.convertDataChangeEventToHoodieFlinkInternalRow(
122-
dataChangeEvent, schema, zoneId, keyGen, fileId, instantTime);
123-
124-
} else {
125-
throw new IllegalArgumentException(
126-
"Unsupported event type for Hudi serialization: "
127-
+ event.getClass().getSimpleName());
79+
Preconditions.checkArgument(
80+
event instanceof DataChangeEvent,
81+
"Unsupported event type for Hudi serialization: "
82+
+ event.getClass().getSimpleName());
83+
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
84+
Schema schema = schemaMaps.get(dataChangeEvent.tableId());
85+
86+
if (schema == null) {
87+
throw new IllegalStateException(
88+
"No schema available for table "
89+
+ dataChangeEvent.tableId()
90+
+ ". CreateTableEvent should arrive before DataChangeEvent.");
12891
}
92+
93+
// Get or create RowDataKeyGen for this table
94+
RowDataKeyGen keyGen =
95+
keyGenCache.computeIfAbsent(
96+
dataChangeEvent.tableId(), tid -> RowDataUtils.createKeyGen(schema));
97+
98+
// Convert DataChangeEvent to HoodieFlinkInternalRow using RowDataKeyGen
99+
return RowDataUtils.convertDataChangeEventToHoodieFlinkInternalRow(
100+
dataChangeEvent, schema, zoneId, keyGen, fileId, instantTime);
129101
}
130102

131103
/**
@@ -139,7 +111,7 @@ public HoodieFlinkInternalRow serialize(Event event, String fileId, String insta
139111
*/
140112
@Override
141113
public HoodieFlinkInternalRow serialize(Event event) {
142-
return serialize(event, "temp", "temp");
114+
return serialize(event, "", "");
143115
}
144116

145117
/**

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/operator/MultiTableWriteOperator.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,20 +79,6 @@ public void open() throws Exception {
7979

8080
// Set the SchemaEvolutionClient on the MultiTableEventStreamWriteFunction
8181
multiTableWriteFunction.setSchemaEvolutionClient(schemaEvolutionClient);
82-
83-
// Register this sink subtask with the SchemaOperator
84-
int subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
85-
try {
86-
schemaEvolutionClient.registerSubtask(subtaskIndex);
87-
LOG.info(
88-
"Registered sink subtask {} with SchemaOperator {}",
89-
subtaskIndex,
90-
schemaOperatorUid);
91-
} catch (Exception e) {
92-
LOG.warn(
93-
"Failed to register subtask with SchemaOperator, but continuing: {}",
94-
e.getMessage());
95-
}
9682
}
9783

9884
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.hudi.sink;
19+
20+
import org.apache.flink.cdc.common.configuration.ConfigOption;
21+
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.common.factories.DataSinkFactory;
23+
import org.apache.flink.cdc.common.factories.FactoryHelper;
24+
import org.apache.flink.cdc.common.sink.DataSink;
25+
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
26+
import org.apache.flink.table.api.ValidationException;
27+
28+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
29+
30+
import org.assertj.core.api.Assertions;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.io.TempDir;
33+
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.stream.Collectors;
38+
39+
/** Tests for {@link HudiDataSinkFactory}. */
40+
class HudiDataSinkFactoryTest {
41+
42+
@TempDir public static java.nio.file.Path temporaryFolder;
43+
44+
@Test
45+
void testCreateDataSink() {
46+
DataSinkFactory sinkFactory =
47+
FactoryDiscoveryUtils.getFactoryByIdentifier("hudi", DataSinkFactory.class);
48+
Assertions.assertThat(sinkFactory).isInstanceOf(HudiDataSinkFactory.class);
49+
50+
Configuration conf =
51+
Configuration.fromMap(
52+
ImmutableMap.<String, String>builder()
53+
.put(HudiConfig.PATH.key(), temporaryFolder.toString())
54+
.build());
55+
DataSink dataSink =
56+
sinkFactory.createDataSink(
57+
new FactoryHelper.DefaultContext(
58+
conf, conf, Thread.currentThread().getContextClassLoader()));
59+
Assertions.assertThat(dataSink).isInstanceOf(HudiDataSink.class);
60+
}
61+
62+
@Test
63+
void testLackRequireOption() {
64+
DataSinkFactory sinkFactory =
65+
FactoryDiscoveryUtils.getFactoryByIdentifier("hudi", DataSinkFactory.class);
66+
Assertions.assertThat(sinkFactory).isInstanceOf(HudiDataSinkFactory.class);
67+
68+
Map<String, String> options = new HashMap<>();
69+
options.put(HudiConfig.PATH.key(), temporaryFolder.toString());
70+
71+
List<String> requireKeys =
72+
sinkFactory.requiredOptions().stream()
73+
.map(ConfigOption::key)
74+
.collect(Collectors.toList());
75+
for (String requireKey : requireKeys) {
76+
Map<String, String> remainingOptions = new HashMap<>(options);
77+
remainingOptions.remove(requireKey);
78+
Configuration conf = Configuration.fromMap(remainingOptions);
79+
80+
Assertions.assertThatThrownBy(
81+
() ->
82+
sinkFactory.createDataSink(
83+
new FactoryHelper.DefaultContext(
84+
conf,
85+
conf,
86+
Thread.currentThread()
87+
.getContextClassLoader())))
88+
.isInstanceOf(ValidationException.class)
89+
.hasMessageContaining(
90+
String.format(
91+
"One or more required options are missing.\n\n"
92+
+ "Missing required options are:\n\n"
93+
+ "%s",
94+
requireKey));
95+
}
96+
}
97+
98+
@Test
99+
void testUnsupportedOption() {
100+
DataSinkFactory sinkFactory =
101+
FactoryDiscoveryUtils.getFactoryByIdentifier("hudi", DataSinkFactory.class);
102+
Assertions.assertThat(sinkFactory).isInstanceOf(HudiDataSinkFactory.class);
103+
104+
Configuration conf =
105+
Configuration.fromMap(
106+
ImmutableMap.<String, String>builder()
107+
.put(HudiConfig.PATH.key(), temporaryFolder.toString())
108+
.put(HudiConfig.RECORD_KEY_FIELD.key(), "id")
109+
.put("unsupported_key", "unsupported_value")
110+
.build());
111+
112+
Assertions.assertThatThrownBy(
113+
() ->
114+
sinkFactory.createDataSink(
115+
new FactoryHelper.DefaultContext(
116+
conf,
117+
conf,
118+
Thread.currentThread().getContextClassLoader())))
119+
.isInstanceOf(ValidationException.class)
120+
.hasMessageContaining(
121+
"Unsupported options found for 'hudi'.\n\n"
122+
+ "Unsupported options:\n\n"
123+
+ "unsupported_key");
124+
}
125+
126+
@Test
127+
void testPrefixRequireOption() {
128+
DataSinkFactory sinkFactory =
129+
FactoryDiscoveryUtils.getFactoryByIdentifier("hudi", DataSinkFactory.class);
130+
Assertions.assertThat(sinkFactory).isInstanceOf(HudiDataSinkFactory.class);
131+
Configuration conf =
132+
Configuration.fromMap(
133+
ImmutableMap.<String, String>builder()
134+
.put(HudiConfig.PATH.key(), temporaryFolder.toString())
135+
.put(HudiConfig.RECORD_KEY_FIELD.key(), "id")
136+
.build());
137+
138+
DataSink dataSink =
139+
sinkFactory.createDataSink(
140+
new FactoryHelper.DefaultContext(
141+
conf, conf, Thread.currentThread().getContextClassLoader()));
142+
Assertions.assertThat(dataSink).isInstanceOf(HudiDataSink.class);
143+
}
144+
}

0 commit comments

Comments
 (0)