Skip to content

Commit 4d13a47

Browse files
committed
feat(hf): add support for deleting files
1 parent 01671de commit 4d13a47

File tree

6 files changed

+152
-49
lines changed

6 files changed

+152
-49
lines changed

core/services/huggingface/src/backend.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use log::debug;
2222
use super::HUGGINGFACE_SCHEME;
2323
use super::config::HfConfig;
2424
use super::core::HfCore;
25+
use super::deleter::HfDeleter;
2526
use super::lister::HfLister;
2627
use super::reader::HfReader;
2728
use super::uri::{HfRepo, RepoType};
@@ -180,6 +181,7 @@ impl Builder for HfBuilder {
180181
stat: true,
181182
read: true,
182183
write: token.is_some(),
184+
delete: token.is_some(),
183185
list: true,
184186
list_with_recursive: true,
185187
shared: true,
@@ -212,7 +214,7 @@ impl Access for HfBackend {
212214
type Reader = HfReader;
213215
type Writer = oio::OneShotWriter<HfWriter>;
214216
type Lister = oio::PageLister<HfLister>;
215-
type Deleter = ();
217+
type Deleter = oio::OneShotDeleter<HfDeleter>;
216218

217219
fn info(&self) -> Arc<AccessorInfo> {
218220
self.core.info.clone()
@@ -243,6 +245,13 @@ impl Access for HfBackend {
243245
let writer = HfWriter::new(&self.core, path, args);
244246
Ok((RpWrite::default(), oio::OneShotWriter::new(writer)))
245247
}
248+
249+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
250+
Ok((
251+
RpDelete::default(),
252+
oio::OneShotDeleter::new(HfDeleter::new(self.core.clone())),
253+
))
254+
}
246255
}
247256

248257
#[cfg(test)]

core/services/huggingface/src/core.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::Arc;
2121
use bytes::Buf;
2222
use bytes::Bytes;
2323
use http::Request;
24+
use http::StatusCode;
2425
use http::header;
2526
use serde::Deserialize;
2627

@@ -63,13 +64,20 @@ pub(super) struct LfsFile {
6364
pub size: u64,
6465
}
6566

67+
#[derive(Debug, serde::Serialize)]
68+
pub(super) struct DeletedFile {
69+
pub path: String,
70+
}
71+
6672
#[derive(serde::Serialize)]
6773
pub(super) struct MixedCommitPayload {
6874
pub summary: String,
6975
#[serde(skip_serializing_if = "Vec::is_empty")]
7076
pub files: Vec<CommitFile>,
7177
#[serde(rename = "lfsFiles", skip_serializing_if = "Vec::is_empty")]
7278
pub lfs_files: Vec<LfsFile>,
79+
#[serde(rename = "deletedFiles", skip_serializing_if = "Vec::is_empty")]
80+
pub deleted_files: Vec<DeletedFile>,
7381
}
7482

7583
// API response types
@@ -381,46 +389,37 @@ impl HfCore {
381389
Ok(resp)
382390
}
383391

384-
/// Commit uploaded files to the repository.
392+
/// Commit file changes (uploads and/or deletions) to the repository.
385393
pub(super) async fn commit_files(
386394
&self,
387395
regular_files: Vec<CommitFile>,
388396
lfs_files: Vec<LfsFile>,
389-
) -> Result<http::Response<Buffer>> {
397+
deleted_files: Vec<DeletedFile>,
398+
) -> Result<()> {
390399
let _token = self.token.as_deref().ok_or_else(|| {
391400
Error::new(
392401
ErrorKind::PermissionDenied,
393-
"token is required for write operations",
402+
"token is required for commit operations",
394403
)
395404
.with_operation("commit")
396405
})?;
397406

398-
let mut summary_paths = Vec::new();
399-
for file in &regular_files {
400-
summary_paths.push(file.path.clone());
401-
}
402-
for file in &lfs_files {
403-
summary_paths.push(file.path.clone());
404-
}
405-
406-
let summary = if summary_paths.len() == 1 {
407-
format!("Upload {} via OpenDAL", summary_paths[0])
408-
} else {
409-
format!("Upload {} files via OpenDAL", summary_paths.len())
410-
};
411-
412-
let client = self.info.http_client();
413-
// Use the first file's path to determine the commit URL
414-
let first_path = summary_paths
407+
let first_path = regular_files
415408
.first()
409+
.map(|f| f.path.as_str())
410+
.or_else(|| lfs_files.first().map(|f| f.path.as_str()))
411+
.or_else(|| deleted_files.first().map(|f| f.path.as_str()))
416412
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "no files to commit"))?;
413+
414+
let client = self.info.http_client();
417415
let uri = self.repo.uri(&self.root, first_path);
418416
let url = uri.commit_url(&self.endpoint);
419417

420418
let payload = MixedCommitPayload {
421-
summary,
419+
summary: "Commit via OpenDAL".to_string(),
422420
files: regular_files,
423421
lfs_files,
422+
deleted_files,
424423
};
425424

426425
let json_body = serde_json::to_vec(&payload).map_err(new_json_serialize_error)?;
@@ -432,7 +431,11 @@ impl HfCore {
432431
.body(Buffer::from(json_body))
433432
.map_err(new_request_build_error)?;
434433

435-
client.send(req).await
434+
let resp = client.send(req).await?;
435+
match resp.status() {
436+
StatusCode::OK | StatusCode::CREATED => Ok(()),
437+
_ => Err(parse_error(resp)),
438+
}
436439
}
437440
}
438441

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use super::core::{DeletedFile, HfCore};
21+
use opendal_core::raw::*;
22+
use opendal_core::*;
23+
24+
pub struct HfDeleter {
25+
core: Arc<HfCore>,
26+
}
27+
28+
impl HfDeleter {
29+
pub fn new(core: Arc<HfCore>) -> Self {
30+
Self { core }
31+
}
32+
}
33+
34+
impl oio::OneShotDelete for HfDeleter {
35+
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
36+
let deleted = vec![DeletedFile { path }];
37+
match self.core.commit_files(vec![], vec![], deleted).await {
38+
Ok(()) => Ok(()),
39+
Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
40+
Err(err) => Err(err),
41+
}
42+
}
43+
}
44+
45+
#[cfg(test)]
46+
mod tests {
47+
use super::super::core::HfCore;
48+
use super::super::uri::{HfRepo, RepoType};
49+
use super::super::writer::HfWriter;
50+
use super::*;
51+
use oio::OneShotDelete;
52+
use oio::OneShotWrite;
53+
54+
fn testing_core() -> HfCore {
55+
let repo_id = std::env::var("HF_OPENDAL_DATASET").expect("HF_OPENDAL_DATASET must be set");
56+
57+
let info = AccessorInfo::default();
58+
info.set_scheme("huggingface")
59+
.set_native_capability(Capability {
60+
write: true,
61+
delete: true,
62+
..Default::default()
63+
});
64+
65+
HfCore {
66+
info: info.into(),
67+
repo: HfRepo::new(RepoType::Dataset, repo_id, Some("main".to_string())),
68+
root: "/".to_string(),
69+
token: std::env::var("HF_OPENDAL_TOKEN").ok(),
70+
endpoint: "https://huggingface.co".to_string(),
71+
#[cfg(feature = "xet")]
72+
xet_enabled: false,
73+
}
74+
}
75+
76+
#[tokio::test]
77+
#[ignore]
78+
async fn test_delete_once() {
79+
let core = Arc::new(testing_core());
80+
81+
// First write a file so we have something to delete
82+
let writer = HfWriter::new(&core, "delete-test.txt", OpWrite::default());
83+
writer
84+
.write_once(Buffer::from("temporary content"))
85+
.await
86+
.expect("write should succeed");
87+
88+
// Now delete it
89+
let deleter = HfDeleter::new(core);
90+
deleter
91+
.delete_once("delete-test.txt".to_string(), OpDelete::default())
92+
.await
93+
.expect("delete should succeed");
94+
}
95+
96+
#[tokio::test]
97+
#[ignore]
98+
async fn test_delete_nonexistent() {
99+
let core = Arc::new(testing_core());
100+
101+
let deleter = HfDeleter::new(core);
102+
deleter
103+
.delete_once("nonexistent-file.txt".to_string(), OpDelete::default())
104+
.await
105+
.expect("deleting nonexistent file should succeed");
106+
}
107+
}

core/services/huggingface/src/docs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ This service can be used to:
1111
- [x] stat
1212
- [x] read
1313
- [x] write
14-
- [ ] delete
14+
- [x] delete
1515
- [x] list
1616
- [ ] copy
1717
- [ ] rename

core/services/huggingface/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub fn register_huggingface_service(registry: &opendal_core::OperatorRegistry) {
2828
mod backend;
2929
mod config;
3030
mod core;
31+
mod deleter;
3132
mod error;
3233
mod lister;
3334
mod reader;

core/services/huggingface/src/writer.rs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@
1818
use std::sync::Arc;
1919

2020
use base64::Engine;
21-
use http::StatusCode;
2221
use sha2::{Digest, Sha256};
2322

24-
use super::core::{CommitFile, HfCore, LfsFile, PreuploadFile};
23+
use super::core::{CommitFile, HfCore, PreuploadFile};
2524
#[cfg(feature = "xet")]
26-
use super::core::{XetTokenRefresher, map_xet_error};
25+
use super::core::{LfsFile, XetTokenRefresher, map_xet_error};
2726
use opendal_core::raw::*;
2827
use opendal_core::*;
2928

@@ -181,12 +180,10 @@ impl HfWriter {
181180
// Commit the files
182181
let regular_files: Vec<_> = commit_file.into_iter().collect();
183182
let lfs_files: Vec<_> = lfs_file.into_iter().collect();
184-
let resp = self.core.commit_files(regular_files, lfs_files).await?;
185-
186-
match resp.status() {
187-
StatusCode::OK | StatusCode::CREATED => Ok(Metadata::default()),
188-
_ => Err(super::error::parse_error(resp)),
189-
}
183+
self.core
184+
.commit_files(regular_files, lfs_files, vec![])
185+
.await?;
186+
Ok(Metadata::default())
190187
}
191188
}
192189

@@ -236,16 +233,9 @@ mod tests {
236233
.await
237234
.expect("upload should succeed");
238235

239-
let resp = core
240-
.commit_files(vec![commit_file], vec![])
236+
core.commit_files(vec![commit_file], vec![], vec![])
241237
.await
242238
.expect("commit should succeed");
243-
244-
assert!(
245-
resp.status() == StatusCode::OK || resp.status() == StatusCode::CREATED,
246-
"expected OK or CREATED status, got {}",
247-
resp.status()
248-
);
249239
}
250240

251241
#[tokio::test]
@@ -288,16 +278,9 @@ mod tests {
288278
.await
289279
.expect("xet upload should succeed");
290280

291-
let resp = core
292-
.commit_files(vec![], vec![lfs_file])
281+
core.commit_files(vec![], vec![lfs_file], vec![])
293282
.await
294283
.expect("commit should succeed");
295-
296-
assert!(
297-
resp.status() == StatusCode::OK || resp.status() == StatusCode::CREATED,
298-
"expected OK or CREATED status, got {}",
299-
resp.status()
300-
);
301284
}
302285

303286
#[cfg(feature = "xet")]

0 commit comments

Comments
 (0)