Skip to content

Commit 527ce13

Browse files
authored
[ISSUE #5185] Enhancement for http server and tcp server (#5187)
1 parent 8237eca commit 527ce13

File tree

11 files changed

+101
-77
lines changed

11 files changed

+101
-77
lines changed

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ protected void writeInternalServerError(ChannelHandlerContext ctx, String messag
9595
* Use {@link HttpResponseUtils#buildHttpResponse} to build {@link HttpResponse} param.
9696
*/
9797
protected void write(ChannelHandlerContext ctx, HttpResponse response) {
98-
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
98+
ctx.channel().eventLoop().execute(() -> {
99+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
100+
});
99101
}
100102

101103
@Override

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -245,16 +245,19 @@ public void sendError(final ChannelHandlerContext ctx, final HttpResponseStatus
245245
HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET));
246246
responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
247247
responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
248-
249-
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
248+
ctx.channel().eventLoop().execute(() -> {
249+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
250+
});
250251
}
251252

252253
public void sendResponse(final ChannelHandlerContext ctx, final DefaultFullHttpResponse response) {
253-
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
254-
if (!f.isSuccess()) {
255-
log.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
256-
f.channel().close();
257-
}
254+
ctx.channel().eventLoop().execute(() -> {
255+
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
256+
if (!f.isSuccess()) {
257+
log.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
258+
f.channel().close();
259+
}
260+
});
258261
});
259262
}
260263

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.util.concurrent.TimeUnit;
2626

27+
import io.netty.channel.DefaultEventLoopGroup;
2728
import io.netty.channel.EventLoopGroup;
2829
import io.netty.channel.epoll.Epoll;
2930
import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -76,7 +77,7 @@ private void buildIOGroup(final String threadPrefix) {
7677
}
7778

7879
private void buildWorkerGroup(final String threadPrefix) {
79-
workerGroup = new NioEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-worker"));
80+
workerGroup = new DefaultEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-worker"));
8081
}
8182

8283
protected void initProducerManager() throws Exception {

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx,
311311
Package res = new Package();
312312
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(),
313313
pkg.getHeader().getSeq()));
314-
ctx.writeAndFlush(res);
314+
ctx.channel().eventLoop().execute(() -> ctx.writeAndFlush(res));
315315
} catch (Exception ex) {
316316
log.warn("writeToClient failed", ex);
317317
}

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,16 @@ private void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest
170170
httpHandlerOpt.get().handle(httpRequest, ctx);
171171
} catch (Exception e) {
172172
log.error("admin server channelRead error", e);
173-
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx,
174-
HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
173+
ctx.channel().eventLoop().execute(() -> {
174+
ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx,
175+
HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
176+
});
175177
}
176178
} else {
177-
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
179+
ctx.channel().eventLoop().execute(() -> {
180+
ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE);
181+
}
182+
);
178183
}
179184
}
180185
}

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java

+17-13
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,16 @@ private void sendResponse(ChannelHandlerContext ctx, HttpRequest request, HttpRe
163163
*/
164164
private void sendPersistentResponse(ChannelHandlerContext ctx, HttpRequest httpRequest, HttpResponse response, boolean isClose) {
165165
ReferenceCountUtil.release(httpRequest);
166-
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
167-
if (!f.isSuccess()) {
168-
HTTP_LOGGER.warn("send response to [{}] fail, will close this channel",
169-
RemotingHelper.parseChannelRemoteAddr(f.channel()));
170-
if (isClose) {
171-
f.channel().close();
166+
ctx.channel().eventLoop().execute(() -> {
167+
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
168+
if (!f.isSuccess()) {
169+
HTTP_LOGGER.warn("send response to [{}] fail, will close this channel",
170+
RemotingHelper.parseChannelRemoteAddr(f.channel()));
171+
if (isClose) {
172+
f.channel().close();
173+
}
172174
}
173-
}
175+
});
174176
});
175177
}
176178

@@ -179,12 +181,14 @@ private void sendPersistentResponse(ChannelHandlerContext ctx, HttpRequest httpR
179181
*/
180182
private void sendShortResponse(ChannelHandlerContext ctx, HttpRequest httpRequest, HttpResponse response) {
181183
ReferenceCountUtil.release(httpRequest);
182-
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
183-
if (!f.isSuccess()) {
184-
HTTP_LOGGER.warn("send response to [{}] with short-lived connection fail, will close this channel",
185-
RemotingHelper.parseChannelRemoteAddr(f.channel()));
186-
}
187-
}).addListener(ChannelFutureListener.CLOSE);
184+
ctx.channel().eventLoop().execute(() -> {
185+
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
186+
if (!f.isSuccess()) {
187+
HTTP_LOGGER.warn("send response to [{}] with short-lived connection fail, will close this channel",
188+
RemotingHelper.parseChannelRemoteAddr(f.channel()));
189+
}
190+
}).addListener(ChannelFutureListener.CLOSE);
191+
});
188192
}
189193

190194
private HttpEventWrapper parseHttpRequest(HttpRequest httpRequest) throws IOException {

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcp2Client.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,15 @@ public static void goodBye2Client(ChannelHandlerContext ctx, String errMsg, Clie
9595
Package pkg = new Package(new Header(SERVER_GOODBYE_REQUEST, OPStatus.FAIL.getCode(), errMsg, null));
9696
eventMeshTcpMetricsManager.eventMesh2clientMsgNumIncrement(IPUtils.parseChannelRemoteAddr(ctx.channel()));
9797
log.info("goodBye2Client client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
98-
ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
99-
Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
100-
try {
101-
mapping.closeSession(ctx);
102-
} catch (Exception e) {
103-
log.warn("close session failed!", e);
104-
}
98+
ctx.channel().eventLoop().execute(() -> {
99+
ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
100+
Utils.logSucceedMessageFlow(pkg, null, startTime, startTime);
101+
try {
102+
mapping.closeSession(ctx);
103+
} catch (Exception e) {
104+
log.warn("close session failed!", e);
105+
}
106+
});
105107
});
106108
}
107109

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/HelloProcessor.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,18 @@ public void process(final Package pkg, final ChannelHandlerContext ctx, long sta
9494
MESSAGE_LOGGER.error("HelloTask failed|address={}", ctx.channel().remoteAddress(), e);
9595
res.setHeader(new Header(HELLO_RESPONSE, OPStatus.FAIL.getCode(), Arrays.toString(e.getStackTrace()), pkg
9696
.getHeader().getSeq()));
97-
ctx.writeAndFlush(res).addListener(
98-
(ChannelFutureListener) future -> {
99-
if (!future.isSuccess()) {
100-
Utils.logFailedMessageFlow(future, res, user, startTime, taskExecuteTime);
101-
} else {
102-
Utils.logSucceedMessageFlow(res, user, startTime, taskExecuteTime);
103-
}
104-
log.warn("HelloTask failed,close session,addr:{}", ctx.channel().remoteAddress());
105-
eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx);
106-
});
97+
ctx.channel().eventLoop().execute(() -> {
98+
ctx.writeAndFlush(res).addListener(
99+
(ChannelFutureListener) future -> {
100+
if (!future.isSuccess()) {
101+
Utils.logFailedMessageFlow(future, res, user, startTime, taskExecuteTime);
102+
} else {
103+
Utils.logSucceedMessageFlow(res, user, startTime, taskExecuteTime);
104+
}
105+
log.warn("HelloTask failed,close session,addr:{}", ctx.channel().remoteAddress());
106+
eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx);
107+
});
108+
});
107109
}
108110
}
109111

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,10 @@ public void process(final Package pkg, final ChannelHandlerContext ctx, long sta
143143
.tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
144144

145145
msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", pkg.getHeader().getSeq()));
146-
ctx.writeAndFlush(msg).addListener(
147-
(ChannelFutureListener) future -> Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime));
146+
ctx.channel().eventLoop().execute(() -> {
147+
ctx.writeAndFlush(msg).addListener(
148+
(ChannelFutureListener) future -> Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime));
149+
});
148150

149151
TraceUtils.finishSpanWithException(ctx, event, "Tps overload, global flow control", null);
150152

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java

+32-30
Original file line numberDiff line numberDiff line change
@@ -120,40 +120,42 @@ public void push(final DownStreamMsgContext downStreamMsgContext) {
120120
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_CLIENT_SPAN, false);
121121

122122
try {
123-
session.getContext().writeAndFlush(pkg).addListener(
124-
(ChannelFutureListener) future -> {
125-
if (!future.isSuccess()) {
126-
log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", downStreamMsgContext.seq,
127-
downStreamMsgContext.retryTimes, downStreamMsgContext.event);
128-
deliverFailMsgsCount.incrementAndGet();
129-
130-
// how long to isolate client when push fail
131-
long isolateTime = System.currentTimeMillis()
132-
+ session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
133-
session.setIsolateTime(isolateTime);
134-
log.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);
135-
136-
// retry
137-
long delayTime = SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType()
138-
? session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
139-
: session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
140-
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
141-
.newTimeout(downStreamMsgContext, delayTime, TimeUnit.MILLISECONDS);
142-
} else {
143-
deliveredMsgsCount.incrementAndGet();
144-
log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
145-
downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event));
146-
147-
if (session.isIsolated()) {
148-
log.info("cancel isolated,client:{}", session.getClient());
149-
session.setIsolateTime(System.currentTimeMillis());
123+
Package finalPkg = pkg;
124+
session.getContext().channel().eventLoop().execute(() -> {
125+
session.getContext().writeAndFlush(finalPkg).addListener(
126+
(ChannelFutureListener) future -> {
127+
if (!future.isSuccess()) {
128+
log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", downStreamMsgContext.seq,
129+
downStreamMsgContext.retryTimes, downStreamMsgContext.event);
130+
deliverFailMsgsCount.incrementAndGet();
131+
132+
// how long to isolate client when push fail
133+
long isolateTime = System.currentTimeMillis()
134+
+ session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
135+
session.setIsolateTime(isolateTime);
136+
log.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);
137+
138+
// retry
139+
long delayTime = SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType()
140+
? session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
141+
: session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
142+
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
143+
.newTimeout(downStreamMsgContext, delayTime, TimeUnit.MILLISECONDS);
144+
} else {
145+
deliveredMsgsCount.incrementAndGet();
146+
log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
147+
downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event));
148+
149+
if (session.isIsolated()) {
150+
log.info("cancel isolated,client:{}", session.getClient());
151+
session.setIsolateTime(System.currentTimeMillis());
152+
}
150153
}
151-
}
152-
});
154+
});
155+
});
153156
} finally {
154157
TraceUtils.finishSpan(span, downStreamMsgContext.event);
155158
}
156-
157159
}
158160
}
159161

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public static void writeAndFlush(final Package pkg, long startTime, long taskExe
6464
new Exception("the session has been closed"));
6565
return;
6666
}
67-
ctx.writeAndFlush(pkg).addListener(
68-
(ChannelFutureListener) future -> {
67+
ctx.channel().eventLoop().execute(() -> {
68+
ctx.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
6969
if (!future.isSuccess()) {
7070
logFailedMessageFlow(future, pkg, user, startTime, taskExecuteTime);
7171
} else {
@@ -77,6 +77,7 @@ public static void writeAndFlush(final Package pkg, long startTime, long taskExe
7777
}
7878
}
7979
});
80+
});
8081
} catch (Exception e) {
8182
log.error("exception while sending message to client", e);
8283
}

0 commit comments

Comments
 (0)