Skip to content

Commit 4a57308

Browse files
voonhousleonardBang
authored andcommitted
[FLINK-36313][pipeline-connector/hudi] Introduce Pipeline Sink Connector for Apache Hudi
1 parent f520424 commit 4a57308

35 files changed

Lines changed: 7161 additions & 108 deletions

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/pom.xml

Lines changed: 397 additions & 0 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.hudi.sink;
19+
20+
import org.apache.flink.cdc.common.configuration.ConfigOption;
21+
import org.apache.flink.cdc.common.configuration.ConfigOptions;
22+
import org.apache.flink.configuration.description.Description;
23+
24+
import org.apache.hudi.common.config.HoodieCommonConfig;
25+
import org.apache.hudi.configuration.FlinkOptions;
26+
27+
/**
28+
* A utility class that holds all the configuration options for the Hudi sink. It wraps Hudi's
29+
* {@link FlinkOptions} to provide a consistent interface within the CDC framework, using helper
30+
* methods to reduce boilerplate.
31+
*/
32+
public class HudiConfig {
33+
34+
// ----- Helper Methods for Option Creation -----
35+
36+
private static ConfigOption<String> stringOption(String key, Description description) {
37+
return ConfigOptions.key(key)
38+
.stringType()
39+
.noDefaultValue()
40+
.withDescription(description.toString());
41+
}
42+
43+
private static ConfigOption<String> stringOption(
44+
String key, String defaultValue, Description description) {
45+
return ConfigOptions.key(key)
46+
.stringType()
47+
.defaultValue(defaultValue)
48+
.withDescription(description.toString());
49+
}
50+
51+
private static ConfigOption<Integer> intOption(String key, Description description) {
52+
return ConfigOptions.key(key)
53+
.intType()
54+
.noDefaultValue()
55+
.withDescription(description.toString());
56+
}
57+
58+
private static ConfigOption<Boolean> booleanOption(
59+
String key, boolean defaultValue, Description description) {
60+
return ConfigOptions.key(key)
61+
.booleanType()
62+
.defaultValue(defaultValue)
63+
.withDescription(description.toString());
64+
}
65+
66+
// ----- Public Configuration Options -----
67+
68+
// Core Hudi Options
69+
public static final ConfigOption<String> PATH =
70+
stringOption(FlinkOptions.PATH.key(), FlinkOptions.PATH.description());
71+
72+
// public static final ConfigOption<String> TABLE_TYPE =
73+
// stringOption(
74+
// FlinkOptions.TABLE_TYPE.key(),
75+
// FlinkOptions.TABLE_TYPE.defaultValue(),
76+
// FlinkOptions.TABLE_TYPE.description());
77+
public static final ConfigOption<String> TABLE_TYPE =
78+
stringOption(
79+
"hoodie.table.type",
80+
FlinkOptions.TABLE_TYPE.defaultValue(),
81+
FlinkOptions.TABLE_TYPE.description());
82+
83+
// Required Fields for CDC
84+
public static final ConfigOption<String> RECORD_KEY_FIELD =
85+
stringOption(
86+
FlinkOptions.RECORD_KEY_FIELD.key(),
87+
FlinkOptions.RECORD_KEY_FIELD.description());
88+
89+
public static final ConfigOption<String> ORDERING_FIELDS =
90+
stringOption(
91+
FlinkOptions.ORDERING_FIELDS.key(), FlinkOptions.ORDERING_FIELDS.description());
92+
93+
public static final ConfigOption<String> PARTITION_PATH_FIELD =
94+
stringOption(
95+
FlinkOptions.PARTITION_PATH_FIELD.key(),
96+
"",
97+
FlinkOptions.PARTITION_PATH_FIELD.description());
98+
99+
// Bucket Index Options
100+
public static final ConfigOption<String> INDEX_TYPE =
101+
stringOption(
102+
FlinkOptions.INDEX_TYPE.key(), "BUCKET", FlinkOptions.INDEX_TYPE.description());
103+
104+
public static final ConfigOption<String> INDEX_BUCKET_TARGET =
105+
stringOption(
106+
FlinkOptions.INDEX_KEY_FIELD.key(), FlinkOptions.INDEX_KEY_FIELD.description());
107+
108+
public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS =
109+
intOption(
110+
FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(),
111+
FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.description());
112+
113+
// Hive Sync Options
114+
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED =
115+
booleanOption(
116+
FlinkOptions.HIVE_SYNC_ENABLED.key(),
117+
false,
118+
FlinkOptions.HIVE_SYNC_ENABLED.description());
119+
120+
public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS =
121+
stringOption(
122+
FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(),
123+
FlinkOptions.HIVE_SYNC_METASTORE_URIS.description());
124+
125+
public static final ConfigOption<String> HIVE_SYNC_DB =
126+
stringOption(FlinkOptions.HIVE_SYNC_DB.key(), FlinkOptions.HIVE_SYNC_DB.description());
127+
128+
public static final ConfigOption<String> HIVE_SYNC_TABLE =
129+
stringOption(
130+
FlinkOptions.HIVE_SYNC_TABLE.key(), FlinkOptions.HIVE_SYNC_TABLE.description());
131+
132+
public static final ConfigOption<String> SCHEMA_OPERATOR_UID =
133+
ConfigOptions.key("schema.operator.uid")
134+
.stringType()
135+
.defaultValue("schema-operator-uid")
136+
.withDescription(
137+
"A unique ID for the schema operator, used by the BucketAssignerOperator to create a SchemaEvolutionClient.");
138+
139+
public static final ConfigOption<String> TABLE_SCHEMA =
140+
ConfigOptions.key("table.schema")
141+
.stringType()
142+
.noDefaultValue()
143+
.withDescription("The table schema in JSON format for the Hudi table.");
144+
145+
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS =
146+
intOption(
147+
FlinkOptions.BUCKET_ASSIGN_TASKS.key(),
148+
FlinkOptions.BUCKET_ASSIGN_TASKS.description());
149+
150+
public static final ConfigOption<Integer> WRITE_TASKS =
151+
intOption(FlinkOptions.WRITE_TASKS.key(), FlinkOptions.WRITE_TASKS.description());
152+
153+
public static final ConfigOption<Boolean> SCHEMA_ON_READ_ENABLE =
154+
booleanOption(
155+
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
156+
false,
157+
Description.builder().build());
158+
159+
public static final ConfigOption<Integer> COMPACTION_DELTA_COMMITS =
160+
ConfigOptions.key("compaction.delta_commits")
161+
.intType()
162+
.defaultValue(5)
163+
.withDescription(
164+
"Max delta commits needed to trigger compaction, default 5 commits");
165+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.hudi.sink;
19+
20+
import org.apache.flink.cdc.common.configuration.Configuration;
21+
import org.apache.flink.cdc.common.sink.DataSink;
22+
import org.apache.flink.cdc.common.sink.EventSinkProvider;
23+
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
24+
import org.apache.flink.cdc.common.sink.MetadataApplier;
25+
import org.apache.flink.cdc.connectors.hudi.sink.v2.HudiSink;
26+
27+
import org.apache.hudi.common.config.HoodieCommonConfig;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.io.Serializable;
32+
import java.time.ZoneId;
33+
34+
/**
35+
* A {@link DataSink} for Apache Hudi that provides the main entry point for the Flink CDC
36+
* framework.
37+
*/
38+
public class HudiDataSink implements DataSink, Serializable {
39+
40+
private static final Logger LOG = LoggerFactory.getLogger(HudiDataSink.class);
41+
42+
private final Configuration config;
43+
44+
private final String schemaOperatorUid;
45+
46+
public HudiDataSink(Configuration config, String schemaOperatorUid) {
47+
LOG.info("Creating HudiDataSink with universal configuration {}", config);
48+
this.config = config;
49+
this.schemaOperatorUid = schemaOperatorUid;
50+
}
51+
52+
/** Provides the core sink implementation that handles the data flow of events. */
53+
@Override
54+
public EventSinkProvider getEventSinkProvider() {
55+
LOG.info("Creating HudiDataSinkProvider with universal configuration {}", config);
56+
// For CDC pipelines, we don't have a pre-configured schema since tables are created
57+
// dynamically
58+
// Instead, we use a multi-table sink that handles schema discovery and table creation
59+
60+
// Convert CDC configuration to Flink configuration for HoodieSink
61+
org.apache.flink.configuration.Configuration flinkConfig = toFlinkConfig(config);
62+
63+
// Create the HudiSink with multi-table support via wrapper pattern
64+
// Use empty RowType since tables are created dynamically
65+
HudiSink hudiSink = new HudiSink(flinkConfig, schemaOperatorUid, ZoneId.systemDefault());
66+
67+
return FlinkSinkProvider.of(hudiSink);
68+
}
69+
70+
/**
71+
* Provides the metadata applier. In our design, this has a passive role (e.g., logging), as
72+
* transactional metadata operations are handled by the HudiCommitter.
73+
*/
74+
@Override
75+
public MetadataApplier getMetadataApplier() {
76+
return new HudiMetadataApplier(config);
77+
}
78+
79+
/**
80+
* Converts a {@link org.apache.flink.cdc.common.configuration.Configuration} to a {@link
81+
* org.apache.flink.configuration.Configuration}.
82+
*
83+
* @param cdcConfig The input CDC configuration.
84+
* @return A new Flink configuration containing the same key-value pairs.
85+
*/
86+
private static org.apache.flink.configuration.Configuration toFlinkConfig(
87+
Configuration cdcConfig) {
88+
final org.apache.flink.configuration.Configuration flinkConfig =
89+
new org.apache.flink.configuration.Configuration();
90+
if (cdcConfig != null) {
91+
cdcConfig.toMap().forEach(flinkConfig::setString);
92+
}
93+
// always enable schema evolution
94+
flinkConfig.setString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
95+
return flinkConfig;
96+
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.hudi.sink;
19+
20+
import org.apache.flink.cdc.common.configuration.ConfigOption;
21+
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.common.factories.DataSinkFactory;
23+
import org.apache.flink.cdc.common.factories.FactoryHelper;
24+
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.util.HashSet;
30+
import java.util.Set;
31+
32+
import static org.apache.flink.cdc.connectors.hudi.sink.HudiDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
33+
import static org.apache.flink.cdc.connectors.hudi.sink.HudiDataSinkOptions.PREFIX_TABLE_PROPERTIES;
34+
35+
/**
36+
* Factory for creating {@link HudiDataSink}. This class defines the configuration options and
37+
* instantiates the sink by delegating option definitions to {@link HudiConfig}.
38+
*/
39+
public class HudiDataSinkFactory implements DataSinkFactory {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(HudiDataSinkFactory.class);
42+
43+
public static final String IDENTIFIER = "hudi";
44+
45+
@Override
46+
public String identifier() {
47+
return IDENTIFIER;
48+
}
49+
50+
@Override
51+
public HudiDataSink createDataSink(Context context) {
52+
LOG.info("Creating HudiDataSink for {}", context);
53+
54+
FactoryHelper.createFactoryHelper(this, context)
55+
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);
56+
57+
FactoryHelper.DefaultContext factoryContext = (FactoryHelper.DefaultContext) context;
58+
Configuration config = factoryContext.getFactoryConfiguration();
59+
60+
// Validate that only BUCKET index type is used
61+
String indexType = config.get(HudiConfig.INDEX_TYPE);
62+
if (indexType != null && !indexType.equalsIgnoreCase("BUCKET")) {
63+
throw new IllegalArgumentException(
64+
String.format(
65+
"Unsupported index type '%s'. Currently only 'BUCKET' index type is supported. "
66+
+ "Other index types (e.g., FLINK_STATE, BLOOM, SIMPLE) are not yet implemented "
67+
+ "for multi-table CDC pipelines.",
68+
indexType));
69+
}
70+
71+
String schemaOperatorUid =
72+
context.getPipelineConfiguration()
73+
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
74+
75+
return new HudiDataSink(config, schemaOperatorUid);
76+
}
77+
78+
@Override
79+
public Set<ConfigOption<?>> requiredOptions() {
80+
Set<ConfigOption<?>> options = new HashSet<>();
81+
options.add(HudiConfig.PATH);
82+
return options;
83+
}
84+
85+
@Override
86+
public Set<ConfigOption<?>> optionalOptions() {
87+
Set<ConfigOption<?>> options = new HashSet<>();
88+
options.add(HudiConfig.RECORD_KEY_FIELD);
89+
options.add(HudiConfig.TABLE_TYPE);
90+
options.add(HudiConfig.PARTITION_PATH_FIELD);
91+
options.add(HudiConfig.INDEX_TYPE);
92+
options.add(HudiConfig.INDEX_BUCKET_TARGET);
93+
options.add(HudiConfig.HIVE_SYNC_ENABLED);
94+
options.add(HudiConfig.HIVE_SYNC_METASTORE_URIS);
95+
options.add(HudiConfig.HIVE_SYNC_DB);
96+
options.add(HudiConfig.HIVE_SYNC_TABLE);
97+
98+
options.add(HudiConfig.WRITE_TASKS);
99+
options.add(HudiConfig.BUCKET_ASSIGN_TASKS);
100+
options.add(HudiConfig.SCHEMA_ON_READ_ENABLE);
101+
102+
// Compaction settings
103+
options.add(HudiConfig.COMPACTION_DELTA_COMMITS);
104+
return options;
105+
}
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.hudi.sink;
19+
20+
/** HudiDataSink Options reference {@link HudiConfig}. */
21+
public class HudiDataSinkOptions {
22+
// prefix for passing properties for table creation.
23+
public static final String PREFIX_TABLE_PROPERTIES = "table.properties.";
24+
25+
// prefix for passing properties for catalog creation.
26+
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";
27+
}

0 commit comments

Comments
 (0)