Skip to content

Commit 21a922f

Browse files
committed
trigger
1 parent 525a446 commit 21a922f

File tree

87 files changed

+7281
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+7281
-1
lines changed

dolphinscheduler-api/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@
7777
<artifactId>dolphinscheduler-task-all</artifactId>
7878
</dependency>
7979

80+
<dependency>
81+
<groupId>org.apache.dolphinscheduler</groupId>
82+
<artifactId>dolphinscheduler-trigger-all</artifactId>
83+
</dependency>
84+
8085
<dependency>
8186
<groupId>org.apache.dolphinscheduler</groupId>
8287
<artifactId>dolphinscheduler-ui</artifactId>

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
2929
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
3030
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
31+
import org.apache.dolphinscheduler.plugin.trigger.api.TriggerPluginManager;
3132
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
3233
import org.apache.dolphinscheduler.service.ServiceConfiguration;
3334
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
@@ -70,6 +71,7 @@ public void run(ApplicationReadyEvent readyEvent) {
7071
log.info("Received spring application context ready event will load taskPlugin and write to DB");
7172
// install task plugin
7273
TaskPluginManager.loadPlugin();
74+
TriggerPluginManager.loadPlugin();
7375
DataSourceProcessorProvider.initialize();
7476
for (Map.Entry<String, TaskChannelFactory> entry : TaskPluginManager.getTaskChannelFactoryMap().entrySet()) {
7577
String taskPluginName = entry.getKey();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.controller;
19+
20+
import io.swagger.v3.oas.annotations.Operation;
21+
import io.swagger.v3.oas.annotations.Parameter;
22+
import io.swagger.v3.oas.annotations.Parameters;
23+
import io.swagger.v3.oas.annotations.media.Schema;
24+
import io.swagger.v3.oas.annotations.tags.Tag;
25+
import lombok.extern.slf4j.Slf4j;
26+
import org.apache.dolphinscheduler.api.audit.OperatorLog;
27+
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
28+
import org.apache.dolphinscheduler.api.exceptions.ApiException;
29+
import org.apache.dolphinscheduler.api.service.ExecutorService;
30+
import org.apache.dolphinscheduler.api.service.TriggerDefinitionService;
31+
import org.apache.dolphinscheduler.api.utils.Result;
32+
import org.apache.dolphinscheduler.common.constants.Constants;
33+
import org.apache.dolphinscheduler.common.enums.*;
34+
import org.apache.dolphinscheduler.dao.entity.User;
35+
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.http.HttpStatus;
38+
import org.springframework.web.bind.annotation.*;
39+
40+
import java.util.*;
41+
42+
import static org.apache.dolphinscheduler.api.enums.Status.*;
43+
44+
/**
45+
* trigger definition controller
46+
*/
47+
@Tag(name = "TRIGGER_DEFINITION_TAG")
48+
@RestController
49+
@RequestMapping("projects/{projectCode}/trigger-definition")
50+
@Slf4j
51+
public class TriggerController extends BaseController {
52+
53+
@Autowired
54+
private ExecutorService execService;
55+
56+
@Autowired
57+
private TriggerDefinitionService triggerDefinitionService;
58+
59+
/**
60+
* create trigger definition
61+
*
62+
* @param loginUser login user
63+
* @param projectCode project code
64+
* @param triggerDefinitionJson trigger definition json
65+
* @return create result code
66+
*/
67+
@Operation(summary = "save", description = "CREATE_TRIGGER_DEFINITION_NOTES")
68+
@Parameters({
69+
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class)),
70+
@Parameter(name = "triggerDefinitionJson", description = "TRIGGER_DEFINITION_JSON", required = true, schema = @Schema(implementation = String.class))
71+
})
72+
@PostMapping()
73+
@ResponseStatus(HttpStatus.CREATED)
74+
// change error code
75+
@ApiException(CREATE_TASK_DEFINITION_ERROR)
76+
// change audit type
77+
@OperatorLog(auditType = AuditType.TASK_CREATE)
78+
public Result createTriggerDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
79+
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
80+
@RequestParam(value = "triggerDefinitionJson", required = true) String triggerDefinitionJson) {
81+
Map<String, Object> result =
82+
triggerDefinitionService.createTriggerDefinition(loginUser, projectCode, triggerDefinitionJson);
83+
return returnDataList(result);
84+
}
85+
86+
/**
87+
* query task definition list paging
88+
*
89+
* @param loginUser login user
90+
* @param projectCode project code
91+
* @param searchTriggerName searchTaskName
92+
* @param triggerType taskType
93+
* @param pageNo page number
94+
* @param pageSize page size
95+
* @return task definition page
96+
*/
97+
@Operation(summary = "queryTaskDefinitionListPaging", description = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES")
98+
@Parameters({
99+
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = false, schema = @Schema(implementation = long.class)),
100+
@Parameter(name = "searchWorkflowName", description = "SEARCH_WORKFLOW_NAME", required = false, schema = @Schema(implementation = String.class)),
101+
@Parameter(name = "searchTaskName", description = "SEARCH_TASK_NAME", required = false, schema = @Schema(implementation = String.class)),
102+
@Parameter(name = "taskType", description = "TASK_TYPE", required = false, schema = @Schema(implementation = String.class, example = "SHELL")),
103+
@Parameter(name = "taskExecuteType", description = "TASK_EXECUTE_TYPE", required = false, schema = @Schema(implementation = TaskExecuteType.class, example = "STREAM")),
104+
@Parameter(name = "pageNo", description = "PAGE_NO", required = true, schema = @Schema(implementation = int.class, example = "1")),
105+
@Parameter(name = "pageSize", description = "PAGE_SIZE", required = true, schema = @Schema(implementation = int.class, example = "10"))
106+
})
107+
@GetMapping()
108+
@ResponseStatus(HttpStatus.OK)
109+
@ApiException(QUERY_TASK_DEFINITION_LIST_PAGING_ERROR)
110+
public Result queryTaskDefinitionListPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
111+
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
112+
@RequestParam(value = "searchTriggerName", required = false) String searchTriggerName,
113+
@RequestParam(value = "triggerType", required = false) String triggerType,
114+
@RequestParam("pageNo") Integer pageNo,
115+
@RequestParam("pageSize") Integer pageSize) {
116+
checkPageParams(pageNo, pageSize);
117+
searchTriggerName = ParameterUtils.handleEscapes(searchTriggerName);
118+
return triggerDefinitionService.queryTriggerDefinitionListPaging(loginUser, projectCode,
119+
searchTriggerName, triggerType, pageNo, pageSize);
120+
}
121+
}
122+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.apache.dolphinscheduler.api.service;
2+
3+
import org.apache.dolphinscheduler.api.utils.Result;
4+
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
5+
import org.apache.dolphinscheduler.dao.entity.User;
6+
7+
import java.util.Map;
8+
9+
/**
10+
* trigger definition service
11+
*/
12+
public interface TriggerDefinitionService {
13+
14+
Map<String, Object> createTriggerDefinition(User loginUser,
15+
long projectCode,
16+
String triggerDefinitionJson);
17+
18+
Result queryTriggerDefinitionListPaging(User loginUser,
19+
long projectCode,
20+
String searchTriggerName,
21+
String triggerType,
22+
Integer pageNo,
23+
Integer pageSize);
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.service.impl;
19+
20+
import com.baomidou.mybatisplus.core.metadata.IPage;
21+
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
22+
import com.google.common.collect.Lists;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.commons.collections4.CollectionUtils;
25+
import org.apache.commons.collections4.MapUtils;
26+
import org.apache.commons.lang3.StringUtils;
27+
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
28+
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
29+
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
30+
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest;
31+
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
32+
import org.apache.dolphinscheduler.api.enums.Status;
33+
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
34+
import org.apache.dolphinscheduler.api.permission.PermissionCheck;
35+
import org.apache.dolphinscheduler.api.service.*;
36+
import org.apache.dolphinscheduler.api.utils.PageInfo;
37+
import org.apache.dolphinscheduler.api.utils.Result;
38+
import org.apache.dolphinscheduler.api.vo.TaskDefinitionVO;
39+
import org.apache.dolphinscheduler.common.constants.Constants;
40+
import org.apache.dolphinscheduler.common.enums.*;
41+
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
42+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
43+
import org.apache.dolphinscheduler.dao.entity.*;
44+
import org.apache.dolphinscheduler.dao.mapper.*;
45+
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
46+
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
47+
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
48+
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
49+
import org.apache.dolphinscheduler.service.process.ProcessService;
50+
import org.springframework.beans.factory.annotation.Autowired;
51+
import org.springframework.stereotype.Service;
52+
import org.springframework.transaction.annotation.Transactional;
53+
54+
import java.util.*;
55+
import java.util.function.Function;
56+
import java.util.stream.Collectors;
57+
58+
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*;
59+
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION;
60+
61+
/**
62+
* tenant service impl
63+
*/
64+
@Service
65+
@Slf4j
66+
public class TriggerDefinitionServiceImpl extends BaseServiceImpl implements TriggerDefinitionService {
67+
68+
@Autowired
69+
private ProjectMapper projectMapper;
70+
71+
@Autowired
72+
private ProjectService projectService;
73+
74+
@Autowired
75+
private TaskDefinitionMapper taskDefinitionMapper;
76+
77+
@Autowired
78+
private TriggerDefinitionMapper triggerDefinitionMapper;
79+
80+
@Autowired
81+
private TaskDefinitionDao taskDefinitionDao;
82+
83+
@Autowired
84+
private TaskDefinitionLogMapper taskDefinitionLogMapper;
85+
86+
@Autowired
87+
private ProcessTaskRelationMapper processTaskRelationMapper;
88+
89+
@Autowired
90+
private ProcessTaskRelationLogDao processTaskRelationLogDao;
91+
92+
@Autowired
93+
private ProcessTaskRelationService processTaskRelationService;
94+
95+
@Autowired
96+
private ProcessDefinitionMapper processDefinitionMapper;
97+
98+
@Autowired
99+
private ProcessService processService;
100+
101+
@Autowired
102+
private ProcessDefinitionService processDefinitionService;
103+
104+
@Autowired
105+
private ProcessDefinitionLogMapper processDefinitionLogMapper;
106+
107+
108+
@Transactional
109+
@Override
110+
public Map<String, Object> createTriggerDefinition(User loginUser,
111+
long projectCode,
112+
String triggerDefinitionJson) {
113+
Project project = projectMapper.queryByCode(projectCode);
114+
// check if user have write perm for project
115+
Map<String, Object> result = new HashMap<>();
116+
return result;
117+
}
118+
119+
@Override
120+
public Result queryTriggerDefinitionListPaging(User loginUser,
121+
long projectCode,
122+
String searchTriggerName,
123+
String triggerType,
124+
Integer pageNo,
125+
Integer pageSize) {
126+
Result result = new Result();
127+
Project project = projectMapper.queryByCode(projectCode);
128+
// check user access for project
129+
Map<String, Object> checkResult =
130+
projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION);
131+
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
132+
if (resultStatus != Status.SUCCESS) {
133+
putMsg(result, resultStatus);
134+
return result;
135+
}
136+
triggerType = triggerType == null ? StringUtils.EMPTY : triggerType;
137+
Page<TriggerMainInfo> page = new Page<>(pageNo, pageSize);
138+
// first, query trigger code by page size
139+
IPage<TriggerMainInfo> taskMainInfoIPage = triggerDefinitionMapper.queryDefinitionListPaging(page, projectCode,
140+
searchTriggerName, triggerType);
141+
PageInfo<TaskMainInfo> pageInfo = new PageInfo<>(pageNo, pageSize);
142+
pageInfo.setTotal((int) taskMainInfoIPage.getTotal());
143+
// pageInfo.setTotalList(taskMainInfoIPage.getRecords());
144+
result.setData(pageInfo);
145+
putMsg(result, Status.SUCCESS);
146+
return result;
147+
}
148+
}
149+

0 commit comments

Comments
 (0)