Skip to content

Commit 5a7e575

Browse files
authored
[Feature][Server] Add metadata lineage (#572)
1 parent 219b78e commit 5a7e575

41 files changed

Lines changed: 3141 additions & 38 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/DefaultStatementParser.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,76 @@
1616
*/
1717
package io.datavines.connector.plugin;
1818

19+
import com.alibaba.druid.DbType;
20+
import com.alibaba.druid.sql.SQLUtils;
21+
import com.alibaba.druid.sql.ast.SQLStatement;
22+
import com.alibaba.druid.sql.visitor.SchemaStatVisitor;
23+
import com.alibaba.druid.stat.TableStat;
1924
import io.datavines.connector.api.StatementParser;
2025
import io.datavines.connector.api.entity.StatementMetadataFragment;
2126

27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Map;
30+
2231
public class DefaultStatementParser implements StatementParser {
2332

33+
private final DbType dbType;
34+
35+
public DefaultStatementParser() {
36+
this(null);
37+
}
38+
39+
public DefaultStatementParser(DbType dbType) {
40+
this.dbType = dbType;
41+
}
42+
2443
@Override
2544
public StatementMetadataFragment parseStatement(String statement) {
26-
return null;
45+
if (statement == null || statement.trim().isEmpty()) {
46+
return null;
47+
}
48+
49+
List<SQLStatement> stmtList = SQLUtils.parseStatements(statement, dbType);
50+
if (stmtList == null || stmtList.isEmpty()) {
51+
return null;
52+
}
53+
54+
List<String> inputTables = new ArrayList<>();
55+
List<String> outputTables = new ArrayList<>();
56+
57+
for (SQLStatement sqlStatement : stmtList) {
58+
SchemaStatVisitor visitor = SQLUtils.createSchemaStatVisitor(dbType);
59+
sqlStatement.accept(visitor);
60+
61+
Map<TableStat.Name, TableStat> tables = visitor.getTables();
62+
if (tables == null || tables.isEmpty()) {
63+
continue;
64+
}
65+
66+
for (Map.Entry<TableStat.Name, TableStat> entry : tables.entrySet()) {
67+
String tableName = entry.getKey().getName();
68+
TableStat stat = entry.getValue();
69+
70+
if (stat.getInsertCount() > 0 || stat.getCreateCount() > 0
71+
|| stat.getMergeCount() > 0) {
72+
if (!outputTables.contains(tableName)) {
73+
outputTables.add(tableName);
74+
}
75+
}
76+
77+
if (stat.getSelectCount() > 0) {
78+
if (!inputTables.contains(tableName)) {
79+
inputTables.add(tableName);
80+
}
81+
}
82+
}
83+
}
84+
85+
if (inputTables.isEmpty() && outputTables.isEmpty()) {
86+
return null;
87+
}
88+
89+
return new StatementMetadataFragment(inputTables, outputTables, new ArrayList<>());
2790
}
2891
}

datavines-connector/datavines-connector-plugins/datavines-connector-trino/src/main/java/io/datavines/connector/plugin/TrinoConnector.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,4 @@ public ConnectorResponse testConnect(TestConnectionRequestParam param) {
9393
.build();
9494
}
9595
}
96-
97-
@Override
98-
public List<String> keyProperties() {
99-
return Arrays.asList("host","port","catalog","database");
100-
}
10196
}

datavines-core/src/main/java/io/datavines/core/enums/Status.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package io.datavines.core.enums;
1818

19+
import lombok.Getter;
1920
import org.springframework.context.i18n.LocaleContextHolder;
2021

2122
import java.util.Locale;
@@ -49,7 +50,7 @@ public enum Status {
4950
OLD_PASSWORD_IS_INCORRECT_ERROR(10020004, "Old Password is Incorrect", "旧密码错误"),
5051
NEW_PASSWORD_CONFIRM_IS_INCORRECT_ERROR(10020004, "New Password Confirm is Incorrect", "新密码确认错误"),
5152

52-
WORKSPACE_EXIST_ERROR(11010001, "WorkSpace {0} is Exist error", "工作空间 {0} 已存在错误"),
53+
WORKSPACE_EXIST_ERROR(11010001, "WorkSpace {0} is Exist Error", "工作空间 {0} 已存在错误"),
5354
CREATE_WORKSPACE_ERROR(11010002, "Create WorkSpace {0} Error", "创建工作空间 {0} 错误"),
5455
WORKSPACE_NOT_EXIST_ERROR(11010003, "WorkSpace {0} is Not Exist Error", "工作空间 {0} 不存在错误"),
5556
UPDATE_WORKSPACE_ERROR(11010004, "Update WorkSpace {0} Error", "更新工作空间 {0} 错误"),
@@ -69,7 +70,6 @@ public enum Status {
6970
TASK_NOT_EXIST_ERROR(13010001, "Task {0} Not Exist Error", "任务{0}不存在错误"),
7071
TASK_LOG_PATH_NOT_EXIST_ERROR(13010002, "Task {0} Log Path Not Exist Error", "任务 {0} 的日志路径不存在错误"),
7172
TASK_EXECUTE_HOST_NOT_EXIST_ERROR(13010003, "Task Execute Host {0} Not Exist Error", "任务 {0} 的执行服务地址不存在错误"),
72-
7373
TASK_EXECUTE_NOT_RUNNING(13010004, "Taskt {0} has not running", "任务 {0} 还没有开始运行,请稍后重试"),
7474

7575
JOB_PARAMETER_IS_NULL_ERROR(14010001, "Job {0} Parameter is Null Error", "作业 {0} 参数为空错误"),
@@ -83,7 +83,6 @@ public enum Status {
8383
METRIC_IS_NOT_EXIST(14010009, "The Metric {0} is not exist", "规则 {0} 不存在"),
8484
MULTI_TABLE_ACCURACY_NOT_SUPPORT_LOCAL_ENGINE(14010010, "Local Engine not support multi table accuracy in one datasource", "Local引擎不支持跨表准确性检查"),
8585
JOB_PARAMETER_CONTAIN_DUPLICATE_METRIC_ERROR(14010011, "Job {0} Parameter Contain Duplicate Metric", "作业中存在重复的检查规则"),
86-
8786
JOB_SCHEDULE_EXIST_ERROR(14020001, "Job Schedule is Exist error, id must be not null", "作业定时任务已存在,ID 不能为空"),
8887
CREATE_JOB_SCHEDULE_ERROR(14020002, "Create Job Schedule {0} Error", "创建作业定时任务 {0} 错误"),
8988
JOB_SCHEDULE_NOT_EXIST_ERROR(14020003, "Job Schedule {0} is not Exist error", "作业定时任务 {0} 不存在错误"),
@@ -93,6 +92,7 @@ public enum Status {
9392
SCHEDULE_TYPE_NOT_VALIDATE_ERROR(14020007, "Schedule type {0} is not Validate Error", "定时器类型参数 {0} 错误"),
9493
SCHEDULE_CYCLE_NOT_VALIDATE_ERROR(14020008, "Schedule Param Cycle {0} is not Validate Error", "定时器周期参数 {0} 错误"),
9594
SCHEDULE_CRON_IS_INVALID_ERROR(14020009, "Schedule cron {0} is not Validate Error", "定时器 Crontab 表达式 {0} 错误"),
95+
DATASOURCE_NOT_SUPPORT_ERROR_DATA_OUTPUT_TO_SELF_ERROR(14020010, "DataSource type {0} not Support Error Data Output To Self Error", "{0} 类型数据源不支持错误输出写入"),
9696

9797
CREATE_TENANT_ERROR(15010001, "Create Tenant {0} Error", "创建 Linux 用户 {0} 错误"),
9898
TENANT_NOT_EXIST_ERROR(15010002, "Tenant {0} Not Exist Error", "Linux 用户 {0} 不存在错误"),
@@ -109,15 +109,13 @@ public enum Status {
109109
ERROR_DATA_STORAGE_EXIST_ERROR(17010003, "Error Data Storage {0} is Exist error", "错误数据存储 {0} 已存在"),
110110
UPDATE_ERROR_DATA_STORAGE_ERROR(17010004, "Update Error Data Storage {0} Error", "更新 错误数据存储 {0} 错误"),
111111

112-
DATASOURCE_NOT_SUPPORT_ERROR_DATA_OUTPUT_TO_SELF_ERROR(14020010, "DataSource type {0} not Support Error Data Output To Self Error", "{0} 类型数据源不支持错误输出写入"),
113-
114112
SLA_ALREADY_EXIST_ERROR(18010001, "SLA {0} Already exist", "SLA {0} 已经存在"),
115113
SLA_SENDER_ALREADY_EXIST_ERROR(18020001, "SLA Sender {0} Already exist", "SLA 发送器 {0} 已经存在"),
116114
SLA_JOB_IS_NOT_EXIST_ERROR(18010003, "SLA job {0} is not exist", "SLA job {0} 存在"),
117115

118116
CATALOG_FETCH_DATASOURCE_NULL_ERROR(19010001, "获取元数据时数据源为空", "DataSource must not be null when fetch metadata"),
119-
120117
CATALOG_FETCH_METADATA_PARAMETER_ERROR(19010002, "获取元数据参数错误", "Fetch Metadata Parameter Error"),
118+
121119
CATALOG_TAG_CATEGORY_CREATE_ERROR(20010001, "Create Tag Category {0} Error", "创建标签类别 {0} 错误"),
122120
CATALOG_TAG_CATEGORY_NOT_EXIST_ERROR(20010002, "Tag Category {0} Not Exist Error", "标签类别 {0} 不存在"),
123121
CATALOG_TAG_CATEGORY_EXIST_ERROR(20010003, "Tag Category {0} is Exist error", "标签类别 {0} 已存在"),
@@ -129,21 +127,22 @@ public enum Status {
129127
CREATE_CATALOG_TASK_SCHEDULE_ERROR(20030002, "Create Catalog Task Schedule {0} Error", "创建元数据抓取定时任务 {0} 错误"),
130128
CATALOG_TASK_SCHEDULE_NOT_EXIST_ERROR(20030003, "Catalog Task Schedule {0} is not Exist error", "元数据抓取定时任务 {0} 不存在"),
131129
UPDATE_CATALOG_TASK_SCHEDULE_ERROR(20030004, "Update Catalog Task Schedule {0} Error", "更新元数据抓取定时任务 {0} 错误"),
132-
CATALOG_PROFILE_INSTANCE_FQN_ERROR(20030004, "Catalog instance fqn {0} Error", "数据实体全限定名 {0} 错误"),
130+
CATALOG_INSTANCE_FQN_ERROR(20030005, "Catalog instance fqn {0} Error", "数据实体全限定名 {0} 错误"),
131+
CATALOG_INSTANCE_IS_NULL_ERROR(20040001, "Catalog instance fqn {0} Error", "数据实体 {0} 为空错误"),
133132

134133
CREATE_ISSUE_ERROR(21010001, "Create Issue {0} Error", "创建Issue {0} 错误"),
135134
ISSUE_NOT_EXIST_ERROR(21010002, "Issue {0} Not Exist Error", "Issue {0} 不存在错误"),
136135
ISSUE_EXIST_ERROR(21010003, "Issue {0} is Exist error", "Issue {0} 已存在错误"),
137136
UPDATE_ISSUE_ERROR(21010004, "Update Issue {0} Error", "更新Issue {0} 错误"),
138137

139-
140138
CREATE_CONFIG_ERROR(22010001, "Create Config {0} Error", "创建参数 {0} 错误"),
141139
CONFIG_NOT_EXIST_ERROR(22010002, "Config {0} Not Exist Error", "参数 {0} 不存在错误"),
142140
CONFIG_EXIST_ERROR(22010003, "Config {0} is Exist error", "参数 {0} 已存在错误"),
143141
UPDATE_CONFIG_ERROR(22010004, "Update Config {0} Error", "更新参数 {0} 错误"),
144-
145142
CAN_NOT_DELETE_DEFAULT_CONFIG_ERROR(22010005, "Can Not Delete Default Config {0} Error", "不能删除默认参数 {0} 错误"),
146143
;
144+
145+
@Getter
147146
private final int code;
148147
private final String enMsg;
149148
private final String zhMsg;
@@ -154,10 +153,6 @@ public enum Status {
154153
this.zhMsg = zhMsg;
155154
}
156155

157-
public int getCode() {
158-
return this.code;
159-
}
160-
161156
public String getMsg() {
162157
if (Locale.SIMPLIFIED_CHINESE.getLanguage().equals(LocaleContextHolder.getLocale().getLanguage())) {
163158
return this.zhMsg;

datavines-engine/datavines-engine-config/src/main/java/io/datavines/engine/config/BaseJobConfigurationBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ protected SinkConfig getValidateResultDataSinkConfig(ExpectedValue expectedValue
254254
SinkConfig validateResultDataStorageConfig = new SinkConfig();
255255
validateResultDataStorageConfig.setPlugin(jobExecutionInfo.getValidateResultDataStorageType());
256256
Map<String, Object> configMap = getValidateResultSourceConfigMap(
257-
ParameterUtils.convertParameterPlaceholders(sql, inputParameter),dbTable);
257+
ParameterUtils.convertParameterPlaceholders(sql, inputParameter), dbTable);
258258
configMap.put(JOB_EXECUTION_ID, jobExecutionInfo.getId());
259259
configMap.put(INVALIDATE_ITEMS_TABLE, inputParameter.get(INVALIDATE_ITEMS_TABLE));
260260
configMap.put(METRIC_UNIQUE_KEY, inputParameter.get(METRIC_UNIQUE_KEY));
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.datavines.server.api.controller;
18+
19+
import io.datavines.core.aop.RefreshToken;
20+
import io.datavines.core.constant.DataVinesConstants;
21+
import io.datavines.server.api.dto.bo.catalog.lineage.LineageEntityEdgeInfo;
22+
import io.datavines.server.api.dto.bo.catalog.lineage.SqlWithDataSourceKeyProperties;
23+
import io.datavines.server.api.dto.bo.catalog.lineage.SqlWithDataSourceList;
24+
import io.datavines.server.repository.entity.catalog.CatalogTagCategory;
25+
import io.datavines.server.repository.service.CatalogEntityRelService;
26+
import io.swagger.annotations.Api;
27+
import io.swagger.annotations.ApiOperation;
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.http.MediaType;
30+
import org.springframework.web.bind.annotation.*;
31+
32+
import javax.validation.Valid;
33+
34+
@Api(value = "catalog", tags = "catalog", produces = MediaType.APPLICATION_JSON_VALUE)
35+
@RestController
36+
@RequestMapping(value = DataVinesConstants.BASE_API_PATH + "/catalog/lineage", produces = MediaType.APPLICATION_JSON_VALUE)
37+
@RefreshToken
38+
public class CatalogLineageController {
39+
40+
@Autowired
41+
private CatalogEntityRelService catalogEntityRelService;
42+
43+
@ApiOperation(value = "add lineage", response = Long.class)
44+
@PostMapping(value = "/add", consumes = MediaType.APPLICATION_JSON_VALUE)
45+
public Object addLineage(@Valid @RequestBody LineageEntityEdgeInfo entityEdgeInfo) {
46+
return catalogEntityRelService.addLineage(entityEdgeInfo);
47+
}
48+
49+
@ApiOperation(value = "get lineage by full qualified name", response = CatalogTagCategory.class, responseContainer = "list")
50+
@GetMapping(value = "/getByFqn/{datasourceId}/{fqn}")
51+
public Object getByFqn(@PathVariable Long datasourceId, @PathVariable String fqn) {
52+
return catalogEntityRelService.getLineageByFqn(datasourceId,fqn,1,1);
53+
}
54+
55+
@ApiOperation(value = "delete tag category", response = boolean.class)
56+
@GetMapping(value = "/getByUUID/{uuid}")
57+
public Object getByUUID(@PathVariable String uuid) {
58+
return catalogEntityRelService.getLineageByUUID(uuid,1,1);
59+
}
60+
61+
@ApiOperation(value = "delete lineage", response = boolean.class)
62+
@DeleteMapping(value = "/{fromUUID}/{toUUID}")
63+
public Object deleteLineage(@PathVariable("fromUUID") String fromUUID,
64+
@PathVariable("toUUID") String toUUID) {
65+
return catalogEntityRelService.deleteLineage(fromUUID, toUUID);
66+
}
67+
68+
@ApiOperation(value = "parse sql to get lineage", response = Long.class)
69+
@PostMapping(value = "/addByParseSql", consumes = MediaType.APPLICATION_JSON_VALUE)
70+
public Object addLineageByParseSql(@Valid @RequestBody SqlWithDataSourceList sqlWithDataSourceList) {
71+
return catalogEntityRelService.addLineageByParseSql(sqlWithDataSourceList);
72+
}
73+
74+
@ApiOperation(value = "parse sql to get lineage", response = Long.class)
75+
@PostMapping(value = "/addByParseSql2", consumes = MediaType.APPLICATION_JSON_VALUE)
76+
public Object addLineageByParseSql2(@Valid @RequestBody SqlWithDataSourceKeyProperties sqlWithDataSourceKeyProperties) {
77+
return catalogEntityRelService.addLineageByParseSql2(sqlWithDataSourceKeyProperties);
78+
}
79+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.datavines.server.api.dto.bo.catalog;
18+
19+
import lombok.Data;
20+
21+
import java.io.Serializable;
22+
23+
@Data
24+
public class CatalogEntityInstanceInfo implements Serializable {
25+
26+
private static final long serialVersionUID = -1L;
27+
28+
private Long id;
29+
30+
private String uuid;
31+
32+
private Long datasourceId;
33+
34+
private String type;
35+
36+
private String fullyQualifiedName;
37+
38+
private String displayName;
39+
40+
private String description;
41+
42+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.datavines.server.api.dto.bo.catalog.lineage;
18+
19+
import io.datavines.server.api.dto.bo.catalog.CatalogEntityInstanceInfo;
20+
import lombok.Data;
21+
22+
import java.io.Serializable;
23+
import java.util.List;
24+
25+
@Data
26+
public class CatalogEntityColumnLineageDetail implements Serializable {
27+
28+
private static final long serialVersionUID = -1L;
29+
30+
private List<CatalogEntityInstanceInfo> fromChildren;
31+
32+
private String function;
33+
34+
private CatalogEntityInstanceInfo toChild;
35+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.datavines.server.api.dto.bo.catalog.lineage;
18+
19+
import io.datavines.server.enums.LineageSourceType;
20+
import lombok.Data;
21+
22+
import java.io.Serializable;
23+
import java.util.List;
24+
25+
@Data
26+
public class CatalogEntityLineageDetail implements Serializable {
27+
28+
private static final long serialVersionUID = -1L;
29+
30+
private List<CatalogEntityColumnLineageDetail> childRelDetailList;
31+
32+
private String description;
33+
34+
private LineageSourceType sourceType;
35+
36+
private String sqlQuery;
37+
}

0 commit comments

Comments
 (0)