Skip to content

Commit 9a3912a

Browse files
authored
[ISSUE #4731] HttpRequestProcessor enhancement (#4732)
* resolve conflicts. * resolve
1 parent d6393ab commit 9a3912a

27 files changed

+295
-151
lines changed

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java

+45-39
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
2828
import org.apache.eventmesh.common.protocol.http.header.Header;
2929
import org.apache.eventmesh.common.utils.AssertUtils;
30-
import org.apache.eventmesh.runtime.common.Pair;
3130
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
3231
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
3332
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -117,7 +116,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
117116
/**
118117
* key: request code
119118
*/
120-
protected final transient Map<String, Pair<HttpRequestProcessor, ThreadPoolExecutor>> httpRequestProcessorTable =
119+
protected final transient Map<String, HttpRequestProcessor> httpRequestProcessorTable =
121120
new ConcurrentHashMap<>(64);
122121

123122
private HttpConnectionHandler httpConnectionHandler;
@@ -198,11 +197,10 @@ public void shutdown() throws Exception {
198197
/**
199198
* Registers the processors required by the runtime module
200199
*/
201-
public void registerProcessor(final Integer requestCode, final HttpRequestProcessor processor, final ThreadPoolExecutor executor) {
200+
public void registerProcessor(final Integer requestCode, final HttpRequestProcessor processor) {
202201
AssertUtils.notNull(requestCode, "requestCode can't be null");
203202
AssertUtils.notNull(processor, "processor can't be null");
204-
AssertUtils.notNull(executor, "executor can't be null");
205-
this.httpRequestProcessorTable.put(requestCode.toString(), new Pair<>(processor, executor));
203+
this.httpRequestProcessorTable.putIfAbsent(requestCode.toString(), processor);
206204
}
207205

208206
/**
@@ -400,46 +398,54 @@ private void injectHttpRequestHeader(final ChannelHandlerContext ctx, final Http
400398

401399
private void processHttpCommandRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext) {
402400
final HttpCommand request = asyncContext.getRequest();
403-
final Pair<HttpRequestProcessor, ThreadPoolExecutor> choosed = httpRequestProcessorTable.get(request.getRequestCode());
404-
try {
405-
choosed.getObject2().submit(() -> {
406-
try {
407-
final HttpRequestProcessor processor = choosed.getObject1();
408-
if (processor.rejectRequest()) {
409-
final HttpCommand responseCommand =
410-
request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
411-
asyncContext.onComplete(responseCommand);
412-
413-
if (asyncContext.isComplete()) {
414-
sendResponse(ctx, responseCommand.httpResponse());
415-
log.debug("{}", asyncContext.getResponse());
416-
final Map<String, Object> traceMap = asyncContext.getRequest().getHeader().toMap();
417-
TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(traceMap,
418-
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN,
419-
false),
420-
traceMap,
421-
EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null);
422-
}
423-
424-
return;
401+
final HttpRequestProcessor choosed = httpRequestProcessorTable.get(request.getRequestCode());
402+
Runnable runnable = () -> {
403+
try {
404+
final HttpRequestProcessor processor = choosed;
405+
if (processor.rejectRequest()) {
406+
final HttpCommand responseCommand =
407+
request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
408+
asyncContext.onComplete(responseCommand);
409+
410+
if (asyncContext.isComplete()) {
411+
sendResponse(ctx, responseCommand.httpResponse());
412+
log.debug("{}", asyncContext.getResponse());
413+
final Map<String, Object> traceMap = asyncContext.getRequest().getHeader().toMap();
414+
TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(traceMap,
415+
416+
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN,
417+
false),
418+
traceMap,
419+
EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null);
425420
}
426421

427-
processor.processRequest(ctx, asyncContext);
428-
if (!asyncContext.isComplete()) {
429-
return;
430-
}
422+
return;
423+
}
424+
425+
processor.processRequest(ctx, asyncContext);
426+
if (!asyncContext.isComplete()) {
427+
return;
428+
}
431429

432-
metrics.getSummaryMetrics()
433-
.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
430+
log.debug("{}", asyncContext.getResponse());
431+
metrics.getSummaryMetrics()
432+
.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
433+
sendResponse(ctx, asyncContext.getResponse().httpResponse());
434434

435-
log.debug("{}", asyncContext.getResponse());
435+
} catch (Exception e) {
436+
log.error("process error", e);
437+
}
438+
};
436439

437-
sendResponse(ctx, asyncContext.getResponse().httpResponse());
440+
try {
441+
if (Objects.nonNull(choosed.executor())) {
442+
choosed.executor().execute(() -> {
443+
runnable.run();
444+
});
445+
} else {
446+
runnable.run();
447+
}
438448

439-
} catch (Exception e) {
440-
log.error("process error", e);
441-
}
442-
});
443449
} catch (RejectedExecutionException re) {
444450
asyncContext.onComplete(request.createHttpCommandResponse(EventMeshRetCode.OVERLOAD));
445451
metrics.getSummaryMetrics().recordHTTPDiscard();

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java

+22-35
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262

6363
import java.util.List;
6464
import java.util.Optional;
65-
import java.util.concurrent.ThreadPoolExecutor;
6665

6766
import org.assertj.core.util.Lists;
6867

@@ -71,6 +70,7 @@
7170

7271
import lombok.extern.slf4j.Slf4j;
7372

73+
7474
/**
7575
* Add multiple managers to the underlying server
7676
*/
@@ -84,22 +84,16 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
8484

8585
private final Acl acl;
8686
private final EventBus eventBus = new EventBus();
87-
87+
private final transient HTTPClientPool httpClientPool = new HTTPClientPool(10);
8888
private ConsumerManager consumerManager;
8989
private ProducerManager producerManager;
9090
private SubscriptionManager subscriptionManager;
91-
9291
private FilterEngine filterEngine;
93-
9492
private TransformerEngine transformerEngine;
95-
9693
private HttpRetryer httpRetryer;
97-
9894
private transient RateLimiter msgRateLimiter;
9995
private transient RateLimiter batchRateLimiter;
10096

101-
private final transient HTTPClientPool httpClientPool = new HTTPClientPool(10);
102-
10397
public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
10498

10599
super(eventMeshHttpConfiguration.getHttpServerPort(),
@@ -239,68 +233,59 @@ private void unRegister() {
239233
}
240234

241235
private void registerHTTPRequestProcessor() throws Exception {
242-
HTTPThreadPoolGroup httpThreadPoolGroup = super.getHttpThreadPoolGroup();
243-
244-
ThreadPoolExecutor batchMsgExecutor = httpThreadPoolGroup.getBatchMsgExecutor();
245236
final BatchSendMessageProcessor batchSendMessageProcessor = new BatchSendMessageProcessor(this);
246-
registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(), batchSendMessageProcessor, batchMsgExecutor);
237+
registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(), batchSendMessageProcessor);
247238

248239
final BatchSendMessageV2Processor batchSendMessageV2Processor = new BatchSendMessageV2Processor(this);
249-
registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(), batchSendMessageV2Processor,
250-
batchMsgExecutor);
240+
registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(), batchSendMessageV2Processor);
251241

252-
ThreadPoolExecutor sendMsgExecutor = httpThreadPoolGroup.getSendMsgExecutor();
253242
final SendSyncMessageProcessor sendSyncMessageProcessor = new SendSyncMessageProcessor(this);
254-
registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(), sendSyncMessageProcessor, sendMsgExecutor);
243+
registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(), sendSyncMessageProcessor);
255244

256245
final SendAsyncMessageProcessor sendAsyncMessageProcessor = new SendAsyncMessageProcessor(this);
257-
registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(), sendAsyncMessageProcessor, sendMsgExecutor);
246+
registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(), sendAsyncMessageProcessor);
258247

259248
final SendAsyncEventProcessor sendAsyncEventProcessor = new SendAsyncEventProcessor(this);
260-
this.getHandlerService().register(sendAsyncEventProcessor, sendMsgExecutor);
249+
this.getHandlerService().register(sendAsyncEventProcessor);
261250

262-
ThreadPoolExecutor remoteMsgExecutor = httpThreadPoolGroup.getRemoteMsgExecutor();
263251
final SendAsyncRemoteEventProcessor sendAsyncRemoteEventProcessor = new SendAsyncRemoteEventProcessor(this);
264-
this.getHandlerService().register(sendAsyncRemoteEventProcessor, remoteMsgExecutor);
252+
this.getHandlerService().register(sendAsyncRemoteEventProcessor);
265253

266-
ThreadPoolExecutor runtimeAdminExecutor = httpThreadPoolGroup.getRuntimeAdminExecutor();
267254
final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this);
268-
registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor, runtimeAdminExecutor);
255+
registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor);
269256

270-
ThreadPoolExecutor clientManageExecutor = httpThreadPoolGroup.getClientManageExecutor();
271257
final HeartBeatProcessor heartProcessor = new HeartBeatProcessor(this);
272-
registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), heartProcessor, clientManageExecutor);
258+
registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), heartProcessor);
273259

274260
final SubscribeProcessor subscribeProcessor = new SubscribeProcessor(this);
275-
registerProcessor(RequestCode.SUBSCRIBE.getRequestCode(), subscribeProcessor, clientManageExecutor);
261+
registerProcessor(RequestCode.SUBSCRIBE.getRequestCode(), subscribeProcessor);
276262

277263
final LocalSubscribeEventProcessor localSubscribeEventProcessor = new LocalSubscribeEventProcessor(this);
278-
this.getHandlerService().register(localSubscribeEventProcessor, clientManageExecutor);
264+
this.getHandlerService().register(localSubscribeEventProcessor);
279265

280266
final RemoteSubscribeEventProcessor remoteSubscribeEventProcessor = new RemoteSubscribeEventProcessor(this);
281-
this.getHandlerService().register(remoteSubscribeEventProcessor, clientManageExecutor);
267+
this.getHandlerService().register(remoteSubscribeEventProcessor);
282268

283269
final UnSubscribeProcessor unSubscribeProcessor = new UnSubscribeProcessor(this);
284-
registerProcessor(RequestCode.UNSUBSCRIBE.getRequestCode(), unSubscribeProcessor, clientManageExecutor);
270+
registerProcessor(RequestCode.UNSUBSCRIBE.getRequestCode(), unSubscribeProcessor);
285271

286272
final LocalUnSubscribeEventProcessor localUnSubscribeEventProcessor = new LocalUnSubscribeEventProcessor(this);
287-
this.getHandlerService().register(localUnSubscribeEventProcessor, clientManageExecutor);
273+
this.getHandlerService().register(localUnSubscribeEventProcessor);
288274

289275
final RemoteUnSubscribeEventProcessor remoteUnSubscribeEventProcessor = new RemoteUnSubscribeEventProcessor(this);
290-
this.getHandlerService().register(remoteUnSubscribeEventProcessor, clientManageExecutor);
276+
this.getHandlerService().register(remoteUnSubscribeEventProcessor);
291277

292-
ThreadPoolExecutor replyMsgExecutor = httpThreadPoolGroup.getReplyMsgExecutor();
293278
final ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
294-
registerProcessor(RequestCode.REPLY_MESSAGE.getRequestCode(), replyMessageProcessor, replyMsgExecutor);
279+
registerProcessor(RequestCode.REPLY_MESSAGE.getRequestCode(), replyMessageProcessor);
295280

296281
final CreateTopicProcessor createTopicProcessor = new CreateTopicProcessor(this);
297-
this.getHandlerService().register(createTopicProcessor, clientManageExecutor);
282+
this.getHandlerService().register(createTopicProcessor);
298283

299284
final DeleteTopicProcessor deleteTopicProcessor = new DeleteTopicProcessor(this);
300-
this.getHandlerService().register(deleteTopicProcessor, clientManageExecutor);
285+
this.getHandlerService().register(deleteTopicProcessor);
301286

302287
final QuerySubscriptionProcessor querySubscriptionProcessor = new QuerySubscriptionProcessor(this);
303-
this.getHandlerService().register(querySubscriptionProcessor, clientManageExecutor);
288+
this.getHandlerService().register(querySubscriptionProcessor);
304289

305290
registerWebhook();
306291
}
@@ -370,4 +355,6 @@ public MetaStorage getMetaStorage() {
370355
public HTTPClientPool getHttpClientPool() {
371356
return httpClientPool;
372357
}
358+
359+
373360
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.runtime.core.protocol.http.processor;
19+
20+
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
21+
22+
public abstract class AbstractHttpRequestProcessor implements HttpRequestProcessor {
23+
24+
}

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,25 @@
2020
import org.apache.eventmesh.common.protocol.http.HttpCommand;
2121
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
2222
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
23-
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
23+
24+
import java.util.concurrent.Executor;
2425

2526
import io.netty.channel.ChannelHandlerContext;
2627

2728
import lombok.RequiredArgsConstructor;
2829

30+
2931
@RequiredArgsConstructor
30-
public class AdminMetricsProcessor implements HttpRequestProcessor {
32+
public class AdminMetricsProcessor extends AbstractHttpRequestProcessor {
3133

3234
private final EventMeshHTTPServer eventMeshHTTPServer;
3335

3436
@Override
3537
public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
3638
}
39+
40+
@Override
41+
public Executor executor() {
42+
return eventMeshHTTPServer.getHttpThreadPoolGroup().getRuntimeAdminExecutor();
43+
}
3744
}

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import org.apache.eventmesh.runtime.boot.EventMeshServer;
2525
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
2626
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
27-
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
2827
import org.apache.eventmesh.runtime.util.RemotingHelper;
2928

29+
import java.util.concurrent.Executor;
30+
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

@@ -35,7 +36,7 @@
3536
import lombok.RequiredArgsConstructor;
3637

3738
@RequiredArgsConstructor
38-
public class AdminShutdownProcessor implements HttpRequestProcessor {
39+
public class AdminShutdownProcessor extends AbstractHttpRequestProcessor {
3940

4041
public final Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD);
4142

@@ -54,4 +55,11 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
5455
HttpCommand responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
5556
asyncContext.onComplete(responseEventMeshCommand);
5657
}
58+
59+
@Override
60+
public Executor executor() {
61+
return (Runnable runnable) -> {
62+
runnable.run();
63+
};
64+
}
5765
}

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
4141
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
4242
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
43-
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
4443
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
4544
import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
4645
import org.apache.eventmesh.runtime.util.RemotingHelper;
@@ -53,6 +52,7 @@
5352
import java.util.List;
5453
import java.util.Map;
5554
import java.util.concurrent.ConcurrentHashMap;
55+
import java.util.concurrent.Executor;
5656
import java.util.concurrent.TimeUnit;
5757

5858
import org.slf4j.Logger;
@@ -65,7 +65,7 @@
6565

6666
import com.google.common.base.Stopwatch;
6767

68-
public class BatchSendMessageProcessor implements HttpRequestProcessor {
68+
public class BatchSendMessageProcessor extends AbstractHttpRequestProcessor {
6969

7070
private static final Logger CMD_LOGGER = LoggerFactory.getLogger(EventMeshConstants.CMD);
7171

@@ -288,4 +288,9 @@ public void onException(OnExceptionContext context) {
288288
SendMessageBatchResponseBody.class);
289289
return;
290290
}
291+
292+
@Override
293+
public Executor executor() {
294+
return eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor();
295+
}
291296
}

0 commit comments

Comments
 (0)