Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions core/core/src/raw/oio/list/flat_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,31 @@ where
);
continue;
}
Err(e) if e.kind() == ErrorKind::NotFound => {
// Skip directories that are deleted while listing.
log::warn!(
"FlatLister skipping directory due to not found during listing: {}",
de.path()
);
continue;
}
Err(e) => return Err(e),
};
if let Some(v) = l.next().await? {
let first = loop {
match l.next().await {
Ok(v) => break v,
Err(e) if e.kind() == ErrorKind::NotFound => {
// Skip entries that are deleted during listing.
log::warn!(
"FlatLister skipping entry due to not found during listing: {}",
de.path()
);
continue;
}
Err(e) => return Err(e),
}
};
if let Some(v) = first {
self.active_lister.push((Some(de.clone()), l));

if v.mode().is_dir() {
Expand All @@ -120,21 +142,40 @@ where
}
}

if matches!(self.active_lister.last(), Some((None, _))) {
let _ = self.active_lister.pop();
continue;
}

let (de, lister) = match self.active_lister.last_mut() {
Some((de, lister)) => (de, lister),
None => return Ok(None),
};

match lister.next().await? {
Some(v) if v.mode().is_dir() => {
match lister.next().await {
Err(e) if e.kind() == ErrorKind::NotFound => {
let path = de.as_ref().map(|entry| entry.path()).unwrap_or("<unknown>");
log::warn!(
"FlatLister skipping entry due to not found during recursive listing: {}",
path
);
continue;
}
Err(e) => return Err(e),
Ok(Some(v)) if v.mode().is_dir() => {
// should not loop itself again
if v.path() != de.as_ref().expect("de should not be none here").path() {
if v.path()
!= de
.as_ref()
.expect("de must be present before listing")
.path()
{
self.next_dir = Some(v);
continue;
}
}
Some(v) => return Ok(Some(v)),
None => match de.take() {
Ok(Some(v)) => return Ok(Some(v)),
Ok(None) => match de.take() {
Some(de) => {
return Ok(Some(de));
}
Expand Down
91 changes: 59 additions & 32 deletions core/services/fs/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,42 +48,69 @@ unsafe impl<P> Sync for FsLister<P> {}

impl oio::List for FsLister<tokio::fs::ReadDir> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
// since list should return path itself, we return it first
if let Some(path) = self.current_path.take() {
let e = oio::Entry::new(path.as_str(), Metadata::new(EntryMode::DIR));
return Ok(Some(e));
}
loop {
// since list should return path itself, we return it first
if let Some(path) = self.current_path.take() {
let e = oio::Entry::new(path.as_str(), Metadata::new(EntryMode::DIR));
return Ok(Some(e));
}

let Some(de) = self.rd.next_entry().await.map_err(new_std_io_error)? else {
return Ok(None);
};
let de = match self.rd.next_entry().await {
Ok(Some(de)) => de,
Ok(None) => return Ok(None),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// Directory can be removed during listing; stop gracefully.
return Ok(None);
}
Err(e) => return Err(new_std_io_error(e)),
};

let entry_path = de.path();
let rel_path = normalize_path(
&entry_path
.strip_prefix(&self.root)
.expect("cannot fail because the prefix is iterated")
.to_string_lossy()
.replace('\\', "/"),
);
let entry_path = de.path();
let rel_path = normalize_path(
&entry_path
.strip_prefix(&self.root)
.expect("cannot fail because the prefix is iterated")
.to_string_lossy()
.replace('\\', "/"),
);

let ft = de.file_type().await.map_err(new_std_io_error)?;
let (path, mode) = if ft.is_dir() {
// Make sure we are returning the correct path.
(&format!("{rel_path}/"), EntryMode::DIR)
} else if ft.is_file() {
(&rel_path, EntryMode::FILE)
} else {
(&rel_path, EntryMode::Unknown)
};
let ft = match de.file_type().await {
Ok(ft) => ft,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// Entry can be deleted between readdir and stat calls.
continue;
}
Err(e) => return Err(new_std_io_error(e)),
};
let (path, mode) = if ft.is_dir() {
// Make sure we are returning the correct path.
(&format!("{rel_path}/"), EntryMode::DIR)
} else if ft.is_file() {
(&rel_path, EntryMode::FILE)
} else {
(&rel_path, EntryMode::Unknown)
};

let de_metadata = de.metadata().await.map_err(new_std_io_error)?;
let metadata = Metadata::new(mode)
.with_content_length(de_metadata.len())
.with_last_modified(Timestamp::try_from(
de_metadata.modified().map_err(new_std_io_error)?,
)?);
let de_metadata = match de.metadata().await {
Ok(de_metadata) => de_metadata,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// Entry can be deleted between readdir and metadata calls.
continue;
}
Err(e) => return Err(new_std_io_error(e)),
};
let last_modified = match de_metadata.modified() {
Ok(v) => v,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
continue;
}
Err(e) => return Err(new_std_io_error(e)),
};
let metadata = Metadata::new(mode)
.with_content_length(de_metadata.len())
.with_last_modified(Timestamp::try_from(last_modified)?);

Ok(Some(oio::Entry::new(path, metadata)))
return Ok(Some(oio::Entry::new(path, metadata)));
}
}
}
Loading