Skip to content

Commit 9701f02

Browse files
authored
[ISSUE #5079] Enhancement update for admin-server (#5080)
* [ISSUE #5079] Enhancement update for admin-server * fix check style error * fix check style error
1 parent 8cb8df5 commit 9701f02

File tree

21 files changed

+423
-42
lines changed

21 files changed

+423
-42
lines changed

Diff for: eventmesh-admin-server/bin/start-admin.sh

+25-26
Original file line numberDiff line numberDiff line change
@@ -56,34 +56,34 @@ function extract_java_version {
5656
#}
5757

5858
function get_pid {
59-
local ppid=""
60-
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
61-
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
62-
# If the process does not exist, it indicates that the previous process terminated abnormally.
59+
local ppid=""
60+
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
61+
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
62+
# If the process does not exist, it indicates that the previous process terminated abnormally.
6363
if [ ! -d /proc/$ppid ]; then
6464
# Remove the residual file.
6565
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
6666
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
6767
ppid=""
6868
fi
69-
else
70-
if [[ $OS =~ Msys ]]; then
71-
# There is a Bug on Msys that may not be able to kill the identified process
72-
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
73-
elif [[ $OS =~ Darwin ]]; then
74-
# Known problem: grep Java may not be able to accurately identify Java processes
75-
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
76-
else
77-
if [ $DOCKER ]; then
78-
# No need to exclude root user in Docker containers.
79-
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
80-
else
69+
else
70+
if [[ $OS =~ Msys ]]; then
71+
# There is a Bug on Msys that may not be able to kill the identified process
72+
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
73+
elif [[ $OS =~ Darwin ]]; then
74+
# Known problem: grep Java may not be able to accurately identify Java processes
75+
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
76+
else
77+
if [ $DOCKER ]; then
78+
# No need to exclude root user in Docker containers.
79+
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
80+
else
8181
# It is required to identify the process as accurately as possible on Linux.
8282
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
8383
fi
84-
fi
85-
fi
86-
echo "$ppid";
84+
fi
85+
fi
86+
echo "$ppid";
8787
}
8888

8989
#===========================================================================================
@@ -136,8 +136,7 @@ export JAVA_HOME
136136

137137
GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
138138

139-
#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
140-
JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
139+
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
141140
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
142141
JAVA_OPT="${JAVA_OPT} -verbose:gc"
143142
if [[ "$JAVA_VERSION" == "8" ]]; then
@@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
172171
# echo "proxy is running already"
173172
# exit 9;
174173
# else
175-
# echo "err pid$pid, rm pid.file"
174+
# echo "err pid$pid, rm pid.file"
176175
# rm pid.file
177176
# fi
178177
#fi
@@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then
183182
exit 9
184183
fi
185184
if [ -n "$pid" ]; then
186-
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
187-
exit 9
185+
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
186+
exit 9
188187
fi
189188

190189
make_logs_dir
@@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_H
193192

194193
EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
195194
if [ $DOCKER ]; then
196-
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
195+
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
197196
else
198-
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
197+
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
199198
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
200199
fi
201200
exit 0

Diff for: eventmesh-admin-server/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ dependencies {
3838
implementation "com.alibaba:druid-spring-boot-starter"
3939
compileOnly 'com.mysql:mysql-connector-j'
4040
compileOnly 'org.projectlombok:lombok'
41+
testImplementation 'junit:junit:4.12'
42+
testImplementation 'org.projectlombok:lombok'
4143
annotationProcessor 'org.projectlombok:lombok'
4244
}
4345

Diff for: eventmesh-admin-server/conf/application.yaml

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ event-mesh:
3535
# grpc server port
3636
port: 8081
3737
adminServerList:
38-
region1:
38+
R1:
3939
- http://localhost:8082
40-
region2:
41-
- http://localhost:8083
42-
region: region1
40+
R2:
41+
- http://localhost:8082
42+
region: R1

Diff for: eventmesh-admin-server/conf/eventmesh.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
102102
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
103103
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
104104
PRIMARY KEY (`id`),
105-
UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
106105
KEY `jobID` (`jobID`)
107106
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
108107

@@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
137136
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
138137
`id` int unsigned NOT NULL AUTO_INCREMENT,
139138
`taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
139+
`jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
140140
`recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
141141
`recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL,
142142
`connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL,

Diff for: eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify">
2727
<id property="id" column="id" jdbcType="INTEGER"/>
2828
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
29+
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
2930
<result property="recordID" column="recordID" jdbcType="VARCHAR"/>
3031
<result property="recordSig" column="recordSig" jdbcType="VARCHAR"/>
3132
<result property="connectorName" column="connectorName" jdbcType="VARCHAR"/>
@@ -35,8 +36,8 @@
3536
</resultMap>
3637

3738
<sql id="Base_Column_List">
38-
id,taskID,recordID,
39-
recordSig,connectorName,connectorStage,
39+
id,taskID,jobID,recordID,
40+
recordSig,connectorName,connectorStage,
4041
position,createTime
4142
</sql>
4243
</mapper>

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java

+23
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.eventmesh.admin.server.web;
1919

2020
import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
21+
import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
2122
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
23+
import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
2224
import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
2325
import org.apache.eventmesh.common.utils.JsonUtils;
2426

@@ -29,19 +31,40 @@
2931
import org.springframework.web.bind.annotation.RequestMethod;
3032
import org.springframework.web.bind.annotation.RestController;
3133

34+
import lombok.extern.slf4j.Slf4j;
35+
3236
@RestController
3337
@RequestMapping("/eventmesh/admin")
38+
@Slf4j
3439
public class HttpServer {
3540

3641
@Autowired
3742
private TaskBizService taskService;
3843

44+
@Autowired
45+
private VerifyBizService verifyService;
46+
3947
@RequestMapping(value = "/createTask", method = RequestMethod.POST)
4048
public ResponseEntity<Object> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
49+
log.info("receive http proto create task:{}", task);
4150
CreateTaskResponse createTaskResponse = taskService.createTask(task);
51+
log.info("receive http proto create task result:{}", createTaskResponse);
4252
return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
4353
}
4454

55+
56+
@RequestMapping(value = "/reportVerify", method = RequestMethod.POST)
57+
public ResponseEntity<Object> reportVerify(@RequestBody ReportVerifyRequest request) {
58+
log.info("receive http proto report verify request:{}", request);
59+
boolean result = verifyService.reportVerifyRecord(request);
60+
log.info("receive http proto report verify result:{}", result);
61+
if (result) {
62+
return ResponseEntity.ok("report verify success.request:" + JsonUtils.toJSONString(request));
63+
} else {
64+
return ResponseEntity.internalServerError().body("report verify success.request:" + JsonUtils.toJSONString(request));
65+
}
66+
}
67+
4568
public boolean deleteTask(Long id) {
4669
return false;
4770
}

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.eventmesh.common.EventMeshThreadFactory;
2121

2222
import java.util.concurrent.LinkedBlockingQueue;
23+
import java.util.concurrent.ScheduledThreadPoolExecutor;
2324
import java.util.concurrent.ThreadPoolExecutor;
2425
import java.util.concurrent.TimeUnit;
2526

@@ -39,22 +40,43 @@ public class DBThreadPool {
3940
new LinkedBlockingQueue<>(1000), new EventMeshThreadFactory("admin-server-db"),
4041
new ThreadPoolExecutor.DiscardOldestPolicy());
4142

43+
44+
private final ScheduledThreadPoolExecutor checkScheduledExecutor =
45+
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new EventMeshThreadFactory("admin-server-check-scheduled"),
46+
new ThreadPoolExecutor.DiscardOldestPolicy());
47+
4248
@PreDestroy
4349
private void destroy() {
4450
if (!executor.isShutdown()) {
4551
try {
4652
executor.shutdown();
4753
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
48-
log.info("wait heart beat handler thread pool shutdown timeout, it will shutdown immediately");
54+
log.info("wait handler thread pool shutdown timeout, it will shutdown immediately");
4955
executor.shutdownNow();
5056
}
5157
} catch (InterruptedException e) {
52-
log.warn("wait heart beat handler thread pool shutdown fail");
58+
log.warn("wait handler thread pool shutdown fail");
59+
}
60+
}
61+
62+
if (!checkScheduledExecutor.isShutdown()) {
63+
try {
64+
checkScheduledExecutor.shutdown();
65+
if (!checkScheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
66+
log.info("wait scheduled thread pool shutdown timeout, it will shutdown immediately");
67+
checkScheduledExecutor.shutdownNow();
68+
}
69+
} catch (InterruptedException e) {
70+
log.warn("wait scheduled thread pool shutdown fail");
5371
}
5472
}
5573
}
5674

5775
public ThreadPoolExecutor getExecutors() {
5876
return executor;
5977
}
78+
79+
public ScheduledThreadPoolExecutor getCheckExecutor() {
80+
return checkScheduledExecutor;
81+
}
6082
}

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java

+3
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@
3232
@TableName(value = "event_mesh_verify")
3333
@Data
3434
public class EventMeshVerify implements Serializable {
35+
3536
@TableId(type = IdType.AUTO)
3637
private Integer id;
3738

3839
private String taskID;
3940

41+
private String jobID;
42+
4043
private String recordID;
4144

4245
private String recordSig;

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
5656
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
5757
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
5858
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
59-
config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
59+
config.setSinkConnectorDesc(detail.getSinkConnectorDesc());
6060
response.setConnectorConfig(config);
6161
response.setTransportType(detail.getTransportType());
6262
response.setState(detail.getState());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.admin.server.web.handler.impl;
19+
20+
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
21+
import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
22+
import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
23+
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
24+
import org.apache.eventmesh.common.remote.exception.ErrorCode;
25+
import org.apache.eventmesh.common.remote.request.ReportJobRequest;
26+
import org.apache.eventmesh.common.remote.response.SimpleResponse;
27+
28+
import org.apache.commons.lang3.StringUtils;
29+
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.stereotype.Component;
32+
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
@Component
36+
@Slf4j
37+
public class ReportJobRequestHandler extends BaseRequestHandler<ReportJobRequest, SimpleResponse> {
38+
39+
@Autowired
40+
JobInfoBizService jobInfoBizService;
41+
42+
@Override
43+
public SimpleResponse handler(ReportJobRequest request, Metadata metadata) {
44+
log.info("receive report job request:{}", request);
45+
if (StringUtils.isBlank(request.getJobID())) {
46+
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");
47+
}
48+
EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(request.getJobID());
49+
if (jobInfo == null) {
50+
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, not exist target job,jobID:" + request.getJobID());
51+
}
52+
boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(), request.getState());
53+
if (result) {
54+
return SimpleResponse.success();
55+
} else {
56+
return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job failed.");
57+
}
58+
}
59+
}

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
@Component
3838
@Slf4j
3939
public class ReportPositionHandler extends BaseRequestHandler<ReportPositionRequest, SimpleResponse> {
40+
4041
@Autowired
4142
private JobInfoBizService jobInfoBizService;
4243

@@ -48,6 +49,7 @@ public class ReportPositionHandler extends BaseRequestHandler<ReportPositionRequ
4849

4950
@Override
5051
protected SimpleResponse handler(ReportPositionRequest request, Metadata metadata) {
52+
log.info("receive report position request:{}", request);
5153
if (StringUtils.isBlank(request.getJobID())) {
5254
log.info("request [{}] illegal job id", request);
5355
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");

0 commit comments

Comments
 (0)