1818package org .apache .fluss .client .lookup ;
1919
2020import org .apache .fluss .annotation .Internal ;
21+ import org .apache .fluss .annotation .VisibleForTesting ;
2122import org .apache .fluss .client .metadata .MetadataUpdater ;
23+ import org .apache .fluss .exception .ApiException ;
2224import org .apache .fluss .exception .FlussRuntimeException ;
25+ import org .apache .fluss .exception .InvalidMetadataException ;
2326import org .apache .fluss .exception .LeaderNotAvailableException ;
27+ import org .apache .fluss .metadata .PhysicalTablePath ;
2428import org .apache .fluss .metadata .TableBucket ;
29+ import org .apache .fluss .metadata .TablePartition ;
2530import org .apache .fluss .rpc .gateway .TabletServerGateway ;
2631import org .apache .fluss .rpc .messages .LookupRequest ;
2732import org .apache .fluss .rpc .messages .LookupResponse ;
3641import org .slf4j .Logger ;
3742import org .slf4j .LoggerFactory ;
3843
44+ import javax .annotation .Nullable ;
45+
3946import java .util .ArrayList ;
47+ import java .util .Collections ;
4048import java .util .HashMap ;
4149import java .util .List ;
4250import java .util .Map ;
51+ import java .util .Set ;
4352import java .util .concurrent .Semaphore ;
4453import java .util .stream .Collectors ;
4554
@@ -145,7 +154,8 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
145154 return lookupBatchesByLeader ;
146155 }
147156
148- private void sendLookups (
157+ @ VisibleForTesting
158+ void sendLookups (
149159 int destination , LookupType lookupType , List <AbstractLookupQuery <?>> lookupBatches ) {
150160 TabletServerGateway gateway = metadataUpdater .newTabletServerClientForNode (destination );
151161 if (gateway == null ) {
@@ -155,16 +165,16 @@ private void sendLookups(
155165 }
156166
157167 if (lookupType == LookupType .LOOKUP ) {
158- sendLookupRequest (gateway , lookupBatches );
168+ sendLookupRequest (destination , gateway , lookupBatches );
159169 } else if (lookupType == LookupType .PREFIX_LOOKUP ) {
160- sendPrefixLookupRequest (gateway , lookupBatches );
170+ sendPrefixLookupRequest (destination , gateway , lookupBatches );
161171 } else {
162172 throw new IllegalArgumentException ("Unsupported lookup type: " + lookupType );
163173 }
164174 }
165175
166176 private void sendLookupRequest (
167- TabletServerGateway gateway , List <AbstractLookupQuery <?>> lookups ) {
177+ int destination , TabletServerGateway gateway , List <AbstractLookupQuery <?>> lookups ) {
168178 // table id -> (bucket -> lookups)
169179 Map <Long , Map <TableBucket , LookupBatch >> lookupByTableId = new HashMap <>();
170180 for (AbstractLookupQuery <?> abstractLookupQuery : lookups ) {
@@ -180,14 +190,17 @@ private void sendLookupRequest(
180190 lookupByTableId .forEach (
181191 (tableId , lookupsByBucket ) ->
182192 sendLookupRequestAndHandleResponse (
193+ destination ,
183194 gateway ,
184195 makeLookupRequest (tableId , lookupsByBucket .values ()),
185196 tableId ,
186197 lookupsByBucket ));
187198 }
188199
189200 private void sendPrefixLookupRequest (
190- TabletServerGateway gateway , List <AbstractLookupQuery <?>> prefixLookups ) {
201+ int destination ,
202+ TabletServerGateway gateway ,
203+ List <AbstractLookupQuery <?>> prefixLookups ) {
191204 // table id -> (bucket -> lookups)
192205 Map <Long , Map <TableBucket , PrefixLookupBatch >> lookupByTableId = new HashMap <>();
193206 for (AbstractLookupQuery <?> abstractLookupQuery : prefixLookups ) {
@@ -203,13 +216,15 @@ private void sendPrefixLookupRequest(
203216 lookupByTableId .forEach (
204217 (tableId , prefixLookupBatch ) ->
205218 sendPrefixLookupRequestAndHandleResponse (
219+ destination ,
206220 gateway ,
207221 makePrefixLookupRequest (tableId , prefixLookupBatch .values ()),
208222 tableId ,
209223 prefixLookupBatch ));
210224 }
211225
212226 private void sendLookupRequestAndHandleResponse (
227+ int destination ,
213228 TabletServerGateway gateway ,
214229 LookupRequest lookupRequest ,
215230 long tableId ,
@@ -224,15 +239,16 @@ private void sendLookupRequestAndHandleResponse(
224239 .thenAccept (
225240 lookupResponse -> {
226241 try {
227- handleLookupResponse (tableId , lookupResponse , lookupsByBucket );
242+ handleLookupResponse (
243+ tableId , destination , lookupResponse , lookupsByBucket );
228244 } finally {
229245 maxInFlightReuqestsSemaphore .release ();
230246 }
231247 })
232248 .exceptionally (
233249 e -> {
234250 try {
235- handleLookupRequestException (e , lookupsByBucket );
251+ handleLookupRequestException (e , destination , lookupsByBucket );
236252 return null ;
237253 } finally {
238254 maxInFlightReuqestsSemaphore .release ();
@@ -241,6 +257,7 @@ private void sendLookupRequestAndHandleResponse(
241257 }
242258
243259 private void sendPrefixLookupRequestAndHandleResponse (
260+ int destination ,
244261 TabletServerGateway gateway ,
245262 PrefixLookupRequest prefixLookupRequest ,
246263 long tableId ,
@@ -256,15 +273,18 @@ private void sendPrefixLookupRequestAndHandleResponse(
256273 prefixLookupResponse -> {
257274 try {
258275 handlePrefixLookupResponse (
259- tableId , prefixLookupResponse , lookupsByBucket );
276+ tableId ,
277+ destination ,
278+ prefixLookupResponse ,
279+ lookupsByBucket );
260280 } finally {
261281 maxInFlightReuqestsSemaphore .release ();
262282 }
263283 })
264284 .exceptionally (
265285 e -> {
266286 try {
267- handlePrefixLookupException (e , lookupsByBucket );
287+ handlePrefixLookupException (e , destination , lookupsByBucket );
268288 return null ;
269289 } finally {
270290 maxInFlightReuqestsSemaphore .release ();
@@ -274,6 +294,7 @@ private void sendPrefixLookupRequestAndHandleResponse(
274294
275295 private void handleLookupResponse (
276296 long tableId ,
297+ int destination ,
277298 LookupResponse lookupResponse ,
278299 Map <TableBucket , LookupBatch > lookupsByBucket ) {
279300 for (PbLookupRespForBucket pbLookupRespForBucket : lookupResponse .getBucketsRespsList ()) {
@@ -288,10 +309,7 @@ private void handleLookupResponse(
288309 if (pbLookupRespForBucket .hasErrorCode ()) {
289310 // TODO for re-triable error, we should retry here instead of throwing exception.
290311 ApiError error = ApiError .fromErrorMessage (pbLookupRespForBucket );
291- LOG .warn (
292- "Get error lookup response on table bucket {}, fail. Error: {}" ,
293- tableBucket ,
294- error .formatErrMsg ());
312+ handleLookupExceptionForBucket (tableBucket , destination , error , "lookup" );
295313 lookupBatch .completeExceptionally (error .exception ());
296314 } else {
297315 List <byte []> byteValues =
@@ -312,6 +330,7 @@ private void handleLookupResponse(
312330
313331 private void handlePrefixLookupResponse (
314332 long tableId ,
333+ int destination ,
315334 PrefixLookupResponse prefixLookupResponse ,
316335 Map <TableBucket , PrefixLookupBatch > prefixLookupsByBucket ) {
317336 for (PbPrefixLookupRespForBucket pbRespForBucket :
@@ -328,10 +347,7 @@ private void handlePrefixLookupResponse(
328347 if (pbRespForBucket .hasErrorCode ()) {
329348 // TODO for re-triable error, we should retry here instead of throwing exception.
330349 ApiError error = ApiError .fromErrorMessage (pbRespForBucket );
331- LOG .warn (
332- "Get error prefix lookup response on table bucket {}, fail. Error: {}" ,
333- tableBucket ,
334- error .formatErrMsg ());
350+ handleLookupExceptionForBucket (tableBucket , destination , error , "prefixLookup" );
335351 prefixLookupBatch .completeExceptionally (error .exception ());
336352 } else {
337353 List <List <byte []>> result = new ArrayList <>(pbRespForBucket .getValueListsCount ());
@@ -349,24 +365,22 @@ private void handlePrefixLookupResponse(
349365 }
350366
351367 private void handleLookupRequestException (
352- Throwable t , Map <TableBucket , LookupBatch > lookupsByBucket ) {
368+ Throwable t , int destination , Map <TableBucket , LookupBatch > lookupsByBucket ) {
353369 ApiError error = ApiError .fromThrowable (t );
354370 for (LookupBatch lookupBatch : lookupsByBucket .values ()) {
355371 // TODO for re-triable error, we should retry here instead of throwing exception.
356- LOG .warn (
357- "Get error lookup response on table bucket {}, fail. Error: {}" ,
358- lookupBatch .tableBucket (),
359- error .formatErrMsg ());
372+ handleLookupExceptionForBucket (lookupBatch .tableBucket (), destination , error , "lookup" );
360373 lookupBatch .completeExceptionally (error .exception ());
361374 }
362375 }
363376
364377 private void handlePrefixLookupException (
365- Throwable t , Map <TableBucket , PrefixLookupBatch > lookupsByBucket ) {
378+ Throwable t , int destination , Map <TableBucket , PrefixLookupBatch > lookupsByBucket ) {
366379 ApiError error = ApiError .fromThrowable (t );
367380 // TODO If error, we need to retry send the request instead of throw exception.
368- LOG .warn ("Get error prefix lookup response. Error: {}" , error .formatErrMsg ());
369381 for (PrefixLookupBatch lookupBatch : lookupsByBucket .values ()) {
382+ handleLookupExceptionForBucket (
383+ lookupBatch .tableBucket (), destination , error , "prefixLookup" );
370384 lookupBatch .completeExceptionally (error .exception ());
371385 }
372386 }
@@ -382,4 +396,48 @@ void initiateClose() {
382396 lookupQueue .close ();
383397 running = false ;
384398 }
399+
400+ private void handleLookupExceptionForBucket (
401+ TableBucket tb , int destination , ApiError error , String lookupType ) {
402+ ApiException exception = error .error ().exception ();
403+ LOG .error (
404+ "Failed to {} from node {} for bucket {}" , lookupType , destination , tb , exception );
405+ if (exception instanceof InvalidMetadataException ) {
406+ LOG .warn (
407+ "Invalid metadata error in {} request. Going to request metadata update." ,
408+ lookupType ,
409+ exception );
410+ long tableId = tb .getTableId ();
411+ TableOrPartitions tableOrPartitions ;
412+ if (tb .getPartitionId () == null ) {
413+ tableOrPartitions = new TableOrPartitions (Collections .singleton (tableId ), null );
414+ } else {
415+ tableOrPartitions =
416+ new TableOrPartitions (
417+ null ,
418+ Collections .singleton (
419+ new TablePartition (tableId , tb .getPartitionId ())));
420+ }
421+ invalidTableOrPartitions (tableOrPartitions );
422+ }
423+ }
424+
425+ /** A helper class to hold table ids or table partitions. */
426+ private static class TableOrPartitions {
427+ private final @ Nullable Set <Long > tableIds ;
428+ private final @ Nullable Set <TablePartition > tablePartitions ;
429+
430+ TableOrPartitions (
431+ @ Nullable Set <Long > tableIds , @ Nullable Set <TablePartition > tablePartitions ) {
432+ this .tableIds = tableIds ;
433+ this .tablePartitions = tablePartitions ;
434+ }
435+ }
436+
437+ private void invalidTableOrPartitions (TableOrPartitions tableOrPartitions ) {
438+ Set <PhysicalTablePath > physicalTablePaths =
439+ metadataUpdater .getPhysicalTablePathByIds (
440+ tableOrPartitions .tableIds , tableOrPartitions .tablePartitions );
441+ metadataUpdater .invalidPhysicalTableBucketMeta (physicalTablePaths );
442+ }
385443}
0 commit comments