Skip to content

Commit 8fe617e

Browse files
committed
trigger
1 parent fb9172c commit 8fe617e

File tree

76 files changed

+6148
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+6148
-0
lines changed

dolphinscheduler-api/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@
7777
<artifactId>dolphinscheduler-task-all</artifactId>
7878
</dependency>
7979

80+
<dependency>
81+
<groupId>org.apache.dolphinscheduler</groupId>
82+
<artifactId>dolphinscheduler-trigger-all</artifactId>
83+
</dependency>
84+
8085
<dependency>
8186
<groupId>org.apache.dolphinscheduler</groupId>
8287
<artifactId>dolphinscheduler-ui</artifactId>

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
2929
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
3030
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
31+
import org.apache.dolphinscheduler.plugin.trigger.api.TriggerPluginManager;
3132
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
3233
import org.apache.dolphinscheduler.service.ServiceConfiguration;
3334
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
@@ -70,6 +71,7 @@ public void run(ApplicationReadyEvent readyEvent) {
7071
log.info("Received spring application context ready event will load taskPlugin and write to DB");
7172
// install task plugin
7273
TaskPluginManager.loadPlugin();
74+
TriggerPluginManager.loadPlugin();
7375
DataSourceProcessorProvider.initialize();
7476
for (Map.Entry<String, TaskChannelFactory> entry : TaskPluginManager.getTaskChannelFactoryMap().entrySet()) {
7577
String taskPluginName = entry.getKey();

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TriggerController.java

+476
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.apache.dolphinscheduler.api.service;
2+
3+
import org.apache.dolphinscheduler.dao.entity.User;
4+
5+
import java.util.Map;
6+
7+
/**
8+
* trigger definition service
9+
*/
10+
public interface TriggerDefinitionService {
11+
12+
Map<String, Object> createTriggerDefinition(User loginUser,
13+
long projectCode,
14+
String triggerDefinitionJson);
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.service.impl;
19+
20+
import com.baomidou.mybatisplus.core.metadata.IPage;
21+
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
22+
import com.google.common.collect.Lists;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.commons.collections4.CollectionUtils;
25+
import org.apache.commons.collections4.MapUtils;
26+
import org.apache.commons.lang3.StringUtils;
27+
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
28+
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
29+
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
30+
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
31+
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
32+
import org.apache.dolphinscheduler.api.enums.Status;
33+
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
34+
import org.apache.dolphinscheduler.api.permission.PermissionCheck;
35+
import org.apache.dolphinscheduler.api.service.*;
36+
import org.apache.dolphinscheduler.api.utils.PageInfo;
37+
import org.apache.dolphinscheduler.api.utils.Result;
38+
import org.apache.dolphinscheduler.api.vo.TaskDefinitionVO;
39+
import org.apache.dolphinscheduler.common.constants.Constants;
40+
import org.apache.dolphinscheduler.common.enums.*;
41+
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
42+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
43+
import org.apache.dolphinscheduler.dao.entity.*;
44+
import org.apache.dolphinscheduler.dao.mapper.*;
45+
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
46+
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
47+
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
48+
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
49+
import org.apache.dolphinscheduler.service.process.ProcessService;
50+
import org.springframework.beans.factory.annotation.Autowired;
51+
import org.springframework.stereotype.Service;
52+
import org.springframework.transaction.annotation.Transactional;
53+
54+
import java.util.*;
55+
import java.util.function.Function;
56+
import java.util.stream.Collectors;
57+
58+
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*;
59+
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION;
60+
61+
/**
62+
* tenant service impl
63+
*/
64+
@Service
65+
@Slf4j
66+
public class TriggerDefinitionServiceImpl extends BaseServiceImpl implements TriggerDefinitionService {
67+
68+
@Autowired
69+
private ProjectMapper projectMapper;
70+
71+
@Autowired
72+
private ProjectService projectService;
73+
74+
@Autowired
75+
private TaskDefinitionMapper taskDefinitionMapper;
76+
77+
@Autowired
78+
private TaskDefinitionDao taskDefinitionDao;
79+
80+
@Autowired
81+
private TaskDefinitionLogMapper taskDefinitionLogMapper;
82+
83+
@Autowired
84+
private ProcessTaskRelationMapper processTaskRelationMapper;
85+
86+
@Autowired
87+
private ProcessTaskRelationLogDao processTaskRelationLogDao;
88+
89+
@Autowired
90+
private ProcessTaskRelationService processTaskRelationService;
91+
92+
@Autowired
93+
private ProcessDefinitionMapper processDefinitionMapper;
94+
95+
@Autowired
96+
private ProcessService processService;
97+
98+
@Autowired
99+
private ProcessDefinitionService processDefinitionService;
100+
101+
@Autowired
102+
private ProcessDefinitionLogMapper processDefinitionLogMapper;
103+
104+
105+
@Transactional
106+
@Override
107+
public Map<String, Object> createTriggerDefinition(User loginUser,
108+
long projectCode,
109+
String triggerDefinitionJson) {
110+
Project project = projectMapper.queryByCode(projectCode);
111+
// check if user have write perm for project
112+
Map<String, Object> result = new HashMap<>();
113+
boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result);
114+
if (!hasProjectAndWritePerm) {
115+
return result;
116+
}
117+
118+
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(triggerDefinitionJson, TaskDefinitionLog.class);
119+
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
120+
log.warn("Parameter taskDefinitionJson is invalid.");
121+
putMsg(result, Status.DATA_IS_NOT_VALID, triggerDefinitionJson);
122+
return result;
123+
}
124+
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
125+
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
126+
.taskType(taskDefinitionLog.getTaskType())
127+
.taskParams(taskDefinitionLog.getTaskParams())
128+
.dependence(taskDefinitionLog.getDependence())
129+
.build())) {
130+
log.warn("Task definition {} parameters are invalid.", taskDefinitionLog.getName());
131+
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
132+
return result;
133+
}
134+
}
135+
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, Boolean.TRUE);
136+
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
137+
log.error("Create task definition error, projectCode:{}.", projectCode);
138+
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
139+
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
140+
}
141+
Map<String, Object> resData = new HashMap<>();
142+
resData.put("total", taskDefinitionLogs.size());
143+
resData.put("code", StringUtils
144+
.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ","));
145+
putMsg(result, Status.SUCCESS);
146+
result.put(Constants.DATA_LIST, resData);
147+
return result;
148+
}
149+
}
150+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.entity;
19+
20+
import java.util.Date;
21+
import java.util.Objects;
22+
23+
import lombok.Data;
24+
25+
import com.baomidou.mybatisplus.annotation.IdType;
26+
import com.baomidou.mybatisplus.annotation.TableId;
27+
import com.baomidou.mybatisplus.annotation.TableName;
28+
29+
@Data
30+
@TableName("t_ds_trigger_definition")
31+
public class TriggerDefinition {
32+
/**
33+
* id
34+
*/
35+
@TableId(value = "id", type = IdType.AUTO)
36+
private Integer id;
37+
38+
/**
39+
* code
40+
*/
41+
private long code;
42+
43+
/**
44+
* name
45+
*/
46+
private String name;
47+
48+
/**
49+
* version
50+
*/
51+
private int version;
52+
53+
/**
54+
* description
55+
*/
56+
private String description;
57+
58+
/**
59+
* project code
60+
*/
61+
private long projectCode;
62+
63+
/**
64+
* trigger user id
65+
*/
66+
private int userId;
67+
68+
/**
69+
* trigger type
70+
*/
71+
private String triggerType;
72+
73+
/**
74+
* create time
75+
*/
76+
private Date createTime;
77+
78+
/**
79+
* update time
80+
*/
81+
private Date updateTime;
82+
83+
public TriggerDefinition() {
84+
}
85+
86+
public TriggerDefinition(long code, int version) {
87+
this.code = code;
88+
this.version = version;
89+
}
90+
91+
public boolean equals(Object o) {
92+
if (o == null) {
93+
return false;
94+
}
95+
TriggerDefinition that = (TriggerDefinition) o;
96+
return Objects.equals(name, that.name)
97+
&& Objects.equals(description, that.description);
98+
}
99+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.entity;
19+
20+
import java.io.Serializable;
21+
import java.util.Date;
22+
import java.util.Map;
23+
import java.util.Objects;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import com.baomidou.mybatisplus.annotation.*;
27+
import com.fasterxml.jackson.core.type.TypeReference;
28+
import lombok.Data;
29+
30+
import org.apache.dolphinscheduler.common.constants.Constants;
31+
import org.apache.dolphinscheduler.common.enums.Flag;
32+
import org.apache.dolphinscheduler.common.enums.Priority;
33+
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
34+
import org.apache.dolphinscheduler.common.utils.DateUtils;
35+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
36+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
37+
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
38+
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
39+
40+
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*;
41+
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
42+
43+
@Data
44+
@TableName("t_ds_trigger_instance")
45+
public class TriggerInstance implements Serializable {
46+
47+
/**
48+
* id
49+
*/
50+
@TableId(value = "id", type = IdType.AUTO)
51+
private Integer id;
52+
53+
/**
54+
* task name
55+
*/
56+
private String name;
57+
58+
/**
59+
* task type
60+
*/
61+
private String triggerType;
62+
63+
private int processInstanceId;
64+
65+
private String processInstanceName;
66+
67+
private Long projectCode;
68+
69+
private long triggerCode;
70+
71+
private int triggerDefinitionVersion;
72+
73+
@TableField(exist = false)
74+
private String processDefinitionName;
75+
76+
/**
77+
* state
78+
*/
79+
private TaskExecutionStatus state;
80+
81+
/**
82+
* task first submit time.
83+
*/
84+
private Date firstSubmitTime;
85+
86+
/**
87+
* task submit time
88+
*/
89+
private Date submitTime;
90+
91+
/**
92+
* task start time
93+
*/
94+
private Date startTime;
95+
96+
/**
97+
* task end time
98+
*/
99+
private Date endTime;
100+
}
101+

0 commit comments

Comments
 (0)