Skip to content

Commit 839e7ab

Browse files
yzeng1618zengyi
andauthored
[Feature][Iceberg] Support schema.partition_keys and ${partition_keys} placeholder (#10389)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent 106c369 commit 839e7ab

File tree

19 files changed

+295
-16
lines changed

19 files changed

+295
-16
lines changed

docs/en/connectors/sink/Iceberg.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ libfb303-xxx.jar
7474
| iceberg.table.auto-create-props | map | no | - | Configuration specified by Iceberg during automatic table creation. |
7575
| iceberg.table.schema-evolution-enabled | boolean | no | false | Setting to true enables Iceberg tables to support schema evolution during the synchronization process |
7676
| iceberg.table.primary-keys | string | no | - | Default comma-separated list of columns that identify a row in tables (primary key) |
77-
| iceberg.table.partition-keys | string | no | - | Default comma-separated list of partition fields to use when creating tables |
77+
| iceberg.table.partition-keys | string | no | - | Default comma-separated list of partition fields to use when creating tables. Supports placeholder `${partition_keys}` for multi-table jobs |
7878
| iceberg.table.upsert-mode-enabled | boolean | no | false | Set to `true` to enable upsert mode, default is `false` |
7979
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below |
8080
| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below |

docs/en/introduction/concepts/config.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ sink {
333333
- If the substitution variable contains double or single quotes (e.g., `"resName"` or `"nameVal"`), you need to include them with the value.
334334
- The value cannot contain spaces (`' '`). For example, `-i jobName='this is a job name'` will be replaced with `job.name = "this"`. You can use environment variables to pass values with spaces.
335335
- For dynamic parameters, you can use the following format: `-i date=$(date +"%Y%m%d")`.
336-
- Cannot use specified system reserved characters; they will not be replaced by `-i`, such as: `${database_name}`, `${schema_name}`, `${table_name}`, `${schema_full_name}`, `${table_full_name}`, `${primary_key}`, `${unique_key}`, `${field_names}`. For details, please refer to [Sink Parameter Placeholders](sink-options-placeholders.md).
336+
- Cannot use specified system reserved characters; they will not be replaced by `-i`, such as: `${database_name}`, `${schema_name}`, `${table_name}`, `${schema_full_name}`, `${table_full_name}`, `${primary_key}`, `${unique_key}`, `${field_names}`, `${partition_keys}`. For details, please refer to [Sink Parameter Placeholders](sink-options-placeholders.md).
337337

338338
## What's More
339339

docs/en/introduction/concepts/schema-feature.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ schema = {
1414
table = "database.schema.table"
1515
schema_first = false
1616
comment = "comment"
17+
partition_keys = ["dt"]
1718
columns = [
1819
...
1920
]
@@ -41,6 +42,11 @@ If the schema_first is true, the schema will be used first, this means if we set
4142

4243
The comment of the CatalogTable which the schema belongs to.
4344

45+
### partition_keys
46+
47+
The partition keys of the CatalogTable which the schema belongs to.
48+
This metadata can be used by sink options placeholders such as `${partition_keys}` (for example, to create partitioned Iceberg tables in multi-table sync jobs).
49+
4450
### Columns
4551

4652
Columns is a list of configs used to define the column in schema, each column can contains name, type, nullable, defaultValue, comment field.

docs/en/introduction/concepts/sink-options-placeholders.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ The placeholders are mainly controlled by the following expressions:
3939
- Used to get the table field keys in the upstream catalog table
4040
- `${comment}`
4141
- Used to get the table comment in the upstream catalog table
42+
- `${partition_keys}`
43+
- Used to get the table partition keys in the upstream catalog table
4244

4345
## Configuration
4446

docs/zh/connectors/sink/Iceberg.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ libfb303-xxx.jar
7474
| iceberg.table.auto-create-props | map | no | - | Iceberg 自动建表时指定的配置 |
7575
| iceberg.table.schema-evolution-enabled | boolean | no | false | 设置为 true 时,Iceberg 表可以在同步过程中支持 schema 变更 |
7676
| iceberg.table.primary-keys | string | no | - | 用于标识表中一行数据的主键列列表,默认情况下以逗号分隔 |
77-
| iceberg.table.partition-keys | string | no | - | 创建表时使用的分区字段列表,默认情况下以逗号分隔 |
77+
| iceberg.table.partition-keys | string | no | - | 创建表时使用的分区字段列表,默认情况下以逗号分隔。多表场景可使用占位符 `${partition_keys}` |
7878
| iceberg.table.upsert-mode-enabled | boolean | no | false | 设置为 `true` 以启用 upsert 模式,默认值为 `false` |
7979
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema 变更方式, 请参考下面的 `schema_save_mode` |
8080
| data_save_mode | Enum | no | APPEND_DATA | 数据写入方式, 请参考下面的 `data_save_mode` |

docs/zh/introduction/concepts/config.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ sink {
321321
- 如果替换变量包含`"``'`(如`"resName"``"nameVal"`),需要添加`"`
322322
- 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。 你可以使用环境变量传递带有空格的值。
323323
- 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`
324-
- 不能使用指定系统保留字符,它将不会被`-i`替换,如:`${database_name}``${schema_name}``${table_name}``${schema_full_name}``${table_full_name}``${primary_key}``${unique_key}``${field_names}`。具体可参考[Sink参数占位符](sink-options-placeholders.md)
324+
- 不能使用指定系统保留字符,它将不会被`-i`替换,如:`${database_name}``${schema_name}``${table_name}``${schema_full_name}``${table_full_name}``${primary_key}``${unique_key}``${field_names}``${partition_keys}`。具体可参考[Sink参数占位符](sink-options-placeholders.md)
325325
## 此外
326326

327327
如果你想了解更多关于格式配置的详细信息,请查看 [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)

docs/zh/introduction/concepts/schema-feature.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ schema = {
1414
table = "database.schema.table"
1515
schema_first = false
1616
comment = "comment"
17+
partition_keys = ["dt"]
1718
columns = [
1819
...
1920
]
@@ -41,6 +42,11 @@ schema所属的表标识符的表全名,包含数据库、schema、表名。
4142

4243
schema所属的 CatalogTable 的注释。
4344

45+
### partition_keys
46+
47+
schema 所属的 CatalogTable 的分区字段列表。
48+
该元数据可以配合 sink 端占位符 `${partition_keys}` 使用(例如多表同步写入 Iceberg 时按表创建分区表)。
49+
4450
### Columns
4551

4652
Columns 是用于定义模式中的列的配置列表,每列可以包含名称(name)、类型(type)、是否可空(nullable)、默认值(defaultValue)、注释(comment)字段。

docs/zh/introduction/concepts/sink-options-placeholders.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ SeaTunnel 提供了 Sink 参数占位符自动替换功能,可让您通过占
3939
- 用于获取上游表中的所有字段名称列表
4040
- `${comment}`
4141
- 用于获取上游表中的表注释
42+
- `${partition_keys}`
43+
- 用于获取上游表中的分区字段列表
4244

4345
## 配置
4446

seatunnel-api/src/main/java/org/apache/seatunnel/api/options/table/TableIdentifierOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
2222

23+
import java.util.List;
24+
2325
public interface TableIdentifierOptions {
2426

2527
Option<Boolean> SCHEMA_FIRST =
@@ -57,4 +59,11 @@ public interface TableIdentifierOptions {
5759
.stringType()
5860
.noDefaultValue()
5961
.withDescription("SeaTunnel Schema Table Name");
62+
63+
Option<List<String>> PARTITION_KEYS =
64+
Options.key("partition_keys")
65+
.listType()
66+
.noDefaultValue()
67+
.withDescription(
68+
"SeaTunnel Schema Partition Keys, used to specify partition keys for table creation");
6069
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ public enum TablePlaceholder {
3737
// Placeholder ${unique_key} or${unique_key:default_value}
3838
REPLACE_UNIQUE_KEY("unique_key"),
3939
// Placeholder ${field_names} or${field_names:default_value}
40-
REPLACE_FIELD_NAMES_KEY("field_names");
40+
REPLACE_FIELD_NAMES_KEY("field_names"),
41+
// Placeholder ${partition_keys} or${partition_keys:default_value}
42+
REPLACE_PARTITION_KEYS_KEY("partition_keys");
4143

4244
private static Set<String> PLACEHOLDER_KEYS = new HashSet<>();
4345

0 commit comments

Comments
 (0)