Skip to content

Commit 62d02f7

Browse files
authored
fix(core): enforce list file content length in complete layer (#7201)
* fix(core): enforce list file content length in complete layer * fix(services/etcd): fill content length from list response * fix(etcd): keep list keys-only and skip stale entries * refactor(core): short-circuit list content length completion * refactor(core): inline list completion fast-path check
1 parent c0802f1 commit 62d02f7

File tree

4 files changed

+80
-4
lines changed

4 files changed

+80
-4
lines changed

core/core/src/layers/complete.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
5757
type Inner = A;
5858
type Reader = CompleteReader<A::Reader>;
5959
type Writer = CompleteWriter<A::Writer>;
60-
type Lister = A::Lister;
60+
type Lister = CompleteLister<A>;
6161
type Deleter = A::Deleter;
6262

6363
fn inner(&self) -> &Self::Inner {
@@ -95,14 +95,76 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
9595
}
9696

9797
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
98-
self.inner.list(path, args).await
98+
let (rp, lister) = self.inner.list(path, args).await?;
99+
let lister = CompleteLister::new(self.inner.clone(), self.info.clone(), lister);
100+
Ok((rp, lister))
99101
}
100102

101103
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
102104
self.inner.presign(path, args).await
103105
}
104106
}
105107

108+
pub struct CompleteLister<A: Access> {
109+
inner: A::Lister,
110+
acc: Arc<A>,
111+
info: Arc<AccessorInfo>,
112+
}
113+
114+
impl<A: Access> CompleteLister<A> {
115+
fn new(acc: Arc<A>, info: Arc<AccessorInfo>, inner: A::Lister) -> Self {
116+
Self { inner, acc, info }
117+
}
118+
119+
async fn ensure_file_content_length(&self, entry: &mut oio::Entry) -> Result<()> {
120+
let path = entry.path().to_string();
121+
let version = entry.metadata().version().map(str::to_owned);
122+
let mut op = OpStat::new();
123+
if let Some(version) = version.as_deref() {
124+
op = op.with_version(version);
125+
}
126+
127+
let stat_metadata = self.acc.stat(&path, op).await?.into_metadata();
128+
if !stat_metadata.has_content_length() {
129+
return Err(Error::new(
130+
ErrorKind::Unexpected,
131+
"content length is required for list file entries",
132+
)
133+
.with_operation("CompleteLister::ensure_file_content_length")
134+
.with_context("service", self.info.scheme().to_string())
135+
.with_context("path", path));
136+
}
137+
138+
entry
139+
.metadata_mut()
140+
.set_content_length(stat_metadata.content_length());
141+
Ok(())
142+
}
143+
}
144+
145+
impl<A: Access> oio::List for CompleteLister<A> {
146+
async fn next(&mut self) -> Result<Option<oio::Entry>> {
147+
loop {
148+
let Some(mut entry) = self.inner.next().await? else {
149+
return Ok(None);
150+
};
151+
152+
if !entry.mode().is_file()
153+
|| entry.metadata().is_deleted()
154+
|| entry.metadata().has_content_length()
155+
{
156+
return Ok(Some(entry));
157+
}
158+
159+
match self.ensure_file_content_length(&mut entry).await {
160+
Ok(()) => return Ok(Some(entry)),
161+
Err(err) if err.kind() == ErrorKind::NotFound => continue,
162+
Err(err) => return Err(err),
163+
}
164+
}
165+
}
166+
}
167+
106168
pub struct CompleteReader<R> {
107169
inner: R,
108170
size: Option<u64>,

core/core/src/raw/oio/entry.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,14 @@ impl Entry {
8686
pub(crate) fn into_entry(self) -> crate::Entry {
8787
crate::Entry::new(self.path, self.meta)
8888
}
89+
90+
/// Get metadata of entry.
91+
pub(crate) fn metadata(&self) -> &Metadata {
92+
&self.meta
93+
}
94+
95+
/// Get mutable metadata of entry.
96+
pub(crate) fn metadata_mut(&mut self) -> &mut Metadata {
97+
&mut self.meta
98+
}
8999
}

core/core/src/types/metadata.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ impl Metadata {
222222
self.content_length.unwrap_or_default()
223223
}
224224

225+
/// Returns `true` if this metadata contains an explicit content length.
226+
pub(crate) fn has_content_length(&self) -> bool {
227+
self.content_length.is_some()
228+
}
229+
225230
/// Set content length of this entry.
226231
pub fn set_content_length(&mut self, v: u64) -> &mut Self {
227232
self.content_length = Some(v);

core/tests/behavior/async_list.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,9 @@ pub async fn test_list_dir(op: Operator) -> Result<()> {
7979
let mut obs = op.lister(&format!("{parent}/")).await?;
8080
let mut found = false;
8181
while let Some(de) = obs.try_next().await? {
82-
let meta = op.stat(de.path()).await?;
8382
if de.path() == path {
83+
let meta = de.metadata();
8484
assert_eq!(meta.mode(), EntryMode::FILE);
85-
8685
assert_eq!(meta.content_length(), size as u64);
8786

8887
found = true

0 commit comments

Comments
 (0)