Skip to content

Commit 4e48829

Browse files
rpc backpressure
1 parent 6457b99 commit 4e48829

File tree

2 files changed

+259
-12
lines changed

2 files changed

+259
-12
lines changed

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,21 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
202202
authenticator.isCompleted()
203203
? ConnectionState.READY
204204
: ConnectionState.AUTHENTICATING);
205+
206+
// Register this channel with its RequestChannel. The RequestChannel will manage this
207+
// channel's lifecycle and backpressure state.
208+
requestChannel.registerChannel(ctx.channel());
209+
205210
// TODO: connection metrics (count, client tags, receive request avg idle time, etc.)
206211
}
207212

208213
@Override
209214
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
210215
super.channelInactive(ctx);
216+
217+
// Unregister this channel from its RequestChannel. The RequestChannel will clean up both
218+
// the association and any paused state.
219+
requestChannel.unregisterChannel(ctx.channel());
211220
}
212221

213222
@Override

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java

Lines changed: 250 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,59 +17,297 @@
1717

1818
package org.apache.fluss.rpc.netty.server;
1919

20+
import org.apache.fluss.shaded.netty4.io.netty.channel.Channel;
21+
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

2325
import javax.annotation.concurrent.ThreadSafe;
2426

25-
import java.util.concurrent.ArrayBlockingQueue;
27+
import java.util.Set;
2628
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.LinkedBlockingQueue;
2731
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.locks.ReentrantLock;
2833

29-
/** A blocking queue channel that can receive requests and send responses. */
34+
/**
35+
* A blocking queue channel that can receive requests and send responses.
36+
*
37+
* <p>Uses an unbounded queue to ensure that putRequest() never blocks, preventing EventLoop threads
38+
* from being blocked. Backpressure is applied at the TCP level by pausing channel reads when the
39+
* queue size exceeds the backpressure threshold.
40+
*
41+
* <p>Each RequestChannel instance manages its own associated Netty channels (those hashed to this
42+
* RequestChannel) and independently controls their backpressure state. This design encapsulates all
43+
* backpressure logic within the RequestChannel, eliminating the need for global state management.
44+
*/
3045
@ThreadSafe
3146
public class RequestChannel {
3247
private static final Logger LOG = LoggerFactory.getLogger(RequestChannel.class);
3348

49+
/** Unbounded queue to hold incoming requests. Never blocks on put. */
3450
protected final BlockingQueue<RpcRequest> requestQueue;
3551

36-
public RequestChannel(int queueCapacity) {
37-
this.requestQueue = new ArrayBlockingQueue<>(queueCapacity);
52+
/**
53+
* The threshold at which backpressure should be applied (pausing channel reads). When queue
54+
* size exceeds this value, channels should be paused to prevent memory exhaustion.
55+
*/
56+
private final int backpressureThreshold;
57+
58+
/**
59+
* The threshold at which to resume paused channels. Set to 50% of backpressureThreshold to
60+
* provide hysteresis and avoid thrashing.
61+
*/
62+
private final int resumeThreshold;
63+
64+
/**
65+
* All Netty channels that are hashed to this RequestChannel. Channels are registered when they
66+
* become active and unregistered when they become inactive.
67+
*
68+
* <p>When backpressure is applied, ALL channels in this set are paused simultaneously. When
69+
* backpressure is released, ALL channels are resumed simultaneously.
70+
*/
71+
private final Set<Channel> associatedChannels = ConcurrentHashMap.newKeySet();
72+
73+
/**
74+
* Indicates whether backpressure is currently active. When true, all associated channels have
75+
* been paused (setAutoRead(false)). When false, all channels are running normally.
76+
*
77+
* <p>Volatile ensures visibility for fast-path reads (outside the lock). All modifications are
78+
* protected by backpressureLock, so atomicity is guaranteed by the lock, not by atomic
79+
* operations.
80+
*/
81+
private volatile boolean isBackpressureActive = false;
82+
83+
/**
84+
* Lock to protect backpressure state transitions and task submissions. This lock ensures that:
85+
* 1. State checks and task submissions are atomic (preventing permanent channel blocking) 2.
86+
* Pause and resume operations are mutually exclusive 3. New channel registration correctly
87+
* synchronizes with current backpressure state
88+
*
89+
* <p>The lock eliminates the need for CAS operations - simple boolean checks and assignments
90+
* under the lock are sufficient for correctness.
91+
*/
92+
private final ReentrantLock backpressureLock = new ReentrantLock();
93+
94+
public RequestChannel(int backpressureThreshold) {
95+
this.requestQueue = new LinkedBlockingQueue<>();
96+
this.backpressureThreshold = backpressureThreshold;
97+
this.resumeThreshold = backpressureThreshold / 2;
3898
}
3999

40100
/**
41-
* Send a request to be handled, potentially blocking until there is room in the queue for the
42-
* request.
101+
* Send a request to be handled. Since this uses an unbounded queue, this method never blocks,
102+
* ensuring EventLoop threads are never blocked by queue operations.
103+
*
104+
* <p>After adding the request, automatically checks if backpressure should be applied. If the
105+
* queue size exceeds the backpressure threshold, ALL channels associated with this
106+
* RequestChannel will be paused to prevent further memory growth.
107+
*
108+
* <p>OPTIMIZATION: Only check backpressure if not already active (avoid redundant checks).
43109
*/
44-
public void putRequest(RpcRequest request) throws Exception {
45-
requestQueue.put(request);
110+
public void putRequest(RpcRequest request) {
111+
requestQueue.add(request);
112+
113+
// CRITICAL OPTIMIZATION: Skip check if already in backpressure state.
114+
// This avoids lock contention on every putRequest() call when system is under pressure.
115+
// The volatile read is very cheap compared to lock acquisition.
116+
if (!isBackpressureActive) {
117+
pauseAllChannelsIfNeeded();
118+
}
46119
}
47120

48121
/**
49122
* Sends a shutdown request to the channel. This can allow request processor gracefully
50123
* shutdown.
51124
*/
52-
public void putShutdownRequest() throws Exception {
125+
public void putShutdownRequest() {
53126
putRequest(ShutdownRequest.INSTANCE);
54127
}
55128

56129
/**
57-
* Get the next request or block until specified time has elapsed.
130+
* Get the next request or block until specified time has elapsed. After successfully polling a
131+
* request, attempts to resume paused channels if the queue size has dropped below the resume
132+
* threshold.
58133
*
59134
* @return the head of this queue, or null if the specified waiting time elapses before an
60135
* element is available.
61136
*/
62137
public RpcRequest pollRequest(long timeoutMs) {
63138
try {
64-
return requestQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
139+
RpcRequest request = requestQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
140+
if (request != null) {
141+
// After dequeuing, check if we can resume paused channels
142+
tryResumeChannels();
143+
}
144+
return request;
65145
} catch (InterruptedException e) {
66146
LOG.warn("Interrupted while polling requests from channel queue.", e);
147+
Thread.currentThread().interrupt();
67148
return null;
68149
}
69150
}
70151

71152
/** Get the number of requests in the queue. */
72-
int requestsCount() {
153+
public int requestsCount() {
73154
return requestQueue.size();
74155
}
156+
157+
/**
158+
* Registers a Netty channel as being associated with this RequestChannel. This is called when a
159+
* channel becomes active and is hashed to this RequestChannel.
160+
*
161+
* <p>IMPORTANT: New channels are NOT immediately paused even if backpressure is active. This is
162+
* critical for system health: 1. Health check connections must not be blocked at startup 2. New
163+
* connections will naturally be controlled by the next backpressure check 3. Immediately
164+
* pausing new connections can cause deadlock at startup when queue is full but processing is
165+
* slow
166+
*
167+
* @param channel the channel to register
168+
*/
169+
public void registerChannel(Channel channel) {
170+
associatedChannels.add(channel);
171+
172+
LOG.debug(
173+
"Registered channel {} to RequestChannel (backpressure threshold: {}, associated channels: {}, backpressure active: {})",
174+
channel.remoteAddress(),
175+
backpressureThreshold,
176+
associatedChannels.size(),
177+
isBackpressureActive);
178+
}
179+
180+
/**
181+
* Unregisters a Netty channel from this RequestChannel. This is called when a channel becomes
182+
* inactive.
183+
*
184+
* @param channel the channel to unregister
185+
*/
186+
public void unregisterChannel(Channel channel) {
187+
associatedChannels.remove(channel);
188+
LOG.debug(
189+
"Unregistered channel {} from RequestChannel (associated channels: {}, backpressure active: {})",
190+
channel.remoteAddress(),
191+
associatedChannels.size(),
192+
isBackpressureActive);
193+
}
194+
195+
/**
196+
* Check if the queue size has exceeded the backpressure threshold. When true, channel reads
197+
* should be paused to prevent memory exhaustion.
198+
*/
199+
private boolean shouldApplyBackpressure() {
200+
return requestQueue.size() >= backpressureThreshold;
201+
}
202+
203+
/**
204+
* Check if the queue size has dropped below the resume threshold. When true, paused channels
205+
* can be resumed to accept new requests.
206+
*/
207+
private boolean shouldResumeChannels() {
208+
return requestQueue.size() <= resumeThreshold;
209+
}
210+
211+
/**
212+
* Pauses ALL channels associated with this RequestChannel if the queue size exceeds the
213+
* backpressure threshold. This ensures that when the queue is full, all channels stop sending
214+
* requests to prevent memory exhaustion.
215+
*
216+
* <p>Uses a lock to protect the entire operation (state check + state change + task submission)
217+
* as an atomic unit. This prevents race conditions with resume operations and channel
218+
* registrations.
219+
*/
220+
private void pauseAllChannelsIfNeeded() {
221+
// Fast path: quick return if queue not full yet (no lock contention)
222+
if (!shouldApplyBackpressure()) {
223+
return;
224+
}
225+
226+
// Lock protects: state check + state change + task submission as atomic operation
227+
backpressureLock.lock();
228+
try {
229+
// Check if already in backpressure state
230+
if (isBackpressureActive) {
231+
return; // Already paused, nothing to do
232+
}
233+
234+
// Double-check: queue size might have changed while waiting for lock
235+
if (!shouldApplyBackpressure()) {
236+
return; // Condition no longer met
237+
}
238+
239+
// Activate backpressure and pause all channels
240+
isBackpressureActive = true;
241+
242+
for (Channel channel : associatedChannels) {
243+
// Submit to the channel's EventLoop to ensure thread safety
244+
channel.eventLoop()
245+
.execute(
246+
() -> {
247+
if (channel.config().isAutoRead()) {
248+
channel.config().setAutoRead(false);
249+
LOG.warn(
250+
"Queue size ({}) exceeded backpressure threshold ({}), paused channel: {}",
251+
requestsCount(),
252+
backpressureThreshold,
253+
channel.remoteAddress());
254+
}
255+
});
256+
}
257+
} finally {
258+
backpressureLock.unlock();
259+
}
260+
}
261+
262+
/**
263+
* Attempts to resume all associated channels if the queue size has dropped below the resume
264+
* threshold. This method is called automatically after a request is dequeued.
265+
*
266+
* <p>Uses a lock to protect the entire operation (state check + state change + task submission)
267+
* as an atomic unit. This prevents race conditions with pause operations and channel
268+
* registrations.
269+
*/
270+
private void tryResumeChannels() {
271+
// Fast path: quick return if queue still too full (no lock contention)
272+
if (!shouldResumeChannels()) {
273+
return;
274+
}
275+
276+
// Lock protects: state check + state change + task submission as atomic operation
277+
backpressureLock.lock();
278+
try {
279+
// Check if backpressure is not active
280+
if (!isBackpressureActive) {
281+
return; // Already resumed, nothing to do
282+
}
283+
284+
// Double-check: queue size might have changed while waiting for lock
285+
if (!shouldResumeChannels()) {
286+
return; // Condition no longer met
287+
}
288+
289+
// Deactivate backpressure and resume all channels
290+
isBackpressureActive = false;
291+
292+
for (Channel channel : associatedChannels) {
293+
if (channel.isActive()) {
294+
// Submit resume task to the channel's EventLoop to ensure thread safety
295+
channel.eventLoop()
296+
.execute(
297+
() -> {
298+
if (channel.isActive() && !channel.config().isAutoRead()) {
299+
channel.config().setAutoRead(true);
300+
LOG.info(
301+
"Queue size ({}) dropped below resume threshold ({}), resumed channel: {}",
302+
requestsCount(),
303+
resumeThreshold,
304+
channel.remoteAddress());
305+
}
306+
});
307+
}
308+
}
309+
} finally {
310+
backpressureLock.unlock();
311+
}
312+
}
75313
}

0 commit comments

Comments
 (0)