Skip to content

Commit 300cf0f

Browse files
authored
fix(storage): deduplicate file paths in remove_file_in_batch to prevent opendal hang (#19939)
When the file list passed to remove_file_in_batch contains duplicate paths (e.g. blocks with and without the vacuum2 'h' object-key prefix, or certain object storage implementations returning duplicates during listing pagination), opendal's Operator::delete_iter enters an infinite loop in Deleter::close(): the internal BatchDeleter uses a HashSet (deduplicating to N unique entries) while the outer Deleter tracks cur_size by call count. After flush returns N < cur_size, the buffer is empty but cur_size never reaches zero, spinning forever. Fix: deduplicate locations inside remove_file_in_batch before chunking and deletion. Log a warning with duplicate count and sample paths when duplicates are detected, to aid future investigation.
1 parent 347ccd5 commit 300cf0f

4 files changed

Lines changed: 81 additions & 7 deletions

File tree

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ pub async fn do_vacuum2(
220220
.chain(blocks_to_gc.into_iter())
221221
.chain(stats_to_gc.into_iter())
222222
.collect();
223+
223224
let op = Files::create(ctx.clone(), fuse_table.get_operator());
224225

225226
// order is important

src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use databend_enterprise_query::test_kits::context::EESetup;
2020
use databend_query::sessions::QueryContext;
2121
use databend_query::sessions::TableContextTableAccess;
2222
use databend_query::test_kits::TestFixture;
23+
use databend_storages_common_io::dedup_file_locations;
2324

2425
// TODO investigate this
2526
// NOTE: SHOULD specify flavor = "multi_thread", otherwise query execution might be hanged
@@ -116,3 +117,47 @@ async fn test_vacuum2_all() -> anyhow::Result<()> {
116117

117118
Ok(())
118119
}
120+
121+
/// Verifies that dedup_file_locations correctly removes duplicates and reports samples.
122+
#[test]
123+
fn test_dedup_file_locations() {
124+
// Simulate the vacuum2 scenario: bloom index paths generated from blocks
125+
// with and without the 'h' prefix map to the same location.
126+
let mut locations = vec![
127+
"548052/604310/_i_b_v2/019bdabd0292702a96f51f3b3ea64335_v4.parquet".to_string(),
128+
"548052/604310/_i_b_v2/019bdabd02927061af2ed18c2562b79e_v4.parquet".to_string(),
129+
"548052/604310/_i_b_v2/019c0df29e8a7016a6a1be466e094ec3_v4.parquet".to_string(),
130+
// Duplicates (same paths appearing again)
131+
"548052/604310/_i_b_v2/019bdabd0292702a96f51f3b3ea64335_v4.parquet".to_string(),
132+
"548052/604310/_i_b_v2/019bdabd02927061af2ed18c2562b79e_v4.parquet".to_string(),
133+
];
134+
135+
let (duplicates, samples) = dedup_file_locations(&mut locations);
136+
137+
assert_eq!(duplicates, 2);
138+
assert_eq!(locations.len(), 3);
139+
assert_eq!(samples.len(), 2);
140+
assert_eq!(
141+
samples[0],
142+
"548052/604310/_i_b_v2/019bdabd0292702a96f51f3b3ea64335_v4.parquet"
143+
);
144+
assert_eq!(
145+
samples[1],
146+
"548052/604310/_i_b_v2/019bdabd02927061af2ed18c2562b79e_v4.parquet"
147+
);
148+
}
149+
150+
#[test]
151+
fn test_dedup_file_locations_no_duplicates() {
152+
let mut locations = vec![
153+
"a/b/file1.parquet".to_string(),
154+
"a/b/file2.parquet".to_string(),
155+
"a/b/file3.parquet".to_string(),
156+
];
157+
158+
let (duplicates, samples) = dedup_file_locations(&mut locations);
159+
160+
assert_eq!(duplicates, 0);
161+
assert_eq!(locations.len(), 3);
162+
assert!(samples.is_empty());
163+
}

src/query/storages/common/io/src/files.rs

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,21 @@ use databend_common_exception::Result;
2121
use log::info;
2222
use opendal::Operator;
2323

24+
/// Deduplicate a list of file locations in place.
25+
/// Returns (number_of_duplicates_removed, up_to_5_sample_duplicate_paths).
26+
pub fn dedup_file_locations(locations: &mut Vec<String>) -> (usize, Vec<String>) {
27+
let len_before = locations.len();
28+
locations.sort_unstable();
29+
let dup_samples: Vec<String> = locations
30+
.windows(2)
31+
.filter(|w| w[0] == w[1])
32+
.map(|w| w[0].clone())
33+
.take(5)
34+
.collect();
35+
locations.dedup();
36+
(len_before - locations.len(), dup_samples)
37+
}
38+
2439
// File related operations.
2540
pub struct Files {
2641
ctx: Arc<dyn TableContext>,
@@ -40,12 +55,30 @@ impl Files {
4055
&self,
4156
file_locations: impl IntoIterator<Item = impl AsRef<str>>,
4257
) -> Result<()> {
43-
let locations = Vec::from_iter(file_locations.into_iter().map(|v| v.as_ref().to_string()));
58+
let mut locations: Vec<String> = file_locations
59+
.into_iter()
60+
.map(|v| v.as_ref().trim_start_matches('/').to_string())
61+
.filter(|v| !v.is_empty())
62+
.collect();
4463

4564
if locations.is_empty() {
4665
return Ok(());
4766
}
4867

68+
// Deduplicate: opendal's Deleter uses a HashSet internally but tracks size by
69+
// insertion count. Duplicates cause cur_size to diverge from the buffer, making
70+
// Deleter::close() loop forever.
71+
let (duplicates, dup_samples) = dedup_file_locations(&mut locations);
72+
if duplicates > 0 {
73+
info!(
74+
"remove_file_in_batch: deduplicated {} entries ({} -> {}), duplicate samples: {:?}",
75+
duplicates,
76+
duplicates + locations.len(),
77+
locations.len(),
78+
dup_samples
79+
);
80+
}
81+
4982
// adjusts batch_size according to the `max_threads` settings,
5083
// limits its min/max value to 1 and 1000.
5184
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
@@ -90,12 +123,6 @@ impl Files {
90123
#[async_backtrace::framed]
91124
async fn delete_files(op: Operator, locations: Vec<String>) -> Result<()> {
92125
let start = Instant::now();
93-
// temporary fix for https://github.com/datafuselabs/databend/issues/13804
94-
let locations = locations
95-
.into_iter()
96-
.map(|loc| loc.trim_start_matches('/').to_owned())
97-
.filter(|loc| !loc.is_empty())
98-
.collect::<Vec<_>>();
99126
info!("deleting files {:?}", &locations);
100127
let num_of_files = locations.len();
101128

src/query/storages/common/io/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod read_settings;
2020

2121
pub use buffer_reader::BufferReader;
2222
pub use files::Files;
23+
pub use files::dedup_file_locations;
2324
pub use merge_io_reader::MergeIOReader;
2425
pub use merge_io_result::MergeIOReadResult;
2526
pub use merge_io_result::OwnerMemory;

0 commit comments

Comments
 (0)