2424import org .apache .fluss .exception .FlussRuntimeException ;
2525import org .apache .fluss .exception .InvalidMetadataException ;
2626import org .apache .fluss .exception .LeaderNotAvailableException ;
27+ import org .apache .fluss .exception .RetriableException ;
2728import org .apache .fluss .metadata .PhysicalTablePath ;
2829import org .apache .fluss .metadata .TableBucket ;
2930import org .apache .fluss .metadata .TablePartition ;
@@ -74,10 +75,17 @@ class LookupSender implements Runnable {
7475
7576 private final Semaphore maxInFlightReuqestsSemaphore ;
7677
77- LookupSender (MetadataUpdater metadataUpdater , LookupQueue lookupQueue , int maxFlightRequests ) {
78+ private final int maxRetries ;
79+
80+ LookupSender (
81+ MetadataUpdater metadataUpdater ,
82+ LookupQueue lookupQueue ,
83+ int maxFlightRequests ,
84+ int maxRetries ) {
7885 this .metadataUpdater = metadataUpdater ;
7986 this .lookupQueue = lookupQueue ;
8087 this .maxInFlightReuqestsSemaphore = new Semaphore (maxFlightRequests );
88+ this .maxRetries = maxRetries ;
8189 this .running = true ;
8290 }
8391
@@ -307,10 +315,8 @@ private void handleLookupResponse(
307315 pbLookupRespForBucket .getBucketId ());
308316 LookupBatch lookupBatch = lookupsByBucket .get (tableBucket );
309317 if (pbLookupRespForBucket .hasErrorCode ()) {
310- // TODO for re-triable error, we should retry here instead of throwing exception.
311318 ApiError error = ApiError .fromErrorMessage (pbLookupRespForBucket );
312- handleLookupExceptionForBucket (tableBucket , destination , error , "lookup" );
313- lookupBatch .completeExceptionally (error .exception ());
319+ handleLookupError (tableBucket , destination , error , lookupBatch .lookups (), "lookup" );
314320 } else {
315321 List <byte []> byteValues =
316322 pbLookupRespForBucket .getValuesList ().stream ()
@@ -345,10 +351,13 @@ private void handlePrefixLookupResponse(
345351
346352 PrefixLookupBatch prefixLookupBatch = prefixLookupsByBucket .get (tableBucket );
347353 if (pbRespForBucket .hasErrorCode ()) {
348- // TODO for re-triable error, we should retry here instead of throwing exception.
349354 ApiError error = ApiError .fromErrorMessage (pbRespForBucket );
350- handleLookupExceptionForBucket (tableBucket , destination , error , "prefixLookup" );
351- prefixLookupBatch .completeExceptionally (error .exception ());
355+ handleLookupError (
356+ tableBucket ,
357+ destination ,
358+ error ,
359+ prefixLookupBatch .lookups (),
360+ "prefix lookup" );
352361 } else {
353362 List <List <byte []>> result = new ArrayList <>(pbRespForBucket .getValueListsCount ());
354363 for (int i = 0 ; i < pbRespForBucket .getValueListsCount (); i ++) {
@@ -368,58 +377,106 @@ private void handleLookupRequestException(
368377 Throwable t , int destination , Map <TableBucket , LookupBatch > lookupsByBucket ) {
369378 ApiError error = ApiError .fromThrowable (t );
370379 for (LookupBatch lookupBatch : lookupsByBucket .values ()) {
371- // TODO for re-triable error, we should retry here instead of throwing exception.
372- handleLookupExceptionForBucket (lookupBatch .tableBucket (), destination , error , "lookup" );
373- lookupBatch .completeExceptionally (error .exception ());
380+ handleLookupError (
381+ lookupBatch .tableBucket (), destination , error , lookupBatch .lookups (), "lookup" );
374382 }
375383 }
376384
377385 private void handlePrefixLookupException (
378386 Throwable t , int destination , Map <TableBucket , PrefixLookupBatch > lookupsByBucket ) {
379387 ApiError error = ApiError .fromThrowable (t );
380- // TODO If error, we need to retry send the request instead of throw exception.
381388 for (PrefixLookupBatch lookupBatch : lookupsByBucket .values ()) {
382- handleLookupExceptionForBucket (
383- lookupBatch .tableBucket (), destination , error , "prefixLookup" );
384- lookupBatch .completeExceptionally (error .exception ());
389+ handleLookupError (
390+ lookupBatch .tableBucket (),
391+ destination ,
392+ error ,
393+ lookupBatch .lookups (),
394+ "prefix lookup" );
385395 }
386396 }
387397
388- void forceClose ( ) {
389- forceClose = true ;
390- initiateClose ( );
398+ private void reEnqueueLookup ( AbstractLookupQuery <?> lookup ) {
399+ lookup . incrementRetries () ;
400+ lookupQueue . appendLookup ( lookup );
391401 }
392402
393- void initiateClose () {
394- // Ensure accumulator is closed first to guarantee that no more appends are accepted after
395- // breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
396- lookupQueue .close ();
397- running = false ;
403+ private boolean canRetry (AbstractLookupQuery <?> lookup , Exception exception ) {
404+ return lookup .retries () < maxRetries
405+ && !lookup .future ().isDone ()
406+ && exception instanceof RetriableException ;
398407 }
399408
400- private void handleLookupExceptionForBucket (
401- TableBucket tb , int destination , ApiError error , String lookupType ) {
409+ /**
410+ * Handle lookup error with retry logic. For each lookup in the list, check if it can be
411+ * retried. If yes, re-enqueue it; otherwise, complete it exceptionally.
412+ *
413+ * @param tableBucket the table bucket
414+ * @param error the error from server response
415+ * @param lookups the list of lookups to handle
416+ * @param lookupType the type of lookup ("" for regular lookup, "prefix " for prefix lookup)
417+ */
418+ private void handleLookupError (
419+ TableBucket tableBucket ,
420+ int destination ,
421+ ApiError error ,
422+ List <? extends AbstractLookupQuery <?>> lookups ,
423+ String lookupType ) {
402424 ApiException exception = error .error ().exception ();
403425 LOG .error (
404- "Failed to {} from node {} for bucket {}" , lookupType , destination , tb , exception );
426+ "Failed to {} from node {} for bucket {}" ,
427+ lookupType ,
428+ destination ,
429+ tableBucket ,
430+ exception );
405431 if (exception instanceof InvalidMetadataException ) {
406432 LOG .warn (
407433 "Invalid metadata error in {} request. Going to request metadata update." ,
408434 lookupType ,
409435 exception );
410- long tableId = tb .getTableId ();
436+ long tableId = tableBucket .getTableId ();
411437 TableOrPartitions tableOrPartitions ;
412- if (tb .getPartitionId () == null ) {
438+ if (tableBucket .getPartitionId () == null ) {
413439 tableOrPartitions = new TableOrPartitions (Collections .singleton (tableId ), null );
414440 } else {
415441 tableOrPartitions =
416442 new TableOrPartitions (
417443 null ,
418444 Collections .singleton (
419- new TablePartition (tableId , tb .getPartitionId ())));
445+ new TablePartition (tableId , tableBucket .getPartitionId ())));
420446 }
421447 invalidTableOrPartitions (tableOrPartitions );
422448 }
449+
450+ for (AbstractLookupQuery <?> lookup : lookups ) {
451+ if (canRetry (lookup , error .exception ())) {
452+ LOG .warn (
453+ "Get error {} response on table bucket {}, retrying ({} attempts left). Error: {}" ,
454+ lookupType ,
455+ tableBucket ,
456+ maxRetries - lookup .retries (),
457+ error .formatErrMsg ());
458+ reEnqueueLookup (lookup );
459+ } else {
460+ LOG .warn (
461+ "Get error {} response on table bucket {}, fail. Error: {}" ,
462+ lookupType ,
463+ tableBucket ,
464+ error .formatErrMsg ());
465+ lookup .future ().completeExceptionally (error .exception ());
466+ }
467+ }
468+ }
469+
470+ void forceClose () {
471+ forceClose = true ;
472+ initiateClose ();
473+ }
474+
475+ void initiateClose () {
476+ // Ensure accumulator is closed first to guarantee that no more appends are accepted after
477+ // breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
478+ lookupQueue .close ();
479+ running = false ;
423480 }
424481
425482 /** A helper class to hold table ids or table partitions. */
0 commit comments