Skip to content

Commit 0f67384

Browse files
authored
feat(services/lakefs): Implement write returns metadata (#6770)
1 parent 731d119 commit 0f67384

File tree

3 files changed

+72
-12
lines changed

3 files changed

+72
-12
lines changed

core/src/services/lakefs/backend.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -210,20 +210,13 @@ impl Access for LakefsBackend {
210210

211211
match status {
212212
StatusCode::OK => {
213-
let mut meta = parse_into_metadata(path, resp.headers())?;
214-
let bs = resp.clone().into_body();
213+
let bs = resp.into_body();
215214

216215
let decoded_response: LakefsStatus =
217216
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
218-
if let Some(size_bytes) = decoded_response.size_bytes {
219-
meta.set_content_length(size_bytes);
220-
}
221-
meta.set_mode(EntryMode::FILE);
222-
if let Some(v) = parse_content_disposition(resp.headers())? {
223-
meta.set_content_disposition(v);
224-
}
225-
226-
meta.set_last_modified(Timestamp::from_second(decoded_response.mtime).unwrap());
217+
218+
// Use the helper function to parse LakefsStatus into Metadata
219+
let meta = LakefsCore::parse_lakefs_status_into_metadata(&decoded_response);
227220

228221
Ok(RpStat::new(meta))
229222
}

core/src/services/lakefs/core.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,41 @@ impl LakefsCore {
233233
.map_err(new_request_build_error)?;
234234
self.info.http_client().send(req).await
235235
}
236+
237+
/// Parse LakefsStatus into Metadata
238+
pub fn parse_lakefs_status_into_metadata(status: &LakefsStatus) -> Metadata {
239+
// Determine entry mode based on path_type
240+
// "common_prefix" indicates a directory in list operations
241+
let mode = if status.path_type == "common_prefix" {
242+
EntryMode::DIR
243+
} else {
244+
EntryMode::FILE
245+
};
246+
247+
let mut meta = Metadata::new(mode);
248+
249+
// Set checksum as etag
250+
if !status.checksum.is_empty() {
251+
meta.set_etag(&status.checksum);
252+
}
253+
254+
// Set content length
255+
if let Some(size) = status.size_bytes {
256+
meta.set_content_length(size);
257+
}
258+
259+
// Set content type
260+
if let Some(ref content_type) = status.content_type {
261+
meta.set_content_type(content_type);
262+
}
263+
264+
// Set last modified time
265+
if let Ok(timestamp) = Timestamp::from_second(status.mtime) {
266+
meta.set_last_modified(timestamp);
267+
}
268+
269+
meta
270+
}
236271
}
237272

238273
#[derive(Deserialize, Eq, PartialEq, Debug)]

core/src/services/lakefs/writer.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
use std::sync::Arc;
1919

20+
use bytes::Buf;
2021
use http::StatusCode;
2122

2223
use super::core::LakefsCore;
24+
use super::core::LakefsStatus;
2325
use super::error::parse_error;
2426
use crate::raw::*;
2527
use crate::*;
@@ -43,7 +45,37 @@ impl oio::OneShotWrite for LakefsWriter {
4345
let status = resp.status();
4446

4547
match status {
46-
StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
48+
StatusCode::CREATED | StatusCode::OK => {
49+
let body = resp.into_body();
50+
let body_bytes = body.to_bytes();
51+
52+
// Try to parse metadata from upload response body
53+
match serde_json::from_slice::<LakefsStatus>(&body_bytes) {
54+
Ok(lakefs_status) => {
55+
// Successfully parsed ObjectStats from upload response
56+
Ok(LakefsCore::parse_lakefs_status_into_metadata(
57+
&lakefs_status,
58+
))
59+
}
60+
Err(_) => {
61+
// Upload response doesn't contain ObjectStats, fetch via stat API
62+
let stat_resp = self.core.get_object_metadata(&self.path).await?;
63+
64+
match stat_resp.status() {
65+
StatusCode::OK => {
66+
let lakefs_status: LakefsStatus =
67+
serde_json::from_reader(stat_resp.into_body().reader())
68+
.map_err(new_json_deserialize_error)?;
69+
70+
Ok(LakefsCore::parse_lakefs_status_into_metadata(
71+
&lakefs_status,
72+
))
73+
}
74+
_ => Err(parse_error(stat_resp)),
75+
}
76+
}
77+
}
78+
}
4779
_ => Err(parse_error(resp)),
4880
}
4981
}

0 commit comments

Comments
 (0)