Skip to content

[Feature][Transform] Add table filter transform #9189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions docs/en/transform-v2/table-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# TableMerge

> TableFilter transform plugin

## Description

TableFilter transform plugin for filter tables.

## Options

| name | type | required | default value | Description |
|:----------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| database_pattern | string | no | | Specify database filter pattern, the default value is null, which means no filtering. If you want to filter the database name, please set it to a regular expression. |
| schema_pattern | string | no | | Specify schema filter pattern, the default value is null, which means no filtering. If you want to filter the schema name, please set it to a regular expression. |
| table_pattern | string | no | | Specify table filter pattern, the default value is null, which means no filtering. If you want to filter the table name, please set it to a regular expression. |
| pattern_mode | string | no | INCLUDE | Specify pattern mode, the default value is INCLUDE, which means include the matched table. If you want to exclude the matched table, please set it to EXCLUDE. |

## Examples

### Include filter tables

Include filter tables with the name matching the regular expression `user_\d+` in the database `test`.

```hocon
transform {
TableFilter {
plugin_input = "source1"
plugin_output = "transform_a_1"

database_pattern = "test"
table_pattern = "user_\\d+"
}
}
```

### Exclude filter tables

Exclude filter tables with the name matching the regular expression `user_\d+` in the database `test`.

```hocon
transform {
TableFilter {
plugin_input = "source1"
plugin_output = "transform_a_1"

database_pattern = "test"
table_pattern = "user_\\d+"
pattern_mode = "EXCLUDE"
}
}
```
51 changes: 51 additions & 0 deletions docs/zh/transform-v2/table-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# TableMerge

> TableFilter transform plugin

## Description

表过滤 transform,用于正向或者反向过滤部分表

## Options

| name | type | required | default value | Description |
|:----------------:|--------|----------|---------------|--------------------------------------------------------|
| database_pattern | string | no | | 指定数据库过滤模式,默认值为 null,表示不过滤。如果要过滤数据库名称,请将其设置为正则表达式。 |
| schema_pattern | string | no | | 指定 schema 过滤模式,默认值为 null,表示不过滤。如果要过滤架构名称,请将其设置为正则表达式。 |
| table_pattern | string | no | | 指定表过滤模式,默认值为 null,表示不过滤。如果要过滤表名称,请将其设置为正则表达式。 |
| pattern_mode | string | no | INCLUDE | 指定过滤模式,默认值为 INCLUDE,表示包含匹配的表。如果要排除匹配的表,请将其设置为 EXCLUDE。 |

## Examples

### 包含表过滤

在数据库 "test" 中包含名称与正则表达式 "user_\d+" 匹配的过滤表。

```hocon
transform {
TableFilter {
plugin_input = "source1"
plugin_output = "transform_a_1"

database_pattern = "test"
table_pattern = "user_\\d+"
}
}
```

### 排除表过滤

排除数据库 "test" 中名称与正则表达式 "user_\d+" 匹配的过滤表。

```hocon
transform {
TableFilter {
plugin_input = "source1"
plugin_output = "transform_a_1"

database_pattern = "test"
table_pattern = "user_\\d+"
pattern_mode = "EXCLUDE"
}
}
```
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,4 @@ seatunnel.transform.Metadata = seatunnel-transforms-v2
seatunnel.transform.FieldRename = seatunnel-transforms-v2
seatunnel.transform.TableRename = seatunnel-transforms-v2
seatunnel.transform.TableMerge = seatunnel-transforms-v2
seatunnel.transform.TableFilter = seatunnel-transforms-v2
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testAllTransformUseFactory() {
FactoryUtil.discoverFactories(
Thread.currentThread().getContextClassLoader(),
TableTransformFactory.class);
Assertions.assertEquals(16, factories.size());
Assertions.assertEquals(17, factories.size());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import java.io.IOException;

public class TestTableFilterIT extends TestSuiteBase {
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Only support for seatunnel")
@TestTemplate
public void testFilterMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/table_filter_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
plugin_output = "source1"

tables_configs = [
{
row.num = 3
schema = {
table = "test.user_1"
columns = [
{
name = "id"
type = "bigint"
},
{
name = "name"
type = "string"
}
]
}
},
{
row.num = 3
schema = {
table = "test.user_2"
columns = [
{
name = "id"
type = "bigint"
},
{
name = "name"
type = "string"
}
]
}
},
{
row.num = 5
schema = {
table = "test.xyz"
columns = [
{
name = "id"
type = "bigint"
},
{
name = "age"
type = "int"
}
]
}
}
]
}
}
transform {
TableFilter {
plugin_input = "source1"
plugin_output = "transform_a_1"

database_pattern = "test"
table_pattern = "user_\\d+"
}
TableRename {
plugin_input = "transform_a_1"
plugin_output = "transform_a_2"

prefix = "table_a_"
}



TableFilter {
plugin_input = "source1"
plugin_output = "transform_b_1"

database_pattern = "test"
table_pattern = "xyz"
}
TableRename {
plugin_input = "transform_b_1"
plugin_output = "transform_b_2"

prefix = "table_b_"
}
}
sink {
Assert {
plugin_input = "transform_a_2"

rules =
{
tables_configs = [
{
table_path = "test.table_a_user_1"
row_rules = [
{
rule_type = MAX_ROW
rule_value = 3
},
{
rule_type = MIN_ROW
rule_value = 3
}
]
},
{
table_path = "test.table_a_user_2"
row_rules = [
{
rule_type = MAX_ROW
rule_value = 3
},
{
rule_type = MIN_ROW
rule_value = 3
}
]
},
{
table_path = "test.table_a_xyz"
row_rules = [
{
rule_type = MAX_ROW
rule_value = 0
},
{
rule_type = MIN_ROW
rule_value = 0
}
]
}
]
}
}

Assert {
plugin_input = "transform_b_2"

rules =
{
tables_configs = [
{
table_path = "test.table_b_user_1"
row_rules = [
{
rule_type = MAX_ROW
rule_value = 0
},
{
rule_type = MIN_ROW
rule_value = 0
}
]
},
{
table_path = "test.table_b_user_2"
row_rules = [
{
rule_type = MAX_ROW
rule_value = 0
},
{
rule_type = MIN_ROW
rule_value = 0
}
]
},
{
table_path = "test.table_b_xyz"
row_rules = [
{
rule_type = MAX_ROW
rule_value = 5
},
{
rule_type = MIN_ROW
rule_value = 5
}
]
}
]
}
}
}
Loading