Skip to content

Commit 1064f9b

Browse files
committed
refactor(hf): remove the option to disable xet in runtime
1 parent 2c4e74d commit 1064f9b

File tree

5 files changed

+60
-129
lines changed

5 files changed

+60
-129
lines changed

core/services/hf/src/backend.rs

Lines changed: 2 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,6 @@ impl HfBuilder {
121121
self
122122
}
123123

124-
/// Enable XET storage protocol for reads.
125-
///
126-
/// When enabled, reads will check for XET-backed files and use the
127-
/// XET protocol for downloading. Default is disabled.
128-
pub fn enable_xet(mut self) -> Self {
129-
self.config.xet = true;
130-
self
131-
}
132-
133-
/// Disable XET storage protocol for reads.
134-
pub fn disable_xet(mut self) -> Self {
135-
self.config.xet = false;
136-
self
137-
}
138-
139124
/// Set the maximum number of retries for commit operations.
140125
///
141126
/// Retries on commit conflicts (HTTP 412) and transient server
@@ -211,14 +196,13 @@ impl Builder for HfBuilder {
211196
debug!("backend max_retries: {}", max_retries);
212197

213198
Ok(HfBackend {
214-
core: Arc::new(HfCore::new(
199+
core: Arc::new(HfCore::build(
215200
info,
216201
repo,
217202
root,
218203
token,
219204
endpoint,
220205
max_retries,
221-
self.config.xet,
222206
)?),
223207
})
224208
}
@@ -318,31 +302,14 @@ pub(super) mod test_utils {
318302
finish_operator(op)
319303
}
320304

321-
pub fn testing_xet_operator() -> Operator {
322-
let (repo_id, token) = testing_credentials();
323-
let op = Operator::new(
324-
HfBuilder::default()
325-
.repo_type("dataset")
326-
.repo_id(&repo_id)
327-
.token(&token)
328-
.enable_xet()
329-
.max_retries(10),
330-
)
331-
.unwrap()
332-
.finish();
333-
finish_operator(op)
334-
}
335-
336305
/// Operator for a bucket requiring HF_OPENDAL_BUCKET and HF_OPENDAL_TOKEN.
337-
/// Buckets always use XET for writes.
338306
pub fn testing_bucket_operator() -> Operator {
339307
let (repo_id, token) = testing_bucket_credentials();
340308
let op = Operator::new(
341309
HfBuilder::default()
342310
.repo_type("bucket")
343311
.repo_id(&repo_id)
344312
.token(&token)
345-
.enable_xet()
346313
.max_retries(10),
347314
)
348315
.unwrap()
@@ -371,24 +338,11 @@ pub(super) mod test_utils {
371338
.finish();
372339
finish_operator(op)
373340
}
374-
375-
pub fn mbpp_xet_operator() -> Operator {
376-
let mut builder = HfBuilder::default()
377-
.repo_type("dataset")
378-
.repo_id("google-research-datasets/mbpp")
379-
.enable_xet();
380-
if let Ok(token) = std::env::var("HF_OPENDAL_TOKEN") {
381-
builder = builder.token(&token);
382-
}
383-
let op = Operator::new(builder).unwrap().finish();
384-
finish_operator(op)
385-
}
386341
}
387342

388343
#[cfg(test)]
389344
mod tests {
390345
use super::test_utils::mbpp_operator;
391-
use super::test_utils::mbpp_xet_operator;
392346
use super::*;
393347

394348
#[test]
@@ -462,7 +416,7 @@ mod tests {
462416
#[tokio::test]
463417
#[ignore = "requires network access"]
464418
async fn test_read_parquet_xet() {
465-
let op = mbpp_xet_operator();
419+
let op = mbpp_operator();
466420
let path = "full/train-00000-of-00001.parquet";
467421

468422
// Full read via XET and verify parquet magic at both ends

core/services/hf/src/config.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,6 @@ pub struct HfConfig {
5252
///
5353
/// Default is "https://huggingface.co".
5454
pub endpoint: Option<String>,
55-
/// Enable XET storage protocol for reads.
56-
///
57-
/// When true and the `xet` feature is compiled in, reads will
58-
/// check for XET-backed files and use the XET protocol for
59-
/// downloading. Default is false.
60-
pub xet: bool,
6155
/// Maximum number of retries for commit operations.
6256
///
6357
/// Retries on commit conflicts (HTTP 412) and transient server
@@ -125,7 +119,6 @@ impl opendal_core::Configurator for HfConfig {
125119
root: opts.get("root").cloned(),
126120
token: opts.get("token").cloned(),
127121
endpoint: opts.get("endpoint").cloned(),
128-
xet: opts.get("xet").is_some_and(|v| v == "true"),
129122
..Default::default()
130123
})
131124
}

core/services/hf/src/core.rs

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -221,24 +221,18 @@ pub struct HfCore {
221221
pub endpoint: String,
222222
pub max_retries: usize,
223223

224-
// Whether XET storage protocol is enabled for reads. When true,
225-
// reads will check for XET-backed files and use the XET protocol
226-
// for downloading.
227-
pub xet_enabled: bool,
228-
229224
/// HTTP client with redirects disabled, used by XET probes to
230225
/// inspect headers on 302 responses.
231226
pub no_redirect_client: HttpClient,
232227
}
233228

234229
impl Debug for HfCore {
235230
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236-
let mut s = f.debug_struct("HfCore");
237-
s.field("repo", &self.repo)
231+
f.debug_struct("HfCore")
232+
.field("repo", &self.repo)
238233
.field("root", &self.root)
239-
.field("endpoint", &self.endpoint);
240-
s.field("xet_enabled", &self.xet_enabled);
241-
s.finish_non_exhaustive()
234+
.field("endpoint", &self.endpoint)
235+
.finish_non_exhaustive()
242236
}
243237
}
244238

@@ -250,33 +244,44 @@ impl HfCore {
250244
token: Option<String>,
251245
endpoint: String,
252246
max_retries: usize,
253-
xet_enabled: bool,
247+
no_redirect_client: HttpClient,
248+
) -> Self {
249+
Self {
250+
info,
251+
repo,
252+
root,
253+
token,
254+
endpoint,
255+
max_retries,
256+
no_redirect_client,
257+
}
258+
}
259+
260+
/// Build HfCore with dedicated reqwest HTTP clients.
261+
///
262+
/// Uses separate clients for standard and no-redirect requests to
263+
/// avoid "dispatch task is gone" errors with multiple tokio runtimes.
264+
pub fn build(
265+
info: Arc<AccessorInfo>,
266+
repo: HfRepo,
267+
root: String,
268+
token: Option<String>,
269+
endpoint: String,
270+
max_retries: usize,
254271
) -> Result<Self> {
255-
// When xet is enabled at runtime, use dedicated reqwest clients instead
256-
// of the global one. This avoids "dispatch task is gone" errors when
257-
// multiple tokio runtimes exist (e.g. in tests) and ensures the
258-
// no-redirect client shares the same runtime as the standard client.
259-
// When xet is disabled, preserve whatever HTTP client is already set
260-
// on `info` (important for mock-based unit tests).
261-
let no_redirect_client = if xet_enabled {
262-
let standard = HttpClient::with(build_reqwest(reqwest::redirect::Policy::default())?);
263-
let no_redirect = HttpClient::with(build_reqwest(reqwest::redirect::Policy::none())?);
264-
info.update_http_client(|_| standard);
265-
no_redirect
266-
} else {
267-
info.http_client()
268-
};
272+
let standard = HttpClient::with(build_reqwest(reqwest::redirect::Policy::default())?);
273+
let no_redirect = HttpClient::with(build_reqwest(reqwest::redirect::Policy::none())?);
274+
info.update_http_client(|_| standard);
269275

270-
Ok(Self {
276+
Ok(Self::new(
271277
info,
272278
repo,
273279
root,
274280
token,
275281
endpoint,
276282
max_retries,
277-
xet_enabled,
278-
no_redirect_client,
279-
})
283+
no_redirect,
284+
))
280285
}
281286

282287
/// Build an authenticated HTTP request.
@@ -620,9 +625,8 @@ pub(crate) mod test_utils {
620625
None,
621626
endpoint.to_string(),
622627
3,
623-
false,
624-
)
625-
.unwrap();
628+
HttpClient::with(mock_client.clone()),
629+
);
626630

627631
(core, mock_client)
628632
}

core/services/hf/src/reader.rs

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,19 @@ pub enum HfReader {
3838
impl HfReader {
3939
/// Create a reader, automatically choosing between XET and HTTP.
4040
///
41-
/// Buckets always use XET. For other repo types, when XET is enabled
42-
/// a HEAD request probes for the `X-Xet-Hash` header. Files stored on
43-
/// XET are downloaded via the CAS protocol; all others fall back to HTTP GET.
41+
/// Buckets always use XET. For other repo types, a HEAD request
42+
/// probes for the `X-Xet-Hash` header. Files stored on XET are
43+
/// downloaded via the CAS protocol; all others fall back to HTTP GET.
4444
pub async fn try_new(core: &HfCore, path: &str, range: BytesRange) -> Result<Self> {
45-
if core.xet_enabled {
46-
// Buckets always use XET
47-
if core.repo.repo_type == RepoType::Bucket {
48-
if let Some(xet_file) = core.maybe_xet_file(path).await? {
49-
return Self::try_new_xet(core, &xet_file, range).await;
50-
}
51-
return Err(Error::new(
52-
ErrorKind::Unexpected,
53-
"bucket file is missing XET metadata",
54-
));
55-
}
45+
if let Some(xet_file) = core.maybe_xet_file(path).await? {
46+
return Self::try_new_xet(core, &xet_file, range).await;
47+
}
5648

57-
// For other repos, probe for XET
58-
if let Some(xet_file) = core.maybe_xet_file(path).await? {
59-
return Self::try_new_xet(core, &xet_file, range).await;
60-
}
49+
if core.repo.repo_type == RepoType::Bucket {
50+
return Err(Error::new(
51+
ErrorKind::Unexpected,
52+
"bucket file is missing XET metadata",
53+
));
6154
}
6255

6356
Self::try_new_http(core, path, range).await
@@ -126,7 +119,6 @@ impl oio::Read for HfReader {
126119

127120
#[cfg(test)]
128121
mod tests {
129-
use super::super::backend::test_utils::mbpp_xet_operator;
130122
use super::super::backend::test_utils::{gpt2_operator, mbpp_operator};
131123

132124
/// Parquet magic bytes: "PAR1"
@@ -155,7 +147,7 @@ mod tests {
155147
#[tokio::test]
156148
#[ignore = "requires network access"]
157149
async fn test_read_xet_parquet() {
158-
let op = mbpp_xet_operator();
150+
let op = mbpp_operator();
159151
let data = op
160152
.read("full/train-00000-of-00001.parquet")
161153
.await
@@ -169,7 +161,7 @@ mod tests {
169161
#[tokio::test]
170162
#[ignore = "requires network access"]
171163
async fn test_read_xet_range() {
172-
let op = mbpp_xet_operator();
164+
let op = mbpp_operator();
173165
let data = op
174166
.read_with("full/train-00000-of-00001.parquet")
175167
.range(0..4)

core/services/hf/src/writer.rs

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,6 @@ impl HfWriter {
4747
pub async fn try_new(core: Arc<HfCore>, path: String) -> Result<Self> {
4848
// Buckets always use XET and don't have a preupload endpoint
4949
if core.repo.repo_type == RepoType::Bucket {
50-
if !core.xet_enabled {
51-
return Err(Error::new(
52-
ErrorKind::Unsupported,
53-
"buckets require XET to be enabled",
54-
));
55-
}
5650
let client = core.xet_client("write").await?;
5751
let writer = client.write(None).await.map_err(map_xet_error)?;
5852
return Ok(HfWriter::Xet {
@@ -65,19 +59,13 @@ impl HfWriter {
6559
let mode_str = core.determine_upload_mode(&path).await?;
6660

6761
if mode_str == "lfs" {
68-
if core.xet_enabled {
69-
let client = core.xet_client("write").await?;
70-
let writer = client.write(None).await.map_err(map_xet_error)?;
71-
return Ok(HfWriter::Xet {
72-
core,
73-
path,
74-
writer: Mutex::new(writer),
75-
});
76-
}
77-
return Err(Error::new(
78-
ErrorKind::Unsupported,
79-
"file requires LFS; call enable_xet() on the builder for large file support",
80-
));
62+
let client = core.xet_client("write").await?;
63+
let writer = client.write(None).await.map_err(map_xet_error)?;
64+
return Ok(HfWriter::Xet {
65+
core,
66+
path,
67+
writer: Mutex::new(writer),
68+
});
8169
}
8270

8371
Ok(HfWriter::Regular {
@@ -191,8 +179,8 @@ impl oio::Write for HfWriter {
191179

192180
#[cfg(test)]
193181
mod tests {
182+
use super::super::backend::test_utils::testing_bucket_operator;
194183
use super::super::backend::test_utils::testing_operator;
195-
use super::super::backend::test_utils::{testing_bucket_operator, testing_xet_operator};
196184
use super::*;
197185
use base64::Engine;
198186

@@ -247,7 +235,7 @@ mod tests {
247235
#[tokio::test]
248236
#[ignore]
249237
async fn test_write_xet() {
250-
let op = testing_xet_operator();
238+
let op = testing_operator();
251239
op.write("test-xet.bin", b"Binary data for XET test".as_slice())
252240
.await
253241
.expect("xet write should succeed");

0 commit comments

Comments
 (0)