Skip to content

Commit 77ffa56

Browse files
authored
[Improve][Transform] Sql transform support inner strucy query (#6484)
1 parent d580860 commit 77ffa56

File tree

7 files changed

+317
-5
lines changed

7 files changed

+317
-5
lines changed

Diff for: docs/en/transform-v2/sql.md

+60
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ The source table name, the query SQL table name must match this field.
2424

2525
The query SQL, it's a simple SQL supported base function and criteria filter operation. But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like.
2626

27+
the query expression can be `select [table_name.]column_a` to query the column that named `column_a`. and the table name is optional.
28+
or `select c_row.c_inner_row.column_b` to query the inline struct column that named `column_b` within `c_row` column and `c_inner_row` column. **In this query expression, can't have table name.**
29+
2730
## Example
2831

2932
The data read from source is a table like this:
@@ -56,6 +59,61 @@ Then the data in result table `fake1` will update to
5659
| 3 | Kin Dom_ | 25 |
5760
| 4 | Joy Dom_ | 23 |
5861

62+
### Struct query
63+
64+
if your upstream data schema is like this:
65+
66+
```hacon
67+
source {
68+
FakeSource {
69+
result_table_name = "fake"
70+
row.num = 100
71+
string.template = ["innerQuery"]
72+
schema = {
73+
fields {
74+
name = "string"
75+
c_date = "date"
76+
c_row = {
77+
c_inner_row = {
78+
c_inner_int = "int"
79+
c_inner_string = "string"
80+
c_inner_timestamp = "timestamp"
81+
c_map_1 = "map<string, string>"
82+
c_map_2 = "map<string, map<string,string>>"
83+
}
84+
c_string = "string"
85+
}
86+
}
87+
}
88+
}
89+
}
90+
```
91+
92+
Those query all are valid:
93+
94+
```sql
95+
select
96+
name,
97+
c_date,
98+
c_row,
99+
c_row.c_inner_row,
100+
c_row.c_string,
101+
c_row.c_inner_row.c_inner_int,
102+
c_row.c_inner_row.c_inner_string,
103+
c_row.c_inner_row.c_inner_timestamp,
104+
c_row.c_inner_row.c_map_1,
105+
c_row.c_inner_row.c_map_1.some_key
106+
```
107+
108+
But this query are not valid:
109+
110+
```sql
111+
select
112+
c_row.c_inner_row.c_map_2.some_key.inner_map_key
113+
```
114+
115+
The map must be the latest struct, can't query the nesting map.
116+
59117
## Job Config Example
60118

61119
```
@@ -94,6 +152,8 @@ sink {
94152

95153
## Changelog
96154

155+
- Support struct query
156+
97157
### new version
98158

99159
- Add SQL Transform Connector

Diff for: docs/zh/transform-v2/sql.md

+58
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ SQL 转换使用内存中的 SQL 引擎,我们可以通过 SQL 函数和 SQL
2424

2525
查询 SQL,它是一个简单的 SQL,支持基本的函数和条件过滤操作。但是,复杂的 SQL 尚不支持,包括:多源表/行连接和聚合操作等。
2626

27+
查询表达式可以是`select [table_name.]column_a`,这时会去查询列为`column_a`的列,`table_name`为可选项
28+
也可以是`select c_row.c_inner_row.column_b`,这时会去查询列`c_row`下的`c_inner_row``column_b`**嵌套结构查询中,不能存在`table_name`**
29+
2730
## 示例
2831

2932
源端数据读取的表格如下:
@@ -56,6 +59,61 @@ transform {
5659
| 3 | Kin Dom_ | 25 |
5760
| 4 | Joy Dom_ | 23 |
5861

62+
### 嵌套结构查询
63+
64+
例如你的上游数据结构是这样:
65+
66+
```hacon
67+
source {
68+
FakeSource {
69+
result_table_name = "fake"
70+
row.num = 100
71+
string.template = ["innerQuery"]
72+
schema = {
73+
fields {
74+
name = "string"
75+
c_date = "date"
76+
c_row = {
77+
c_inner_row = {
78+
c_inner_int = "int"
79+
c_inner_string = "string"
80+
c_inner_timestamp = "timestamp"
81+
c_map_1 = "map<string, string>"
82+
c_map_2 = "map<string, map<string,string>>"
83+
}
84+
c_string = "string"
85+
}
86+
}
87+
}
88+
}
89+
}
90+
```
91+
92+
那么下列所有的查询表达式都是有效的
93+
94+
```sql
95+
select
96+
name,
97+
c_date,
98+
c_row,
99+
c_row.c_inner_row,
100+
c_row.c_string,
101+
c_row.c_inner_row.c_inner_int,
102+
c_row.c_inner_row.c_inner_string,
103+
c_row.c_inner_row.c_inner_timestamp,
104+
c_row.c_inner_row.c_map_1,
105+
c_row.c_inner_row.c_map_1.some_key
106+
```
107+
108+
但是这个查询语句是无效的
109+
110+
```sql
111+
select
112+
c_row.c_inner_row.c_map_2.some_key.inner_map_key
113+
```
114+
115+
当查询map结构时,map结构应该为最后一个数据结构,不能查询嵌套map
116+
59117
## 作业配置示例
60118

61119
```

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,20 @@ public SeaTunnelDataType<?> getFieldType(int index) {
7474
}
7575

7676
public int indexOf(String fieldName) {
77+
return indexOf(fieldName, true);
78+
}
79+
80+
public int indexOf(String fieldName, boolean throwExceptionWhenNotFound) {
7781
for (int i = 0; i < fieldNames.length; i++) {
7882
if (fieldNames[i].equals(fieldName)) {
7983
return i;
8084
}
8185
}
82-
throw new IllegalArgumentException(String.format("can't find field [%s]", fieldName));
86+
if (throwExceptionWhenNotFound) {
87+
throw new IllegalArgumentException(String.format("can't find field [%s]", fieldName));
88+
} else {
89+
return -1;
90+
}
8391
}
8492

8593
@Override

Diff for: seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java

+13
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.seatunnel.e2e.transform;
1919

20+
import org.apache.seatunnel.e2e.common.container.EngineType;
2021
import org.apache.seatunnel.e2e.common.container.TestContainer;
22+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2123

2224
import org.junit.jupiter.api.Assertions;
2325
import org.junit.jupiter.api.TestTemplate;
@@ -58,4 +60,15 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
5860
Container.ExecResult caseWhenSql = container.executeJob("/sql_transform/case_when.conf");
5961
Assertions.assertEquals(0, caseWhenSql.getExitCode());
6062
}
63+
64+
@TestTemplate
65+
@DisabledOnContainer(
66+
value = {},
67+
type = {EngineType.SPARK},
68+
disabledReason = "Spark translation has some issue on map convert")
69+
public void testInnerQuery(TestContainer container) throws IOException, InterruptedException {
70+
Container.ExecResult innerQuerySql =
71+
container.executeJob("/sql_transform/inner_query.conf");
72+
Assertions.assertEquals(0, innerQuerySql.getExitCode());
73+
}
6174
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
job.mode = "BATCH"
23+
}
24+
25+
source {
26+
FakeSource {
27+
result_table_name = "fake"
28+
row.num = 100
29+
string.template = ["innerQuery"]
30+
schema = {
31+
fields {
32+
name = "string"
33+
c_date = "date"
34+
c_row = {
35+
c_inner_row = {
36+
c_inner_int = "int"
37+
c_inner_string = "string"
38+
c_inner_timestamp = "timestamp"
39+
c_map = "map<string, string>"
40+
}
41+
c_string = "string"
42+
}
43+
}
44+
}
45+
}
46+
}
47+
48+
transform {
49+
Sql {
50+
source_table_name = "fake"
51+
result_table_name = "tmp1"
52+
query = """select c_date,
53+
c_row.c_string c_string,
54+
c_row.c_inner_row.c_inner_string c_inner_string,
55+
c_row.c_inner_row.c_inner_timestamp c_inner_timestamp,
56+
c_row.c_inner_row.c_map.innerQuery map_val,
57+
c_row.c_inner_row.c_map.notExistKey map_not_exist_val
58+
from fake"""
59+
}
60+
}
61+
62+
sink {
63+
Console {
64+
source_table_name = "tmp1"
65+
}
66+
Assert {
67+
source_table_name = "tmp1"
68+
rules = {
69+
field_rules = [{
70+
field_name = "c_date"
71+
field_type = "date"
72+
field_value = [
73+
{rule_type = NOT_NULL}
74+
]
75+
},
76+
{
77+
field_name = "c_string"
78+
field_type = "string"
79+
field_value = [
80+
{equals_to = "innerQuery"}
81+
]
82+
},
83+
{
84+
field_name = "c_inner_string"
85+
field_type = "string"
86+
field_value = [
87+
{equals_to = "innerQuery"}
88+
]
89+
},
90+
{
91+
field_name = "c_inner_timestamp"
92+
field_type = "timestamp"
93+
field_value = [
94+
{rule_type = NOT_NULL}
95+
]
96+
},
97+
{
98+
field_name = "map_val"
99+
field_type = "string"
100+
field_value = [
101+
{rule_type = NOT_NULL}
102+
]
103+
},
104+
{
105+
field_name = "map_not_exist_val"
106+
field_type = "null"
107+
field_value = [
108+
{rule_type = NULL}
109+
]
110+
}
111+
]
112+
}
113+
}
114+
}

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java

+30-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.seatunnel.transform.sql.zeta;
1919

2020
import org.apache.seatunnel.api.table.type.DecimalType;
21+
import org.apache.seatunnel.api.table.type.MapType;
2122
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2224
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2325
import org.apache.seatunnel.api.table.type.SqlType;
2426
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -56,6 +58,7 @@
5658
import java.math.RoundingMode;
5759
import java.util.ArrayList;
5860
import java.util.List;
61+
import java.util.Map;
5962

6063
public class ZetaSQLFunction {
6164
// ============================internal functions=====================
@@ -199,8 +202,33 @@ public Object computeForValue(Expression expression, Object[] inputFields) {
199202
return ((StringValue) expression).getValue();
200203
}
201204
if (expression instanceof Column) {
202-
int idx = inputRowType.indexOf(((Column) expression).getColumnName());
203-
return inputFields[idx];
205+
Column columnExp = (Column) expression;
206+
String columnName = columnExp.getColumnName();
207+
int index = inputRowType.indexOf(columnName, false);
208+
if (index != -1) {
209+
return inputFields[index];
210+
} else {
211+
String fullyQualifiedName = columnExp.getFullyQualifiedName();
212+
String[] columnNames = fullyQualifiedName.split("\\.");
213+
int deep = columnNames.length;
214+
SeaTunnelDataType parDataType = inputRowType;
215+
SeaTunnelRow parRowValues = new SeaTunnelRow(inputFields);
216+
Object res = parRowValues;
217+
for (int i = 0; i < deep; i++) {
218+
if (parDataType instanceof MapType) {
219+
return ((Map) res).get(columnNames[i]);
220+
}
221+
parRowValues = (SeaTunnelRow) res;
222+
int idx = ((SeaTunnelRowType) parDataType).indexOf(columnNames[i], false);
223+
if (idx == -1) {
224+
throw new IllegalArgumentException(
225+
String.format("can't find field [%s]", fullyQualifiedName));
226+
}
227+
parDataType = ((SeaTunnelRowType) parDataType).getFieldType(idx);
228+
res = parRowValues.getFields()[idx];
229+
}
230+
return res;
231+
}
204232
}
205233
if (expression instanceof Function) {
206234
Function function = (Function) expression;

0 commit comments

Comments
 (0)