diff --git a/core/core/src/raw/oio/list/flat_list.rs b/core/core/src/raw/oio/list/flat_list.rs index 69ff0edb5be0..424c916154ea 100644 --- a/core/core/src/raw/oio/list/flat_list.rs +++ b/core/core/src/raw/oio/list/flat_list.rs @@ -103,9 +103,31 @@ where ); continue; } + Err(e) if e.kind() == ErrorKind::NotFound => { + // Skip directories that are deleted while listing. + log::warn!( + "FlatLister skipping directory due to not found during listing: {}", + de.path() + ); + continue; + } Err(e) => return Err(e), }; - if let Some(v) = l.next().await? { + let first = loop { + match l.next().await { + Ok(v) => break v, + Err(e) if e.kind() == ErrorKind::NotFound => { + // Skip entries that are deleted during listing. + log::warn!( + "FlatLister skipping entry due to not found during listing: {}", + de.path() + ); + continue; + } + Err(e) => return Err(e), + } + }; + if let Some(v) = first { self.active_lister.push((Some(de.clone()), l)); if v.mode().is_dir() { @@ -120,21 +142,40 @@ where } } + if matches!(self.active_lister.last(), Some((None, _))) { + let _ = self.active_lister.pop(); + continue; + } + let (de, lister) = match self.active_lister.last_mut() { Some((de, lister)) => (de, lister), None => return Ok(None), }; - match lister.next().await? { - Some(v) if v.mode().is_dir() => { + match lister.next().await { + Err(e) if e.kind() == ErrorKind::NotFound => { + let path = de.as_ref().map(|entry| entry.path()).unwrap_or(""); + log::warn!( + "FlatLister skipping entry due to not found during recursive listing: {}", + path + ); + continue; + } + Err(e) => return Err(e), + Ok(Some(v)) if v.mode().is_dir() => { // should not loop itself again - if v.path() != de.as_ref().expect("de should not be none here").path() { + if v.path() + != de + .as_ref() + .expect("de must be present before listing") + .path() + { self.next_dir = Some(v); continue; } } - Some(v) => return Ok(Some(v)), - None => match de.take() { + Ok(Some(v)) => return Ok(Some(v)), + Ok(None) => match de.take() { Some(de) => { return Ok(Some(de)); } diff --git a/core/services/fs/src/lister.rs b/core/services/fs/src/lister.rs index 90aaf3075bb4..8b9cee5f2617 100644 --- a/core/services/fs/src/lister.rs +++ b/core/services/fs/src/lister.rs @@ -48,42 +48,69 @@ unsafe impl

Sync for FsLister

{} impl oio::List for FsLister { async fn next(&mut self) -> Result> { - // since list should return path itself, we return it first - if let Some(path) = self.current_path.take() { - let e = oio::Entry::new(path.as_str(), Metadata::new(EntryMode::DIR)); - return Ok(Some(e)); - } + loop { + // since list should return path itself, we return it first + if let Some(path) = self.current_path.take() { + let e = oio::Entry::new(path.as_str(), Metadata::new(EntryMode::DIR)); + return Ok(Some(e)); + } - let Some(de) = self.rd.next_entry().await.map_err(new_std_io_error)? else { - return Ok(None); - }; + let de = match self.rd.next_entry().await { + Ok(Some(de)) => de, + Ok(None) => return Ok(None), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // Directory can be removed during listing; stop gracefully. + return Ok(None); + } + Err(e) => return Err(new_std_io_error(e)), + }; - let entry_path = de.path(); - let rel_path = normalize_path( - &entry_path - .strip_prefix(&self.root) - .expect("cannot fail because the prefix is iterated") - .to_string_lossy() - .replace('\\', "/"), - ); + let entry_path = de.path(); + let rel_path = normalize_path( + &entry_path + .strip_prefix(&self.root) + .expect("cannot fail because the prefix is iterated") + .to_string_lossy() + .replace('\\', "/"), + ); - let ft = de.file_type().await.map_err(new_std_io_error)?; - let (path, mode) = if ft.is_dir() { - // Make sure we are returning the correct path. - (&format!("{rel_path}/"), EntryMode::DIR) - } else if ft.is_file() { - (&rel_path, EntryMode::FILE) - } else { - (&rel_path, EntryMode::Unknown) - }; + let ft = match de.file_type().await { + Ok(ft) => ft, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // Entry can be deleted between readdir and stat calls. + continue; + } + Err(e) => return Err(new_std_io_error(e)), + }; + let (path, mode) = if ft.is_dir() { + // Make sure we are returning the correct path. + (&format!("{rel_path}/"), EntryMode::DIR) + } else if ft.is_file() { + (&rel_path, EntryMode::FILE) + } else { + (&rel_path, EntryMode::Unknown) + }; - let de_metadata = de.metadata().await.map_err(new_std_io_error)?; - let metadata = Metadata::new(mode) - .with_content_length(de_metadata.len()) - .with_last_modified(Timestamp::try_from( - de_metadata.modified().map_err(new_std_io_error)?, - )?); + let de_metadata = match de.metadata().await { + Ok(de_metadata) => de_metadata, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // Entry can be deleted between readdir and metadata calls. + continue; + } + Err(e) => return Err(new_std_io_error(e)), + }; + let last_modified = match de_metadata.modified() { + Ok(v) => v, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + continue; + } + Err(e) => return Err(new_std_io_error(e)), + }; + let metadata = Metadata::new(mode) + .with_content_length(de_metadata.len()) + .with_last_modified(Timestamp::try_from(last_modified)?); - Ok(Some(oio::Entry::new(path, metadata))) + return Ok(Some(oio::Entry::new(path, metadata))); + } } }