Skip to content

Commit 32dbf7c

Browse files
committed
[BugFix] remove load job after db had been dropped
Signed-off-by: luohaha <[email protected]>
1 parent a520ac2 commit 32dbf7c

File tree

3 files changed

+84
-0
lines changed

3 files changed

+84
-0
lines changed

fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,24 @@ public int compare(LoadJob o1, LoadJob o2) {
495495
}
496496
}
497497

498+
public void removeLoadJobsByDb(long dbId) {
499+
writeLock();
500+
try {
501+
// Get all jobs belonging to the database
502+
List<LoadJob> jobsToRemove = idToLoadJob.values().stream()
503+
.filter(job -> job.getDbId() == dbId)
504+
.collect(Collectors.toList());
505+
506+
// Remove each job
507+
for (LoadJob job : jobsToRemove) {
508+
LOG.info("remove load job {} of database {}", job.getLabel(), dbId);
509+
unprotectedRemoveJobReleatedMeta(job);
510+
}
511+
} finally {
512+
writeUnlock();
513+
}
514+
}
515+
498516
// only for those jobs which transaction is not started
499517
public void processTimeoutJobs() {
500518
idToLoadJob.values().stream().forEach(entity -> entity.processTimeout());

fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5051,6 +5051,8 @@ public void setPartitionVersion(AdminSetPartitionVersionStmt stmt) {
50515051
public void onEraseDatabase(long dbId) {
50525052
// remove database transaction manager
50535053
stateMgr.getGlobalTransactionMgr().removeDatabaseTransactionMgr(dbId);
5054+
// remove load jobs belonging to this database
5055+
stateMgr.getLoadMgr().removeLoadJobsByDb(dbId);
50545056
// unbind db to storage volume
50555057
stateMgr.getStorageVolumeMgr().unbindDbToStorageVolume(dbId);
50565058
}

fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadMgrTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,4 +387,68 @@ public void testReplayUpdateLoadJobStateInfoRemovesExpiredJob(@Mocked GlobalStat
387387
// recover config
388388
Config.label_keep_max_second = origKeep;
389389
}
390+
391+
@Test
392+
public void testRemoveLoadJobsByDb(@Mocked GlobalStateMgr globalStateMgr,
393+
@Injectable Database db1,
394+
@Injectable Database db2) throws Exception {
395+
LoadMgr loadMgr = new LoadMgr(new LoadJobScheduler());
396+
397+
// Create load jobs for database 1
398+
long dbId1 = 100L;
399+
InsertLoadJob job1 = new InsertLoadJob("label1", dbId1, 1L, System.currentTimeMillis(), "", "", null);
400+
job1.id = 1001L;
401+
job1.state = JobState.FINISHED;
402+
Deencapsulation.invoke(loadMgr, "addLoadJob", job1);
403+
404+
InsertLoadJob job2 = new InsertLoadJob("label2", dbId1, 1L, System.currentTimeMillis(), "", "", null);
405+
job2.id = 1002L;
406+
job2.state = JobState.LOADING;
407+
Deencapsulation.invoke(loadMgr, "addLoadJob", job2);
408+
409+
// Create load jobs for database 2
410+
long dbId2 = 200L;
411+
InsertLoadJob job3 = new InsertLoadJob("label3", dbId2, 2L, System.currentTimeMillis(), "", "", null);
412+
job3.id = 2001L;
413+
job3.state = JobState.FINISHED;
414+
Deencapsulation.invoke(loadMgr, "addLoadJob", job3);
415+
416+
InsertLoadJob job4 = new InsertLoadJob("label4", dbId2, 2L, System.currentTimeMillis(), "", "", null);
417+
job4.id = 2002L;
418+
job4.state = JobState.CANCELLED;
419+
Deencapsulation.invoke(loadMgr, "addLoadJob", job4);
420+
421+
// Verify all jobs are added
422+
Map<Long, LoadJob> idToLoadJob = Deencapsulation.getField(loadMgr, "idToLoadJob");
423+
Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = Deencapsulation.getField(
424+
loadMgr, "dbIdToLabelToLoadJobs");
425+
426+
Assertions.assertEquals(4, idToLoadJob.size());
427+
Assertions.assertTrue(dbIdToLabelToLoadJobs.containsKey(dbId1));
428+
Assertions.assertTrue(dbIdToLabelToLoadJobs.containsKey(dbId2));
429+
Assertions.assertEquals(2, dbIdToLabelToLoadJobs.get(dbId1).size());
430+
Assertions.assertEquals(2, dbIdToLabelToLoadJobs.get(dbId2).size());
431+
432+
// Remove all jobs for database 1
433+
loadMgr.removeLoadJobsByDb(dbId1);
434+
435+
// Verify jobs for database 1 are removed
436+
Assertions.assertEquals(2, idToLoadJob.size());
437+
Assertions.assertFalse(idToLoadJob.containsKey(job1.id));
438+
Assertions.assertFalse(idToLoadJob.containsKey(job2.id));
439+
Assertions.assertTrue(idToLoadJob.containsKey(job3.id));
440+
Assertions.assertTrue(idToLoadJob.containsKey(job4.id));
441+
442+
// Verify database 1 is removed from dbIdToLabelToLoadJobs
443+
Assertions.assertFalse(dbIdToLabelToLoadJobs.containsKey(dbId1));
444+
Assertions.assertTrue(dbIdToLabelToLoadJobs.containsKey(dbId2));
445+
446+
// Remove all jobs for database 2
447+
loadMgr.removeLoadJobsByDb(dbId2);
448+
449+
// Verify all jobs are removed
450+
Assertions.assertEquals(0, idToLoadJob.size());
451+
Assertions.assertFalse(dbIdToLabelToLoadJobs.containsKey(dbId2));
452+
Assertions.assertEquals(0, dbIdToLabelToLoadJobs.size());
453+
}
390454
}

0 commit comments

Comments
 (0)