Skip to content

Commit 14a86bc

Browse files
authored
[Feature][Transform] Support merge shading-tables to one table (#8360)
1 parent eae369b commit 14a86bc

File tree

12 files changed

+667
-4
lines changed

12 files changed

+667
-4
lines changed

docs/en/transform-v2/table-merge.md

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# TableRename
2+
3+
> TableMerge transform plugin
4+
5+
## Description
6+
7+
TableMerge transform plugin for merge sharding-tables.
8+
9+
## Options
10+
11+
| name | type | required | default value | Description |
12+
|:--------:|--------|----------|---------------|---------------------------|
13+
| database | string | no | | Specify new database name |
14+
| schema | string | no | | Specify new schema name |
15+
| table | string | yes | | Specify new table name |
16+
17+
## Examples
18+
19+
### Merge sharding-tables
20+
21+
`
22+
```hocon
23+
env {
24+
parallelism = 1
25+
job.mode = "BATCH"
26+
}
27+
28+
source {
29+
MySQL-CDC {
30+
plugin_output = "customers_mysql_cdc"
31+
32+
username = "root"
33+
password = "123456"
34+
table-names = ["source.user_1", "source.user_2", "source.shop"]
35+
base-url = "jdbc:mysql://localhost:3306/source"
36+
}
37+
}
38+
39+
transform {
40+
TableMerge {
41+
plugin_input = "customers_mysql_cdc"
42+
plugin_output = "trans_result"
43+
44+
table_match_regex = "source.user_.*"
45+
database = "user_db"
46+
table = "user_all"
47+
}
48+
}
49+
50+
sink {
51+
Jdbc {
52+
plugin_input = "trans_result"
53+
54+
driver="com.mysql.cj.jdbc.Driver"
55+
url="jdbc:mysql://localhost:3306/sink"
56+
user="myuser"
57+
password="mypwd"
58+
59+
generate_sink_sql = true
60+
database = "${database_name}"
61+
table = "${table_name}"
62+
primary_keys = ["${primary_key}"]
63+
64+
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
65+
data_save_mode = "APPEND_DATA"
66+
}
67+
}
68+
```

docs/zh/transform-v2/table-merge.md

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# TableRename
2+
3+
> TableMerge transform plugin
4+
5+
## Description
6+
7+
表合并插件,用于分库分表合并为一个表。
8+
9+
## Options
10+
11+
| name | type | required | default value | Description |
12+
|:--------:|--------|----------|---------------|------------------|
13+
| database | string | no | | 指定新的 database 名称 |
14+
| schema | string | no | | 指定新的 schema 名称 |
15+
| table | string | yes | | 指定新的 table 名称 |
16+
17+
## Examples
18+
19+
### 合并分库分表为一个表
20+
21+
```hocon
22+
env {
23+
parallelism = 1
24+
job.mode = "BATCH"
25+
}
26+
27+
source {
28+
MySQL-CDC {
29+
plugin_output = "customers_mysql_cdc"
30+
31+
username = "root"
32+
password = "123456"
33+
table-names = ["source.user_1", "source.user_2", "source.shop"]
34+
base-url = "jdbc:mysql://localhost:3306/source"
35+
}
36+
}
37+
38+
transform {
39+
TableMerge {
40+
plugin_input = "customers_mysql_cdc"
41+
plugin_output = "trans_result"
42+
43+
table_match_regex = "source.user_.*"
44+
database = "user_db"
45+
table = "user_all"
46+
}
47+
}
48+
49+
sink {
50+
Jdbc {
51+
plugin_input = "trans_result"
52+
53+
driver="com.mysql.cj.jdbc.Driver"
54+
url="jdbc:mysql://localhost:3306/sink"
55+
user="myuser"
56+
password="mypwd"
57+
58+
generate_sink_sql = true
59+
database = "${database_name}"
60+
table = "${table_name}"
61+
primary_keys = ["${primary_key}"]
62+
63+
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
64+
data_save_mode = "APPEND_DATA"
65+
}
66+
}
67+
```

plugin-mapping.properties

+1
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,4 @@ seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
157157
seatunnel.transform.Metadata = seatunnel-transforms-v2
158158
seatunnel.transform.FieldRename = seatunnel-transforms-v2
159159
seatunnel.transform.TableRename = seatunnel-transforms-v2
160+
seatunnel.transform.TableMerge = seatunnel-transforms-v2

seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void testAllTransformUseFactory() {
4343
FactoryUtil.discoverFactories(
4444
Thread.currentThread().getContextClassLoader(),
4545
TableTransformFactory.class);
46-
Assertions.assertEquals(15, factories.size());
46+
Assertions.assertEquals(16, factories.size());
4747
}
4848

4949
@Test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.e2e.transform;
19+
20+
import org.apache.seatunnel.e2e.common.container.TestContainer;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.TestTemplate;
24+
import org.testcontainers.containers.Container;
25+
26+
import java.io.IOException;
27+
28+
public class TestTableMergeIT extends TestSuiteBase {
29+
30+
@TestTemplate
31+
public void testMergeMultiTable(TestContainer container)
32+
throws IOException, InterruptedException {
33+
Container.ExecResult execResult = container.executeJob("/table_merge_multi_table.conf");
34+
Assertions.assertEquals(0, execResult.getExitCode());
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
FakeSource {
25+
plugin_output = "source1"
26+
27+
tables_configs = [
28+
{
29+
row.num = 3
30+
schema = {
31+
table = "test.user_1"
32+
columns = [
33+
{
34+
name = "id"
35+
type = "bigint"
36+
},
37+
{
38+
name = "name"
39+
type = "string"
40+
}
41+
]
42+
}
43+
},
44+
{
45+
row.num = 3
46+
schema = {
47+
table = "test.user_2"
48+
columns = [
49+
{
50+
name = "id"
51+
type = "bigint"
52+
},
53+
{
54+
name = "name"
55+
type = "string"
56+
}
57+
]
58+
}
59+
},
60+
{
61+
row.num = 5
62+
schema = {
63+
table = "test.xyz"
64+
columns = [
65+
{
66+
name = "id"
67+
type = "bigint"
68+
},
69+
{
70+
name = "age"
71+
type = "int"
72+
}
73+
]
74+
}
75+
}
76+
]
77+
}
78+
}
79+
transform {
80+
TableMerge {
81+
plugin_input = "source1"
82+
plugin_output = "transform1"
83+
84+
table_match_regex = "test.user_.*"
85+
database = "sink"
86+
table = "user_all"
87+
}
88+
}
89+
sink {
90+
Assert {
91+
plugin_input = "transform1"
92+
93+
rules =
94+
{
95+
tables_configs = [
96+
{
97+
table_path = "sink.user_all"
98+
row_rules = [
99+
{
100+
rule_type = MAX_ROW
101+
rule_value = 6
102+
},
103+
{
104+
rule_type = MIN_ROW
105+
rule_value = 6
106+
}
107+
],
108+
field_rules = [
109+
{
110+
field_name = id
111+
field_type = bigint
112+
field_value = [
113+
{
114+
rule_type = NOT_NULL
115+
}
116+
]
117+
},
118+
{
119+
field_name = name
120+
field_type = string
121+
field_value = [
122+
{
123+
rule_type = NOT_NULL
124+
}
125+
]
126+
}
127+
]
128+
},
129+
{
130+
table_path = "test.xyz"
131+
row_rules = [
132+
{
133+
rule_type = MAX_ROW
134+
rule_value = 5
135+
},
136+
{
137+
rule_type = MIN_ROW
138+
rule_value = 5
139+
}
140+
],
141+
field_rules = [
142+
{
143+
field_name = id
144+
field_type = bigint
145+
field_value = [
146+
{
147+
rule_type = NOT_NULL
148+
}
149+
]
150+
},
151+
{
152+
field_name = age
153+
field_type = int
154+
field_value = [
155+
{
156+
rule_type = NOT_NULL
157+
}
158+
]
159+
}
160+
]
161+
}
162+
]
163+
}
164+
}
165+
}

0 commit comments

Comments
 (0)