Skip to content

Commit 5d9b73a

Browse files
committed
[Feature][Transform] Add table filter transform
1 parent 9da2d17 commit 5d9b73a

File tree

21 files changed

+708
-988
lines changed

21 files changed

+708
-988
lines changed

docs/en/transform-v2/table-filter.md

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# TableMerge
2+
3+
> TableFilter transform plugin
4+
5+
## Description
6+
7+
TableFilter transform plugin for filter tables.
8+
9+
## Options
10+
11+
| name | type | required | default value | Description |
12+
|:----------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
13+
| database_pattern | string | no | | Specify database filter pattern, the default value is null, which means no filtering. If you want to filter the database name, please set it to a regular expression. |
14+
| schema_pattern | string | no | | Specify schema filter pattern, the default value is null, which means no filtering. If you want to filter the schema name, please set it to a regular expression. |
15+
| table_pattern | string | no | | Specify table filter pattern, the default value is null, which means no filtering. If you want to filter the table name, please set it to a regular expression. |
16+
| pattern_mode | string | no | INCLUDE | Specify pattern mode, the default value is INCLUDE, which means include the matched table. If you want to exclude the matched table, please set it to EXCLUDE. |
17+
18+
## Examples
19+
20+
### Include filter tables
21+
22+
Include filter tables with the name matching the regular expression `user_\d+` in the database `test`.
23+
24+
```hocon
25+
transform {
26+
TableFilter {
27+
plugin_input = "source1"
28+
plugin_output = "transform_a_1"
29+
30+
database_pattern = "test"
31+
table_pattern = "user_\\d+"
32+
}
33+
}
34+
```
35+
36+
### Exclude filter tables
37+
38+
Exclude filter tables with the name matching the regular expression `user_\d+` in the database `test`.
39+
40+
```hocon
41+
transform {
42+
TableFilter {
43+
plugin_input = "source1"
44+
plugin_output = "transform_a_1"
45+
46+
database_pattern = "test"
47+
table_pattern = "user_\\d+"
48+
pattern_mode = "EXCLUDE"
49+
}
50+
}
51+
```

docs/zh/transform-v2/table-filter.md

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# TableMerge
2+
3+
> TableFilter transform plugin
4+
5+
## Description
6+
7+
表过滤 transform,用于正向或者反向过滤部分表
8+
9+
## Options
10+
11+
| name | type | required | default value | Description |
12+
|:----------------:|--------|----------|---------------|--------------------------------------------------------|
13+
| database_pattern | string | no | | 指定数据库过滤模式,默认值为 null,表示不过滤。如果要过滤数据库名称,请将其设置为正则表达式。 |
14+
| schema_pattern | string | no | | 指定 schema 过滤模式,默认值为 null,表示不过滤。如果要过滤架构名称,请将其设置为正则表达式。 |
15+
| table_pattern | string | no | | 指定表过滤模式,默认值为 null,表示不过滤。如果要过滤表名称,请将其设置为正则表达式。 |
16+
| pattern_mode | string | no | INCLUDE | 指定过滤模式,默认值为 INCLUDE,表示包含匹配的表。如果要排除匹配的表,请将其设置为 EXCLUDE。 |
17+
18+
## Examples
19+
20+
### 包含表过滤
21+
22+
在数据库 "test" 中包含名称与正则表达式 "user_\d+" 匹配的过滤表。
23+
24+
```hocon
25+
transform {
26+
TableFilter {
27+
plugin_input = "source1"
28+
plugin_output = "transform_a_1"
29+
30+
database_pattern = "test"
31+
table_pattern = "user_\\d+"
32+
}
33+
}
34+
```
35+
36+
### 排除表过滤
37+
38+
排除数据库 "test" 中名称与正则表达式 "user_\d+" 匹配的过滤表。
39+
40+
```hocon
41+
transform {
42+
TableFilter {
43+
plugin_input = "source1"
44+
plugin_output = "transform_a_1"
45+
46+
database_pattern = "test"
47+
table_pattern = "user_\\d+"
48+
pattern_mode = "EXCLUDE"
49+
}
50+
}
51+
```

plugin-mapping.properties

+1
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,4 @@ seatunnel.transform.Metadata = seatunnel-transforms-v2
161161
seatunnel.transform.FieldRename = seatunnel-transforms-v2
162162
seatunnel.transform.TableRename = seatunnel-transforms-v2
163163
seatunnel.transform.TableMerge = seatunnel-transforms-v2
164+
seatunnel.transform.TableFilter = seatunnel-transforms-v2

seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void testAllTransformUseFactory() {
4343
FactoryUtil.discoverFactories(
4444
Thread.currentThread().getContextClassLoader(),
4545
TableTransformFactory.class);
46-
Assertions.assertEquals(16, factories.size());
46+
Assertions.assertEquals(17, factories.size());
4747
}
4848

4949
@Test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.e2e.transform;
19+
20+
import org.apache.seatunnel.e2e.common.container.EngineType;
21+
import org.apache.seatunnel.e2e.common.container.TestContainer;
22+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
23+
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.TestTemplate;
26+
import org.testcontainers.containers.Container;
27+
28+
import java.io.IOException;
29+
30+
public class TestTableFilterIT extends TestSuiteBase {
31+
@DisabledOnContainer(
32+
value = {},
33+
type = {EngineType.SPARK, EngineType.FLINK},
34+
disabledReason = "Only support for seatunnel")
35+
@TestTemplate
36+
public void testFilterMultiTable(TestContainer container)
37+
throws IOException, InterruptedException {
38+
Container.ExecResult execResult = container.executeJob("/table_filter_multi_table.conf");
39+
Assertions.assertEquals(0, execResult.getExitCode());
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
FakeSource {
25+
plugin_output = "source1"
26+
27+
tables_configs = [
28+
{
29+
row.num = 3
30+
schema = {
31+
table = "test.user_1"
32+
columns = [
33+
{
34+
name = "id"
35+
type = "bigint"
36+
},
37+
{
38+
name = "name"
39+
type = "string"
40+
}
41+
]
42+
}
43+
},
44+
{
45+
row.num = 3
46+
schema = {
47+
table = "test.user_2"
48+
columns = [
49+
{
50+
name = "id"
51+
type = "bigint"
52+
},
53+
{
54+
name = "name"
55+
type = "string"
56+
}
57+
]
58+
}
59+
},
60+
{
61+
row.num = 5
62+
schema = {
63+
table = "test.xyz"
64+
columns = [
65+
{
66+
name = "id"
67+
type = "bigint"
68+
},
69+
{
70+
name = "age"
71+
type = "int"
72+
}
73+
]
74+
}
75+
}
76+
]
77+
}
78+
}
79+
transform {
80+
TableFilter {
81+
plugin_input = "source1"
82+
plugin_output = "transform_a_1"
83+
84+
database_pattern = "test"
85+
table_pattern = "user_\\d+"
86+
}
87+
TableRename {
88+
plugin_input = "transform_a_1"
89+
plugin_output = "transform_a_2"
90+
91+
prefix = "table_a_"
92+
}
93+
94+
95+
96+
TableFilter {
97+
plugin_input = "source1"
98+
plugin_output = "transform_b_1"
99+
100+
database_pattern = "test"
101+
table_pattern = "xyz"
102+
}
103+
TableRename {
104+
plugin_input = "transform_b_1"
105+
plugin_output = "transform_b_2"
106+
107+
prefix = "table_b_"
108+
}
109+
}
110+
sink {
111+
Assert {
112+
plugin_input = "transform_a_2"
113+
114+
rules =
115+
{
116+
tables_configs = [
117+
{
118+
table_path = "test.table_a_user_1"
119+
row_rules = [
120+
{
121+
rule_type = MAX_ROW
122+
rule_value = 3
123+
},
124+
{
125+
rule_type = MIN_ROW
126+
rule_value = 3
127+
}
128+
]
129+
},
130+
{
131+
table_path = "test.table_a_user_2"
132+
row_rules = [
133+
{
134+
rule_type = MAX_ROW
135+
rule_value = 3
136+
},
137+
{
138+
rule_type = MIN_ROW
139+
rule_value = 3
140+
}
141+
]
142+
},
143+
{
144+
table_path = "test.table_a_xyz"
145+
row_rules = [
146+
{
147+
rule_type = MAX_ROW
148+
rule_value = 0
149+
},
150+
{
151+
rule_type = MIN_ROW
152+
rule_value = 0
153+
}
154+
]
155+
}
156+
]
157+
}
158+
}
159+
160+
Assert {
161+
plugin_input = "transform_b_2"
162+
163+
rules =
164+
{
165+
tables_configs = [
166+
{
167+
table_path = "test.table_b_user_1"
168+
row_rules = [
169+
{
170+
rule_type = MAX_ROW
171+
rule_value = 0
172+
},
173+
{
174+
rule_type = MIN_ROW
175+
rule_value = 0
176+
}
177+
]
178+
},
179+
{
180+
table_path = "test.table_b_user_2"
181+
row_rules = [
182+
{
183+
rule_type = MAX_ROW
184+
rule_value = 0
185+
},
186+
{
187+
rule_type = MIN_ROW
188+
rule_value = 0
189+
}
190+
]
191+
},
192+
{
193+
table_path = "test.table_b_xyz"
194+
row_rules = [
195+
{
196+
rule_type = MAX_ROW
197+
rule_value = 5
198+
},
199+
{
200+
rule_type = MIN_ROW
201+
rule_value = 5
202+
}
203+
]
204+
}
205+
]
206+
}
207+
}
208+
}

0 commit comments

Comments
 (0)