1818package org .apache .fluss .client .lookup ;
1919
2020import org .apache .fluss .annotation .Internal ;
21+ import org .apache .fluss .annotation .VisibleForTesting ;
2122import org .apache .fluss .client .metadata .MetadataUpdater ;
23+ import org .apache .fluss .exception .ApiException ;
2224import org .apache .fluss .exception .FlussRuntimeException ;
25+ import org .apache .fluss .exception .InvalidMetadataException ;
2326import org .apache .fluss .exception .LeaderNotAvailableException ;
27+ import org .apache .fluss .metadata .PhysicalTablePath ;
2428import org .apache .fluss .metadata .TableBucket ;
29+ import org .apache .fluss .metadata .TablePartition ;
2530import org .apache .fluss .rpc .gateway .TabletServerGateway ;
2631import org .apache .fluss .rpc .messages .LookupRequest ;
2732import org .apache .fluss .rpc .messages .LookupResponse ;
3641import org .slf4j .Logger ;
3742import org .slf4j .LoggerFactory ;
3843
44+ import javax .annotation .Nullable ;
45+
3946import java .util .ArrayList ;
47+ import java .util .Collections ;
4048import java .util .HashMap ;
4149import java .util .List ;
4250import java .util .Map ;
51+ import java .util .Set ;
4352import java .util .concurrent .Semaphore ;
4453import java .util .stream .Collectors ;
4554
@@ -145,7 +154,8 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
145154 return lookupBatchesByLeader ;
146155 }
147156
148- private void sendLookups (
157+ @ VisibleForTesting
158+ void sendLookups (
149159 int destination , LookupType lookupType , List <AbstractLookupQuery <?>> lookupBatches ) {
150160 TabletServerGateway gateway = metadataUpdater .newTabletServerClientForNode (destination );
151161 if (gateway == null ) {
@@ -155,16 +165,16 @@ private void sendLookups(
155165 }
156166
157167 if (lookupType == LookupType .LOOKUP ) {
158- sendLookupRequest (gateway , lookupBatches );
168+ sendLookupRequest (destination , gateway , lookupBatches );
159169 } else if (lookupType == LookupType .PREFIX_LOOKUP ) {
160- sendPrefixLookupRequest (gateway , lookupBatches );
170+ sendPrefixLookupRequest (destination , gateway , lookupBatches );
161171 } else {
162172 throw new IllegalArgumentException ("Unsupported lookup type: " + lookupType );
163173 }
164174 }
165175
166176 private void sendLookupRequest (
167- TabletServerGateway gateway , List <AbstractLookupQuery <?>> lookups ) {
177+ int destination , TabletServerGateway gateway , List <AbstractLookupQuery <?>> lookups ) {
168178 // table id -> (bucket -> lookups)
169179 Map <Long , Map <TableBucket , LookupBatch >> lookupByTableId = new HashMap <>();
170180 for (AbstractLookupQuery <?> abstractLookupQuery : lookups ) {
@@ -180,14 +190,17 @@ private void sendLookupRequest(
180190 lookupByTableId .forEach (
181191 (tableId , lookupsByBucket ) ->
182192 sendLookupRequestAndHandleResponse (
193+ destination ,
183194 gateway ,
184195 makeLookupRequest (tableId , lookupsByBucket .values ()),
185196 tableId ,
186197 lookupsByBucket ));
187198 }
188199
189200 private void sendPrefixLookupRequest (
190- TabletServerGateway gateway , List <AbstractLookupQuery <?>> prefixLookups ) {
201+ int destination ,
202+ TabletServerGateway gateway ,
203+ List <AbstractLookupQuery <?>> prefixLookups ) {
191204 // table id -> (bucket -> lookups)
192205 Map <Long , Map <TableBucket , PrefixLookupBatch >> lookupByTableId = new HashMap <>();
193206 for (AbstractLookupQuery <?> abstractLookupQuery : prefixLookups ) {
@@ -203,13 +216,15 @@ private void sendPrefixLookupRequest(
203216 lookupByTableId .forEach (
204217 (tableId , prefixLookupBatch ) ->
205218 sendPrefixLookupRequestAndHandleResponse (
219+ destination ,
206220 gateway ,
207221 makePrefixLookupRequest (tableId , prefixLookupBatch .values ()),
208222 tableId ,
209223 prefixLookupBatch ));
210224 }
211225
212226 private void sendLookupRequestAndHandleResponse (
227+ int destination ,
213228 TabletServerGateway gateway ,
214229 LookupRequest lookupRequest ,
215230 long tableId ,
@@ -224,15 +239,16 @@ private void sendLookupRequestAndHandleResponse(
224239 .thenAccept (
225240 lookupResponse -> {
226241 try {
227- handleLookupResponse (tableId , lookupResponse , lookupsByBucket );
242+ handleLookupResponse (
243+ tableId , destination , lookupResponse , lookupsByBucket );
228244 } finally {
229245 maxInFlightReuqestsSemaphore .release ();
230246 }
231247 })
232248 .exceptionally (
233249 e -> {
234250 try {
235- handleLookupRequestException (e , lookupsByBucket );
251+ handleLookupRequestException (e , destination , lookupsByBucket );
236252 return null ;
237253 } finally {
238254 maxInFlightReuqestsSemaphore .release ();
@@ -241,6 +257,7 @@ private void sendLookupRequestAndHandleResponse(
241257 }
242258
243259 private void sendPrefixLookupRequestAndHandleResponse (
260+ int destination ,
244261 TabletServerGateway gateway ,
245262 PrefixLookupRequest prefixLookupRequest ,
246263 long tableId ,
@@ -256,15 +273,18 @@ private void sendPrefixLookupRequestAndHandleResponse(
256273 prefixLookupResponse -> {
257274 try {
258275 handlePrefixLookupResponse (
259- tableId , prefixLookupResponse , lookupsByBucket );
276+ tableId ,
277+ destination ,
278+ prefixLookupResponse ,
279+ lookupsByBucket );
260280 } finally {
261281 maxInFlightReuqestsSemaphore .release ();
262282 }
263283 })
264284 .exceptionally (
265285 e -> {
266286 try {
267- handlePrefixLookupException (e , lookupsByBucket );
287+ handlePrefixLookupException (e , destination , lookupsByBucket );
268288 return null ;
269289 } finally {
270290 maxInFlightReuqestsSemaphore .release ();
@@ -274,6 +294,7 @@ private void sendPrefixLookupRequestAndHandleResponse(
274294
275295 private void handleLookupResponse (
276296 long tableId ,
297+ int destination ,
277298 LookupResponse lookupResponse ,
278299 Map <TableBucket , LookupBatch > lookupsByBucket ) {
279300 for (PbLookupRespForBucket pbLookupRespForBucket : lookupResponse .getBucketsRespsList ()) {
@@ -288,10 +309,7 @@ private void handleLookupResponse(
288309 if (pbLookupRespForBucket .hasErrorCode ()) {
289310 // TODO for re-triable error, we should retry here instead of throwing exception.
290311 ApiError error = ApiError .fromErrorMessage (pbLookupRespForBucket );
291- LOG .warn (
292- "Get error lookup response on table bucket {}, fail. Error: {}" ,
293- tableBucket ,
294- error .formatErrMsg ());
312+ handleLookupExceptionForBucket (tableBucket , destination , error );
295313 lookupBatch .completeExceptionally (error .exception ());
296314 } else {
297315 List <byte []> byteValues =
@@ -312,6 +330,7 @@ private void handleLookupResponse(
312330
313331 private void handlePrefixLookupResponse (
314332 long tableId ,
333+ int destination ,
315334 PrefixLookupResponse prefixLookupResponse ,
316335 Map <TableBucket , PrefixLookupBatch > prefixLookupsByBucket ) {
317336 for (PbPrefixLookupRespForBucket pbRespForBucket :
@@ -328,10 +347,7 @@ private void handlePrefixLookupResponse(
328347 if (pbRespForBucket .hasErrorCode ()) {
329348 // TODO for re-triable error, we should retry here instead of throwing exception.
330349 ApiError error = ApiError .fromErrorMessage (pbRespForBucket );
331- LOG .warn (
332- "Get error prefix lookup response on table bucket {}, fail. Error: {}" ,
333- tableBucket ,
334- error .formatErrMsg ());
350+ handleLookupExceptionForBucket (tableBucket , destination , error );
335351 prefixLookupBatch .completeExceptionally (error .exception ());
336352 } else {
337353 List <List <byte []>> result = new ArrayList <>(pbRespForBucket .getValueListsCount ());
@@ -349,24 +365,21 @@ private void handlePrefixLookupResponse(
349365 }
350366
351367 private void handleLookupRequestException (
352- Throwable t , Map <TableBucket , LookupBatch > lookupsByBucket ) {
368+ Throwable t , int destination , Map <TableBucket , LookupBatch > lookupsByBucket ) {
353369 ApiError error = ApiError .fromThrowable (t );
354370 for (LookupBatch lookupBatch : lookupsByBucket .values ()) {
355371 // TODO for re-triable error, we should retry here instead of throwing exception.
356- LOG .warn (
357- "Get error lookup response on table bucket {}, fail. Error: {}" ,
358- lookupBatch .tableBucket (),
359- error .formatErrMsg ());
372+ handleLookupExceptionForBucket (lookupBatch .tableBucket (), destination , error );
360373 lookupBatch .completeExceptionally (error .exception ());
361374 }
362375 }
363376
364377 private void handlePrefixLookupException (
365- Throwable t , Map <TableBucket , PrefixLookupBatch > lookupsByBucket ) {
378+ Throwable t , int destination , Map <TableBucket , PrefixLookupBatch > lookupsByBucket ) {
366379 ApiError error = ApiError .fromThrowable (t );
367380 // TODO If error, we need to retry send the request instead of throw exception.
368- LOG .warn ("Get error prefix lookup response. Error: {}" , error .formatErrMsg ());
369381 for (PrefixLookupBatch lookupBatch : lookupsByBucket .values ()) {
382+ handleLookupExceptionForBucket (lookupBatch .tableBucket (), destination , error );
370383 lookupBatch .completeExceptionally (error .exception ());
371384 }
372385 }
@@ -382,4 +395,46 @@ void initiateClose() {
382395 lookupQueue .close ();
383396 running = false ;
384397 }
398+
399+ private void handleLookupExceptionForBucket (TableBucket tb , int destination , ApiError error ) {
400+ ApiException exception = error .error ().exception ();
401+ LOG .error ("Failed to lookup from node {} for bucket {}" , destination , tb , exception );
402+ if (exception instanceof InvalidMetadataException ) {
403+ LOG .warn (
404+ "Invalid metadata error in fetch log request. "
405+ + "Going to request metadata update." ,
406+ exception );
407+ long tableId = tb .getTableId ();
408+ TableOrPartitions tableOrPartitions ;
409+ if (tb .getPartitionId () == null ) {
410+ tableOrPartitions = new TableOrPartitions (Collections .singleton (tableId ), null );
411+ } else {
412+ tableOrPartitions =
413+ new TableOrPartitions (
414+ null ,
415+ Collections .singleton (
416+ new TablePartition (tableId , tb .getPartitionId ())));
417+ }
418+ invalidTableOrPartitions (tableOrPartitions );
419+ }
420+ }
421+
422+ /** A helper class to hold table ids or table partitions. */
423+ private static class TableOrPartitions {
424+ private final @ Nullable Set <Long > tableIds ;
425+ private final @ Nullable Set <TablePartition > tablePartitions ;
426+
427+ TableOrPartitions (
428+ @ Nullable Set <Long > tableIds , @ Nullable Set <TablePartition > tablePartitions ) {
429+ this .tableIds = tableIds ;
430+ this .tablePartitions = tablePartitions ;
431+ }
432+ }
433+
434+ private void invalidTableOrPartitions (TableOrPartitions tableOrPartitions ) {
435+ Set <PhysicalTablePath > physicalTablePaths =
436+ metadataUpdater .getPhysicalTablePathByIds (
437+ tableOrPartitions .tableIds , tableOrPartitions .tablePartitions );
438+ metadataUpdater .invalidPhysicalTableBucketMeta (physicalTablePaths );
439+ }
385440}
0 commit comments