Skip to content

Commit 839455a

Browse files
committed
chore(hf): more explicit error handling
1 parent 1064f9b commit 839455a

File tree

3 files changed

+52
-38
lines changed

3 files changed

+52
-38
lines changed

core/services/hf/src/core.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,9 @@ impl HfCore {
382382
Some(refresher),
383383
"opendal/1.0".to_string(),
384384
)
385-
.map_err(map_xet_error)
385+
.map_err(|err| {
386+
Error::new(ErrorKind::Unexpected, "failed to create xet client").set_source(err)
387+
})
386388
}
387389

388390
/// Issue a HEAD request and extract XET file info (hash and size).
@@ -719,10 +721,6 @@ mod tests {
719721
}
720722
}
721723

722-
pub(super) fn map_xet_error(err: impl std::error::Error + Send + Sync + 'static) -> Error {
723-
Error::new(ErrorKind::Unexpected, "xet operation failed").set_source(err)
724-
}
725-
726724
fn build_reqwest(policy: reqwest::redirect::Policy) -> Result<reqwest::Client> {
727725
reqwest::Client::builder()
728726
.redirect(policy)

core/services/hf/src/reader.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ use futures::StreamExt;
2323
use subxet::cas_types::FileRange;
2424

2525
use super::core::HfCore;
26-
use super::core::map_xet_error;
2726
use super::uri::RepoType;
2827
use opendal_core::raw::*;
2928
use opendal_core::*;
29+
use subxet::cas_client::CasClientError;
3030
use subxet::data::XetFileInfo;
31+
use subxet::data::errors::DataProcessingError;
3132
use subxet::data::streaming::XetReader;
3233

3334
pub enum HfReader {
@@ -99,7 +100,9 @@ impl HfReader {
99100

100101
let reader = client
101102
.read(file_info.clone(), file_range, None, 256)
102-
.map_err(map_xet_error)?;
103+
.map_err(|err| {
104+
Error::new(ErrorKind::Unexpected, "failed to create xet reader").set_source(err)
105+
})?;
103106
Ok(Self::Xet(reader))
104107
}
105108
}
@@ -110,7 +113,20 @@ impl oio::Read for HfReader {
110113
Self::Http(body) => body.read().await,
111114
Self::Xet(stream) => match stream.next().await {
112115
Some(Ok(bytes)) => Ok(Buffer::from(bytes)),
113-
Some(Err(e)) => Err(map_xet_error(e)),
116+
Some(Err(e)) => {
117+
let kind = match &e {
118+
DataProcessingError::CasClientError(
119+
CasClientError::FileNotFound(_) | CasClientError::XORBNotFound(_),
120+
)
121+
| DataProcessingError::HashNotFound => ErrorKind::NotFound,
122+
DataProcessingError::CasClientError(CasClientError::InvalidRange) => {
123+
ErrorKind::RangeNotSatisfied
124+
}
125+
_ => ErrorKind::Unexpected,
126+
};
127+
let err = Error::new(kind, "xet read error").set_source(e);
128+
Err(err)
129+
}
114130
None => Ok(Buffer::new()),
115131
},
116132
}

core/services/hf/src/writer.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ use std::sync::Mutex;
2020

2121
use base64::Engine;
2222

23-
use super::core::{BucketOperation, CommitFile, HfCore, LfsFile, map_xet_error};
23+
use super::core::{BucketOperation, CommitFile, HfCore, LfsFile};
2424
use super::uri::RepoType;
2525
use opendal_core::raw::*;
2626
use opendal_core::*;
27+
use subxet::data::errors::DataProcessingError;
2728
use subxet::data::streaming::XetWriter;
2829

2930
/// Writer that handles both regular (small) and XET (large) file uploads.
@@ -45,34 +46,33 @@ pub enum HfWriter {
4546
impl HfWriter {
4647
/// Create a new writer by determining the upload mode from the API.
4748
pub async fn try_new(core: Arc<HfCore>, path: String) -> Result<Self> {
48-
// Buckets always use XET and don't have a preupload endpoint
49-
if core.repo.repo_type == RepoType::Bucket {
49+
// Buckets always use XET and don't have a preupload endpoint;
50+
// other repo types check the preupload API.
51+
let use_xet = core.repo.repo_type == RepoType::Bucket
52+
|| core.determine_upload_mode(&path).await? == "lfs";
53+
54+
let writer = if use_xet {
5055
let client = core.xet_client("write").await?;
51-
let writer = client.write(None).await.map_err(map_xet_error)?;
52-
return Ok(HfWriter::Xet {
56+
let writer = client.write(None).await.map_err(|err| {
57+
let kind = match &err {
58+
DataProcessingError::AuthError(_) => ErrorKind::PermissionDenied,
59+
_ => ErrorKind::Unexpected,
60+
};
61+
Error::new(kind, "failed to create xet writer").set_source(err)
62+
})?;
63+
HfWriter::Xet {
5364
core,
5465
path,
5566
writer: Mutex::new(writer),
56-
});
57-
}
58-
59-
let mode_str = core.determine_upload_mode(&path).await?;
60-
61-
if mode_str == "lfs" {
62-
let client = core.xet_client("write").await?;
63-
let writer = client.write(None).await.map_err(map_xet_error)?;
64-
return Ok(HfWriter::Xet {
67+
}
68+
} else {
69+
HfWriter::Regular {
6570
core,
6671
path,
67-
writer: Mutex::new(writer),
68-
});
69-
}
70-
71-
Ok(HfWriter::Regular {
72-
core,
73-
path,
74-
buf: Vec::new(),
75-
})
72+
buf: Vec::new(),
73+
}
74+
};
75+
Ok(writer)
7676
}
7777

7878
fn prepare_commit_file(path: &str, body: &[u8]) -> CommitFile {
@@ -97,7 +97,10 @@ impl oio::Write for HfWriter {
9797
.unwrap()
9898
.write(bs.to_bytes())
9999
.await
100-
.map_err(map_xet_error),
100+
.map_err(|err| {
101+
Error::new(ErrorKind::Unexpected, "failed to write chunk to xet stream")
102+
.set_source(err)
103+
}),
101104
}
102105
}
103106

@@ -122,12 +125,9 @@ impl oio::Write for HfWriter {
122125
Ok(meta)
123126
}
124127
HfWriter::Xet { core, path, writer } => {
125-
let file_info = writer
126-
.get_mut()
127-
.unwrap()
128-
.close()
129-
.await
130-
.map_err(map_xet_error)?;
128+
let file_info = writer.get_mut().unwrap().close().await.map_err(|err| {
129+
Error::new(ErrorKind::Unexpected, "failed to close xet writer").set_source(err)
130+
})?;
131131

132132
let meta = Metadata::default().with_content_length(file_info.file_size());
133133

0 commit comments

Comments
 (0)