Skip to content

Commit 4ce849f

Browse files
[rpc] Prevent EventLoop blocking and implement TCP-level backpressure in RequestChannel (#2065)
1 parent c5c9e58 commit 4ce849f

File tree

3 files changed

+793
-13
lines changed

3 files changed

+793
-13
lines changed

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,21 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
170170
authenticator.isCompleted()
171171
? ConnectionState.READY
172172
: ConnectionState.AUTHENTICATING);
173+
174+
// Register this channel with its RequestChannel. The RequestChannel will manage this
175+
// channel's lifecycle and backpressure state.
176+
requestChannel.registerChannel(ctx.channel());
177+
173178
// TODO: connection metrics (count, client tags, receive request avg idle time, etc.)
174179
}
175180

176181
@Override
177182
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
178183
super.channelInactive(ctx);
184+
185+
// Unregister this channel from its RequestChannel. The RequestChannel will clean up both
186+
// the association and any paused state.
187+
requestChannel.unregisterChannel(ctx.channel());
179188
}
180189

181190
@Override

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java

Lines changed: 247 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,59 +17,293 @@
1717

1818
package org.apache.fluss.rpc.netty.server;
1919

20+
import org.apache.fluss.shaded.netty4.io.netty.channel.Channel;
21+
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

2325
import javax.annotation.concurrent.ThreadSafe;
2426

25-
import java.util.concurrent.ArrayBlockingQueue;
27+
import java.util.Set;
2628
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.LinkedBlockingQueue;
2731
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.locks.ReentrantLock;
2833

29-
/** A blocking queue channel that can receive requests and send responses. */
34+
/**
35+
* A queue channel that can receive requests and send responses.
36+
*
37+
* <p>Uses an unbounded LinkedBlockingQueue to ensure that putRequest() never blocks, preventing
38+
* EventLoop threads from being blocked. Backpressure is applied at the TCP level by pausing channel
39+
* reads when the queue size exceeds the backpressure threshold.
40+
*
41+
* <p>Each RequestChannel instance manages its own associated Netty channels (those hashed to this
42+
* RequestChannel) and independently controls their backpressure state. This design encapsulates all
43+
* backpressure logic within the RequestChannel, eliminating the need for global state management.
44+
*/
3045
@ThreadSafe
3146
public class RequestChannel {
3247
private static final Logger LOG = LoggerFactory.getLogger(RequestChannel.class);
3348

49+
/** Unbounded blocking queue to hold incoming requests. Never blocks on put. */
3450
protected final BlockingQueue<RpcRequest> requestQueue;
3551

36-
public RequestChannel(int queueCapacity) {
37-
this.requestQueue = new ArrayBlockingQueue<>(queueCapacity);
52+
/**
53+
* The threshold at which backpressure should be applied (pausing channel reads). When queue
54+
* size exceeds this value, channels should be paused to prevent memory exhaustion.
55+
*/
56+
private final int backpressureThreshold;
57+
58+
/**
59+
* The threshold at which to resume paused channels. Set to 50% of backpressureThreshold to
60+
* provide hysteresis and avoid thrashing.
61+
*/
62+
private final int resumeThreshold;
63+
64+
/**
65+
* All Netty channels that are hashed to this RequestChannel. Channels are registered when they
66+
* become active and unregistered when they become inactive.
67+
*
68+
* <p>When backpressure is applied, ALL channels in this set are paused simultaneously. When
69+
* backpressure is released, ALL channels are resumed simultaneously.
70+
*/
71+
private final Set<Channel> associatedChannels = ConcurrentHashMap.newKeySet();
72+
73+
/**
74+
* Indicates whether backpressure is currently active. When true, all associated channels have
75+
* been paused (setAutoRead(false)). When false, all channels are running normally.
76+
*
77+
* <p>Volatile ensures visibility for fast-path reads (outside the lock). All modifications are
78+
* protected by backpressureLock, so atomicity is guaranteed by the lock, not by atomic
79+
* operations.
80+
*/
81+
private volatile boolean isBackpressureActive = false;
82+
83+
/**
84+
* Lock to protect backpressure state transitions and task submissions. This lock ensures that:
85+
* 1. State checks and task submissions are atomic (preventing permanent channel blocking) 2.
86+
* Pause and resume operations are mutually exclusive 3. New channel registration correctly
87+
* synchronizes with current backpressure state
88+
*
89+
* <p>The lock eliminates the need for CAS operations - simple boolean checks and assignments
90+
* under the lock are sufficient for correctness.
91+
*/
92+
private final ReentrantLock backpressureLock = new ReentrantLock();
93+
94+
public RequestChannel(int backpressureThreshold) {
95+
this.requestQueue = new LinkedBlockingQueue<>();
96+
this.backpressureThreshold = backpressureThreshold;
97+
this.resumeThreshold = backpressureThreshold / 2;
3898
}
3999

40100
/**
41-
* Send a request to be handled, potentially blocking until there is room in the queue for the
42-
* request.
101+
* Send a request to be handled. Since this uses an unbounded queue, this method never blocks,
102+
* ensuring EventLoop threads are never blocked by queue operations.
103+
*
104+
* <p>After adding the request, automatically checks if backpressure should be applied. If the
105+
* queue size exceeds the backpressure threshold, ALL channels associated with this
106+
* RequestChannel will be paused to prevent further memory growth.
107+
*
108+
* <p>OPTIMIZATION: Only check backpressure if not already active (avoid redundant checks).
43109
*/
44-
public void putRequest(RpcRequest request) throws Exception {
45-
requestQueue.put(request);
110+
public void putRequest(RpcRequest request) {
111+
requestQueue.add(request);
112+
113+
// CRITICAL OPTIMIZATION: Skip check if already in backpressure state.
114+
// This avoids lock contention on every putRequest() call when system is under pressure.
115+
// The volatile read is very cheap compared to lock acquisition.
116+
if (!isBackpressureActive) {
117+
pauseAllChannelsIfNeeded();
118+
}
46119
}
47120

48121
/**
49122
* Sends a shutdown request to the channel. This can allow request processor gracefully
50123
* shutdown.
51124
*/
52-
public void putShutdownRequest() throws Exception {
125+
public void putShutdownRequest() {
53126
putRequest(ShutdownRequest.INSTANCE);
54127
}
55128

56129
/**
57-
* Get the next request or block until specified time has elapsed.
130+
* Get the next request, waiting up to the specified timeout if the queue is empty. After
131+
* successfully polling a request, attempts to resume paused channels if the queue size has
132+
* dropped below the resume threshold.
58133
*
59134
* @return the head of this queue, or null if the specified waiting time elapses before an
60135
* element is available.
61136
*/
62137
public RpcRequest pollRequest(long timeoutMs) {
63138
try {
64-
return requestQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
139+
RpcRequest request = requestQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
140+
if (isBackpressureActive) {
141+
tryResumeChannels();
142+
}
143+
return request;
65144
} catch (InterruptedException e) {
66-
LOG.warn("Interrupted while polling requests from channel queue.", e);
145+
Thread.currentThread().interrupt();
67146
return null;
68147
}
69148
}
70149

71150
/** Get the number of requests in the queue. */
72-
int requestsCount() {
151+
public int requestsCount() {
73152
return requestQueue.size();
74153
}
154+
155+
/**
156+
* Registers a Netty channel as being associated with this RequestChannel. This is called when a
157+
* channel becomes active and is hashed to this RequestChannel.
158+
*
159+
* <p>IMPORTANT: New channels are NOT immediately paused even if backpressure is active. This is
160+
* critical for system health: 1. Health check connections must not be blocked at startup 2. New
161+
* connections will naturally be controlled by the next backpressure check 3. Immediately
162+
* pausing new connections can cause deadlock at startup when queue is full but processing is
163+
* slow
164+
*
165+
* @param channel the channel to register
166+
*/
167+
public void registerChannel(Channel channel) {
168+
associatedChannels.add(channel);
169+
170+
if (LOG.isDebugEnabled()) {
171+
LOG.debug(
172+
"Registered channel {} to RequestChannel (backpressure threshold: {}, associated channels: {}, backpressure active: {})",
173+
channel.remoteAddress(),
174+
backpressureThreshold,
175+
associatedChannels.size(),
176+
isBackpressureActive);
177+
}
178+
}
179+
180+
/**
181+
* Unregisters a Netty channel from this RequestChannel. This is called when a channel becomes
182+
* inactive.
183+
*
184+
* @param channel the channel to unregister
185+
*/
186+
public void unregisterChannel(Channel channel) {
187+
associatedChannels.remove(channel);
188+
if (LOG.isDebugEnabled()) {
189+
LOG.debug(
190+
"Unregistered channel {} from RequestChannel (associated channels: {}, backpressure active: {})",
191+
channel.remoteAddress(),
192+
associatedChannels.size(),
193+
isBackpressureActive);
194+
}
195+
}
196+
197+
/**
198+
* Check if the queue size has exceeded the backpressure threshold. When true, channel reads
199+
* should be paused to prevent memory exhaustion.
200+
*/
201+
private boolean shouldApplyBackpressure() {
202+
return requestQueue.size() >= backpressureThreshold;
203+
}
204+
205+
/**
206+
* Check if the queue size has dropped below the resume threshold. When true, paused channels
207+
* can be resumed to accept new requests.
208+
*/
209+
private boolean shouldResumeChannels() {
210+
return requestQueue.size() <= resumeThreshold;
211+
}
212+
213+
/**
214+
* Pauses ALL channels associated with this RequestChannel if the queue size exceeds the
215+
* backpressure threshold. This ensures that when the queue is full, all channels stop sending
216+
* requests to prevent memory exhaustion.
217+
*
218+
* <p>Uses a lock to protect the entire operation (state check + state change + task submission)
219+
* as an atomic unit. This prevents race conditions with resume operations and channel
220+
* registrations.
221+
*
222+
* <p>TODO: In the future, consider pausing only a subset of channels instead of all channels to
223+
* reduce the impact on upstream traffic. A selective pause strategy could minimize disruption
224+
* to the overall system while still providing effective backpressure control.
225+
*/
226+
private void pauseAllChannelsIfNeeded() {
227+
if (!shouldApplyBackpressure()) {
228+
return;
229+
}
230+
231+
// Lock protects: state check + state change + task submission as atomic operation
232+
backpressureLock.lock();
233+
try {
234+
// Check if already in backpressure state
235+
if (isBackpressureActive) {
236+
return; // Already paused, nothing to do
237+
}
238+
239+
// Activate backpressure and pause all channels
240+
isBackpressureActive = true;
241+
242+
for (Channel channel : associatedChannels) {
243+
if (channel.isActive()) {
244+
// Submit to the channel's EventLoop to ensure thread safety
245+
channel.eventLoop()
246+
.execute(
247+
() -> {
248+
if (channel.isActive() && channel.config().isAutoRead()) {
249+
channel.config().setAutoRead(false);
250+
LOG.warn(
251+
"Queue size ({}) exceeded backpressure threshold ({}), paused channel: {}",
252+
requestsCount(),
253+
backpressureThreshold,
254+
channel.remoteAddress());
255+
}
256+
});
257+
}
258+
}
259+
} finally {
260+
backpressureLock.unlock();
261+
}
262+
}
263+
264+
/**
265+
* Attempts to resume all associated channels if the queue size has dropped below the resume
266+
* threshold. This method is called automatically after a request is dequeued.
267+
*
268+
* <p>Uses a lock to protect the entire operation (state check + state change + task submission)
269+
* as an atomic unit. This prevents race conditions with pause operations and channel
270+
* registrations.
271+
*/
272+
private void tryResumeChannels() {
273+
if (!shouldResumeChannels()) {
274+
return;
275+
}
276+
277+
// Lock protects: state check + state change + task submission as atomic operation
278+
backpressureLock.lock();
279+
try {
280+
// Check if backpressure is not active
281+
if (!isBackpressureActive) {
282+
return; // Already resumed, nothing to do
283+
}
284+
285+
// Deactivate backpressure and resume all channels
286+
isBackpressureActive = false;
287+
288+
for (Channel channel : associatedChannels) {
289+
if (channel.isActive()) {
290+
// Submit resume task to the channel's EventLoop to ensure thread safety
291+
channel.eventLoop()
292+
.execute(
293+
() -> {
294+
if (channel.isActive() && !channel.config().isAutoRead()) {
295+
channel.config().setAutoRead(true);
296+
LOG.info(
297+
"Queue size ({}) dropped below resume threshold ({}), resumed channel: {}",
298+
requestsCount(),
299+
resumeThreshold,
300+
channel.remoteAddress());
301+
}
302+
});
303+
}
304+
}
305+
} finally {
306+
backpressureLock.unlock();
307+
}
308+
}
75309
}

0 commit comments

Comments
 (0)