Skip to content

Commit a07e563

Browse files
[common] Fix thread-safety problem of PrimaryKeyLoookuper and PrefixKeyLookuper (apache#1915)
1 parent 53b108b commit a07e563

File tree

16 files changed

+650
-176
lines changed

16 files changed

+650
-176
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ public abstract class AbstractLookupQuery<T> {
2828

2929
private final TableBucket tableBucket;
3030
private final byte[] key;
31+
private int retries;
3132

3233
public AbstractLookupQuery(TableBucket tableBucket, byte[] key) {
3334
this.tableBucket = tableBucket;
3435
this.key = key;
36+
this.retries = 0;
3537
}
3638

3739
public byte[] key() {
@@ -42,6 +44,14 @@ public TableBucket tableBucket() {
4244
return tableBucket;
4345
}
4446

47+
public int retries() {
48+
return retries;
49+
}
50+
51+
public void incrementRetries() {
52+
retries++;
53+
}
54+
4555
public abstract LookupType lookupType();
4656

4757
public abstract CompletableFuture<T> future();

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ public LookupClient(Configuration conf, MetadataUpdater metadataUpdater) {
6767
new LookupSender(
6868
metadataUpdater,
6969
lookupQueue,
70-
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE));
70+
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE),
71+
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES));
7172
lookupSenderThreadPool.submit(lookupSender);
7273
}
7374

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 85 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.exception.FlussRuntimeException;
2525
import org.apache.fluss.exception.InvalidMetadataException;
2626
import org.apache.fluss.exception.LeaderNotAvailableException;
27+
import org.apache.fluss.exception.RetriableException;
2728
import org.apache.fluss.metadata.PhysicalTablePath;
2829
import org.apache.fluss.metadata.TableBucket;
2930
import org.apache.fluss.metadata.TablePartition;
@@ -74,10 +75,17 @@ class LookupSender implements Runnable {
7475

7576
private final Semaphore maxInFlightReuqestsSemaphore;
7677

77-
LookupSender(MetadataUpdater metadataUpdater, LookupQueue lookupQueue, int maxFlightRequests) {
78+
private final int maxRetries;
79+
80+
LookupSender(
81+
MetadataUpdater metadataUpdater,
82+
LookupQueue lookupQueue,
83+
int maxFlightRequests,
84+
int maxRetries) {
7885
this.metadataUpdater = metadataUpdater;
7986
this.lookupQueue = lookupQueue;
8087
this.maxInFlightReuqestsSemaphore = new Semaphore(maxFlightRequests);
88+
this.maxRetries = maxRetries;
8189
this.running = true;
8290
}
8391

@@ -307,10 +315,8 @@ private void handleLookupResponse(
307315
pbLookupRespForBucket.getBucketId());
308316
LookupBatch lookupBatch = lookupsByBucket.get(tableBucket);
309317
if (pbLookupRespForBucket.hasErrorCode()) {
310-
// TODO for re-triable error, we should retry here instead of throwing exception.
311318
ApiError error = ApiError.fromErrorMessage(pbLookupRespForBucket);
312-
handleLookupExceptionForBucket(tableBucket, destination, error, "lookup");
313-
lookupBatch.completeExceptionally(error.exception());
319+
handleLookupError(tableBucket, destination, error, lookupBatch.lookups(), "lookup");
314320
} else {
315321
List<byte[]> byteValues =
316322
pbLookupRespForBucket.getValuesList().stream()
@@ -345,10 +351,13 @@ private void handlePrefixLookupResponse(
345351

346352
PrefixLookupBatch prefixLookupBatch = prefixLookupsByBucket.get(tableBucket);
347353
if (pbRespForBucket.hasErrorCode()) {
348-
// TODO for re-triable error, we should retry here instead of throwing exception.
349354
ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
350-
handleLookupExceptionForBucket(tableBucket, destination, error, "prefixLookup");
351-
prefixLookupBatch.completeExceptionally(error.exception());
355+
handleLookupError(
356+
tableBucket,
357+
destination,
358+
error,
359+
prefixLookupBatch.lookups(),
360+
"prefix lookup");
352361
} else {
353362
List<List<byte[]>> result = new ArrayList<>(pbRespForBucket.getValueListsCount());
354363
for (int i = 0; i < pbRespForBucket.getValueListsCount(); i++) {
@@ -368,58 +377,106 @@ private void handleLookupRequestException(
368377
Throwable t, int destination, Map<TableBucket, LookupBatch> lookupsByBucket) {
369378
ApiError error = ApiError.fromThrowable(t);
370379
for (LookupBatch lookupBatch : lookupsByBucket.values()) {
371-
// TODO for re-triable error, we should retry here instead of throwing exception.
372-
handleLookupExceptionForBucket(lookupBatch.tableBucket(), destination, error, "lookup");
373-
lookupBatch.completeExceptionally(error.exception());
380+
handleLookupError(
381+
lookupBatch.tableBucket(), destination, error, lookupBatch.lookups(), "lookup");
374382
}
375383
}
376384

377385
private void handlePrefixLookupException(
378386
Throwable t, int destination, Map<TableBucket, PrefixLookupBatch> lookupsByBucket) {
379387
ApiError error = ApiError.fromThrowable(t);
380-
// TODO If error, we need to retry send the request instead of throw exception.
381388
for (PrefixLookupBatch lookupBatch : lookupsByBucket.values()) {
382-
handleLookupExceptionForBucket(
383-
lookupBatch.tableBucket(), destination, error, "prefixLookup");
384-
lookupBatch.completeExceptionally(error.exception());
389+
handleLookupError(
390+
lookupBatch.tableBucket(),
391+
destination,
392+
error,
393+
lookupBatch.lookups(),
394+
"prefix lookup");
385395
}
386396
}
387397

388-
void forceClose() {
389-
forceClose = true;
390-
initiateClose();
398+
private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
399+
lookup.incrementRetries();
400+
lookupQueue.appendLookup(lookup);
391401
}
392402

393-
void initiateClose() {
394-
// Ensure accumulator is closed first to guarantee that no more appends are accepted after
395-
// breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
396-
lookupQueue.close();
397-
running = false;
403+
private boolean canRetry(AbstractLookupQuery<?> lookup, Exception exception) {
404+
return lookup.retries() < maxRetries
405+
&& !lookup.future().isDone()
406+
&& exception instanceof RetriableException;
398407
}
399408

400-
private void handleLookupExceptionForBucket(
401-
TableBucket tb, int destination, ApiError error, String lookupType) {
409+
/**
410+
* Handle lookup error with retry logic. For each lookup in the list, check if it can be
411+
* retried. If yes, re-enqueue it; otherwise, complete it exceptionally.
412+
*
413+
* @param tableBucket the table bucket
414+
* @param error the error from server response
415+
* @param lookups the list of lookups to handle
416+
* @param lookupType the type of lookup ("" for regular lookup, "prefix " for prefix lookup)
417+
*/
418+
private void handleLookupError(
419+
TableBucket tableBucket,
420+
int destination,
421+
ApiError error,
422+
List<? extends AbstractLookupQuery<?>> lookups,
423+
String lookupType) {
402424
ApiException exception = error.error().exception();
403425
LOG.error(
404-
"Failed to {} from node {} for bucket {}", lookupType, destination, tb, exception);
426+
"Failed to {} from node {} for bucket {}",
427+
lookupType,
428+
destination,
429+
tableBucket,
430+
exception);
405431
if (exception instanceof InvalidMetadataException) {
406432
LOG.warn(
407433
"Invalid metadata error in {} request. Going to request metadata update.",
408434
lookupType,
409435
exception);
410-
long tableId = tb.getTableId();
436+
long tableId = tableBucket.getTableId();
411437
TableOrPartitions tableOrPartitions;
412-
if (tb.getPartitionId() == null) {
438+
if (tableBucket.getPartitionId() == null) {
413439
tableOrPartitions = new TableOrPartitions(Collections.singleton(tableId), null);
414440
} else {
415441
tableOrPartitions =
416442
new TableOrPartitions(
417443
null,
418444
Collections.singleton(
419-
new TablePartition(tableId, tb.getPartitionId())));
445+
new TablePartition(tableId, tableBucket.getPartitionId())));
420446
}
421447
invalidTableOrPartitions(tableOrPartitions);
422448
}
449+
450+
for (AbstractLookupQuery<?> lookup : lookups) {
451+
if (canRetry(lookup, error.exception())) {
452+
LOG.warn(
453+
"Get error {} response on table bucket {}, retrying ({} attempts left). Error: {}",
454+
lookupType,
455+
tableBucket,
456+
maxRetries - lookup.retries(),
457+
error.formatErrMsg());
458+
reEnqueueLookup(lookup);
459+
} else {
460+
LOG.warn(
461+
"Get error {} response on table bucket {}, fail. Error: {}",
462+
lookupType,
463+
tableBucket,
464+
error.formatErrMsg());
465+
lookup.future().completeExceptionally(error.exception());
466+
}
467+
}
468+
}
469+
470+
void forceClose() {
471+
forceClose = true;
472+
initiateClose();
473+
}
474+
475+
void initiateClose() {
476+
// Ensure accumulator is closed first to guarantee that no more appends are accepted after
477+
// breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
478+
lookupQueue.close();
479+
running = false;
423480
}
424481

425482
/** A helper class to hold table ids or table partitions. */

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,21 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.row.InternalRow;
2222

23+
import javax.annotation.concurrent.NotThreadSafe;
24+
2325
import java.util.concurrent.CompletableFuture;
2426

2527
/**
26-
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key.
28+
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key. The
29+
* lookuper has retriable ability to handle transient errors during lookup operations which is
30+
* configured by {@link org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
31+
*
32+
* <p>Note: Lookuper instances are not thread-safe.
2733
*
2834
* @since 0.6
2935
*/
3036
@PublicEvolving
37+
@NotThreadSafe
3138
public interface Lookuper {
3239

3340
/**

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.types.RowType;
3333

3434
import javax.annotation.Nullable;
35+
import javax.annotation.concurrent.NotThreadSafe;
3536

3637
import java.util.ArrayList;
3738
import java.util.Collections;
@@ -46,6 +47,7 @@
4647
* An implementation of {@link Lookuper} that lookups by prefix key. A prefix key is a prefix subset
4748
* of the primary key.
4849
*/
50+
@NotThreadSafe
4951
class PrefixKeyLookuper implements Lookuper {
5052

5153
private final TableInfo tableInfo;

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.types.RowType;
3333

3434
import javax.annotation.Nullable;
35+
import javax.annotation.concurrent.NotThreadSafe;
3536

3637
import java.util.Collections;
3738
import java.util.concurrent.CompletableFuture;
@@ -40,6 +41,7 @@
4041
import static org.apache.fluss.utils.Preconditions.checkArgument;
4142

4243
/** An implementation of {@link Lookuper} that lookups by primary key. */
44+
@NotThreadSafe
4345
class PrimaryKeyLookuper implements Lookuper {
4446

4547
private final TableInfo tableInfo;

0 commit comments

Comments
 (0)