|
22 | 22 | import org.apache.fluss.config.Configuration; |
23 | 23 | import org.apache.fluss.exception.FlussRuntimeException; |
24 | 24 | import org.apache.fluss.exception.LogStorageException; |
| 25 | +import org.apache.fluss.exception.SchemaNotExistException; |
25 | 26 | import org.apache.fluss.metadata.LogFormat; |
26 | 27 | import org.apache.fluss.metadata.PhysicalTablePath; |
27 | 28 | import org.apache.fluss.metadata.TableBucket; |
@@ -189,24 +190,8 @@ private void loadLogs() { |
189 | 190 | final boolean cleanShutdown = isCleanShutdown; |
190 | 191 | // set runnable job. |
191 | 192 | Runnable[] jobsForDir = |
192 | | - tabletsToLoad.stream() |
193 | | - .map( |
194 | | - tabletDir -> |
195 | | - (Runnable) |
196 | | - () -> { |
197 | | - LOG.debug("Loading log {}", tabletDir); |
198 | | - try { |
199 | | - loadLog( |
200 | | - tabletDir, |
201 | | - cleanShutdown, |
202 | | - finalRecoveryPoints, |
203 | | - conf, |
204 | | - clock); |
205 | | - } catch (Exception e) { |
206 | | - throw new FlussRuntimeException(e); |
207 | | - } |
208 | | - }) |
209 | | - .toArray(Runnable[]::new); |
| 193 | + createLogLoadingJobs( |
| 194 | + tabletsToLoad, cleanShutdown, finalRecoveryPoints, conf, clock); |
210 | 195 |
|
211 | 196 | long startTime = System.currentTimeMillis(); |
212 | 197 |
|
@@ -471,6 +456,70 @@ public void shutdown() { |
471 | 456 | LOG.info("Shut down LogManager complete."); |
472 | 457 | } |
473 | 458 |
|
| 459 | + /** Create runnable jobs for loading logs from tablet directories. */ |
| 460 | + private Runnable[] createLogLoadingJobs( |
| 461 | + List<File> tabletsToLoad, |
| 462 | + boolean cleanShutdown, |
| 463 | + Map<TableBucket, Long> recoveryPoints, |
| 464 | + Configuration conf, |
| 465 | + Clock clock) { |
| 466 | + Runnable[] jobs = new Runnable[tabletsToLoad.size()]; |
| 467 | + for (int i = 0; i < tabletsToLoad.size(); i++) { |
| 468 | + final File tabletDir = tabletsToLoad.get(i); |
| 469 | + jobs[i] = createLogLoadingJob(tabletDir, cleanShutdown, recoveryPoints, conf, clock); |
| 470 | + } |
| 471 | + return jobs; |
| 472 | + } |
| 473 | + |
| 474 | + /** Create a runnable job for loading log from a single tablet directory. */ |
| 475 | + private Runnable createLogLoadingJob( |
| 476 | + File tabletDir, |
| 477 | + boolean cleanShutdown, |
| 478 | + Map<TableBucket, Long> recoveryPoints, |
| 479 | + Configuration conf, |
| 480 | + Clock clock) { |
| 481 | + return new Runnable() { |
| 482 | + @Override |
| 483 | + public void run() { |
| 484 | + LOG.debug("Loading log {}", tabletDir); |
| 485 | + try { |
| 486 | + loadLog(tabletDir, cleanShutdown, recoveryPoints, conf, clock); |
| 487 | + } catch (Exception e) { |
| 488 | + LOG.error("Fail to loadLog from {}", tabletDir, e); |
| 489 | + if (e instanceof SchemaNotExistException) { |
| 490 | + LOG.error( |
| 491 | + "schema not exist, table for {} has already been dropped, the residual data will be removed.", |
| 492 | + tabletDir, |
| 493 | + e); |
| 494 | + FileUtils.deleteDirectoryQuietly(tabletDir); |
| 495 | + |
| 496 | + // Also delete corresponding KV tablet directory if it exists |
| 497 | + try { |
| 498 | + Tuple2<PhysicalTablePath, TableBucket> pathAndBucket = |
| 499 | + FlussPaths.parseTabletDir(tabletDir); |
| 500 | + File kvTabletDir = |
| 501 | + FlussPaths.kvTabletDir( |
| 502 | + dataDir, pathAndBucket.f0, pathAndBucket.f1); |
| 503 | + if (kvTabletDir.exists()) { |
| 504 | + LOG.info( |
| 505 | + "Also removing corresponding KV tablet directory: {}", |
| 506 | + kvTabletDir); |
| 507 | + FileUtils.deleteDirectoryQuietly(kvTabletDir); |
| 508 | + } |
| 509 | + } catch (Exception kvDeleteException) { |
| 510 | + LOG.warn( |
| 511 | + "Failed to delete corresponding KV tablet directory for log {}: {}", |
| 512 | + tabletDir, |
| 513 | + kvDeleteException.getMessage()); |
| 514 | + } |
| 515 | + return; |
| 516 | + } |
| 517 | + throw new FlussRuntimeException(e); |
| 518 | + } |
| 519 | + } |
| 520 | + }; |
| 521 | + } |
| 522 | + |
474 | 523 | @VisibleForTesting |
475 | 524 | void checkpointRecoveryOffsets() { |
476 | 525 | // Assuming TableBucket and LogTablet are actual types used in your application |
|
0 commit comments