Skip to content

Commit 6457b99

Browse files
fix crc & slow request detector
1 parent b85de0f commit 6457b99

File tree

15 files changed

+211
-89
lines changed

15 files changed

+211
-89
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ public class ConfigOptions {
463463
public static final ConfigOption<Duration> RPC_SERVER_SLOW_REQUEST_THRESHOLD =
464464
key("rpc.server.slow-request.threshold")
465465
.durationType()
466-
.defaultValue(Duration.ofSeconds(30))
466+
.defaultValue(Duration.ofSeconds(60))
467467
.withDescription(
468468
"The threshold for identifying slow RPC requests on the server side. "
469469
+ "When a request takes longer than this threshold to process, "

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -333,29 +333,18 @@ private CompletableFuture<ApiMessage> doSend(
333333

334334
int currentRequestId = requestCount++;
335335

336-
// Create timeout timer task
337-
RequestTimeoutTask timeoutTask =
338-
new RequestTimeoutTask(currentRequestId, DEFAULT_REQUEST_TIMEOUT_MS);
339-
336+
// Create InflightRequest without timeout task initially
337+
// Timeout will be started only after the request is successfully sent to network
340338
InflightRequest inflight =
341339
new InflightRequest(
342-
apiKey.id,
343-
version,
344-
currentRequestId,
345-
rawRequest,
346-
responseFuture,
347-
timeoutTask);
340+
apiKey.id, version, currentRequestId, rawRequest, responseFuture, null);
348341
inflightRequests.put(inflight.requestId, inflight);
349342
ByteBuf byteBuf;
350343
try {
351344
byteBuf = inflight.toByteBuf(channel.alloc());
352345
} catch (Exception e) {
353346
LOG.error("Failed to encode request for '{}'.", ApiKeys.forId(inflight.apiKey), e);
354347
inflightRequests.remove(inflight.requestId);
355-
// Cancel timeout task
356-
if (inflight.timeoutTask != null) {
357-
inflight.timeoutTask.cancel();
358-
}
359348
responseFuture.completeExceptionally(
360349
new FlussRuntimeException(
361350
String.format(
@@ -367,19 +356,27 @@ private CompletableFuture<ApiMessage> doSend(
367356

368357
connectionMetrics.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize());
369358

370-
// Add timeout task to timer
371-
timer.add(timeoutTask);
359+
// Send request to network
372360
channel.writeAndFlush(byteBuf)
373361
.addListener(
374362
(ChannelFutureListener)
375363
future -> {
376-
if (!future.isSuccess()) {
364+
if (future.isSuccess()) {
365+
// Start timeout timer only after successfully sent to
366+
// network
367+
// This ensures timeout only measures network round-trip
368+
// + server processing time,
369+
// excluding client-side queuing time (e.g., semaphore
370+
// waiting)
371+
RequestTimeoutTask timeoutTask =
372+
new RequestTimeoutTask(
373+
currentRequestId,
374+
DEFAULT_REQUEST_TIMEOUT_MS);
375+
inflight.timeoutTask = timeoutTask;
376+
timer.add(timeoutTask);
377+
} else {
377378
connectionMetrics.updateMetricsAfterGetResponse(
378379
apiKey, inflight.requestStartTime, 0);
379-
// Cancel timeout task
380-
if (inflight.timeoutTask != null) {
381-
inflight.timeoutTask.cancel();
382-
}
383380
Throwable cause = future.cause();
384381
if (cause instanceof IOException) {
385382
// when server close the channel, the cause will be
@@ -569,7 +566,7 @@ private static class InflightRequest {
569566
final ApiMessage request;
570567
final long requestStartTime;
571568
final CompletableFuture<ApiMessage> responseFuture;
572-
final TimerTask timeoutTask;
569+
volatile TimerTask timeoutTask;
573570

574571
InflightRequest(
575572
short apiKey,

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussProtocolPlugin.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ public List<String> listenerNames() {
7171
@Override
7272
public ChannelHandler createChannelHandler(
7373
RequestChannel[] requestChannels, String listenerName) {
74+
boolean slowRequestMonitoringEnabled =
75+
conf.get(ConfigOptions.RPC_SERVER_SLOW_REQUEST_MONITORING_ENABLED);
76+
long slowRequestThresholdMs =
77+
conf.get(ConfigOptions.RPC_SERVER_SLOW_REQUEST_THRESHOLD).toMillis();
78+
boolean dumpStack = conf.get(ConfigOptions.RPC_SERVER_SLOW_REQUEST_DUMP_STACK);
79+
7480
return new ServerChannelInitializer(
7581
requestChannels,
7682
apiManager,
@@ -81,7 +87,11 @@ public ChannelHandler createChannelHandler(
8187
Optional.ofNullable(
8288
AuthenticationFactory.loadServerAuthenticatorSuppliers(conf)
8389
.get(listenerName))
84-
.orElse(PlainTextAuthenticationPlugin.PlainTextServerAuthenticator::new));
90+
.orElse(PlainTextAuthenticationPlugin.PlainTextServerAuthenticator::new),
91+
slowRequestMonitoringEnabled,
92+
slowRequestThresholdMs,
93+
dumpStack,
94+
timer);
8595
}
8696

8797
@Override

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import org.apache.fluss.security.acl.FlussPrincipal;
2424
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
2525
import org.apache.fluss.shaded.netty4.io.netty.util.ReferenceCountUtil;
26+
import org.apache.fluss.timer.TimerTask;
2627

2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

31+
import javax.annotation.Nullable;
32+
3033
import java.net.InetAddress;
3134
import java.util.concurrent.CompletableFuture;
3235

@@ -54,6 +57,9 @@ public final class FlussRequest implements RpcRequest {
5457
private volatile long requestCompletedTimeMs;
5558
private volatile boolean cancelled = false;
5659

60+
// slow request detector, started when request is received
61+
@Nullable private volatile TimerTask slowRequestDetector;
62+
5763
public FlussRequest(
5864
short apiKey,
5965
short apiVersion,
@@ -167,6 +173,15 @@ public boolean isInternal() {
167173
return isInternal;
168174
}
169175

176+
public void setSlowRequestDetector(TimerTask slowRequestDetector) {
177+
this.slowRequestDetector = slowRequestDetector;
178+
}
179+
180+
@Nullable
181+
public TimerTask getSlowRequestDetector() {
182+
return slowRequestDetector;
183+
}
184+
170185
@Override
171186
public String toString() {
172187
return "FlussRequest{"

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ public class FlussRequestHandler implements RequestHandler<FlussRequest> {
3939
private static final Logger LOG = LoggerFactory.getLogger(FlussRequestHandler.class);
4040

4141
private final RpcGatewayService service;
42-
private final boolean slowRequestMonitoringEnabled;
43-
private final long slowRequestThresholdMs;
44-
private final boolean dumpStack;
45-
@Nullable private final Timer timer;
4642

4743
public FlussRequestHandler(
4844
RpcGatewayService service,
@@ -51,10 +47,8 @@ public FlussRequestHandler(
5147
boolean dumpStack,
5248
@Nullable Timer timer) {
5349
this.service = service;
54-
this.slowRequestMonitoringEnabled = slowRequestMonitoringEnabled;
55-
this.slowRequestThresholdMs = slowRequestThresholdMs;
56-
this.dumpStack = dumpStack;
57-
this.timer = timer;
50+
// These parameters are kept for backward compatibility but not used anymore
51+
// The slow request monitoring is now done in NettyServerHandler
5852
}
5953

6054
@Override
@@ -68,21 +62,14 @@ public void processRequest(FlussRequest request) {
6862
ApiMethod api = request.getApiMethod();
6963
ApiMessage message = request.getMessage();
7064

71-
// Schedule slow request detection if enabled
72-
TimerTask slowRequestDetector = null;
73-
if (slowRequestMonitoringEnabled && timer != null && slowRequestThresholdMs > 0) {
74-
slowRequestDetector =
75-
new SlowRequestDetector(
76-
request,
77-
Thread.currentThread(),
78-
slowRequestThresholdMs,
79-
dumpStack,
80-
slowRequestThresholdMs);
81-
timer.add(slowRequestDetector);
65+
// Update the processing thread in the existing slow request detector
66+
// The detector was already started when the request was received (in NettyServerHandler)
67+
// Now we update it with the actual processing thread
68+
TimerTask detectorTask = request.getSlowRequestDetector();
69+
if (detectorTask instanceof SlowRequestDetector) {
70+
((SlowRequestDetector) detectorTask).setProcessingThread(Thread.currentThread());
8271
}
8372

84-
final TimerTask detectorTask = slowRequestDetector;
85-
8673
try {
8774
service.setCurrentSession(
8875
new Session(

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,16 @@
3939
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
4040
import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleState;
4141
import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
42+
import org.apache.fluss.timer.Timer;
43+
import org.apache.fluss.timer.TimerTask;
4244
import org.apache.fluss.utils.ExceptionUtils;
4345
import org.apache.fluss.utils.IOUtils;
4446

4547
import org.slf4j.Logger;
4648
import org.slf4j.LoggerFactory;
4749

50+
import javax.annotation.Nullable;
51+
4852
import java.net.InetSocketAddress;
4953
import java.net.SocketAddress;
5054
import java.util.ArrayDeque;
@@ -71,6 +75,12 @@ public final class NettyServerHandler extends ChannelInboundHandlerAdapter {
7175

7276
private final ServerAuthenticator authenticator;
7377

78+
// Slow request monitoring configuration
79+
private final boolean slowRequestMonitoringEnabled;
80+
private final long slowRequestThresholdMs;
81+
private final boolean dumpStack;
82+
@Nullable private final Timer timer;
83+
7484
private volatile ConnectionState state;
7585
private volatile boolean initialized = false;
7686

@@ -80,13 +90,21 @@ public NettyServerHandler(
8090
String listenerName,
8191
boolean isInternal,
8292
RequestsMetrics requestsMetrics,
83-
ServerAuthenticator authenticator) {
93+
ServerAuthenticator authenticator,
94+
boolean slowRequestMonitoringEnabled,
95+
long slowRequestThresholdMs,
96+
boolean dumpStack,
97+
@Nullable Timer timer) {
8498
this.requestChannel = requestChannel;
8599
this.apiManager = apiManager;
86100
this.listenerName = listenerName;
87101
this.isInternal = isInternal;
88102
this.requestsMetrics = requestsMetrics;
89103
this.authenticator = authenticator;
104+
this.slowRequestMonitoringEnabled = slowRequestMonitoringEnabled;
105+
this.slowRequestThresholdMs = slowRequestThresholdMs;
106+
this.dumpStack = dumpStack;
107+
this.timer = timer;
90108
this.state = ConnectionState.START;
91109
}
92110

@@ -133,6 +151,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
133151
((InetSocketAddress) ctx.channel().remoteAddress()).getAddress(),
134152
future);
135153

154+
// Schedule slow request detection immediately when request is received
155+
// This ensures we monitor the total server-side time including queue waiting
156+
if (slowRequestMonitoringEnabled && timer != null && slowRequestThresholdMs > 0) {
157+
TimerTask slowRequestDetector =
158+
new SlowRequestDetector(
159+
request,
160+
null,
161+
slowRequestThresholdMs,
162+
dumpStack,
163+
slowRequestThresholdMs);
164+
request.setSlowRequestDetector(slowRequestDetector);
165+
timer.add(slowRequestDetector);
166+
}
167+
136168
future.whenCompleteAsync((r, t) -> sendResponse(ctx, request), ctx.executor());
137169
if (apiKey == ApiKeys.AUTHENTICATE.id
138170
|| (state.isAuthenticating() && apiKey != ApiKeys.API_VERSIONS.id)) {

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/ServerChannelInitializer.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
2424
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
2525
import org.apache.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;
26+
import org.apache.fluss.timer.Timer;
2627
import org.apache.fluss.utils.MathUtils;
2728

2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031

32+
import javax.annotation.Nullable;
33+
3134
import java.util.function.Supplier;
3235

3336
/**
@@ -43,6 +46,10 @@ final class ServerChannelInitializer extends NettyChannelInitializer {
4346
private final boolean isInternal;
4447
private final RequestsMetrics requestsMetrics;
4548
private final Supplier<ServerAuthenticator> authenticatorSupplier;
49+
private final boolean slowRequestMonitoringEnabled;
50+
private final long slowRequestThresholdMs;
51+
private final boolean dumpStack;
52+
@Nullable private final Timer timer;
4653

4754
public ServerChannelInitializer(
4855
RequestChannel[] requestChannels,
@@ -51,14 +58,22 @@ public ServerChannelInitializer(
5158
boolean isInternal,
5259
RequestsMetrics requestsMetrics,
5360
long maxIdleTimeSeconds,
54-
Supplier<ServerAuthenticator> authenticatorSupplier) {
61+
Supplier<ServerAuthenticator> authenticatorSupplier,
62+
boolean slowRequestMonitoringEnabled,
63+
long slowRequestThresholdMs,
64+
boolean dumpStack,
65+
@Nullable Timer timer) {
5566
super(maxIdleTimeSeconds);
5667
this.requestChannels = requestChannels;
5768
this.apiManager = apiManager;
5869
this.endpointListenerName = endpointListenerName;
5970
this.isInternal = isInternal;
6071
this.requestsMetrics = requestsMetrics;
6172
this.authenticatorSupplier = authenticatorSupplier;
73+
this.slowRequestMonitoringEnabled = slowRequestMonitoringEnabled;
74+
this.slowRequestThresholdMs = slowRequestThresholdMs;
75+
this.dumpStack = dumpStack;
76+
this.timer = timer;
6277
}
6378

6479
@Override
@@ -84,7 +99,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
8499
endpointListenerName,
85100
isInternal,
86101
requestsMetrics,
87-
serverAuthenticator));
102+
serverAuthenticator,
103+
slowRequestMonitoringEnabled,
104+
slowRequestThresholdMs,
105+
dumpStack,
106+
timer));
88107
}
89108

90109
@Override

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/SlowRequestDetector.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

26+
import javax.annotation.Nullable;
27+
2628
import java.util.concurrent.atomic.AtomicBoolean;
2729

2830
/**
@@ -44,7 +46,7 @@ class SlowRequestDetector extends TimerTask {
4446
private static final Logger LOG = LoggerFactory.getLogger(SlowRequestDetector.class);
4547

4648
private final FlussRequest request;
47-
private final Thread processingThread;
49+
private volatile Thread processingThread;
4850
private final boolean dumpStack;
4951
private final long thresholdMs;
5052
private final AtomicBoolean executed = new AtomicBoolean(false);
@@ -53,14 +55,15 @@ class SlowRequestDetector extends TimerTask {
5355
* Creates a slow request detector.
5456
*
5557
* @param request the request to monitor
56-
* @param processingThread the thread processing this request
58+
* @param processingThread the thread processing this request (can be null initially and set
59+
* later)
5760
* @param thresholdMs the threshold in milliseconds after which the request is considered slow
5861
* @param dumpStack whether to dump the thread stack trace
5962
* @param delayMs the delay in milliseconds before checking if the request is slow
6063
*/
6164
SlowRequestDetector(
6265
FlussRequest request,
63-
Thread processingThread,
66+
@Nullable Thread processingThread,
6467
long thresholdMs,
6568
boolean dumpStack,
6669
long delayMs) {
@@ -71,6 +74,17 @@ class SlowRequestDetector extends TimerTask {
7174
this.dumpStack = dumpStack;
7275
}
7376

77+
/**
78+
* Sets the processing thread. This allows the detector to be created when the request is
79+
* received (to monitor queue time) and updated with the actual processing thread when the
80+
* request starts being processed.
81+
*
82+
* @param processingThread the thread that is processing the request
83+
*/
84+
void setProcessingThread(Thread processingThread) {
85+
this.processingThread = processingThread;
86+
}
87+
7488
@Override
7589
public void run() {
7690
// Ensure this task only executes once even if there are race conditions

0 commit comments

Comments
 (0)