Skip to content

[ISSUES #4933]Add Admin Module #4934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.apache.eventmesh.admin.server;

import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.PagedList;

import com.apache.eventmesh.admin.server.task.Task;

public interface Admin extends ComponentLifeCycle{
public interface Admin extends ComponentLifeCycle {
/**
* support for web or ops
**/
Expand All @@ -17,8 +17,5 @@ public interface Admin extends ComponentLifeCycle{
/**
* support for task
*/
void reportHeartbeat(HeartBeat heartBeat);



void reportHeartbeat(ReportHeartBeatRequest heartBeat);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,18 +1,50 @@
package com.apache.eventmesh.admin.server;

import com.apache.eventmesh.admin.server.task.Task;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.PagedList;
import org.apache.eventmesh.registry.RegisterServerInfo;
import org.apache.eventmesh.registry.RegistryFactory;
import org.apache.eventmesh.registry.RegistryService;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;

public class AdminServer implements Admin {
import javax.annotation.PostConstruct;

private RegistryService registryService;
@Service
@Slf4j
public class AdminServer implements Admin, ApplicationListener<ApplicationReadyEvent> {

// private EventMeshAdminServerRegisterInfo registerInfo;
private final RegistryService registryService;

public AdminServer(RegistryService registryService) {
this.registryService = registryService;
// this.registerInfo = registerInfo;
private final RegisterServerInfo adminServeInfo;

private final CommonConfiguration configuration;

public AdminServer(AdminServerProperties properties) {
configuration =
ConfigService.getInstance().buildConfigInstance(CommonConfiguration.class);
if (configuration == null) {
throw new AdminServerRuntimeException(ErrorCode.STARTUP_CONFIG_MISS, "common configuration file miss");
}
this.adminServeInfo = new RegisterServerInfo();

adminServeInfo.setHealth(true);
adminServeInfo.setAddress(IPUtils.getLocalAddress() + ":" + properties.getPort());
String name = Constants.ADMIN_SERVER_REGISTRY_NAME;
if (StringUtils.isNotBlank(properties.getServiceName())) {
name = properties.getServiceName();
}
adminServeInfo.setServiceName(name);
registryService = RegistryFactory.getInstance(configuration.getEventMeshRegistryPluginType());
}


Expand All @@ -37,18 +69,35 @@ public PagedList<Task> getTaskPaged(Task task) {
}

@Override
public void reportHeartbeat(HeartBeat heartBeat) {
public void reportHeartbeat(ReportHeartBeatRequest heartBeat) {

}

@Override
@PostConstruct
public void start() {
registryService.register(null);
if (configuration.isEventMeshRegistryPluginEnabled()) {
registryService.init();
}
}

@Override
public void destroy() {
registryService.unRegister(null);
registryService.shutdown();
if (configuration.isEventMeshRegistryPluginEnabled()) {
registryService.unRegister(adminServeInfo);
try {
Thread.sleep(3000);
} catch (InterruptedException ignore) {
}
registryService.shutdown();
}
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
if (configuration.isEventMeshRegistryPluginEnabled()) {
log.info("application is started and registry plugin is enabled, it's will register admin self");
registryService.register(adminServeInfo);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.apache.eventmesh.admin.server;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("event-mesh.admin-server")
@Getter
@Setter
public class AdminServerProperties {
private int port;
private boolean enableSSL;
private String configurationPath;
private String configurationFile;
private String serviceName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.apache.eventmesh.admin.server;

import lombok.Getter;

public class AdminServerRuntimeException extends RuntimeException {
@Getter
private final int code;
public AdminServerRuntimeException(int code, String message) {
super(message);
this.code = code;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.apache.eventmesh.admin.server;

public interface ComponentLifeCycle {
void start();
void start() throws Exception;
void destroy();
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.apache.eventmesh.admin.server;

import com.apache.eventmesh.admin.server.constatns.AdminServerConstants;
import org.apache.eventmesh.common.config.ConfigService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ExampleAdminServer {
public static void main(String[] args) throws Exception {
ConfigService.getInstance().setConfigPath(AdminServerConstants.EVENTMESH_CONF_HOME).setRootConfig(AdminServerConstants.EVENTMESH_CONF_FILE);
SpringApplication.run(ExampleAdminServer.class);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.apache.eventmesh.admin.server.constatns;

public class AdminServerConstants {
public static final String CONF_ENV = "configurationPath";

public static final String EVENTMESH_CONF_HOME = System.getProperty(CONF_ENV, System.getenv(CONF_ENV));

public static final String EVENTMESH_CONF_FILE = "eventmesh-admin.properties";
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.apache.eventmesh.admin.server.web;

import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
import com.apache.eventmesh.admin.server.web.handler.RequestHandlerFactory;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.payload.PayloadUtil;
import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
import org.apache.eventmesh.common.remote.response.FailResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase {
@Autowired
RequestHandlerFactory handlerFactory;

private Payload process(Payload value) {
if (value == null || StringUtils.isBlank(value.getMetadata().getType())) {
return PayloadUtil.from(FailResponse.build(ErrorCode.BAD_REQUEST, "bad request: type not " +
"exists"));
}
try {
BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> handler =
handlerFactory.getHandler(value.getMetadata().getType());
if (handler == null) {
return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN,
"not match any request handler"));
}
BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata());
if (response == null || response instanceof EmptyAckResponse) {
return null;
}
return PayloadUtil.from(response);
} catch (Exception e) {
log.warn("process payload {} fail", value.getMetadata().getType(), e);
if (e instanceof AdminServerRuntimeException) {
return PayloadUtil.from(FailResponse.build(((AdminServerRuntimeException)e).getCode(),
e.getMessage()));
}
return PayloadUtil.from(FailResponse.build(ErrorCode.INTERNAL_ERR, "admin server internal err"));
}
}

public StreamObserver<Payload> invokeBiStream(StreamObserver<Payload> responseObserver) {
return new StreamObserver<Payload>() {
@Override
public void onNext(Payload value) {
Payload payload = process(value);
if (payload == null) {
return;
}
responseObserver.onNext(payload);
}

@Override
public void onError(Throwable t) {
if (responseObserver instanceof ServerCallStreamObserver) {
if (!((ServerCallStreamObserver<Payload>) responseObserver).isCancelled()) {
log.warn("admin gRPC server fail", t);
}
}
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

public void invoke(Payload request, StreamObserver<Payload> responseObserver) {
responseObserver.onNext(process(request));
responseObserver.onCompleted();
}
}
Loading
Loading