Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.client.lookup;

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;

Expand All @@ -26,6 +27,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -38,13 +41,18 @@ class LookupQueue {

private volatile boolean closed;
// buffering both the Lookup and PrefixLookup.
// TODO This queue could be refactored into a memory-managed queue similar to
// RecordAccumulator, which would significantly improve the efficiency of lookup batching. Trace
// by https://github.com/apache/fluss/issues/2124
private final ArrayBlockingQueue<AbstractLookupQuery<?>> lookupQueue;
private final BlockingQueue<AbstractLookupQuery<?>> reEnqueuedLookupQueue;
private final int maxBatchSize;
private final long batchTimeoutNanos;

LookupQueue(Configuration conf) {
this.lookupQueue =
new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE));
this.reEnqueuedLookupQueue = new LinkedBlockingQueue<>();
this.maxBatchSize = conf.get(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE);
this.batchTimeoutNanos = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toNanos();
this.closed = false;
Expand All @@ -63,8 +71,21 @@ void appendLookup(AbstractLookupQuery<?> lookup) {
}
}

void reEnqueue(AbstractLookupQuery<?> lookup) {
if (closed) {
throw new IllegalStateException(
"Can not re-enqueue lookup operation since the LookupQueue is closed.");
}

try {
reEnqueuedLookupQueue.put(lookup);
} catch (InterruptedException e) {
lookup.future().completeExceptionally(e);
}
}

boolean hasUnDrained() {
return !lookupQueue.isEmpty();
return !lookupQueue.isEmpty() || !reEnqueuedLookupQueue.isEmpty();
}

/** Drain a batch of {@link LookupQuery}s from the lookup queue. */
Expand All @@ -78,6 +99,16 @@ List<AbstractLookupQuery<?>> drain() throws Exception {
break;
}

while (!reEnqueuedLookupQueue.isEmpty() && count < maxBatchSize) {
AbstractLookupQuery<?> lookup =
reEnqueuedLookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS);
if (lookup == null) {
break;
}
lookupOperations.add(lookup);
count++;
}

AbstractLookupQuery<?> lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS);
if (lookup == null) {
break;
Expand All @@ -97,10 +128,21 @@ List<AbstractLookupQuery<?>> drain() throws Exception {
List<AbstractLookupQuery<?>> drainAll() {
List<AbstractLookupQuery<?>> lookupOperations = new ArrayList<>(lookupQueue.size());
lookupQueue.drainTo(lookupOperations);
reEnqueuedLookupQueue.drainTo(lookupOperations);
return lookupOperations;
}

public void close() {
closed = true;
}

@VisibleForTesting
ArrayBlockingQueue<AbstractLookupQuery<?>> getLookupQueue() {
return lookupQueue;
}

@VisibleForTesting
BlockingQueue<AbstractLookupQuery<?>> getReEnqueuedLookupQueue() {
return reEnqueuedLookupQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
// lookup the leader node
TableBucket tb = lookup.tableBucket();
try {
// TODO Metadata requests are being sent too frequently here. consider first
// collecting the tables that need to be updated and then sending them together in
// one request.
leader = metadataUpdater.leaderFor(lookup.tablePath(), tb);
} catch (Exception e) {
// if leader is not found, re-enqueue the lookup to send again.
Expand Down Expand Up @@ -207,6 +210,7 @@ private void sendLookupRequest(int destination, List<AbstractLookupQuery<?>> loo
+ " is not found in metadata cache."),
destination,
lookupsByBucket));
return;
}

lookupByTableId.forEach(
Expand Down Expand Up @@ -244,6 +248,7 @@ private void sendPrefixLookupRequest(
+ " is not found in metadata cache."),
destination,
lookupsByBucket));
return;
}

lookupByTableId.forEach(
Expand Down Expand Up @@ -421,7 +426,7 @@ private void handlePrefixLookupException(
}

private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
lookupQueue.appendLookup(lookup);
lookupQueue.reEnqueue(lookup);
}

private boolean canRetry(AbstractLookupQuery<?> lookup, Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@

import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT;
import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE;
import static org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -59,6 +63,61 @@ void testDrainMaxBatchSize() throws Exception {
assertThat(queue.hasUnDrained()).isFalse();
}

@Test
void testAppendLookupBlocksWhenQueueIsFull() throws Exception {
Configuration conf = new Configuration();
conf.set(CLIENT_LOOKUP_QUEUE_SIZE, 5);
LookupQueue queue = new LookupQueue(conf);

appendLookups(queue, 5);
assertThat(queue.getLookupQueue()).hasSize(5);

CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
appendLookups(queue, 1); // will be blocked.
});

// appendLookup should block and not complete immediately.
assertThat(future.isDone()).isFalse();

Thread.sleep(100);
// Still blocked after 100ms.
assertThat(future.isDone()).isFalse();

queue.drain();
future.get(1, TimeUnit.SECONDS);
assertThat(future.isDone()).isTrue();
}

@Test
void testReEnqueueNotBlock() throws Exception {
Configuration conf = new Configuration();
conf.set(CLIENT_LOOKUP_QUEUE_SIZE, 5);
conf.set(CLIENT_LOOKUP_MAX_BATCH_SIZE, 5);
LookupQueue queue = new LookupQueue(conf);

appendLookups(queue, 5);
assertThat(queue.getLookupQueue()).hasSize(5);
assertThat(queue.getReEnqueuedLookupQueue()).hasSize(0);

queue.reEnqueue(
new LookupQuery(DATA1_TABLE_PATH_PK, new TableBucket(1, 1), new byte[] {0}));
assertThat(queue.getLookupQueue()).hasSize(5);
// This batch will be put into re-enqueued lookup queue.
assertThat(queue.getReEnqueuedLookupQueue()).hasSize(1);
assertThat(queue.hasUnDrained()).isTrue();

assertThat(queue.drain()).hasSize(5);
// drain re-enqueued lookup first.
assertThat(queue.getReEnqueuedLookupQueue().isEmpty()).isTrue();
assertThat(queue.getLookupQueue()).hasSize(1);
assertThat(queue.hasUnDrained()).isTrue();

assertThat(queue.drain()).hasSize(1);
assertThat(queue.hasUnDrained()).isFalse();
}

private static void appendLookups(LookupQueue queue, int count) {
for (int i = 0; i < count; i++) {
queue.appendLookup(
Expand Down