Skip to content

Commit 425226a

Browse files
authored
[network] Make sure the response from server return in order (#681)
1 parent 0ee04af commit 425226a

12 files changed

Lines changed: 594 additions & 130 deletions

File tree

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/FlussRequest.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import com.alibaba.fluss.rpc.protocol.ApiMethod;
2121
import com.alibaba.fluss.rpc.protocol.RequestType;
2222
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
23-
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
23+
24+
import java.util.concurrent.CompletableFuture;
2425

2526
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
2627

@@ -34,10 +35,13 @@ public final class FlussRequest implements RpcRequest {
3435
private final ApiMessage message;
3536
private final ByteBuf buffer;
3637
private final String listenerName;
37-
private final ChannelHandlerContext channelContext;
38+
private final CompletableFuture<ApiMessage> responseFuture;
3839

3940
// the time when the request is received by server
4041
private final long startTimeMs;
42+
private volatile long requestDequeTimeMs;
43+
private volatile long requestCompletedTimeMs;
44+
private volatile boolean cancelled = false;
4145

4246
public FlussRequest(
4347
short apiKey,
@@ -47,15 +51,15 @@ public FlussRequest(
4751
ApiMessage message,
4852
ByteBuf buffer,
4953
String listenerName,
50-
ChannelHandlerContext channelContext) {
54+
CompletableFuture<ApiMessage> responseFuture) {
5155
this.apiKey = apiKey;
5256
this.apiVersion = apiVersion;
5357
this.requestId = requestId;
5458
this.apiMethod = apiMethod;
5559
this.message = message;
5660
this.buffer = checkNotNull(buffer);
5761
this.listenerName = listenerName;
58-
this.channelContext = channelContext;
62+
this.responseFuture = responseFuture;
5963
this.startTimeMs = System.currentTimeMillis();
6064
}
6165

@@ -90,10 +94,6 @@ public void releaseBuffer() {
9094
}
9195
}
9296

93-
public ChannelHandlerContext getChannelContext() {
94-
return channelContext;
95-
}
96-
9797
public long getStartTimeMs() {
9898
return startTimeMs;
9999
}
@@ -102,6 +102,42 @@ public String getListenerName() {
102102
return listenerName;
103103
}
104104

105+
public CompletableFuture<ApiMessage> getResponseFuture() {
106+
return responseFuture;
107+
}
108+
109+
public void complete(ApiMessage response) {
110+
responseFuture.complete(response);
111+
}
112+
113+
public void fail(Throwable t) {
114+
responseFuture.completeExceptionally(t);
115+
}
116+
117+
public void setRequestDequeTimeMs(long requestDequeTimeMs) {
118+
this.requestDequeTimeMs = requestDequeTimeMs;
119+
}
120+
121+
public long getRequestDequeTimeMs() {
122+
return requestDequeTimeMs;
123+
}
124+
125+
public void setRequestCompletedTimeMs(long requestCompletedTimeMs) {
126+
this.requestCompletedTimeMs = requestCompletedTimeMs;
127+
}
128+
129+
public long getRequestCompletedTimeMs() {
130+
return requestCompletedTimeMs;
131+
}
132+
133+
public void cancel() {
134+
cancelled = true;
135+
}
136+
137+
public boolean cancelled() {
138+
return cancelled;
139+
}
140+
105141
@Override
106142
public String toString() {
107143
return "FlussRequest{"

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/FlussRequestHandler.java

Lines changed: 7 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,27 @@
1616

1717
package com.alibaba.fluss.rpc.netty.server;
1818

19-
import com.alibaba.fluss.record.send.Send;
2019
import com.alibaba.fluss.rpc.RpcGatewayService;
2120
import com.alibaba.fluss.rpc.messages.ApiMessage;
22-
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
23-
import com.alibaba.fluss.rpc.protocol.ApiError;
24-
import com.alibaba.fluss.rpc.protocol.ApiKeys;
2521
import com.alibaba.fluss.rpc.protocol.ApiMethod;
2622
import com.alibaba.fluss.rpc.protocol.RequestType;
27-
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
28-
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator;
29-
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
3023

3124
import org.slf4j.Logger;
3225
import org.slf4j.LoggerFactory;
3326

3427
import java.lang.reflect.InvocationTargetException;
35-
import java.util.Optional;
3628
import java.util.concurrent.CompletableFuture;
3729

38-
import static com.alibaba.fluss.rpc.protocol.MessageCodec.encodeErrorResponse;
39-
import static com.alibaba.fluss.rpc.protocol.MessageCodec.encodeSuccessResponse;
4030
import static com.alibaba.fluss.utils.ExceptionUtils.stripException;
4131

4232
/** A handler that processes and answers incoming {@link FlussRequest}. */
4333
public class FlussRequestHandler implements RequestHandler<FlussRequest> {
4434
private static final Logger LOG = LoggerFactory.getLogger(FlussRequestHandler.class);
4535

4636
private final RpcGatewayService service;
47-
private final RequestsMetrics requestsMetrics;
4837

49-
public FlussRequestHandler(RpcGatewayService service, RequestsMetrics requestsMetrics) {
38+
public FlussRequestHandler(RpcGatewayService service) {
5039
this.service = service;
51-
this.requestsMetrics = requestsMetrics;
5240
}
5341

5442
@Override
@@ -58,7 +46,7 @@ public RequestType requestType() {
5846

5947
@Override
6048
public void processRequest(FlussRequest request) {
61-
long requestDequeTimeMs = System.currentTimeMillis();
49+
request.setRequestDequeTimeMs(System.currentTimeMillis());
6250
ApiMethod api = request.getApiMethod();
6351
ApiMessage message = request.getMessage();
6452
try {
@@ -69,14 +57,14 @@ public void processRequest(FlussRequest request) {
6957
(CompletableFuture<?>) api.getMethod().invoke(service, message);
7058
responseFuture.whenComplete(
7159
(response, throwable) -> {
60+
request.setRequestCompletedTimeMs(System.currentTimeMillis());
7261
if (throwable != null) {
73-
sendError(request, throwable);
62+
request.fail(throwable);
7463
} else {
7564
if (response instanceof ApiMessage) {
76-
sendResponse(request, requestDequeTimeMs, (ApiMessage) response);
65+
request.complete((ApiMessage) response);
7766
} else {
78-
sendError(
79-
request,
67+
request.fail(
8068
new ClassCastException(
8169
"The response "
8270
+ response.getClass().getName()
@@ -86,72 +74,7 @@ public void processRequest(FlussRequest request) {
8674
});
8775
} catch (Throwable t) {
8876
LOG.debug("Error while executing RPC {}", api, t);
89-
sendError(request, stripException(t, InvocationTargetException.class));
77+
request.fail(stripException(t, InvocationTargetException.class));
9078
}
9179
}
92-
93-
private void sendResponse(
94-
FlussRequest request, long requestDequeTimeMs, ApiMessage responseMessage) {
95-
long requestCompletedTimeMs = System.currentTimeMillis();
96-
// TODO: use a memory managed allocator
97-
ChannelHandlerContext channelContext = request.getChannelContext();
98-
ByteBufAllocator alloc = channelContext.alloc();
99-
try {
100-
Send send = encodeSuccessResponse(alloc, request.getRequestId(), responseMessage);
101-
send.writeTo(channelContext);
102-
channelContext.flush();
103-
long requestEndTimeMs = System.currentTimeMillis();
104-
updateRequestMetrics(
105-
request, requestDequeTimeMs, requestCompletedTimeMs, requestEndTimeMs);
106-
} catch (Throwable t) {
107-
LOG.error("Failed to send response to client.", t);
108-
sendError(request, t);
109-
}
110-
}
111-
112-
private void updateRequestMetrics(
113-
FlussRequest request,
114-
long requestDequeTimeMs,
115-
long requestCompletedTimeMs,
116-
long requestEndTimeMs) {
117-
// get the metrics to be updated for this kind of request
118-
Optional<RequestsMetrics.Metrics> optMetrics = getMetrics(request);
119-
// no any metrics registered for the kind of request
120-
if (!optMetrics.isPresent()) {
121-
return;
122-
}
123-
124-
// now, we need to update metrics
125-
RequestsMetrics.Metrics metrics = optMetrics.get();
126-
127-
metrics.getRequestsCount().inc();
128-
metrics.getRequestBytes().update(request.getMessage().totalSize());
129-
130-
// update metrics related to time
131-
metrics.getRequestQueueTimeMs().update(requestDequeTimeMs - request.getStartTimeMs());
132-
metrics.getRequestProcessTimeMs().update(requestCompletedTimeMs - requestDequeTimeMs);
133-
metrics.getResponseSendTimeMs().update(requestEndTimeMs - requestCompletedTimeMs);
134-
metrics.getTotalTimeMs().update(requestEndTimeMs - request.getStartTimeMs());
135-
}
136-
137-
private void sendError(FlussRequest request, Throwable t) {
138-
ApiError error = ApiError.fromThrowable(t);
139-
// TODO: use a memory managed allocator
140-
ByteBufAllocator alloc = request.getChannelContext().alloc();
141-
ByteBuf byteBuf = encodeErrorResponse(alloc, request.getRequestId(), error);
142-
request.getChannelContext().writeAndFlush(byteBuf);
143-
144-
getMetrics(request).ifPresent(metrics -> metrics.getErrorsCount().inc());
145-
}
146-
147-
private Optional<RequestsMetrics.Metrics> getMetrics(FlussRequest request) {
148-
boolean isFromFollower = false;
149-
ApiMessage requestMessage = request.getMessage();
150-
if (request.getApiKey() == ApiKeys.FETCH_LOG.id) {
151-
// for fetch, we need to identify it's from client or follower
152-
FetchLogRequest fetchLogRequest = (FetchLogRequest) requestMessage;
153-
isFromFollower = fetchLogRequest.getFollowerServerId() >= 0;
154-
}
155-
return requestsMetrics.getMetrics(request.getApiKey(), isFromFollower);
156-
}
15780
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public final class NettyServer implements RpcServer {
7676
private final List<NetworkProtocolPlugin> protocols;
7777
private final List<Channel> bindChannels;
7878
private final List<Endpoint> bindEndpoints;
79+
private final RequestsMetrics requestsMetrics;
7980

8081
private EventLoopGroup acceptorGroup;
8182
private EventLoopGroup selectorGroup;
@@ -89,6 +90,7 @@ public NettyServer(
8990
MetricGroup serverMetricGroup,
9091
RequestsMetrics requestsMetrics) {
9192
this.conf = checkNotNull(conf, "conf");
93+
this.requestsMetrics = requestsMetrics;
9294
this.serverMetricGroup = checkNotNull(serverMetricGroup, "serverMetricGroup");
9395
this.apiManager = new ApiManager(service.providerType());
9496
this.endpoints = checkNotNull(endpoints, "endpoints");
@@ -171,10 +173,10 @@ private void startEndpoint(Endpoint endpoint, @Nullable NetworkProtocolPlugin pr
171173
protocolName = "FLUSS";
172174
channelHandler =
173175
new ServerChannelInitializer(
174-
new NettyServerHandler(
175-
workerPool.getRequestChannels(),
176-
apiManager,
177-
endpoint.getListenerName()),
176+
workerPool.getRequestChannels(),
177+
apiManager,
178+
endpoint.getListenerName(),
179+
requestsMetrics,
178180
conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds());
179181
} else {
180182
// plugin protocol

0 commit comments

Comments
 (0)