Skip to content

Commit 691aab0

Browse files
authored
[ISSUE #5071] Enhancement for admin server and canal source/sink connector (#5072)
* [ISSUE #5069] Enhancement for http source/sink connector * update http source connector & config * fix checkstyle error * [ISSUE #5071] Enhancement for admin server and canal source/sink connector
1 parent a6018dd commit 691aab0

File tree

22 files changed

+195
-111
lines changed

22 files changed

+195
-111
lines changed

Diff for: eventmesh-admin-server/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ dependencies {
2020
implementation project(":eventmesh-common")
2121
implementation project(":eventmesh-registry:eventmesh-registry-api")
2222
implementation project(":eventmesh-registry:eventmesh-registry-nacos")
23-
implementation project(':eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api')
23+
implementation project(":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api")
2424
implementation "com.alibaba.nacos:nacos-client"
2525
implementation("org.springframework.boot:spring-boot-starter-web") {
2626
exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"

Diff for: eventmesh-admin-server/conf/application.yaml

+6-2
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,17 @@ mybatis-plus:
2626
configuration:
2727
map-underscore-to-camel-case: false
2828
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
29+
# http server port
30+
server:
31+
port: 8082
2932
event-mesh:
3033
admin-server:
3134
serviceName: DEFAULT_GROUP@@em_adm_server
35+
# grpc server port
3236
port: 8081
3337
adminServerList:
3438
region1:
35-
- http://localhost:8081
36-
region2:
3739
- http://localhost:8082
40+
region2:
41+
- http://localhost:8083
3842
region: region1

Diff for: eventmesh-admin-server/conf/eventmesh.sql

+4-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
3333
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
3434
`description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
3535
`configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
36+
`configurationClass` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
3637
`region` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
3738
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
3839
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -134,13 +135,13 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
134135

135136
-- export table eventmesh.event_mesh_verify structure
136137
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
137-
`id` int NOT NULL,
138+
`id` int unsigned NOT NULL AUTO_INCREMENT,
138139
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
139140
`recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
140141
`recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
141-
`connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
142+
`connectorName` varchar(200) COLLATE utf8mb4_general_ci DEFAULT NULL,
142143
`connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
143-
`position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
144+
`position` text COLLATE utf8mb4_general_ci DEFAULT NULL,
144145
`createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
145146
PRIMARY KEY (`id`)
146147
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

Diff for: eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
<result property="dataType" column="dataType" jdbcType="VARCHAR"/>
2929
<result property="description" column="description" jdbcType="VARCHAR"/>
3030
<result property="configuration" column="configuration" jdbcType="VARCHAR"/>
31+
<result property="configurationClass" column="configurationClass" jdbcType="VARCHAR"/>
3132
<result property="region" column="region" jdbcType="VARCHAR"/>
3233
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
3334
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
@@ -37,7 +38,7 @@
3738

3839
<sql id="Base_Column_List">
3940
id,dataType,description,
40-
configuration,region,createUid,updateUid,
41-
createTime,updateTime
41+
configuration,configurationClass,region,
42+
createUid,updateUid,createTime,updateTime
4243
</sql>
4344
</mapper>

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class EventMeshDataSource implements Serializable {
4141

4242
private String configuration;
4343

44+
private String configurationClass;
45+
4446
private String region;
4547

4648
private String createUid;

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
5353
}
5454
response.setId(detail.getJobID());
5555
JobConnectorConfig config = new JobConnectorConfig();
56-
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource()));
56+
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
5757
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
58-
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource()));
58+
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
5959
config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
6060
response.setConnectorConfig(config);
6161
response.setTransportType(detail.getTransportType());

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java

+2
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929

3030
@Service
3131
public class DataSourceBizService {
32+
3233
@Autowired
3334
private EventMeshDataSourceService dataSourceService;
3435

3536
public EventMeshDataSource createDataSource(CreateOrUpdateDataSourceReq dataSource) {
3637
EventMeshDataSource entity = new EventMeshDataSource();
3738
entity.setConfiguration(JsonUtils.toJSONString(dataSource.getConfig()));
39+
entity.setConfigurationClass(dataSource.getConfigClass());
3840
entity.setDataType(dataSource.getType().name());
3941
entity.setCreateUid(dataSource.getOperator());
4042
entity.setUpdateUid(dataSource.getOperator());

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
2828
import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
2929
import org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
30+
import org.apache.eventmesh.common.config.connector.Config;
3031
import org.apache.eventmesh.common.remote.TaskState;
3132
import org.apache.eventmesh.common.remote.TransportType;
3233
import org.apache.eventmesh.common.remote.datasource.DataSource;
@@ -114,6 +115,7 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
114115
source.setRegion(job.getSourceDataSource().getRegion());
115116
source.setDesc(job.getSourceConnectorDesc());
116117
source.setConfig(job.getSourceDataSource().getConf());
118+
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
117119
EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source);
118120
entity.setSourceData(createdSource.getId());
119121

@@ -123,6 +125,7 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
123125
sink.setRegion(job.getSinkDataSource().getRegion());
124126
sink.setDesc(job.getSinkConnectorDesc());
125127
sink.setConfig(job.getSinkDataSource().getConf());
128+
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
126129
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink);
127130
entity.setTargetData(createdSink.getId());
128131

@@ -141,18 +144,22 @@ public JobDetail getJobDetail(String jobID) {
141144
if (jobID == null) {
142145
return null;
143146
}
144-
EventMeshJobInfo job = jobInfoService.getById(jobID);
147+
EventMeshJobInfo job = jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID));
145148
if (job == null) {
146149
return null;
147150
}
148151
JobDetail detail = new JobDetail();
152+
detail.setTaskID(job.getTaskID());
149153
detail.setJobID(job.getJobID());
150154
EventMeshDataSource source = dataSourceService.getById(job.getSourceData());
151155
EventMeshDataSource target = dataSourceService.getById(job.getTargetData());
152156
if (source != null) {
153157
if (!StringUtils.isBlank(source.getConfiguration())) {
154158
try {
155-
detail.setSourceDataSource(JsonUtils.parseObject(source.getConfiguration(), DataSource.class));
159+
DataSource sourceDataSource = new DataSource();
160+
Class<?> configClass = Class.forName(source.getConfigurationClass());
161+
sourceDataSource.setConf((Config) JsonUtils.parseObject(source.getConfiguration(), configClass));
162+
detail.setSourceDataSource(sourceDataSource);
156163
} catch (Exception e) {
157164
log.warn("parse source config id [{}] fail", job.getSourceData(), e);
158165
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source config");
@@ -168,7 +175,10 @@ public JobDetail getJobDetail(String jobID) {
168175
if (target != null) {
169176
if (!StringUtils.isBlank(target.getConfiguration())) {
170177
try {
171-
detail.setSinkDataSource(JsonUtils.parseObject(target.getConfiguration(), DataSource.class));
178+
DataSource sinkDataSource = new DataSource();
179+
Class<?> configClass = Class.forName(target.getConfigurationClass());
180+
sinkDataSource.setConf((Config) JsonUtils.parseObject(target.getConfiguration(), configClass));
181+
detail.setSinkDataSource(sinkDataSource);
172182
} catch (Exception e) {
173183
log.warn("parse sink config id [{}] fail", job.getSourceData(), e);
174184
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink config");

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java

+31-4
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@
2222
import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
2323
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
2424
import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
25+
import org.apache.eventmesh.common.config.connector.Config;
2526
import org.apache.eventmesh.common.remote.TaskState;
27+
import org.apache.eventmesh.common.remote.datasource.DataSource;
28+
import org.apache.eventmesh.common.remote.datasource.DataSourceType;
2629
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
30+
import org.apache.eventmesh.common.utils.JsonUtils;
2731

2832
import org.apache.commons.lang3.StringUtils;
2933

3034
import java.util.List;
35+
import java.util.Map;
3136
import java.util.Random;
3237
import java.util.UUID;
3338
import java.util.stream.Collectors;
@@ -40,6 +45,7 @@
4045

4146
@Service
4247
public class TaskBizService {
48+
4349
@Autowired
4450
private EventMeshTaskInfoService taskInfoService;
4551

@@ -76,7 +82,12 @@ public String createTask(CreateTaskRequest req) {
7682

7783
String finalTaskID = taskID;
7884
List<JobDetail> jobs = req.getJobs().stream().map(x -> {
79-
JobDetail job = parse(x);
85+
JobDetail job = null;
86+
try {
87+
job = parse(x);
88+
} catch (ClassNotFoundException e) {
89+
throw new RuntimeException(e);
90+
}
8091
job.setTaskID(finalTaskID);
8192
job.setCreateUid(req.getUid());
8293
job.setUpdateUid(req.getUid());
@@ -95,14 +106,30 @@ public String createTask(CreateTaskRequest req) {
95106
return finalTaskID;
96107
}
97108

98-
private JobDetail parse(CreateTaskRequest.JobDetail src) {
109+
private JobDetail parse(CreateTaskRequest.JobDetail src) throws ClassNotFoundException {
99110
JobDetail dst = new JobDetail();
100111
dst.setJobDesc(src.getJobDesc());
101112
dst.setTransportType(src.getTransportType());
102113
dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
103-
dst.setSourceDataSource(src.getSourceDataSource());
114+
Map<String, Object> sourceDataMap = src.getSourceDataSource();
115+
DataSource sourceDataSource = new DataSource();
116+
sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString()));
117+
sourceDataSource.setDesc((String) sourceDataMap.get("desc"));
118+
sourceDataSource.setConfClazz((Class<? extends Config>) Class.forName(sourceDataMap.get("confClazz").toString()));
119+
sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")), sourceDataSource.getConfClazz()));
120+
sourceDataSource.setRegion((String) sourceDataMap.get("region"));
121+
dst.setSourceDataSource(sourceDataSource);
122+
104123
dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
105-
dst.setSinkDataSource(src.getSinkDataSource());
124+
Map<String, Object> sinkDataMap = src.getSinkDataSource();
125+
DataSource sinkDataSource = new DataSource();
126+
sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString()));
127+
sinkDataSource.setDesc((String) sinkDataMap.get("desc"));
128+
sinkDataSource.setConfClazz((Class<? extends Config>) Class.forName(sinkDataMap.get("confClazz").toString()));
129+
sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")), sinkDataSource.getConfClazz()));
130+
sinkDataSource.setRegion((String) sinkDataMap.get("region"));
131+
dst.setSinkDataSource(sinkDataSource);
132+
106133
// full/increase/check
107134
dst.setJobType(src.getJobType());
108135
dst.setFromRegion(src.getFromRegion());

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java

+9
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,13 @@ public static DataSourceType getDataSourceType(Integer index) {
6161
}
6262
return TYPES[index];
6363
}
64+
65+
public static DataSourceType fromString(String type) {
66+
for (DataSourceType dataSourceType : DataSourceType.values()) {
67+
if (dataSourceType.name().equalsIgnoreCase(type)) {
68+
return dataSourceType;
69+
}
70+
}
71+
throw new IllegalArgumentException("No enum constant for type: " + type);
72+
}
6473
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest {
3434
private DataSourceType type;
3535
private String desc;
3636
private Config config;
37+
private String configClass;
3738
private String region;
3839
private String operator;
3940
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package org.apache.eventmesh.common.remote.request;
1919

2020
import org.apache.eventmesh.common.remote.TransportType;
21-
import org.apache.eventmesh.common.remote.datasource.DataSource;
2221
import org.apache.eventmesh.common.remote.job.JobType;
2322

2423
import java.util.List;
24+
import java.util.Map;
2525

2626
import lombok.Data;
2727

@@ -61,11 +61,11 @@ public static class JobDetail {
6161
// full/increase/check
6262
private JobType jobType;
6363

64-
private DataSource sourceDataSource;
64+
private Map<String, Object> sourceDataSource;
6565

6666
private String sourceConnectorDesc;
6767

68-
private DataSource sinkDataSource;
68+
private Map<String, Object> sinkDataSource;
6969

7070
private String sinkConnectorDesc;
7171

Diff for: eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
org.apache.eventmesh.common.remote.request.FetchJobRequest
1717
org.apache.eventmesh.common.remote.response.FetchJobResponse
1818
org.apache.eventmesh.common.remote.request.ReportPositionRequest
19+
org.apache.eventmesh.common.remote.request.ReportVerifyRequest
1920
org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
2021
org.apache.eventmesh.common.remote.request.FetchPositionRequest
2122
org.apache.eventmesh.common.remote.response.FetchPositionResponse

Diff for: eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import org.apache.eventmesh.connector.canal.model.EventColumn;
2323
import org.apache.eventmesh.connector.canal.model.EventType;
2424

25+
import java.io.Serializable;
2526
import java.util.ArrayList;
2627
import java.util.List;
2728

2829
import lombok.Data;
2930

3031
@Data
31-
public class CanalConnectRecord {
32+
public class CanalConnectRecord implements Serializable {
33+
34+
private static final long serialVersionUID = 1L;
3235

3336
private String schemaName;
3437

0 commit comments

Comments
 (0)