diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java index cdfe4e163a..516960e7b6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java @@ -95,7 +95,9 @@ protected void writeInternalServerError(ChannelHandlerContext ctx, String messag * Use {@link HttpResponseUtils#buildHttpResponse} to build {@link HttpResponse} param. */ protected void write(ChannelHandlerContext ctx, HttpResponse response) { - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + }); } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index 3fcca51832..6c0cb4742c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -245,16 +245,19 @@ public void sendError(final ChannelHandlerContext ctx, final HttpResponseStatus HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET)); responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + }); } public void sendResponse(final ChannelHandlerContext ctx, final DefaultFullHttpResponse response) { - ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { - if (!f.isSuccess()) { - log.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(f.channel())); - f.channel().close(); - } + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { + if (!f.isSuccess()) { + log.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(f.channel())); + f.channel().close(); + } + }); }); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java index e02637ec39..e310042622 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; +import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -76,7 +77,7 @@ private void buildIOGroup(final String threadPrefix) { } private void buildWorkerGroup(final String threadPrefix) { - workerGroup = new NioEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-worker")); + workerGroup = new DefaultEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-worker")); } protected void initProducerManager() throws Exception { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java index c9a464bf8a..eb9793be6b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java @@ -311,7 +311,7 @@ private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Package res = new Package(); res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader().getSeq())); - ctx.writeAndFlush(res); + ctx.channel().eventLoop().execute(() -> ctx.writeAndFlush(res)); } catch (Exception ex) { log.warn("writeToClient failed", ex); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java index 5e98fc690b..e7f2748cca 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java @@ -170,11 +170,16 @@ private void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest httpHandlerOpt.get().handle(httpRequest, ctx); } catch (Exception e) { log.error("admin server channelRead error", e); - ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx, - HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE); + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx, + HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE); + }); } } else { - ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE); + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE); + } + ); } } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java index 6ecf745fa5..c2e20ba6a2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java @@ -163,14 +163,16 @@ private void sendResponse(ChannelHandlerContext ctx, HttpRequest request, HttpRe */ private void sendPersistentResponse(ChannelHandlerContext ctx, HttpRequest httpRequest, HttpResponse response, boolean isClose) { ReferenceCountUtil.release(httpRequest); - ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { - if (!f.isSuccess()) { - HTTP_LOGGER.warn("send response to [{}] fail, will close this channel", - RemotingHelper.parseChannelRemoteAddr(f.channel())); - if (isClose) { - f.channel().close(); + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { + if (!f.isSuccess()) { + HTTP_LOGGER.warn("send response to [{}] fail, will close this channel", + RemotingHelper.parseChannelRemoteAddr(f.channel())); + if (isClose) { + f.channel().close(); + } } - } + }); }); } @@ -179,12 +181,14 @@ private void sendPersistentResponse(ChannelHandlerContext ctx, HttpRequest httpR */ private void sendShortResponse(ChannelHandlerContext ctx, HttpRequest httpRequest, HttpResponse response) { ReferenceCountUtil.release(httpRequest); - ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { - if (!f.isSuccess()) { - HTTP_LOGGER.warn("send response to [{}] with short-lived connection fail, will close this channel", - RemotingHelper.parseChannelRemoteAddr(f.channel())); - } - }).addListener(ChannelFutureListener.CLOSE); + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { + if (!f.isSuccess()) { + HTTP_LOGGER.warn("send response to [{}] with short-lived connection fail, will close this channel", + RemotingHelper.parseChannelRemoteAddr(f.channel())); + } + }).addListener(ChannelFutureListener.CLOSE); + }); } private HttpEventWrapper parseHttpRequest(HttpRequest httpRequest) throws IOException { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java index 6ef8e2bc11..c4acc8d81e 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java @@ -95,13 +95,15 @@ public static void goodBye2Client(ChannelHandlerContext ctx, String errMsg, Clie Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, OPStatus.FAIL.getCode(), errMsg, null)); eventMeshTcpMetricsManager.eventMesh2clientMsgNumIncrement(IPUtils.parseChannelRemoteAddr(ctx.channel())); log.info("goodBye2Client client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> { - Utils.logSucceedMessageFlow(pkg, null, startTime, startTime); - try { - mapping.closeSession(ctx); - } catch (Exception e) { - log.warn("close session failed!", e); - } + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> { + Utils.logSucceedMessageFlow(pkg, null, startTime, startTime); + try { + mapping.closeSession(ctx); + } catch (Exception e) { + log.warn("close session failed!", e); + } + }); }); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java index a216561c54..2827f9f324 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java @@ -94,16 +94,18 @@ public void process(final Package pkg, final ChannelHandlerContext ctx, long sta MESSAGE_LOGGER.error("HelloTask failed|address={}", ctx.channel().remoteAddress(), e); res.setHeader(new Header(HELLO_RESPONSE, OPStatus.FAIL.getCode(), Arrays.toString(e.getStackTrace()), pkg .getHeader().getSeq())); - ctx.writeAndFlush(res).addListener( - (ChannelFutureListener) future -> { - if (!future.isSuccess()) { - Utils.logFailedMessageFlow(future, res, user, startTime, taskExecuteTime); - } else { - Utils.logSucceedMessageFlow(res, user, startTime, taskExecuteTime); - } - log.warn("HelloTask failed,close session,addr:{}", ctx.channel().remoteAddress()); - eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx); - }); + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(res).addListener( + (ChannelFutureListener) future -> { + if (!future.isSuccess()) { + Utils.logFailedMessageFlow(future, res, user, startTime, taskExecuteTime); + } else { + Utils.logSucceedMessageFlow(res, user, startTime, taskExecuteTime); + } + log.warn("HelloTask failed,close session,addr:{}", ctx.channel().remoteAddress()); + eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx); + }); + }); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java index ccbb98255b..fdb7595c62 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java @@ -143,8 +143,10 @@ public void process(final Package pkg, final ChannelHandlerContext ctx, long sta .tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) { msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", pkg.getHeader().getSeq())); - ctx.writeAndFlush(msg).addListener( - (ChannelFutureListener) future -> Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime)); + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(msg).addListener( + (ChannelFutureListener) future -> Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime)); + }); TraceUtils.finishSpanWithException(ctx, event, "Tps overload, global flow control", null); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java index 22ebc57687..75a6e62d9d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java @@ -120,40 +120,42 @@ public void push(final DownStreamMsgContext downStreamMsgContext) { EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_CLIENT_SPAN, false); try { - session.getContext().writeAndFlush(pkg).addListener( - (ChannelFutureListener) future -> { - if (!future.isSuccess()) { - log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", downStreamMsgContext.seq, - downStreamMsgContext.retryTimes, downStreamMsgContext.event); - deliverFailMsgsCount.incrementAndGet(); - - // how long to isolate client when push fail - long isolateTime = System.currentTimeMillis() - + session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills(); - session.setIsolateTime(isolateTime); - log.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime); - - // retry - long delayTime = SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType() - ? session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills() - : session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills(); - Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer() - .newTimeout(downStreamMsgContext, delayTime, TimeUnit.MILLISECONDS); - } else { - deliveredMsgsCount.incrementAndGet(); - log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, - downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event)); - - if (session.isIsolated()) { - log.info("cancel isolated,client:{}", session.getClient()); - session.setIsolateTime(System.currentTimeMillis()); + Package finalPkg = pkg; + session.getContext().channel().eventLoop().execute(() -> { + session.getContext().writeAndFlush(finalPkg).addListener( + (ChannelFutureListener) future -> { + if (!future.isSuccess()) { + log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", downStreamMsgContext.seq, + downStreamMsgContext.retryTimes, downStreamMsgContext.event); + deliverFailMsgsCount.incrementAndGet(); + + // how long to isolate client when push fail + long isolateTime = System.currentTimeMillis() + + session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills(); + session.setIsolateTime(isolateTime); + log.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime); + + // retry + long delayTime = SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType() + ? session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills() + : session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills(); + Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer() + .newTimeout(downStreamMsgContext, delayTime, TimeUnit.MILLISECONDS); + } else { + deliveredMsgsCount.incrementAndGet(); + log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, + downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event)); + + if (session.isIsolated()) { + log.info("cancel isolated,client:{}", session.getClient()); + session.setIsolateTime(System.currentTimeMillis()); + } } - } - }); + }); + }); } finally { TraceUtils.finishSpan(span, downStreamMsgContext.event); } - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java index 63f5d232cb..8f35c7de25 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java @@ -64,8 +64,8 @@ public static void writeAndFlush(final Package pkg, long startTime, long taskExe new Exception("the session has been closed")); return; } - ctx.writeAndFlush(pkg).addListener( - (ChannelFutureListener) future -> { + ctx.channel().eventLoop().execute(() -> { + ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { logFailedMessageFlow(future, pkg, user, startTime, taskExecuteTime); } else { @@ -77,6 +77,7 @@ public static void writeAndFlush(final Package pkg, long startTime, long taskExe } } }); + }); } catch (Exception e) { log.error("exception while sending message to client", e); }