Skip to content

[ISSUE #5079] Enhancement update for admin-server #5080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions eventmesh-admin-server/bin/start-admin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,34 +56,34 @@ function extract_java_version {
#}

function get_pid {
local ppid=""
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
# If the process does not exist, it indicates that the previous process terminated abnormally.
local ppid=""
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
# If the process does not exist, it indicates that the previous process terminated abnormally.
if [ ! -d /proc/$ppid ]; then
# Remove the residual file.
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
ppid=""
fi
else
if [[ $OS =~ Msys ]]; then
# There is a Bug on Msys that may not be able to kill the identified process
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
elif [[ $OS =~ Darwin ]]; then
# Known problem: grep Java may not be able to accurately identify Java processes
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
else
if [ $DOCKER ]; then
# No need to exclude root user in Docker containers.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
else
else
if [[ $OS =~ Msys ]]; then
# There is a Bug on Msys that may not be able to kill the identified process
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
elif [[ $OS =~ Darwin ]]; then
# Known problem: grep Java may not be able to accurately identify Java processes
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
else
if [ $DOCKER ]; then
# No need to exclude root user in Docker containers.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
else
# It is required to identify the process as accurately as possible on Linux.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
fi
fi
fi
echo "$ppid";
fi
fi
echo "$ppid";
}

#===========================================================================================
Expand Down Expand Up @@ -136,8 +136,7 @@ export JAVA_HOME

GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"

#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
JAVA_OPT="${JAVA_OPT} -verbose:gc"
if [[ "$JAVA_VERSION" == "8" ]]; then
Expand Down Expand Up @@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
# echo "proxy is running already"
# exit 9;
# else
# echo "err pid$pid, rm pid.file"
# echo "err pid$pid, rm pid.file"
# rm pid.file
# fi
#fi
Expand All @@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then
exit 9
fi
if [ -n "$pid" ]; then
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
exit 9
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
exit 9
fi

make_logs_dir
Expand All @@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_H

EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
if [ $DOCKER ]; then
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
else
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
fi
exit 0
2 changes: 2 additions & 0 deletions eventmesh-admin-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ dependencies {
implementation "com.alibaba:druid-spring-boot-starter"
compileOnly 'com.mysql:mysql-connector-j'
compileOnly 'org.projectlombok:lombok'
testImplementation 'junit:junit:4.12'
testImplementation 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}

Expand Down
8 changes: 4 additions & 4 deletions eventmesh-admin-server/conf/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ event-mesh:
# grpc server port
port: 8081
adminServerList:
region1:
R1:
- http://localhost:8082
region2:
- http://localhost:8083
region: region1
R2:
- http://localhost:8082
region: R1
2 changes: 1 addition & 1 deletion eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
KEY `jobID` (`jobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

Expand Down Expand Up @@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL,
Expand Down
5 changes: 3 additions & 2 deletions eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
<result property="recordID" column="recordID" jdbcType="VARCHAR"/>
<result property="recordSig" column="recordSig" jdbcType="VARCHAR"/>
<result property="connectorName" column="connectorName" jdbcType="VARCHAR"/>
Expand All @@ -35,8 +36,8 @@
</resultMap>

<sql id="Base_Column_List">
id,taskID,recordID,
recordSig,connectorName,connectorStage,
id,taskID,jobID,recordID,
recordSig,connectorName,connectorStage,
position,createTime
</sql>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.eventmesh.admin.server.web;

import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
import org.apache.eventmesh.common.utils.JsonUtils;

Expand All @@ -29,19 +31,40 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import lombok.extern.slf4j.Slf4j;

@RestController
@RequestMapping("/eventmesh/admin")
@Slf4j
public class HttpServer {

@Autowired
private TaskBizService taskService;

@Autowired
private VerifyBizService verifyService;

@RequestMapping(value = "/createTask", method = RequestMethod.POST)
public ResponseEntity<Object> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
log.info("receive http proto create task:{}", task);
CreateTaskResponse createTaskResponse = taskService.createTask(task);
log.info("receive http proto create task result:{}", createTaskResponse);
return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
}


@RequestMapping(value = "/reportVerify", method = RequestMethod.POST)
public ResponseEntity<Object> reportVerify(@RequestBody ReportVerifyRequest request) {
log.info("receive http proto report verify request:{}", request);
boolean result = verifyService.reportVerifyRecord(request);
log.info("receive http proto report verify result:{}", result);
if (result) {
return ResponseEntity.ok("report verify success.request:" + JsonUtils.toJSONString(request));
} else {
return ResponseEntity.internalServerError().body("report verify success.request:" + JsonUtils.toJSONString(request));
}
}

public boolean deleteTask(Long id) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.eventmesh.common.EventMeshThreadFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -39,22 +40,43 @@ public class DBThreadPool {
new LinkedBlockingQueue<>(1000), new EventMeshThreadFactory("admin-server-db"),
new ThreadPoolExecutor.DiscardOldestPolicy());


private final ScheduledThreadPoolExecutor checkScheduledExecutor =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new EventMeshThreadFactory("admin-server-check-scheduled"),
new ThreadPoolExecutor.DiscardOldestPolicy());

@PreDestroy
private void destroy() {
if (!executor.isShutdown()) {
try {
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
log.info("wait heart beat handler thread pool shutdown timeout, it will shutdown immediately");
log.info("wait handler thread pool shutdown timeout, it will shutdown immediately");
executor.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("wait heart beat handler thread pool shutdown fail");
log.warn("wait handler thread pool shutdown fail");
}
}

if (!checkScheduledExecutor.isShutdown()) {
try {
checkScheduledExecutor.shutdown();
if (!checkScheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
log.info("wait scheduled thread pool shutdown timeout, it will shutdown immediately");
checkScheduledExecutor.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("wait scheduled thread pool shutdown fail");
}
}
}

public ThreadPoolExecutor getExecutors() {
return executor;
}

public ScheduledThreadPoolExecutor getCheckExecutor() {
return checkScheduledExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
@TableName(value = "event_mesh_verify")
@Data
public class EventMeshVerify implements Serializable {

@TableId(type = IdType.AUTO)
private Integer id;

private String taskID;

private String jobID;

private String recordID;

private String recordSig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
config.setSinkConnectorDesc(detail.getSinkConnectorDesc());
response.setConnectorConfig(config);
response.setTransportType(detail.getTransportType());
response.setState(detail.getState());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.admin.server.web.handler.impl;

import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.request.ReportJobRequest;
import org.apache.eventmesh.common.remote.response.SimpleResponse;

import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class ReportJobRequestHandler extends BaseRequestHandler<ReportJobRequest, SimpleResponse> {

@Autowired
JobInfoBizService jobInfoBizService;

@Override
public SimpleResponse handler(ReportJobRequest request, Metadata metadata) {
log.info("receive report job request:{}", request);
if (StringUtils.isBlank(request.getJobID())) {
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");
}
EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(request.getJobID());
if (jobInfo == null) {
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, not exist target job,jobID:" + request.getJobID());
}
boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(), request.getState());
if (result) {
return SimpleResponse.success();
} else {
return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job failed.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@Component
@Slf4j
public class ReportPositionHandler extends BaseRequestHandler<ReportPositionRequest, SimpleResponse> {

@Autowired
private JobInfoBizService jobInfoBizService;

Expand All @@ -48,6 +49,7 @@ public class ReportPositionHandler extends BaseRequestHandler<ReportPositionRequ

@Override
protected SimpleResponse handler(ReportPositionRequest request, Metadata metadata) {
log.info("receive report position request:{}", request);
if (StringUtils.isBlank(request.getJobID())) {
log.info("request [{}] illegal job id", request);
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");
Expand Down
Loading
Loading