Skip to content

Commit c8303b1

Browse files
committed
Add Admin Module
1 parent 78942c4 commit c8303b1

File tree

71 files changed

+1841
-691
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+1841
-691
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.apache.eventmesh.admin.server;
22

3+
import org.apache.eventmesh.common.remote.Task;
4+
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
35
import org.apache.eventmesh.common.utils.PagedList;
46

5-
import com.apache.eventmesh.admin.server.task.Task;
6-
7-
public interface Admin extends ComponentLifeCycle{
7+
public interface Admin extends ComponentLifeCycle {
88
/**
99
* support for web or ops
1010
**/
@@ -17,8 +17,5 @@ public interface Admin extends ComponentLifeCycle{
1717
/**
1818
* support for task
1919
*/
20-
void reportHeartbeat(HeartBeat heartBeat);
21-
22-
23-
20+
void reportHeartbeat(ReportHeartBeatRequest heartBeat);
2421
}

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java

-11
This file was deleted.

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java

+60-11
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,50 @@
11
package com.apache.eventmesh.admin.server;
22

3-
import com.apache.eventmesh.admin.server.task.Task;
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.apache.commons.lang3.StringUtils;
5+
import org.apache.eventmesh.common.Constants;
6+
import org.apache.eventmesh.common.config.CommonConfiguration;
7+
import org.apache.eventmesh.common.config.ConfigService;
8+
import org.apache.eventmesh.common.remote.Task;
9+
import org.apache.eventmesh.common.remote.exception.ErrorCode;
10+
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
11+
import org.apache.eventmesh.common.utils.IPUtils;
412
import org.apache.eventmesh.common.utils.PagedList;
13+
import org.apache.eventmesh.registry.RegisterServerInfo;
14+
import org.apache.eventmesh.registry.RegistryFactory;
515
import org.apache.eventmesh.registry.RegistryService;
16+
import org.springframework.boot.context.event.ApplicationReadyEvent;
17+
import org.springframework.context.ApplicationListener;
18+
import org.springframework.stereotype.Service;
619

7-
public class AdminServer implements Admin {
20+
import javax.annotation.PostConstruct;
821

9-
private RegistryService registryService;
22+
@Service
23+
@Slf4j
24+
public class AdminServer implements Admin, ApplicationListener<ApplicationReadyEvent> {
1025

11-
// private EventMeshAdminServerRegisterInfo registerInfo;
26+
private final RegistryService registryService;
1227

13-
public AdminServer(RegistryService registryService) {
14-
this.registryService = registryService;
15-
// this.registerInfo = registerInfo;
28+
private final RegisterServerInfo adminServeInfo;
29+
30+
private final CommonConfiguration configuration;
31+
32+
public AdminServer(AdminServerProperties properties) {
33+
configuration =
34+
ConfigService.getInstance().buildConfigInstance(CommonConfiguration.class);
35+
if (configuration == null) {
36+
throw new AdminServerRuntimeException(ErrorCode.STARTUP_CONFIG_MISS, "common configuration file miss");
37+
}
38+
this.adminServeInfo = new RegisterServerInfo();
39+
40+
adminServeInfo.setHealth(true);
41+
adminServeInfo.setAddress(IPUtils.getLocalAddress() + ":" + properties.getPort());
42+
String name = Constants.ADMIN_SERVER_REGISTRY_NAME;
43+
if (StringUtils.isNotBlank(properties.getServiceName())) {
44+
name = properties.getServiceName();
45+
}
46+
adminServeInfo.setServiceName(name);
47+
registryService = RegistryFactory.getInstance(configuration.getEventMeshRegistryPluginType());
1648
}
1749

1850

@@ -37,18 +69,35 @@ public PagedList<Task> getTaskPaged(Task task) {
3769
}
3870

3971
@Override
40-
public void reportHeartbeat(HeartBeat heartBeat) {
72+
public void reportHeartbeat(ReportHeartBeatRequest heartBeat) {
4173

4274
}
4375

4476
@Override
77+
@PostConstruct
4578
public void start() {
46-
registryService.register(null);
79+
if (configuration.isEventMeshRegistryPluginEnabled()) {
80+
registryService.init();
81+
}
4782
}
4883

4984
@Override
5085
public void destroy() {
51-
registryService.unRegister(null);
52-
registryService.shutdown();
86+
if (configuration.isEventMeshRegistryPluginEnabled()) {
87+
registryService.unRegister(adminServeInfo);
88+
try {
89+
Thread.sleep(3000);
90+
} catch (InterruptedException ignore) {
91+
}
92+
registryService.shutdown();
93+
}
94+
}
95+
96+
@Override
97+
public void onApplicationEvent(ApplicationReadyEvent event) {
98+
if (configuration.isEventMeshRegistryPluginEnabled()) {
99+
log.info("application is started and registry plugin is enabled, it's will register admin self");
100+
registryService.register(adminServeInfo);
101+
}
53102
}
54103
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.apache.eventmesh.admin.server;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
import org.springframework.boot.context.properties.ConfigurationProperties;
6+
7+
@ConfigurationProperties("event-mesh.admin-server")
8+
@Getter
9+
@Setter
10+
public class AdminServerProperties {
11+
private int port;
12+
private boolean enableSSL;
13+
private String configurationPath;
14+
private String configurationFile;
15+
private String serviceName;
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.apache.eventmesh.admin.server;
2+
3+
import lombok.Getter;
4+
5+
public class AdminServerRuntimeException extends RuntimeException {
6+
@Getter
7+
private final int code;
8+
public AdminServerRuntimeException(int code, String message) {
9+
super(message);
10+
this.code = code;
11+
}
12+
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.apache.eventmesh.admin.server;
22

33
public interface ComponentLifeCycle {
4-
void start();
4+
void start() throws Exception;
55
void destroy();
66
}

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java

-32
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.apache.eventmesh.admin.server;
2+
3+
import com.apache.eventmesh.admin.server.constatns.AdminServerConstants;
4+
import org.apache.eventmesh.common.config.ConfigService;
5+
import org.springframework.boot.SpringApplication;
6+
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
8+
@SpringBootApplication
9+
public class ExampleAdminServer {
10+
public static void main(String[] args) throws Exception {
11+
ConfigService.getInstance().setConfigPath(AdminServerConstants.EVENTMESH_CONF_HOME).setRootConfig(AdminServerConstants.EVENTMESH_CONF_FILE);
12+
SpringApplication.run(ExampleAdminServer.class);
13+
}
14+
}

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java

-14
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.apache.eventmesh.admin.server.constatns;
2+
3+
public class AdminServerConstants {
4+
public static final String CONF_ENV = "configurationPath";
5+
6+
public static final String EVENTMESH_CONF_HOME = System.getProperty(CONF_ENV, System.getenv(CONF_ENV));
7+
8+
public static final String EVENTMESH_CONF_FILE = "eventmesh-admin.properties";
9+
}

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java

-8
This file was deleted.

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java

-10
This file was deleted.

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java

-7
This file was deleted.

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java

-5
This file was deleted.

Diff for: eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java

-17
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.apache.eventmesh.admin.server.web;
2+
3+
import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
4+
import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
5+
import com.apache.eventmesh.admin.server.web.handler.RequestHandlerFactory;
6+
import io.grpc.stub.ServerCallStreamObserver;
7+
import io.grpc.stub.StreamObserver;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.apache.commons.lang3.StringUtils;
10+
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
11+
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
12+
import org.apache.eventmesh.common.remote.exception.ErrorCode;
13+
import org.apache.eventmesh.common.remote.payload.PayloadUtil;
14+
import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
15+
import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
16+
import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
17+
import org.apache.eventmesh.common.remote.response.FailResponse;
18+
import org.springframework.beans.factory.annotation.Autowired;
19+
import org.springframework.stereotype.Service;
20+
21+
@Service
22+
@Slf4j
23+
public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase {
24+
@Autowired
25+
RequestHandlerFactory handlerFactory;
26+
27+
private Payload process(Payload value) {
28+
if (value == null || StringUtils.isBlank(value.getMetadata().getType())) {
29+
return PayloadUtil.from(FailResponse.build(ErrorCode.BAD_REQUEST, "bad request: type not " +
30+
"exists"));
31+
}
32+
try {
33+
BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> handler =
34+
handlerFactory.getHandler(value.getMetadata().getType());
35+
if (handler == null) {
36+
return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN,
37+
"not match any request handler"));
38+
}
39+
BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata());
40+
if (response == null || response instanceof EmptyAckResponse) {
41+
return null;
42+
}
43+
return PayloadUtil.from(response);
44+
} catch (Exception e) {
45+
log.warn("process payload {} fail", value.getMetadata().getType(), e);
46+
if (e instanceof AdminServerRuntimeException) {
47+
return PayloadUtil.from(FailResponse.build(((AdminServerRuntimeException)e).getCode(),
48+
e.getMessage()));
49+
}
50+
return PayloadUtil.from(FailResponse.build(ErrorCode.INTERNAL_ERR, "admin server internal err"));
51+
}
52+
}
53+
54+
public StreamObserver<Payload> invokeBiStream(StreamObserver<Payload> responseObserver) {
55+
return new StreamObserver<Payload>() {
56+
@Override
57+
public void onNext(Payload value) {
58+
Payload payload = process(value);
59+
if (payload == null) {
60+
return;
61+
}
62+
responseObserver.onNext(payload);
63+
}
64+
65+
@Override
66+
public void onError(Throwable t) {
67+
if (responseObserver instanceof ServerCallStreamObserver) {
68+
if (!((ServerCallStreamObserver<Payload>) responseObserver).isCancelled()) {
69+
log.warn("admin gRPC server fail", t);
70+
}
71+
}
72+
}
73+
74+
@Override
75+
public void onCompleted() {
76+
responseObserver.onCompleted();
77+
}
78+
};
79+
}
80+
81+
public void invoke(Payload request, StreamObserver<Payload> responseObserver) {
82+
responseObserver.onNext(process(request));
83+
responseObserver.onCompleted();
84+
}
85+
}

0 commit comments

Comments
 (0)