Skip to content

Commit cb5413b

Browse files
committed
update
1 parent e6d9624 commit cb5413b

File tree

5 files changed

+1063
-107
lines changed

5 files changed

+1063
-107
lines changed

src/query/ee/src/storages/fuse/operations/vacuum_table_v2_branch_cleanup.rs

Lines changed: 46 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,11 @@ use databend_common_meta_app::schema::ListHistoryTableBranchesReq;
3333
use databend_common_meta_app::schema::ListIndexesByIdReq;
3434
use databend_common_meta_app::schema::ListTableTagsReq;
3535
use databend_common_storages_fuse::FuseTable;
36-
use databend_common_storages_fuse::io::SegmentsIO;
3736
use databend_common_storages_fuse::io::SnapshotsIO;
3837
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
3938
use databend_common_storages_fuse::operations::SnapshotGcSelection;
4039
use databend_meta_types::MatchSeq;
41-
use databend_storages_common_cache::CacheAccessor;
42-
use databend_storages_common_cache::CacheManager;
4340
use databend_storages_common_io::Files;
44-
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
4541
use databend_storages_common_table_meta::meta::Location;
4642
use databend_storages_common_table_meta::meta::TableSnapshot;
4743
use databend_storages_common_table_meta::meta::try_extract_uuid_str_from_path;
@@ -283,14 +279,14 @@ pub async fn do_vacuum2(
283279
branch_table.as_ref(),
284280
ctx.clone(),
285281
None,
286-
list_owner_files_to_gc(
287-
branch_table.as_ref(),
288-
branch_table
289-
.meta_location_generator()
290-
.snapshot_location_prefix(),
291-
None,
292-
)
293-
.await?,
282+
branch_table
283+
.list_files_for_gc(
284+
branch_table
285+
.meta_location_generator()
286+
.snapshot_location_prefix(),
287+
None,
288+
)
289+
.await?,
294290
protected_segments,
295291
protected_blocks,
296292
)
@@ -434,17 +430,8 @@ async fn select_owner_gc_root(
434430
}
435431
None => {
436432
let gc_root_meta_ts = fuse_table
437-
.get_operator_ref()
438-
.stat(&latest_snapshot_loc)
439-
.await
440-
.map_err(ErrorCode::from)?
441-
.last_modified()
442-
.ok_or_else(|| {
443-
ErrorCode::StorageOther(format!(
444-
"Failed to get `last_modified` metadata of the snapshot object '{}'",
445-
latest_snapshot_loc
446-
))
447-
})?;
433+
.snapshot_last_modified(&latest_snapshot_loc)
434+
.await?;
448435
(
449436
latest_snapshot,
450437
latest_snapshot_loc.clone(),
@@ -465,26 +452,6 @@ async fn select_owner_gc_root(
465452
}))
466453
}
467454

468-
async fn remove_snapshots(
469-
fuse_table: &FuseTable,
470-
ctx: Arc<dyn TableContext>,
471-
snapshots_to_gc: &[String],
472-
) -> Result<()> {
473-
if snapshots_to_gc.is_empty() {
474-
return Ok(());
475-
}
476-
477-
if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() {
478-
for path in snapshots_to_gc {
479-
snapshot_cache.evict(path);
480-
}
481-
}
482-
Files::create(ctx, fuse_table.get_operator())
483-
.remove_file_in_batch(snapshots_to_gc)
484-
.await?;
485-
Ok(())
486-
}
487-
488455
fn merge_protected_segments(
489456
owner_prefixes: &OwnerPrefixes,
490457
owner_result: &OwnerSnapshotResult,
@@ -507,31 +474,23 @@ async fn collect_protected_blocks_by_owner(
507474
owner_prefixes: &OwnerPrefixes,
508475
protected_segments_by_owner: &ProtectedSegmentsByOwner,
509476
) -> Result<ProtectedBlocksByOwner> {
510-
let segments_io = SegmentsIO::create(
511-
ctx.clone(),
512-
base_fuse_table.get_operator(),
513-
base_fuse_table.schema(),
514-
);
515477
let mut protected_blocks_by_owner: ProtectedBlocksByOwner = HashMap::new();
516478

517479
for protected_segments in protected_segments_by_owner.values() {
518480
if protected_segments.is_empty() {
519481
continue;
520482
}
521483

522-
let locations = protected_segments.iter().cloned().collect::<Vec<_>>();
523-
let segments = segments_io
524-
.read_segments::<Arc<CompactSegmentInfo>>(&locations, false)
484+
let segment_refs = protected_segments.iter().collect::<Vec<_>>();
485+
let block_locations = base_fuse_table
486+
.get_block_locations(ctx.clone(), &segment_refs, false, false)
525487
.await?;
526-
for segment in segments {
527-
for block in segment?.block_metas()?.iter() {
528-
let block_path = &block.location.0;
529-
let owner_id = owner_id_by_path(owner_prefixes, block_path)?;
530-
protected_blocks_by_owner
531-
.entry(owner_id)
532-
.or_default()
533-
.insert(try_extract_uuid_str_from_path(block_path)?.to_string());
534-
}
488+
for block_path in block_locations.block_location {
489+
let owner_id = owner_id_by_path(owner_prefixes, &block_path)?;
490+
protected_blocks_by_owner
491+
.entry(owner_id)
492+
.or_default()
493+
.insert(try_extract_uuid_str_from_path(&block_path)?.to_string());
535494
}
536495
}
537496

@@ -550,29 +509,29 @@ async fn cleanup_owner_data(
550509
let fuse_table = FuseTable::try_from_table(table)?;
551510
let table_info = fuse_table.get_table_info();
552511

553-
let segments_to_gc = list_owner_files_to_gc(
554-
fuse_table,
555-
fuse_table
556-
.meta_location_generator()
557-
.segment_location_prefix(),
558-
cutoff,
559-
)
560-
.await?
561-
.into_iter()
562-
.filter(|path| {
563-
!protected_segments.contains(&(
564-
path.clone(),
565-
TableMetaLocationGenerator::snapshot_version(path),
566-
))
567-
})
568-
.collect::<Vec<_>>();
569-
let blocks_to_gc = filter_blocks_to_gc(
570-
list_owner_files_to_gc(
571-
fuse_table,
572-
fuse_table.meta_location_generator().block_location_prefix(),
512+
let segments_to_gc = fuse_table
513+
.list_files_for_gc(
514+
fuse_table
515+
.meta_location_generator()
516+
.segment_location_prefix(),
573517
cutoff,
574518
)
575-
.await?,
519+
.await?
520+
.into_iter()
521+
.filter(|path| {
522+
!protected_segments.contains(&(
523+
path.clone(),
524+
TableMetaLocationGenerator::snapshot_version(path),
525+
))
526+
})
527+
.collect::<Vec<_>>();
528+
let blocks_to_gc = filter_blocks_to_gc(
529+
fuse_table
530+
.list_files_for_gc(
531+
fuse_table.meta_location_generator().block_location_prefix(),
532+
cutoff,
533+
)
534+
.await?,
576535
&protected_blocks,
577536
)?;
578537

@@ -636,7 +595,9 @@ async fn finalize_owner_snapshot_phase(
636595
protected_segments: HashSet<Location>,
637596
snapshot_files_to_gc: Vec<String>,
638597
) -> Result<OwnerSnapshotResult> {
639-
remove_snapshots(fuse_table, ctx, &snapshot_files_to_gc).await?;
598+
fuse_table
599+
.cleanup_snapshot_files(&ctx, &snapshot_files_to_gc, false)
600+
.await?;
640601
let gc_root_snapshot_ts = gc_root.gc_root.timestamp.ok_or_else(|| {
641602
ErrorCode::IllegalReference(format!(
642603
"Table {} snapshot lacks required timestamp",
@@ -654,23 +615,6 @@ async fn finalize_owner_snapshot_phase(
654615
})
655616
}
656617

657-
#[async_backtrace::framed]
658-
async fn list_owner_files_to_gc(
659-
fuse_table: &FuseTable,
660-
prefix: &str,
661-
cutoff: Option<(DateTime<Utc>, DateTime<Utc>)>,
662-
) -> Result<Vec<String>> {
663-
match cutoff {
664-
Some((gc_root_timestamp, gc_root_meta_ts)) => Ok(fuse_table
665-
.list_files_until_timestamp(prefix, gc_root_timestamp, false, Some(gc_root_meta_ts))
666-
.await?
667-
.into_iter()
668-
.map(|entry| entry.path().to_string())
669-
.collect()),
670-
None => fuse_table.list_files(prefix.to_string(), |_, _| true).await,
671-
}
672-
}
673-
674618
#[async_backtrace::framed]
675619
async fn cleanup_owner_files(
676620
fuse_table: &FuseTable,
@@ -728,7 +672,9 @@ async fn cleanup_owner_files(
728672
let op = Files::create(ctx.clone(), fuse_table.get_operator());
729673
op.remove_file_in_batch(&indexes_to_gc).await?;
730674
op.remove_file_in_batch(&subject_files_to_gc).await?;
731-
remove_snapshots(fuse_table, ctx, &snapshots_to_gc).await?;
675+
fuse_table
676+
.cleanup_snapshot_files(&ctx, &snapshots_to_gc, false)
677+
.await?;
732678

733679
Ok(subject_files_to_gc
734680
.into_iter()

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -976,17 +976,15 @@ impl Table for FuseTable {
976976
num_snapshot_limit: Option<usize>,
977977
dry_run: bool,
978978
) -> Result<Option<Vec<String>>> {
979-
match self.navigate_for_purge(&ctx, instant).await {
980-
Ok((table, files)) => {
981-
table
982-
.do_purge(&ctx, files, num_snapshot_limit, dry_run)
983-
.await
984-
}
979+
match self
980+
.do_ref_aware_purge(&ctx, instant, num_snapshot_limit, dry_run)
981+
.await
982+
{
985983
Err(e) if e.code() == ErrorCode::TABLE_HISTORICAL_DATA_NOT_FOUND => {
986984
warn!("navigate failed: {:?}", e);
987985
if dry_run { Ok(Some(vec![])) } else { Ok(None) }
988986
}
989-
Err(e) => Err(e),
987+
res => res,
990988
}
991989
}
992990

src/query/storages/fuse/src/operations/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ mod merge_into;
2626
mod mutation;
2727
mod mutation_source;
2828
mod navigate;
29+
mod purge;
2930
pub mod read;
3031
mod read_data;
3132
mod read_partitions;
3233
mod recluster;
34+
mod ref_gc;
3335
mod replace;
3436
mod replace_into;
3537
mod revert;

0 commit comments

Comments
 (0)