Skip to content

Commit c8eba8f

Browse files
committed
[client] Change the implementation of LookupQueue from ArrayBlockingQueue to an unbounded LinkedBlockingDeque to avoid deadlocks during re-enqueue
1 parent 56fe593 commit c8eba8f

File tree

4 files changed

+107
-7
lines changed

4 files changed

+107
-7
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQueue.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
package org.apache.fluss.client.lookup;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.annotation.VisibleForTesting;
2122
import org.apache.fluss.config.ConfigOptions;
2223
import org.apache.fluss.config.Configuration;
2324

2425
import javax.annotation.concurrent.ThreadSafe;
25-
2626
import java.util.ArrayList;
2727
import java.util.List;
2828
import java.util.concurrent.ArrayBlockingQueue;
29+
import java.util.concurrent.BlockingQueue;
30+
import java.util.concurrent.LinkedBlockingQueue;
2931
import java.util.concurrent.TimeUnit;
3032

3133
/**
@@ -38,13 +40,18 @@ class LookupQueue {
3840

3941
private volatile boolean closed;
4042
// buffering both the Lookup and PrefixLookup.
43+
// TODO This queue could be refactored into a memory-managed queue similar to
44+
// RecordAccumulator, which would significantly improve the efficiency of lookup batching. Trace
45+
// by https://github.com/apache/fluss/issues/2124
4146
private final ArrayBlockingQueue<AbstractLookupQuery<?>> lookupQueue;
47+
private final BlockingQueue<AbstractLookupQuery<?>> reEnqueuedLookupQueue;
4248
private final int maxBatchSize;
4349
private final long batchTimeoutNanos;
4450

4551
LookupQueue(Configuration conf) {
4652
this.lookupQueue =
4753
new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE));
54+
this.reEnqueuedLookupQueue = new LinkedBlockingQueue<>();
4855
this.maxBatchSize = conf.get(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE);
4956
this.batchTimeoutNanos = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toNanos();
5057
this.closed = false;
@@ -63,8 +70,21 @@ void appendLookup(AbstractLookupQuery<?> lookup) {
6370
}
6471
}
6572

73+
void reEnqueue(AbstractLookupQuery<?> lookup) {
74+
if (closed) {
75+
throw new IllegalStateException(
76+
"Can not re-enqueue lookup operation since the LookupQueue is closed.");
77+
}
78+
79+
try {
80+
reEnqueuedLookupQueue.put(lookup);
81+
} catch (InterruptedException e) {
82+
lookup.future().completeExceptionally(e);
83+
}
84+
}
85+
6686
boolean hasUnDrained() {
67-
return !lookupQueue.isEmpty();
87+
return !lookupQueue.isEmpty() || !reEnqueuedLookupQueue.isEmpty();
6888
}
6989

7090
/** Drain a batch of {@link LookupQuery}s from the lookup queue. */
@@ -78,6 +98,16 @@ List<AbstractLookupQuery<?>> drain() throws Exception {
7898
break;
7999
}
80100

101+
while (!lookupQueue.isEmpty() && count < maxBatchSize) {
102+
AbstractLookupQuery<?> lookup =
103+
reEnqueuedLookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS);
104+
if (lookup == null) {
105+
break;
106+
}
107+
lookupOperations.add(lookup);
108+
count++;
109+
}
110+
81111
AbstractLookupQuery<?> lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS);
82112
if (lookup == null) {
83113
break;
@@ -97,10 +127,21 @@ List<AbstractLookupQuery<?>> drain() throws Exception {
97127
List<AbstractLookupQuery<?>> drainAll() {
98128
List<AbstractLookupQuery<?>> lookupOperations = new ArrayList<>(lookupQueue.size());
99129
lookupQueue.drainTo(lookupOperations);
130+
reEnqueuedLookupQueue.drainTo(lookupOperations);
100131
return lookupOperations;
101132
}
102133

103134
public void close() {
104135
closed = true;
105136
}
137+
138+
@VisibleForTesting
139+
ArrayBlockingQueue<AbstractLookupQuery<?>> getLookupQueue() {
140+
return lookupQueue;
141+
}
142+
143+
@VisibleForTesting
144+
BlockingQueue<AbstractLookupQuery<?>> getReEnqueuedLookupQueue() {
145+
return reEnqueuedLookupQueue;
146+
}
106147
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,10 @@
3838
import org.apache.fluss.rpc.messages.PrefixLookupResponse;
3939
import org.apache.fluss.rpc.protocol.ApiError;
4040
import org.apache.fluss.utils.types.Tuple2;
41-
4241
import org.slf4j.Logger;
4342
import org.slf4j.LoggerFactory;
4443

4544
import javax.annotation.Nullable;
46-
4745
import java.util.ArrayList;
4846
import java.util.Collections;
4947
import java.util.HashMap;
@@ -157,6 +155,9 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
157155
// lookup the leader node
158156
TableBucket tb = lookup.tableBucket();
159157
try {
158+
// TODO Metadata requests are being sent too frequently here. consider first
159+
// collecting the tables that need to be updated and then sending them together in
160+
// one request.
160161
leader = metadataUpdater.leaderFor(lookup.tablePath(), tb);
161162
} catch (Exception e) {
162163
// if leader is not found, re-enqueue the lookup to send again.
@@ -423,7 +424,7 @@ private void handlePrefixLookupException(
423424
}
424425

425426
private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
426-
lookupQueue.appendLookup(lookup);
427+
lookupQueue.reEnqueue(lookup);
427428
}
428429

429430
private boolean canRetry(AbstractLookupQuery<?> lookup, Exception exception) {

fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@
1919

2020
import org.apache.fluss.config.Configuration;
2121
import org.apache.fluss.metadata.TableBucket;
22-
2322
import org.junit.jupiter.api.Test;
23+
import org.junit.jupiter.api.Timeout;
24+
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.TimeUnit;
2427

2528
import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT;
2629
import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE;
30+
import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE;
2731
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
2832
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.junit.jupiter.api.Assertions.assertFalse;
2934

3035
/** Tests for {@link LookupQueue}. */
3136
class LookupQueueTest {
@@ -59,6 +64,59 @@ void testDrainMaxBatchSize() throws Exception {
5964
assertThat(queue.hasUnDrained()).isFalse();
6065
}
6166

67+
@Test
68+
@Timeout(value = 10, unit = TimeUnit.SECONDS)
69+
void testAppendLookupBlocksWhenQueueIsFull() throws Exception {
70+
Configuration conf = new Configuration();
71+
conf.set(CLIENT_LOOKUP_QUEUE_SIZE, 5);
72+
LookupQueue queue = new LookupQueue(conf);
73+
74+
appendLookups(queue, 5);
75+
assertThat(queue.getLookupQueue()).hasSize(5);
76+
77+
CompletableFuture<Void> future =
78+
CompletableFuture.runAsync(
79+
() -> {
80+
appendLookups(queue, 1); // will be blocked.
81+
});
82+
83+
assertFalse(future.isDone(), "appendLookup should block and not complete immediately");
84+
85+
Thread.sleep(100);
86+
assertFalse(future.isDone(), "Still blocked after 100ms");
87+
88+
queue.drain();
89+
future.get(1, TimeUnit.SECONDS);
90+
}
91+
92+
@Test
93+
void testReEnqueueNotBlock() throws Exception {
94+
Configuration conf = new Configuration();
95+
conf.set(CLIENT_LOOKUP_QUEUE_SIZE, 5);
96+
conf.set(CLIENT_LOOKUP_MAX_BATCH_SIZE, 5);
97+
LookupQueue queue = new LookupQueue(conf);
98+
99+
appendLookups(queue, 5);
100+
assertThat(queue.getLookupQueue()).hasSize(5);
101+
assertThat(queue.getReEnqueuedLookupQueue()).hasSize(0);
102+
103+
queue.reEnqueue(
104+
new LookupQuery(DATA1_TABLE_PATH_PK, new TableBucket(1, 1), new byte[] {0}));
105+
assertThat(queue.getLookupQueue()).hasSize(5);
106+
// This batch will be put into re-enqueued lookup queue.
107+
assertThat(queue.getReEnqueuedLookupQueue()).hasSize(1);
108+
assertThat(queue.hasUnDrained()).isTrue();
109+
110+
assertThat(queue.drain()).hasSize(5);
111+
// drain re-enqueued lookup first.
112+
assertThat(queue.getReEnqueuedLookupQueue().isEmpty()).isTrue();
113+
assertThat(queue.getLookupQueue()).hasSize(1);
114+
assertThat(queue.hasUnDrained()).isTrue();
115+
116+
assertThat(queue.drain()).hasSize(1);
117+
assertThat(queue.hasUnDrained()).isFalse();
118+
}
119+
62120
private static void appendLookups(LookupQueue queue, int count) {
63121
for (int i = 0; i < count; i++) {
64122
queue.appendLookup(

website/docs/engine-flink/options.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties)
117117
| lookup.partial-cache.max-rows | Long | (None) | The maximum number of rows to store in the cache. |
118118
| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. |
119119
| client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. |
120-
| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. |
120+
| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. |:
121121
| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. |
122122
| client.lookup.max-retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any lookup request that fails with a potentially transient error. |
123123

0 commit comments

Comments
 (0)