Skip to content

Commit 2b1234c

Browse files
authored
[Feature][Paimon] Support specify paimon table write properties, partition keys and primary keys (#6535)
1 parent 809870a commit 2b1234c

File tree

24 files changed

+1159
-100
lines changed

24 files changed

+1159
-100
lines changed

Diff for: docs/en/concept/schema-feature.md

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ columns = [
6464
| type | Yes | - | The data type of the column |
6565
| nullable | No | true | If the column can be nullable |
6666
| columnLength | No | 0 | The length of the column which will be useful when you need to define the length |
67+
| columnScale | No | - | The scale of the column which will be useful when you need to define the scale |
6768
| defaultValue | No | null | The default value of the column |
6869
| comment | No | null | The comment of the column |
6970

Diff for: docs/en/connector-v2/sink/Paimon.md

+45-8
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ Sink connector for Apache Paimon. It can support cdc mode 、auto create table.
1212

1313
## Options
1414

15-
| name | type | required | default value | Description |
16-
|------------------|--------|----------|------------------------------|---------------------------------|
17-
| warehouse | String | Yes | - | Paimon warehouse path |
18-
| database | String | Yes | - | The database you want to access |
19-
| table | String | Yes | - | The table you want to access |
20-
| hdfs_site_path | String | No | - | |
21-
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode |
22-
| data_save_mode | Enum | no | APPEND_DATA | The data save mode |
15+
| name | type | required | default value | Description |
16+
|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
17+
| warehouse | String | Yes | - | Paimon warehouse path |
18+
| database | String | Yes | - | The database you want to access |
19+
| table | String | Yes | - | The table you want to access |
20+
| hdfs_site_path | String | No | - | |
21+
| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode |
22+
| data_save_mode | Enum | No | APPEND_DATA | The data save mode |
23+
| paimon.table.primary-keys | String | No | - | Default comma-separated list of columns (primary key) that identify a row in tables.(Notice: The partition field needs to be included in the primary key fields) |
24+
| paimon.table.partition-keys | String | No | - | Default comma-separated list of partition fields to use when creating tables. |
25+
| paimon.table.write-props | Map | No | - | Properties passed through to paimon table initialization, [reference](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions). |
2326

2427
## Examples
2528

@@ -54,6 +57,40 @@ sink {
5457
}
5558
```
5659

60+
### Single table with write props of paimon
61+
62+
```hocon
63+
env {
64+
parallelism = 1
65+
job.mode = "STREAMING"
66+
checkpoint.interval = 5000
67+
}
68+
69+
source {
70+
Mysql-CDC {
71+
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
72+
username = "root"
73+
password = "******"
74+
table-names = ["seatunnel.role"]
75+
}
76+
}
77+
78+
sink {
79+
Paimon {
80+
catalog_name="seatunnel_test"
81+
warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
82+
database="seatunnel"
83+
table="role"
84+
paimon.table.write-props = {
85+
bucket = 2
86+
file.format = "parquet"
87+
}
88+
paimon.table.partition-keys = "dt"
89+
paimon.table.primary-keys = "pk_id,dt"
90+
}
91+
}
92+
```
93+
5794
### Multiple table
5895

5996
```hocon

Diff for: docs/zh/concept/schema-feature.md

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ columns = [
6464
| type | Yes | - | 列的数据类型 |
6565
| nullable | No | true | 列是否可空 |
6666
| columnLength | No | 0 | 列的长度,当您需要定义长度时将很有用 |
67+
| columnScale | No | - | 列的精度,当您需要定义精度时将很有用 |
6768
| defaultValue | No | null | 列的默认值 |
6869
| comment | No | null | 列的注释 |
6970

Diff for: docs/zh/connector-v2/sink/Paimon.md

+45-8
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ Apache Paimon数据连接器。支持cdc写以及自动建表。
1212

1313
## 连接器选项
1414

15-
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
16-
|------------------|--------|------|------------------------------|--------------------|
17-
| warehouse | String | Yes | - | Paimon warehouse路径 |
18-
| database | String | Yes | - | 数据库名称 |
19-
| table | String | Yes | - | 表名 |
20-
| hdfs_site_path | String | No | - | |
21-
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式 |
22-
| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式 |
15+
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
16+
|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|
17+
| warehouse | 字符串 || - | Paimon warehouse路径 |
18+
| database | 字符串 || - | 数据库名称 |
19+
| table | 字符串 || - | 表名 |
20+
| hdfs_site_path | 字符串 || - | |
21+
| schema_save_mode | 枚举 || CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 |
22+
| data_save_mode | 枚举 || APPEND_DATA | 数据保存模式 |
23+
| paimon.table.primary-keys | 字符串 || - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) |
24+
| paimon.table.partition-keys | 字符串 || - | 分区字段列表,多字段使用逗号分隔 |
25+
| paimon.table.write-props | Map || - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions) |
2326

2427
## 示例
2528

@@ -54,6 +57,40 @@ sink {
5457
}
5558
```
5659

60+
### 指定paimon的写属性的单表
61+
62+
```hocon
63+
env {
64+
parallelism = 1
65+
job.mode = "STREAMING"
66+
checkpoint.interval = 5000
67+
}
68+
69+
source {
70+
Mysql-CDC {
71+
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
72+
username = "root"
73+
password = "******"
74+
table-names = ["seatunnel.role"]
75+
}
76+
}
77+
78+
sink {
79+
Paimon {
80+
catalog_name="seatunnel_test"
81+
warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
82+
database="seatunnel"
83+
table="role"
84+
paimon.table.write-props = {
85+
bucket = 2
86+
file.format = "parquet"
87+
}
88+
paimon.table.partition-keys = "dt"
89+
paimon.table.primary-keys = "pk_id,dt"
90+
}
91+
}
92+
```
93+
5794
### 多表
5895

5996
```hocon

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public List<Column> parse(ReadonlyConfig schemaConfig) {
133133
Integer columnLength =
134134
columnConfig.get(
135135
TableSchemaOptions.ColumnOptions.COLUMN_LENGTH);
136+
137+
Integer columnScale =
138+
columnConfig.get(
139+
TableSchemaOptions.ColumnOptions.COLUMN_SCALE);
140+
136141
Boolean nullable =
137142
columnConfig.get(TableSchemaOptions.ColumnOptions.NULLABLE);
138143
Object defaultValue =
@@ -143,7 +148,8 @@ public List<Column> parse(ReadonlyConfig schemaConfig) {
143148
return PhysicalColumn.of(
144149
name,
145150
seaTunnelDataType,
146-
columnLength,
151+
Long.valueOf(columnLength),
152+
columnScale,
147153
nullable,
148154
defaultValue,
149155
comment);

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java

+6
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ public static class ColumnOptions {
8686
.noDefaultValue()
8787
.withDescription("SeaTunnel Schema Column Type");
8888

89+
public static final Option<Integer> COLUMN_SCALE =
90+
Options.key("columnScale")
91+
.intType()
92+
.noDefaultValue()
93+
.withDescription("SeaTunnel Schema Column scale");
94+
8995
public static final Option<Integer> COLUMN_LENGTH =
9096
Options.key("columnLength")
9197
.intType()

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java

+11-23
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,21 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
1919

20-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2120
import org.apache.seatunnel.api.table.catalog.Catalog;
2221
import org.apache.seatunnel.api.table.catalog.CatalogTable;
23-
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
22+
import org.apache.seatunnel.api.table.catalog.Column;
2423
import org.apache.seatunnel.api.table.catalog.TablePath;
2524
import org.apache.seatunnel.api.table.catalog.TableSchema;
2625
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
2726
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
2827
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
2928
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
3029
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
31-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3230
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
3331
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
3432

3533
import org.apache.paimon.catalog.Identifier;
34+
import org.apache.paimon.schema.Schema;
3635
import org.apache.paimon.table.FileStoreTable;
3736
import org.apache.paimon.table.Table;
3837
import org.apache.paimon.types.DataField;
@@ -48,14 +47,14 @@ public class PaimonCatalog implements Catalog, PaimonTable {
4847
private static final String DEFAULT_DATABASE = "default";
4948

5049
private String catalogName;
51-
private ReadonlyConfig readonlyConfig;
50+
private PaimonSinkConfig paimonSinkConfig;
5251
private PaimonCatalogLoader paimonCatalogLoader;
5352
private org.apache.paimon.catalog.Catalog catalog;
5453

55-
public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
56-
this.readonlyConfig = readonlyConfig;
54+
public PaimonCatalog(String catalogName, PaimonSinkConfig paimonSinkConfig) {
55+
this.paimonSinkConfig = paimonSinkConfig;
5756
this.catalogName = catalogName;
58-
this.paimonCatalogLoader = new PaimonCatalogLoader(new PaimonSinkConfig(readonlyConfig));
57+
this.paimonCatalogLoader = new PaimonCatalogLoader(paimonSinkConfig);
5958
}
6059

6160
@Override
@@ -135,10 +134,9 @@ public Table getPaimonTable(TablePath tablePath)
135134
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
136135
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
137136
try {
138-
catalog.createTable(
139-
toIdentifier(tablePath),
140-
SchemaUtil.toPaimonSchema(table.getTableSchema()),
141-
ignoreIfExists);
137+
Schema paimonSchema =
138+
SchemaUtil.toPaimonSchema(table.getTableSchema(), this.paimonSinkConfig);
139+
catalog.createTable(toIdentifier(tablePath), paimonSchema, ignoreIfExists);
142140
} catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) {
143141
throw new TableAlreadyExistException(this.catalogName, tablePath);
144142
} catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) {
@@ -183,18 +181,8 @@ private CatalogTable toCatalogTable(
183181
TableSchema.Builder builder = TableSchema.builder();
184182
dataFields.forEach(
185183
dataField -> {
186-
String name = dataField.name();
187-
SeaTunnelDataType<?> seaTunnelType =
188-
SchemaUtil.toSeaTunnelType(dataField.type());
189-
PhysicalColumn physicalColumn =
190-
PhysicalColumn.of(
191-
name,
192-
seaTunnelType,
193-
(Long) null,
194-
true,
195-
null,
196-
dataField.description());
197-
builder.column(physicalColumn);
184+
Column column = SchemaUtil.toSeaTunnelType(dataField.type());
185+
builder.column(column);
198186
});
199187

200188
List<String> partitionKeys = schema.partitionKeys();

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
public class PaimonCatalogFactory implements CatalogFactory {
3131
@Override
3232
public Catalog createCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
33-
return new PaimonCatalog(catalogName, readonlyConfig);
33+
return new PaimonCatalog(catalogName, new PaimonSinkConfig(readonlyConfig));
3434
}
3535

3636
@Override
@@ -48,7 +48,10 @@ public OptionRule optionRule() {
4848
.optional(
4949
PaimonSinkConfig.HDFS_SITE_PATH,
5050
PaimonSinkConfig.SCHEMA_SAVE_MODE,
51-
PaimonSinkConfig.DATA_SAVE_MODE)
51+
PaimonSinkConfig.DATA_SAVE_MODE,
52+
PaimonSinkConfig.PRIMARY_KEYS,
53+
PaimonSinkConfig.PARTITION_KEYS,
54+
PaimonSinkConfig.WRITE_PROPS)
5255
.build();
5356
}
5457
}

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java

+47-2
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,26 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
22+
2023
import org.apache.seatunnel.api.configuration.Option;
2124
import org.apache.seatunnel.api.configuration.Options;
2225
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2326
import org.apache.seatunnel.api.sink.DataSaveMode;
2427
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2528

26-
import lombok.Getter;
29+
import lombok.Data;
30+
31+
import java.util.Arrays;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.Map;
2735

36+
import static java.util.stream.Collectors.toList;
2837
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
2938

30-
@Getter
39+
@Data
3140
public class PaimonSinkConfig extends PaimonConfig {
3241
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
3342
Options.key("schema_save_mode")
@@ -41,13 +50,38 @@ public class PaimonSinkConfig extends PaimonConfig {
4150
.defaultValue(DataSaveMode.APPEND_DATA)
4251
.withDescription("data_save_mode");
4352

53+
public static final Option<String> PRIMARY_KEYS =
54+
Options.key("paimon.table.primary-keys")
55+
.stringType()
56+
.noDefaultValue()
57+
.withDescription(
58+
"Default comma-separated list of columns that identify a row in tables (primary key)");
59+
60+
public static final Option<String> PARTITION_KEYS =
61+
Options.key("paimon.table.partition-keys")
62+
.stringType()
63+
.noDefaultValue()
64+
.withDescription(
65+
"Default comma-separated list of partition fields to use when creating tables.");
66+
67+
public static final Option<Map<String, String>> WRITE_PROPS =
68+
Options.key("paimon.table.write-props")
69+
.mapType()
70+
.defaultValue(new HashMap<>())
71+
.withDescription(
72+
"Properties passed through to paimon table initialization, such as 'file.format', 'bucket'(org.apache.paimon.CoreOptions)");
73+
4474
private String catalogName;
4575
private String warehouse;
4676
private String namespace;
4777
private String table;
4878
private String hdfsSitePath;
4979
private SchemaSaveMode schemaSaveMode;
5080
private DataSaveMode dataSaveMode;
81+
private Integer bucket;
82+
private List<String> primaryKeys;
83+
private List<String> partitionKeys;
84+
private Map<String, String> writeProps;
5185

5286
public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
5387
this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME));
@@ -57,10 +91,21 @@ public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
5791
this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
5892
this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
5993
this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
94+
this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
95+
this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS), ",");
96+
this.writeProps = readonlyConfig.get(WRITE_PROPS);
6097
}
6198

6299
protected <T> T checkArgumentNotNull(T argument) {
63100
checkNotNull(argument);
64101
return argument;
65102
}
103+
104+
@VisibleForTesting
105+
public static List<String> stringToList(String value, String regex) {
106+
if (value == null || value.isEmpty()) {
107+
return ImmutableList.of();
108+
}
109+
return Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
110+
}
66111
}

0 commit comments

Comments
 (0)