1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
3
+ * contributor license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright ownership.
5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
6
+ * (the "License"); you may not use this file except in compliance with
7
+ * the License. You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+
1
18
package com .apache .eventmesh .admin .server .web ;
2
19
3
20
import com .apache .eventmesh .admin .server .AdminServerRuntimeException ;
4
21
import com .apache .eventmesh .admin .server .web .handler .BaseRequestHandler ;
5
22
import com .apache .eventmesh .admin .server .web .handler .RequestHandlerFactory ;
23
+
6
24
import io .grpc .stub .ServerCallStreamObserver ;
7
25
import io .grpc .stub .StreamObserver ;
26
+
8
27
import lombok .extern .slf4j .Slf4j ;
28
+
9
29
import org .apache .commons .lang3 .StringUtils ;
30
+
10
31
import org .apache .eventmesh .common .protocol .grpc .adminserver .AdminServiceGrpc ;
11
32
import org .apache .eventmesh .common .protocol .grpc .adminserver .Payload ;
12
33
import org .apache .eventmesh .common .remote .exception .ErrorCode ;
15
36
import org .apache .eventmesh .common .remote .response .BaseRemoteResponse ;
16
37
import org .apache .eventmesh .common .remote .response .EmptyAckResponse ;
17
38
import org .apache .eventmesh .common .remote .response .FailResponse ;
39
+
18
40
import org .springframework .beans .factory .annotation .Autowired ;
19
41
import org .springframework .stereotype .Service ;
20
42
21
43
@ Service
22
44
@ Slf4j
23
45
public class AdminGrpcServer extends AdminServiceGrpc .AdminServiceImplBase {
46
+
24
47
@ Autowired
25
48
RequestHandlerFactory handlerFactory ;
26
49
27
50
private Payload process (Payload value ) {
28
51
if (value == null || StringUtils .isBlank (value .getMetadata ().getType ())) {
29
52
return PayloadUtil .from (FailResponse .build (ErrorCode .BAD_REQUEST , "bad request: type not " +
30
- "exists" ));
53
+ "exists" ));
31
54
}
32
55
try {
33
56
BaseRequestHandler <BaseRemoteRequest , BaseRemoteResponse > handler =
34
- handlerFactory .getHandler (value .getMetadata ().getType ());
57
+ handlerFactory .getHandler (value .getMetadata ().getType ());
35
58
if (handler == null ) {
36
59
return PayloadUtil .from (FailResponse .build (BaseRemoteResponse .UNKNOWN ,
37
- "not match any request handler" ));
60
+ "not match any request handler" ));
38
61
}
39
62
BaseRemoteResponse response = handler .handlerRequest ((BaseRemoteRequest ) PayloadUtil .parse (value ), value .getMetadata ());
40
63
if (response == null || response instanceof EmptyAckResponse ) {
@@ -44,8 +67,8 @@ private Payload process(Payload value) {
44
67
} catch (Exception e ) {
45
68
log .warn ("process payload {} fail" , value .getMetadata ().getType (), e );
46
69
if (e instanceof AdminServerRuntimeException ) {
47
- return PayloadUtil .from (FailResponse .build (((AdminServerRuntimeException )e ).getCode (),
48
- e .getMessage ()));
70
+ return PayloadUtil .from (FailResponse .build (((AdminServerRuntimeException ) e ).getCode (),
71
+ e .getMessage ()));
49
72
}
50
73
return PayloadUtil .from (FailResponse .build (ErrorCode .INTERNAL_ERR , "admin server internal err" ));
51
74
}
0 commit comments