Skip to content

Commit ea9e7a1

Browse files
authored
[ISSUE apache#5067] Enhancement for eventmesh-admin-server (apache#5068)
* [ISSUE apache#5040] Support gtid mode for sync data with mysql * fix conflicts with master * fix checkstyle error * [ISSUE apache#5044] Data synchronization strong verification in mariadb gtid mode * fix checkstyle error * [ISSUE apache#5048] Add report verify request to admin for connector runtime * fix checkstyle error * [ISSUE apache#5052] Enhancement for source\sink connector * fix checkstyle error * fix checkstyle error * [ISSUE apache#5067] Enhancement for eventmesh-admin-server
1 parent 1f6623e commit ea9e7a1

File tree

22 files changed

+272
-193
lines changed

22 files changed

+272
-193
lines changed

eventmesh-admin-server/conf/application.yaml

+8-2
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,11 @@ mybatis-plus:
2828
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
2929
event-mesh:
3030
admin-server:
31-
service-name: DEFAULT_GROUP@@em_adm_server
32-
port: 8081
31+
serviceName: DEFAULT_GROUP@@em_adm_server
32+
port: 8081
33+
adminServerList:
34+
region1:
35+
- http://localhost:8081
36+
region2:
37+
- http://localhost:8082
38+
region: region1

eventmesh-admin-server/conf/eventmesh.sql

+8-6
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
4545
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
4646
`id` int unsigned NOT NULL AUTO_INCREMENT,
4747
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
48-
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
48+
`jobDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
4949
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
5050
`transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
5151
`sourceData` int NOT NULL DEFAULT '0',
5252
`targetData` int NOT NULL DEFAULT '0',
53-
`state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
53+
`jobState` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
5454
`jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
5555
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
56+
`runningRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
5657
`createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
5758
`updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
5859
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -118,10 +119,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
118119
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
119120
`id` int unsigned NOT NULL AUTO_INCREMENT,
120121
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
121-
`name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
122-
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
123-
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
124-
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
122+
`taskName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
123+
`taskDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
124+
`taskState` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
125+
`sourceRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
126+
`targetRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
125127
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
126128
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
127129
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml

+21-19
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,33 @@
1919
-->
2020

2121
<!DOCTYPE mapper
22-
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
23-
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
22+
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
23+
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
2424
<mapper namespace="org.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper">
2525

2626
<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo">
27-
<id property="id" column="id" jdbcType="INTEGER"/>
28-
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
29-
<result property="desc" column="desc" jdbcType="VARCHAR"/>
30-
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
31-
<result property="transportType" column="transportType" jdbcType="VARCHAR"/>
32-
<result property="sourceData" column="sourceData" jdbcType="INTEGER"/>
33-
<result property="targetData" column="targetData" jdbcType="INTEGER"/>
34-
<result property="state" column="state" jdbcType="VARCHAR"/>
35-
<result property="jobType" column="jobType" jdbcType="VARCHAR"/>
36-
<result property="fromRegion" column="fromRegion" jdbcType="VARCHAR"/>
37-
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
38-
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
39-
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
40-
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
27+
<id property="id" column="id" jdbcType="INTEGER"/>
28+
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
29+
<result property="jobDesc" column="desc" jdbcType="VARCHAR"/>
30+
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
31+
<result property="transportType" column="transportType" jdbcType="VARCHAR"/>
32+
<result property="sourceData" column="sourceData" jdbcType="INTEGER"/>
33+
<result property="targetData" column="targetData" jdbcType="INTEGER"/>
34+
<result property="jobState" column="state" jdbcType="VARCHAR"/>
35+
<result property="jobType" column="jobType" jdbcType="VARCHAR"/>
36+
<result property="fromRegion" column="sourceRegion" jdbcType="VARCHAR"/>
37+
<result property="runningRegion" column="targetRegion" jdbcType="VARCHAR"/>
38+
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
39+
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
40+
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
41+
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
4142
</resultMap>
4243

4344
<sql id="Base_Column_List">
44-
id,jobID,desc,
45+
id,jobID,jobDesc,
4546
taskID,transportType,sourceData,
46-
targetData,state,jobType,
47-
fromRegion,createTime,updateTime
47+
targetData,jobState,jobType,
48+
fromRegion,runningRegion,createUid,
49+
updateUid,createTime,updateTime
4850
</sql>
4951
</mapper>

eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml

+7-6
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,20 @@
2626
<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo">
2727
<id property="id" column="id" jdbcType="INTEGER"/>
2828
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
29-
<result property="name" column="name" jdbcType="VARCHAR"/>
30-
<result property="desc" column="desc" jdbcType="VARCHAR"/>
31-
<result property="state" column="state" jdbcType="VARCHAR"/>
32-
<result property="fromRegion" column="fromRegion" jdbcType="VARCHAR"/>
29+
<result property="taskName" column="taskName" jdbcType="VARCHAR"/>
30+
<result property="taskDesc" column="taskDesc" jdbcType="VARCHAR"/>
31+
<result property="taskState" column="taskState" jdbcType="VARCHAR"/>
32+
<result property="sourceRegion" column="sourceRegion" jdbcType="VARCHAR"/>
33+
<result property="targetRegion" column="targetRegion" jdbcType="VARCHAR"/>
3334
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
3435
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
3536
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
3637
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
3738
</resultMap>
3839

3940
<sql id="Base_Column_List">
40-
id,taskID,name,
41-
desc,state,fromRegion,
41+
id,taskID,taskName,
42+
taskDesc,taskState,sourceRegion,targetRegion,
4243
createUid,updateUid,createTime,
4344
updateTime
4445
</sql>

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java

+5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.eventmesh.admin.server;
1919

20+
import java.util.List;
21+
import java.util.Map;
22+
2023
import org.springframework.boot.context.properties.ConfigurationProperties;
2124

2225
import lombok.Getter;
@@ -32,4 +35,6 @@ public class AdminServerProperties {
3235
private String configurationPath;
3336
private String configurationFile;
3437
private String serviceName;
38+
private Map<String, List<String>> adminServerList;
39+
private String region;
3540
}

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,21 @@
2424
import org.springframework.http.ResponseEntity;
2525
import org.springframework.web.bind.annotation.RequestBody;
2626
import org.springframework.web.bind.annotation.RequestMapping;
27+
import org.springframework.web.bind.annotation.RequestMethod;
2728
import org.springframework.web.bind.annotation.RestController;
2829

30+
import com.alibaba.druid.support.json.JSONUtils;
31+
2932
@RestController
3033
@RequestMapping("/eventmesh/admin")
3134
public class HttpServer {
3235
@Autowired
3336
private TaskBizService taskService;
3437

35-
@RequestMapping("/createTask")
36-
public ResponseEntity<Response<String>> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
38+
@RequestMapping(value = "/createTask", method = RequestMethod.POST)
39+
public ResponseEntity<Object> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
3740
String uuid = taskService.createTask(task);
38-
return ResponseEntity.ok(Response.success(uuid));
41+
return ResponseEntity.ok(JSONUtils.toJSONString(Response.success(uuid)));
3942
}
4043

4144
public boolean deleteTask(Long id) {

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class EventMeshJobInfo implements Serializable {
3737

3838
private String jobID;
3939

40-
private String desc;
40+
private String jobDesc;
4141

4242
private String taskID;
4343

@@ -47,12 +47,16 @@ public class EventMeshJobInfo implements Serializable {
4747

4848
private Integer targetData;
4949

50-
private String state;
50+
private String jobState;
5151

5252
private String jobType;
5353

54+
// job request from region
5455
private String fromRegion;
5556

57+
// job actually running region
58+
private String runningRegion;
59+
5660
private String createUid;
5761

5862
private String updateUid;

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@ public class EventMeshTaskInfo implements Serializable {
3737

3838
private String taskID;
3939

40-
private String name;
40+
private String taskName;
4141

42-
private String desc;
42+
private String taskDesc;
4343

44-
private String state;
44+
private String taskState;
4545

46-
private String fromRegion;
46+
private String sourceRegion;
47+
48+
private String targetRegion;
4749

4850
private String createUid;
4951

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,31 @@
2121

2222
import org.apache.ibatis.annotations.Insert;
2323
import org.apache.ibatis.annotations.Mapper;
24-
import org.apache.ibatis.annotations.Options;
2524
import org.apache.ibatis.annotations.Param;
2625

2726
import java.util.List;
2827

28+
import org.springframework.transaction.annotation.Transactional;
29+
2930
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
3031

3132
/**
3233
* etx operator for table event_mesh_job_info
3334
*/
3435
@Mapper
3536
public interface EventMeshJobInfoExtMapper extends BaseMapper<EventMeshJobInfo> {
36-
@Insert("insert into event_mesh_job_info(`taskID`,`state`,`jobType`) values"
37-
+ "<foreach collection= 'jobs' item='job' separator=','>(#{job.taskID},#{job.state},#{job.jobType})</foreach>")
38-
@Options(useGeneratedKeys = true, keyProperty = "jobID")
37+
38+
@Insert("<script>"
39+
+ "insert into event_mesh_job_info(jobID, jobDesc, taskID, transportType, sourceData, "
40+
+ "targetData, jobState, jobType, fromRegion, runningRegion, "
41+
+ "createUid, updateUid) values"
42+
+ "<foreach collection= 'jobs' item='job' separator=','>"
43+
+ "(#{job.jobID}, #{job.jobDesc}, #{job.taskID}, #{job.transportType}, "
44+
+ "#{job.sourceData}, #{job.targetData}, #{job.jobState}, #{job.jobType}, "
45+
+ "#{job.fromRegion}, #{job.runningRegion}, #{job.createUid}, #{job.updateUid})"
46+
+ "</foreach>"
47+
+ "</script>")
48+
@Transactional(rollbackFor = Exception.class)
3949
int saveBatch(@Param("jobs") List<EventMeshJobInfo> jobInfoList);
4050
}
4151

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.admin.server.web.db.service.impl;
19+
20+
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify;
21+
import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshVerifyMapper;
22+
import org.apache.eventmesh.admin.server.web.db.service.EventMeshVerifyService;
23+
24+
import org.springframework.stereotype.Service;
25+
26+
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
27+
28+
/**
29+
* event_mesh_verify
30+
*/
31+
@Service
32+
public class EventMeshVerifyServiceImpl extends ServiceImpl<EventMeshVerifyMapper, EventMeshVerify>
33+
implements EventMeshVerifyService {
34+
35+
}
36+
37+
38+
39+

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class JobDetail {
3434

3535
private String jobID;
3636

37-
private String desc;
37+
private String jobDesc;
3838

3939
private String taskID;
4040

@@ -50,7 +50,11 @@ public class JobDetail {
5050

5151
private String updateUid;
5252

53-
private String region;
53+
// job request from region
54+
private String fromRegion;
55+
56+
// job actually running region
57+
private String runningRegion;
5458

5559
private DataSource sourceDataSource;
5660

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java

+20-10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.eventmesh.admin.server.web.service.job;
1919

20+
import org.apache.eventmesh.admin.server.AdminServerProperties;
2021
import org.apache.eventmesh.admin.server.AdminServerRuntimeException;
2122
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
2223
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
@@ -70,12 +71,15 @@ public class JobInfoBizService {
7071
@Autowired
7172
private PositionBizService positionBizService;
7273

74+
@Autowired
75+
private AdminServerProperties properties;
76+
7377
public boolean updateJobState(String jobID, TaskState state) {
7478
if (jobID == null || state == null) {
7579
return false;
7680
}
7781
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
78-
jobInfo.setState(state.name());
82+
jobInfo.setJobState(state.name());
7983
return jobInfoService.update(jobInfo, Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("state", TaskState.DELETE.name()));
8084
}
8185

@@ -86,34 +90,40 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
8690
return null;
8791
}
8892
List<EventMeshJobInfo> entityList = new LinkedList<>();
93+
8994
for (JobDetail job : jobs) {
95+
// if running region not equal with admin region continue
96+
if (!job.getRunningRegion().equals(properties.getRegion())) {
97+
continue;
98+
}
9099
EventMeshJobInfo entity = new EventMeshJobInfo();
91-
entity.setState(TaskState.INIT.name());
100+
entity.setJobState(TaskState.INIT.name());
92101
entity.setTaskID(job.getTaskID());
93102
entity.setJobType(job.getJobType().name());
94-
entity.setDesc(job.getDesc());
103+
entity.setJobDesc(job.getJobDesc());
95104
String jobID = UUID.randomUUID().toString();
96105
entity.setJobID(jobID);
97106
entity.setTransportType(job.getTransportType().name());
98107
entity.setCreateUid(job.getCreateUid());
99108
entity.setUpdateUid(job.getUpdateUid());
100-
entity.setFromRegion(job.getRegion());
109+
entity.setFromRegion(job.getFromRegion());
110+
entity.setRunningRegion(job.getRunningRegion());
101111
CreateOrUpdateDataSourceReq source = new CreateOrUpdateDataSourceReq();
102112
source.setType(job.getTransportType().getSrc());
103113
source.setOperator(job.getCreateUid());
104-
source.setRegion(job.getRegion());
114+
source.setRegion(job.getSourceDataSource().getRegion());
105115
source.setDesc(job.getSourceConnectorDesc());
106-
source.setConfig(job.getSourceDataSource());
116+
source.setConfig(job.getSourceDataSource().getConf());
107117
EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source);
108118
entity.setSourceData(createdSource.getId());
109119

110120
CreateOrUpdateDataSourceReq sink = new CreateOrUpdateDataSourceReq();
111121
sink.setType(job.getTransportType().getDst());
112122
sink.setOperator(job.getCreateUid());
113-
sink.setRegion(job.getRegion());
123+
sink.setRegion(job.getSinkDataSource().getRegion());
114124
sink.setDesc(job.getSinkConnectorDesc());
115-
sink.setConfig(job.getSinkDataSource());
116-
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(source);
125+
sink.setConfig(job.getSinkDataSource().getConf());
126+
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink);
117127
entity.setTargetData(createdSink.getId());
118128

119129
entityList.add(entity);
@@ -167,7 +177,7 @@ public JobDetail getJobDetail(String jobID) {
167177
detail.setSinkConnectorDesc(target.getDescription());
168178
}
169179

170-
TaskState state = TaskState.fromIndex(job.getState());
180+
TaskState state = TaskState.fromIndex(job.getJobState());
171181
if (state == null) {
172182
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal job state in db");
173183
}

0 commit comments

Comments
 (0)