Skip to content

Commit 5b090a6

Browse files
committed
[Iceberg] Support partition transforms in Iceberg sink
1 parent c1a7d0b commit 5b090a6

4 files changed

Lines changed: 181 additions & 9 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.cdc.common.configuration.ConfigOption;
2121
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.common.event.TableId;
2223
import org.apache.flink.cdc.common.factories.DataSinkFactory;
2324
import org.apache.flink.cdc.common.factories.FactoryHelper;
2425
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -31,8 +32,10 @@
3132
import org.apache.iceberg.CatalogProperties;
3233

3334
import java.time.ZoneId;
35+
import java.util.Arrays;
3436
import java.util.HashMap;
3537
import java.util.HashSet;
38+
import java.util.List;
3639
import java.util.Map;
3740
import java.util.Objects;
3841
import java.util.Set;
@@ -86,10 +89,28 @@ public DataSink createDataSink(Context context) {
8689
CompactionOptions compactionOptions =
8790
getCompactionStrategy(context.getFactoryConfiguration());
8891

92+
Map<TableId, List<String>> partitionMaps = new HashMap<>();
93+
String partitionKey =
94+
context.getFactoryConfiguration().get(IcebergDataSinkOptions.PARTITION_KEY);
95+
if (partitionKey != null && !partitionKey.isEmpty()) {
96+
for (String tables : partitionKey.trim().split(";")) {
97+
String[] splits = tables.trim().split(":");
98+
if (splits.length == 2) {
99+
TableId tableId = TableId.parse(splits[0]);
100+
List<String> partitions = Arrays.asList(splits[1].split(","));
101+
partitionMaps.put(tableId, partitions);
102+
} else {
103+
throw new IllegalArgumentException(
104+
IcebergDataSinkOptions.PARTITION_KEY.key()
105+
+ " is malformed, please refer to the documents");
106+
}
107+
}
108+
}
109+
89110
return new IcebergDataSink(
90111
catalogOptions,
91112
tableOptions,
92-
new HashMap<>(),
113+
partitionMaps,
93114
zoneId,
94115
schemaOperatorUid,
95116
compactionOptions);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ public class IcebergDataSinkOptions {
5151
.withDescription(
5252
"Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. "
5353
+ "Tables are separated by ';', and partition keys are separated by ','. "
54-
+ "For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.");
54+
+ "For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'."
55+
+ "For partition transforms, we can set partition.key by 'testdb.table1:truncate[10](id);testdb.table2:day(create_time)'.");
5556

5657
@Experimental
5758
public static final ConfigOption<Boolean> SINK_COMPACTION_ENABLED =

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,26 @@
5858
import java.util.List;
5959
import java.util.Map;
6060
import java.util.Set;
61+
import java.util.regex.Matcher;
62+
import java.util.regex.Pattern;
6163

6264
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
6365

6466
/** A {@link MetadataApplier} for Apache Iceberg. */
6567
public class IcebergMetadataApplier implements MetadataApplier {
68+
private static final Pattern PARTITION_YEAR_PATTERN = Pattern.compile("^year\\((.*)\\)$");
69+
70+
private static final Pattern PARTITION_MONTH_PATTERN = Pattern.compile("^month\\((.*)\\)$");
71+
72+
private static final Pattern PARTITION_DAY_PATTERN = Pattern.compile("^day\\((.*)\\)$");
73+
74+
private static final Pattern PARTITION_HOUR_PATTERN = Pattern.compile("^hour\\((.*)\\)$");
75+
76+
private static final Pattern PARTITION_BUCKET_PATTERN =
77+
Pattern.compile("^bucket\\[(\\d+)]\\((.*)\\)$");
78+
79+
private static final Pattern PARTITION_TRUNCATE_PATTERN =
80+
Pattern.compile("^truncate\\[(\\d+)]\\((.*)\\)$");
6681

6782
private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataApplier.class);
6883

@@ -161,13 +176,7 @@ private void applyCreateTable(CreateTableEvent event) {
161176
if (partitionMaps.containsKey(event.tableId())) {
162177
partitionColumns = partitionMaps.get(event.tableId());
163178
}
164-
PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
165-
for (String name : partitionColumns) {
166-
// TODO Add more partition transforms, see
167-
// https://iceberg.apache.org/spec/#partition-transforms.
168-
builder.identity(name);
169-
}
170-
PartitionSpec partitionSpec = builder.build();
179+
PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema, partitionColumns);
171180
if (!catalog.tableExists(tableIdentifier)) {
172181
catalog.createTable(tableIdentifier, icebergSchema, partitionSpec, tableOptions);
173182
LOG.info(
@@ -281,6 +290,58 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) {
281290
}
282291
}
283292

293+
private PartitionSpec generatePartitionSpec(Schema schema, List<String> partitionColumns) {
294+
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
295+
for (String name : partitionColumns) {
296+
Matcher matcherYear = PARTITION_YEAR_PATTERN.matcher(name);
297+
if (matcherYear.matches()) {
298+
String matchedName = matcherYear.group(1);
299+
builder.year(matchedName);
300+
continue;
301+
}
302+
303+
Matcher matcherMonth = PARTITION_MONTH_PATTERN.matcher(name);
304+
if (matcherMonth.matches()) {
305+
String matchedName = matcherMonth.group(1);
306+
builder.month(matchedName);
307+
continue;
308+
}
309+
310+
Matcher matcherDay = PARTITION_DAY_PATTERN.matcher(name);
311+
if (matcherDay.matches()) {
312+
String matchedName = matcherDay.group(1);
313+
builder.day(matchedName);
314+
continue;
315+
}
316+
317+
Matcher matcherHour = PARTITION_HOUR_PATTERN.matcher(name);
318+
if (matcherHour.matches()) {
319+
String matchedName = matcherHour.group(1);
320+
builder.hour(matchedName);
321+
continue;
322+
}
323+
324+
Matcher matcherBucket = PARTITION_BUCKET_PATTERN.matcher(name);
325+
if (matcherBucket.matches()) {
326+
String matchedName = matcherBucket.group(2);
327+
int numBuckets = Integer.parseInt(matcherBucket.group(1));
328+
builder.bucket(matchedName, numBuckets);
329+
continue;
330+
}
331+
332+
Matcher matcherTruncate = PARTITION_TRUNCATE_PATTERN.matcher(name);
333+
if (matcherTruncate.matches()) {
334+
String matchedName = matcherTruncate.group(2);
335+
int width = Integer.parseInt(matcherTruncate.group(1));
336+
builder.truncate(matchedName, width);
337+
continue;
338+
}
339+
340+
builder.identity(name);
341+
}
342+
return builder.build();
343+
}
344+
284345
@Override
285346
public MetadataApplier setAcceptedSchemaEvolutionTypes(
286347
Set<SchemaChangeEventType> schemaEvolutionTypes) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,35 @@
1818
package org.apache.flink.cdc.connectors.iceberg.sink;
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
21+
import org.apache.flink.cdc.common.event.CreateTableEvent;
22+
import org.apache.flink.cdc.common.event.TableId;
2123
import org.apache.flink.cdc.common.factories.DataSinkFactory;
2224
import org.apache.flink.cdc.common.factories.FactoryHelper;
25+
import org.apache.flink.cdc.common.schema.Schema;
2326
import org.apache.flink.cdc.common.sink.DataSink;
27+
import org.apache.flink.cdc.common.types.DataTypes;
2428
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
2529
import org.apache.flink.table.api.ValidationException;
2630

2731
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
2832

33+
import org.apache.iceberg.CatalogUtil;
34+
import org.apache.iceberg.PartitionSpec;
35+
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.catalog.Catalog;
37+
import org.apache.iceberg.catalog.TableIdentifier;
2938
import org.assertj.core.api.Assertions;
3039
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.io.TempDir;
41+
42+
import java.io.File;
43+
import java.util.HashMap;
44+
import java.util.Map;
45+
import java.util.UUID;
3146

3247
/** Tests for {@link IcebergDataSinkFactory}. */
3348
public class IcebergDataSinkFactoryTest {
49+
@TempDir public static java.nio.file.Path temporaryFolder;
3450

3551
@Test
3652
void testCreateDataSink() {
@@ -92,4 +108,77 @@ void testPrefixRequireOption() {
92108
conf, conf, Thread.currentThread().getContextClassLoader()));
93109
Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
94110
}
111+
112+
@Test
113+
public void testPartitionOption() {
114+
Map<String, String> testcases = new HashMap<>();
115+
testcases.put("test.iceberg_partition_table:year(create_time)", "create_time_year");
116+
testcases.put("test.iceberg_partition_table:month(create_time)", "create_time_month");
117+
testcases.put("test.iceberg_partition_table:day(create_time)", "create_time_day");
118+
testcases.put("test.iceberg_partition_table:hour(create_time)", "create_time_hour");
119+
testcases.put("test.iceberg_partition_table:bucket[8](create_time)", "create_time_bucket");
120+
testcases.put("test.iceberg_partition_table:create_time", "create_time");
121+
testcases.put("test.iceberg_partition_table:truncate[8](id)", "id_trunc");
122+
123+
for (Map.Entry<String, String> entry : testcases.entrySet()) {
124+
runTestPartitionOption(entry.getKey(), entry.getValue());
125+
}
126+
}
127+
128+
public void runTestPartitionOption(String partitionKey, String transformColumnName) {
129+
Map<String, String> catalogOptions = new HashMap<>();
130+
String warehouse =
131+
new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
132+
catalogOptions.put("type", "hadoop");
133+
catalogOptions.put("warehouse", warehouse);
134+
catalogOptions.put("cache-enabled", "false");
135+
Catalog catalog =
136+
CatalogUtil.buildIcebergCatalog(
137+
"cdc-iceberg-catalog",
138+
catalogOptions,
139+
new org.apache.hadoop.conf.Configuration());
140+
141+
Map<String, String> catalogConf = new HashMap<>();
142+
for (Map.Entry<String, String> entry : catalogOptions.entrySet()) {
143+
catalogConf.put(
144+
IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES + entry.getKey(),
145+
entry.getValue());
146+
}
147+
IcebergDataSinkFactory sinkFactory = new IcebergDataSinkFactory();
148+
Configuration conf = Configuration.fromMap(catalogConf);
149+
conf.set(IcebergDataSinkOptions.PARTITION_KEY, partitionKey);
150+
DataSink dataSink =
151+
sinkFactory.createDataSink(
152+
new FactoryHelper.DefaultContext(
153+
conf, conf, Thread.currentThread().getContextClassLoader()));
154+
155+
TableId tableId = TableId.parse("test.iceberg_partition_table");
156+
CreateTableEvent createTableEvent =
157+
new CreateTableEvent(
158+
tableId,
159+
Schema.newBuilder()
160+
.physicalColumn(
161+
"id",
162+
DataTypes.BIGINT().notNull(),
163+
"column for id",
164+
"AUTO_DECREMENT()")
165+
.physicalColumn(
166+
"create_time",
167+
DataTypes.TIMESTAMP().notNull(),
168+
"column for name",
169+
null)
170+
.primaryKey("id")
171+
.build());
172+
173+
dataSink.getMetadataApplier().applySchemaChange(createTableEvent);
174+
175+
Table table =
176+
catalog.loadTable(
177+
TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName()));
178+
179+
Assertions.assertThat(table.specs().size()).isEqualTo(1);
180+
PartitionSpec spec = table.specs().get(0);
181+
Assertions.assertThat(spec.isPartitioned()).isTrue();
182+
Assertions.assertThat(spec.rawPartitionType().field(transformColumnName)).isNotNull();
183+
}
95184
}

0 commit comments

Comments
 (0)