Skip to content

[ISSUE #5185] Enhancement for http server and tcp server #5187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ protected void writeInternalServerError(ChannelHandlerContext ctx, String messag
* Use {@link HttpResponseUtils#buildHttpResponse} to build {@link HttpResponse} param.
*/
protected void write(ChannelHandlerContext ctx, HttpResponse response) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,19 @@ public void sendError(final ChannelHandlerContext ctx, final HttpResponseStatus
HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET));
responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);

ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
});
}

public void sendResponse(final ChannelHandlerContext ctx, final DefaultFullHttpResponse response) {
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
log.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
f.channel().close();
}
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
log.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
f.channel().close();
}
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.concurrent.TimeUnit;

import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
Expand Down Expand Up @@ -76,7 +77,7 @@ private void buildIOGroup(final String threadPrefix) {
}

private void buildWorkerGroup(final String threadPrefix) {
workerGroup = new NioEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-worker"));
workerGroup = new DefaultEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-worker"));
}

protected void initProducerManager() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx,
Package res = new Package();
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(),
pkg.getHeader().getSeq()));
ctx.writeAndFlush(res);
ctx.channel().eventLoop().execute(() -> ctx.writeAndFlush(res));
} catch (Exception ex) {
log.warn("writeToClient failed", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,16 @@ private void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest
httpHandlerOpt.get().handle(httpRequest, ctx);
} catch (Exception e) {
log.error("admin server channelRead error", e);
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx,
HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx,
HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
});
}
} else {
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
}
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,16 @@ private void sendResponse(ChannelHandlerContext ctx, HttpRequest request, HttpRe
*/
private void sendPersistentResponse(ChannelHandlerContext ctx, HttpRequest httpRequest, HttpResponse response, boolean isClose) {
ReferenceCountUtil.release(httpRequest);
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
HTTP_LOGGER.warn("send response to [{}] fail, will close this channel",
RemotingHelper.parseChannelRemoteAddr(f.channel()));
if (isClose) {
f.channel().close();
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
HTTP_LOGGER.warn("send response to [{}] fail, will close this channel",
RemotingHelper.parseChannelRemoteAddr(f.channel()));
if (isClose) {
f.channel().close();
}
}
}
});
});
}

Expand All @@ -179,12 +181,14 @@ private void sendPersistentResponse(ChannelHandlerContext ctx, HttpRequest httpR
*/
private void sendShortResponse(ChannelHandlerContext ctx, HttpRequest httpRequest, HttpResponse response) {
ReferenceCountUtil.release(httpRequest);
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
HTTP_LOGGER.warn("send response to [{}] with short-lived connection fail, will close this channel",
RemotingHelper.parseChannelRemoteAddr(f.channel()));
}
}).addListener(ChannelFutureListener.CLOSE);
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
HTTP_LOGGER.warn("send response to [{}] with short-lived connection fail, will close this channel",
RemotingHelper.parseChannelRemoteAddr(f.channel()));
}
}).addListener(ChannelFutureListener.CLOSE);
});
}

private HttpEventWrapper parseHttpRequest(HttpRequest httpRequest) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ public static void goodBye2Client(ChannelHandlerContext ctx, String errMsg, Clie
Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, OPStatus.FAIL.getCode(), errMsg, null));
eventMeshTcpMetricsManager.eventMesh2clientMsgNumIncrement(IPUtils.parseChannelRemoteAddr(ctx.channel()));
log.info("goodBye2Client client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
try {
mapping.closeSession(ctx);
} catch (Exception e) {
log.warn("close session failed!", e);
}
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
try {
mapping.closeSession(ctx);
} catch (Exception e) {
log.warn("close session failed!", e);
}
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,18 @@ public void process(final Package pkg, final ChannelHandlerContext ctx, long sta
MESSAGE_LOGGER.error("HelloTask failed|address={}", ctx.channel().remoteAddress(), e);
res.setHeader(new Header(HELLO_RESPONSE, OPStatus.FAIL.getCode(), Arrays.toString(e.getStackTrace()), pkg
.getHeader().getSeq()));
ctx.writeAndFlush(res).addListener(
(ChannelFutureListener) future -> {
if (!future.isSuccess()) {
Utils.logFailedMessageFlow(future, res, user, startTime, taskExecuteTime);
} else {
Utils.logSucceedMessageFlow(res, user, startTime, taskExecuteTime);
}
log.warn("HelloTask failed,close session,addr:{}", ctx.channel().remoteAddress());
eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx);
});
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(res).addListener(
(ChannelFutureListener) future -> {
if (!future.isSuccess()) {
Utils.logFailedMessageFlow(future, res, user, startTime, taskExecuteTime);
} else {
Utils.logSucceedMessageFlow(res, user, startTime, taskExecuteTime);
}
log.warn("HelloTask failed,close session,addr:{}", ctx.channel().remoteAddress());
eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx);
});
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ public void process(final Package pkg, final ChannelHandlerContext ctx, long sta
.tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {

msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", pkg.getHeader().getSeq()));
ctx.writeAndFlush(msg).addListener(
(ChannelFutureListener) future -> Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime));
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(msg).addListener(
(ChannelFutureListener) future -> Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime));
});

TraceUtils.finishSpanWithException(ctx, event, "Tps overload, global flow control", null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,40 +120,42 @@ public void push(final DownStreamMsgContext downStreamMsgContext) {
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_CLIENT_SPAN, false);

try {
session.getContext().writeAndFlush(pkg).addListener(
(ChannelFutureListener) future -> {
if (!future.isSuccess()) {
log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", downStreamMsgContext.seq,
downStreamMsgContext.retryTimes, downStreamMsgContext.event);
deliverFailMsgsCount.incrementAndGet();

// how long to isolate client when push fail
long isolateTime = System.currentTimeMillis()
+ session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
session.setIsolateTime(isolateTime);
log.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);

// retry
long delayTime = SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType()
? session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
: session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
.newTimeout(downStreamMsgContext, delayTime, TimeUnit.MILLISECONDS);
} else {
deliveredMsgsCount.incrementAndGet();
log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event));

if (session.isIsolated()) {
log.info("cancel isolated,client:{}", session.getClient());
session.setIsolateTime(System.currentTimeMillis());
Package finalPkg = pkg;
session.getContext().channel().eventLoop().execute(() -> {
session.getContext().writeAndFlush(finalPkg).addListener(
(ChannelFutureListener) future -> {
if (!future.isSuccess()) {
log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", downStreamMsgContext.seq,
downStreamMsgContext.retryTimes, downStreamMsgContext.event);
deliverFailMsgsCount.incrementAndGet();

// how long to isolate client when push fail
long isolateTime = System.currentTimeMillis()
+ session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
session.setIsolateTime(isolateTime);
log.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);

// retry
long delayTime = SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType()
? session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
: session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
.newTimeout(downStreamMsgContext, delayTime, TimeUnit.MILLISECONDS);
} else {
deliveredMsgsCount.incrementAndGet();
log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event));

if (session.isIsolated()) {
log.info("cancel isolated,client:{}", session.getClient());
session.setIsolateTime(System.currentTimeMillis());
}
}
}
});
});
});
} finally {
TraceUtils.finishSpan(span, downStreamMsgContext.event);
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public static void writeAndFlush(final Package pkg, long startTime, long taskExe
new Exception("the session has been closed"));
return;
}
ctx.writeAndFlush(pkg).addListener(
(ChannelFutureListener) future -> {
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
logFailedMessageFlow(future, pkg, user, startTime, taskExecuteTime);
} else {
Expand All @@ -77,6 +77,7 @@ public static void writeAndFlush(final Package pkg, long startTime, long taskExe
}
}
});
});
} catch (Exception e) {
log.error("exception while sending message to client", e);
}
Expand Down
Loading