Skip to content

Commit 14260b7

Browse files
committed
[client] Change the implementation of LookupQueue from ArrayBlockingQueue to an unbounded LinkedBlockingDeque to avoid deadlocks during re-enqueue
1 parent 56fe593 commit 14260b7

File tree

3 files changed

+106
-2
lines changed

3 files changed

+106
-2
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQueue.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.lookup;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.annotation.VisibleForTesting;
2122
import org.apache.fluss.config.ConfigOptions;
2223
import org.apache.fluss.config.Configuration;
2324

@@ -26,6 +27,8 @@
2627
import java.util.ArrayList;
2728
import java.util.List;
2829
import java.util.concurrent.ArrayBlockingQueue;
30+
import java.util.concurrent.BlockingQueue;
31+
import java.util.concurrent.LinkedBlockingQueue;
2932
import java.util.concurrent.TimeUnit;
3033

3134
/**
@@ -38,13 +41,18 @@ class LookupQueue {
3841

3942
private volatile boolean closed;
4043
// buffering both the Lookup and PrefixLookup.
44+
// TODO This queue could be refactored into a memory-managed queue similar to
45+
// RecordAccumulator, which would significantly improve the efficiency of lookup batching. Trace
46+
// by https://github.com/apache/fluss/issues/2124
4147
private final ArrayBlockingQueue<AbstractLookupQuery<?>> lookupQueue;
48+
private final BlockingQueue<AbstractLookupQuery<?>> reEnqueuedLookupQueue;
4249
private final int maxBatchSize;
4350
private final long batchTimeoutNanos;
4451

4552
LookupQueue(Configuration conf) {
4653
this.lookupQueue =
4754
new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE));
55+
this.reEnqueuedLookupQueue = new LinkedBlockingQueue<>();
4856
this.maxBatchSize = conf.get(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE);
4957
this.batchTimeoutNanos = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toNanos();
5058
this.closed = false;
@@ -63,8 +71,21 @@ void appendLookup(AbstractLookupQuery<?> lookup) {
6371
}
6472
}
6573

74+
void reEnqueue(AbstractLookupQuery<?> lookup) {
75+
if (closed) {
76+
throw new IllegalStateException(
77+
"Can not re-enqueue lookup operation since the LookupQueue is closed.");
78+
}
79+
80+
try {
81+
reEnqueuedLookupQueue.put(lookup);
82+
} catch (InterruptedException e) {
83+
lookup.future().completeExceptionally(e);
84+
}
85+
}
86+
6687
boolean hasUnDrained() {
67-
return !lookupQueue.isEmpty();
88+
return !lookupQueue.isEmpty() || !reEnqueuedLookupQueue.isEmpty();
6889
}
6990

7091
/** Drain a batch of {@link LookupQuery}s from the lookup queue. */
@@ -78,6 +99,16 @@ List<AbstractLookupQuery<?>> drain() throws Exception {
7899
break;
79100
}
80101

102+
while (!lookupQueue.isEmpty() && count < maxBatchSize) {
103+
AbstractLookupQuery<?> lookup =
104+
reEnqueuedLookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS);
105+
if (lookup == null) {
106+
break;
107+
}
108+
lookupOperations.add(lookup);
109+
count++;
110+
}
111+
81112
AbstractLookupQuery<?> lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS);
82113
if (lookup == null) {
83114
break;
@@ -97,10 +128,21 @@ List<AbstractLookupQuery<?>> drain() throws Exception {
97128
List<AbstractLookupQuery<?>> drainAll() {
98129
List<AbstractLookupQuery<?>> lookupOperations = new ArrayList<>(lookupQueue.size());
99130
lookupQueue.drainTo(lookupOperations);
131+
reEnqueuedLookupQueue.drainTo(lookupOperations);
100132
return lookupOperations;
101133
}
102134

103135
public void close() {
104136
closed = true;
105137
}
138+
139+
@VisibleForTesting
140+
ArrayBlockingQueue<AbstractLookupQuery<?>> getLookupQueue() {
141+
return lookupQueue;
142+
}
143+
144+
@VisibleForTesting
145+
BlockingQueue<AbstractLookupQuery<?>> getReEnqueuedLookupQueue() {
146+
return reEnqueuedLookupQueue;
147+
}
106148
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
157157
// lookup the leader node
158158
TableBucket tb = lookup.tableBucket();
159159
try {
160+
// TODO Metadata requests are being sent too frequently here. consider first
161+
// collecting the tables that need to be updated and then sending them together in
162+
// one request.
160163
leader = metadataUpdater.leaderFor(lookup.tablePath(), tb);
161164
} catch (Exception e) {
162165
// if leader is not found, re-enqueue the lookup to send again.
@@ -423,7 +426,7 @@ private void handlePrefixLookupException(
423426
}
424427

425428
private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
426-
lookupQueue.appendLookup(lookup);
429+
lookupQueue.reEnqueue(lookup);
427430
}
428431

429432
private boolean canRetry(AbstractLookupQuery<?> lookup, Exception exception) {

fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@
2222

2323
import org.junit.jupiter.api.Test;
2424

25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.TimeUnit;
27+
2528
import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT;
2629
import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE;
30+
import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE;
2731
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
2832
import static org.assertj.core.api.Assertions.assertThat;
2933

@@ -59,6 +63,61 @@ void testDrainMaxBatchSize() throws Exception {
5963
assertThat(queue.hasUnDrained()).isFalse();
6064
}
6165

66+
@Test
67+
void testAppendLookupBlocksWhenQueueIsFull() throws Exception {
68+
Configuration conf = new Configuration();
69+
conf.set(CLIENT_LOOKUP_QUEUE_SIZE, 5);
70+
LookupQueue queue = new LookupQueue(conf);
71+
72+
appendLookups(queue, 5);
73+
assertThat(queue.getLookupQueue()).hasSize(5);
74+
75+
CompletableFuture<Void> future =
76+
CompletableFuture.runAsync(
77+
() -> {
78+
appendLookups(queue, 1); // will be blocked.
79+
});
80+
81+
// appendLookup should block and not complete immediately.
82+
assertThat(future.isDone()).isFalse();
83+
84+
Thread.sleep(100);
85+
// Still blocked after 100ms.
86+
assertThat(future.isDone()).isFalse();
87+
88+
queue.drain();
89+
future.get(1, TimeUnit.SECONDS);
90+
assertThat(future.isDone()).isTrue();
91+
}
92+
93+
@Test
94+
void testReEnqueueNotBlock() throws Exception {
95+
Configuration conf = new Configuration();
96+
conf.set(CLIENT_LOOKUP_QUEUE_SIZE, 5);
97+
conf.set(CLIENT_LOOKUP_MAX_BATCH_SIZE, 5);
98+
LookupQueue queue = new LookupQueue(conf);
99+
100+
appendLookups(queue, 5);
101+
assertThat(queue.getLookupQueue()).hasSize(5);
102+
assertThat(queue.getReEnqueuedLookupQueue()).hasSize(0);
103+
104+
queue.reEnqueue(
105+
new LookupQuery(DATA1_TABLE_PATH_PK, new TableBucket(1, 1), new byte[] {0}));
106+
assertThat(queue.getLookupQueue()).hasSize(5);
107+
// This batch will be put into re-enqueued lookup queue.
108+
assertThat(queue.getReEnqueuedLookupQueue()).hasSize(1);
109+
assertThat(queue.hasUnDrained()).isTrue();
110+
111+
assertThat(queue.drain()).hasSize(5);
112+
// drain re-enqueued lookup first.
113+
assertThat(queue.getReEnqueuedLookupQueue().isEmpty()).isTrue();
114+
assertThat(queue.getLookupQueue()).hasSize(1);
115+
assertThat(queue.hasUnDrained()).isTrue();
116+
117+
assertThat(queue.drain()).hasSize(1);
118+
assertThat(queue.hasUnDrained()).isFalse();
119+
}
120+
62121
private static void appendLookups(LookupQueue queue, int count) {
63122
for (int i = 0; i < count; i++) {
64123
queue.appendLookup(

0 commit comments

Comments
 (0)