Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ public abstract class AbstractLookupQuery<T> {

private final TableBucket tableBucket;
private final byte[] key;
private int retries;

public AbstractLookupQuery(TableBucket tableBucket, byte[] key) {
this.tableBucket = tableBucket;
this.key = key;
this.retries = 0;
}

public byte[] key() {
Expand All @@ -42,6 +44,14 @@ public TableBucket tableBucket() {
return tableBucket;
}

public int retries() {
return retries;
}

public void incrementRetries() {
retries++;
}

public abstract LookupType lookupType();

public abstract CompletableFuture<T> future();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public LookupClient(Configuration conf, MetadataUpdater metadataUpdater) {
new LookupSender(
metadataUpdater,
lookupQueue,
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE));
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE),
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES));
lookupSenderThreadPool.submit(lookupSender);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidMetadataException;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.exception.RetriableException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePartition;
Expand Down Expand Up @@ -74,10 +75,17 @@ class LookupSender implements Runnable {

private final Semaphore maxInFlightReuqestsSemaphore;

LookupSender(MetadataUpdater metadataUpdater, LookupQueue lookupQueue, int maxFlightRequests) {
private final int maxRetries;

LookupSender(
MetadataUpdater metadataUpdater,
LookupQueue lookupQueue,
int maxFlightRequests,
int maxRetries) {
this.metadataUpdater = metadataUpdater;
this.lookupQueue = lookupQueue;
this.maxInFlightReuqestsSemaphore = new Semaphore(maxFlightRequests);
this.maxRetries = maxRetries;
this.running = true;
}

Expand Down Expand Up @@ -307,10 +315,8 @@ private void handleLookupResponse(
pbLookupRespForBucket.getBucketId());
LookupBatch lookupBatch = lookupsByBucket.get(tableBucket);
if (pbLookupRespForBucket.hasErrorCode()) {
// TODO for re-triable error, we should retry here instead of throwing exception.
ApiError error = ApiError.fromErrorMessage(pbLookupRespForBucket);
handleLookupExceptionForBucket(tableBucket, destination, error, "lookup");
lookupBatch.completeExceptionally(error.exception());
handleLookupError(tableBucket, destination, error, lookupBatch.lookups(), "lookup");
} else {
List<byte[]> byteValues =
pbLookupRespForBucket.getValuesList().stream()
Expand Down Expand Up @@ -345,10 +351,13 @@ private void handlePrefixLookupResponse(

PrefixLookupBatch prefixLookupBatch = prefixLookupsByBucket.get(tableBucket);
if (pbRespForBucket.hasErrorCode()) {
// TODO for re-triable error, we should retry here instead of throwing exception.
ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
handleLookupExceptionForBucket(tableBucket, destination, error, "prefixLookup");
prefixLookupBatch.completeExceptionally(error.exception());
handleLookupError(
tableBucket,
destination,
error,
prefixLookupBatch.lookups(),
"prefix lookup");
} else {
List<List<byte[]>> result = new ArrayList<>(pbRespForBucket.getValueListsCount());
for (int i = 0; i < pbRespForBucket.getValueListsCount(); i++) {
Expand All @@ -368,58 +377,106 @@ private void handleLookupRequestException(
Throwable t, int destination, Map<TableBucket, LookupBatch> lookupsByBucket) {
ApiError error = ApiError.fromThrowable(t);
for (LookupBatch lookupBatch : lookupsByBucket.values()) {
// TODO for re-triable error, we should retry here instead of throwing exception.
handleLookupExceptionForBucket(lookupBatch.tableBucket(), destination, error, "lookup");
lookupBatch.completeExceptionally(error.exception());
handleLookupError(
lookupBatch.tableBucket(), destination, error, lookupBatch.lookups(), "lookup");
}
}

private void handlePrefixLookupException(
Throwable t, int destination, Map<TableBucket, PrefixLookupBatch> lookupsByBucket) {
ApiError error = ApiError.fromThrowable(t);
// TODO If error, we need to retry send the request instead of throw exception.
for (PrefixLookupBatch lookupBatch : lookupsByBucket.values()) {
handleLookupExceptionForBucket(
lookupBatch.tableBucket(), destination, error, "prefixLookup");
lookupBatch.completeExceptionally(error.exception());
handleLookupError(
lookupBatch.tableBucket(),
destination,
error,
lookupBatch.lookups(),
"prefix lookup");
}
}

void forceClose() {
forceClose = true;
initiateClose();
private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
lookup.incrementRetries();
lookupQueue.appendLookup(lookup);
}

void initiateClose() {
// Ensure accumulator is closed first to guarantee that no more appends are accepted after
// breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
lookupQueue.close();
running = false;
private boolean canRetry(AbstractLookupQuery<?> lookup, Exception exception) {
return lookup.retries() < maxRetries
&& !lookup.future().isDone()
&& exception instanceof RetriableException;
}

private void handleLookupExceptionForBucket(
TableBucket tb, int destination, ApiError error, String lookupType) {
/**
* Handle lookup error with retry logic. For each lookup in the list, check if it can be
* retried. If yes, re-enqueue it; otherwise, complete it exceptionally.
*
* @param tableBucket the table bucket
* @param error the error from server response
* @param lookups the list of lookups to handle
* @param lookupType the type of lookup ("" for regular lookup, "prefix " for prefix lookup)
*/
private void handleLookupError(
TableBucket tableBucket,
int destination,
ApiError error,
List<? extends AbstractLookupQuery<?>> lookups,
String lookupType) {
ApiException exception = error.error().exception();
LOG.error(
"Failed to {} from node {} for bucket {}", lookupType, destination, tb, exception);
"Failed to {} from node {} for bucket {}",
lookupType,
destination,
tableBucket,
exception);
if (exception instanceof InvalidMetadataException) {
LOG.warn(
"Invalid metadata error in {} request. Going to request metadata update.",
lookupType,
exception);
long tableId = tb.getTableId();
long tableId = tableBucket.getTableId();
TableOrPartitions tableOrPartitions;
if (tb.getPartitionId() == null) {
if (tableBucket.getPartitionId() == null) {
tableOrPartitions = new TableOrPartitions(Collections.singleton(tableId), null);
} else {
tableOrPartitions =
new TableOrPartitions(
null,
Collections.singleton(
new TablePartition(tableId, tb.getPartitionId())));
new TablePartition(tableId, tableBucket.getPartitionId())));
}
invalidTableOrPartitions(tableOrPartitions);
}

for (AbstractLookupQuery<?> lookup : lookups) {
if (canRetry(lookup, error.exception())) {
LOG.warn(
"Get error {} response on table bucket {}, retrying ({} attempts left). Error: {}",
lookupType,
tableBucket,
maxRetries - lookup.retries(),
error.formatErrMsg());
reEnqueueLookup(lookup);
} else {
LOG.warn(
"Get error {} response on table bucket {}, fail. Error: {}",
lookupType,
tableBucket,
error.formatErrMsg());
lookup.future().completeExceptionally(error.exception());
}
}
}

void forceClose() {
forceClose = true;
initiateClose();
}

void initiateClose() {
// Ensure accumulator is closed first to guarantee that no more appends are accepted after
// breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
lookupQueue.close();
running = false;
}

/** A helper class to hold table ids or table partitions. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.row.InternalRow;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.concurrent.CompletableFuture;

/**
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key.
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key. The
* lookuper has retriable ability to handle transient errors during lookup operations which is
* configured by {@link org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
*
* <p>Note: Lookuper instances are not thread-safe.
*
* @since 0.6
*/
@PublicEvolving
@NotThreadSafe
public interface Lookuper {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -46,6 +47,7 @@
* An implementation of {@link Lookuper} that lookups by prefix key. A prefix key is a prefix subset
* of the primary key.
*/
@NotThreadSafe
class PrefixKeyLookuper implements Lookuper {

private final TableInfo tableInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
Expand All @@ -40,6 +41,7 @@
import static org.apache.fluss.utils.Preconditions.checkArgument;

/** An implementation of {@link Lookuper} that lookups by primary key. */
@NotThreadSafe
class PrimaryKeyLookuper implements Lookuper {

private final TableInfo tableInfo;
Expand Down
Loading