Skip to content

Commit 6e36baa

Browse files
committed
refactor(hf): disable progress reporting and total_bytes optional during progress tracking
1 parent 38c6684 commit 6e36baa

File tree

11 files changed

+99
-97
lines changed

11 files changed

+99
-97
lines changed

data/src/bin/example.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ async fn clean(mut reader: impl Read, mut writer: impl Write, size: u64) -> Resu
9191
FileUploadSession::new(TranslatorConfig::local_config(std::env::current_dir()?)?.into(), None).await?;
9292

9393
let mut size_read = 0;
94-
let mut handle = translator.start_clean(None, size, None).await;
94+
let mut handle = translator.start_clean(None, Some(size), None).await;
9595

9696
loop {
9797
let bytes = reader.read(&mut read_buf)?;

data/src/data_client.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ pub async fn clean_bytes(
249249
bytes: Vec<u8>,
250250
name: Option<Arc<str>>,
251251
) -> errors::Result<(XetFileInfo, DeduplicationMetrics)> {
252-
let mut handle = processor.start_clean(name, bytes.len() as u64, None).await;
252+
let mut handle = processor.start_clean(name, Some(bytes.len() as u64), None).await;
253253
handle.add_data(&bytes).await?;
254254
handle.finish().await
255255
}
@@ -270,7 +270,11 @@ pub async fn clean_file(
270270
let mut buffer = vec![0u8; u64::min(filesize, *xet_config().data.ingestion_block_size) as usize];
271271

272272
let mut handle = processor
273-
.start_clean(Some(filename.as_ref().to_string_lossy().into()), filesize, Sha256::from_hex(sha256.as_ref()).ok())
273+
.start_clean(
274+
Some(filename.as_ref().to_string_lossy().into()),
275+
Some(filesize),
276+
Sha256::from_hex(sha256.as_ref()).ok(),
277+
)
274278
.await;
275279

276280
loop {

data/src/file_upload_session.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,10 @@ impl FileUploadSession {
162162

163163
// Get a new file id for the completion tracking. This also registers the size against the total bytes
164164
// of the file.
165-
let file_id = self.completion_tracker.register_new_file(file_name.clone(), file_size).await;
165+
let file_id = self
166+
.completion_tracker
167+
.register_new_file(file_name.clone(), Some(file_size))
168+
.await;
166169

167170
// Now, spawn a task
168171
let ingestion_concurrency_limiter =
@@ -260,7 +263,7 @@ impl FileUploadSession {
260263
pub async fn start_clean(
261264
self: &Arc<Self>,
262265
file_name: Option<Arc<str>>,
263-
size: u64,
266+
size: Option<u64>,
264267
sha256: Option<Sha256>,
265268
) -> SingleFileCleaner {
266269
// Get a new file id for the completion tracking
@@ -571,7 +574,7 @@ mod tests {
571574
.unwrap();
572575

573576
let mut cleaner = upload_session
574-
.start_clean(Some("test".into()), read_data.len() as u64, None)
577+
.start_clean(Some("test".into()), Some(read_data.len() as u64), None)
575578
.await;
576579

577580
// Read blocks from the source file and hand them to the cleaning handle

data/src/streaming.rs

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,8 @@ impl XetClient {
6666
///
6767
/// Each call creates a fresh [`FileUploadSession`] because sessions are
6868
/// consumed on finalization and cannot be reused.
69-
pub async fn write(
70-
&self,
71-
progress_updater: Option<Arc<dyn TrackingProgressUpdater>>,
72-
name: Option<Arc<str>>,
73-
size: u64,
74-
) -> errors::Result<XetWriter> {
75-
XetWriter::new(self.config.clone(), progress_updater, name, size).await
69+
pub async fn write(&self, name: Option<Arc<str>>) -> errors::Result<XetWriter> {
70+
XetWriter::new(self.config.clone(), name).await
7671
}
7772
}
7873

@@ -89,17 +84,9 @@ pub struct XetWriter {
8984

9085
impl XetWriter {
9186
/// Creates a new writer that will upload a single file.
92-
///
93-
/// `size` is the total number of bytes that will be written and is used for
94-
/// progress tracking. The caller must know the content length before writing.
95-
pub async fn new(
96-
config: Arc<TranslatorConfig>,
97-
progress_updater: Option<Arc<dyn TrackingProgressUpdater>>,
98-
name: Option<Arc<str>>,
99-
size: u64,
100-
) -> errors::Result<Self> {
101-
let session = FileUploadSession::new(config, progress_updater).await?;
102-
let handle = session.start_clean(name, size, None).await;
87+
pub async fn new(config: Arc<TranslatorConfig>, name: Option<Arc<str>>) -> errors::Result<Self> {
88+
let session = FileUploadSession::new(config, None).await?;
89+
let handle = session.start_clean(name, None, None).await;
10390
Ok(Self {
10491
session: Some(session),
10592
handle: Some(handle),
@@ -292,7 +279,7 @@ mod tests {
292279
/// Upload `content` via [`XetWriter`] and download it back via [`XetReader`],
293280
/// asserting the round-tripped bytes match the original.
294281
async fn assert_roundtrip(client: &XetClient, content: &[u8]) {
295-
let mut writer = client.write(None, None, content.len() as u64).await.unwrap();
282+
let mut writer = client.write(None).await.unwrap();
296283
for chunk in content.chunks(4096) {
297284
writer.write(Bytes::copy_from_slice(chunk)).await.unwrap();
298285
}
@@ -341,7 +328,7 @@ mod tests {
341328
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
342329

343330
let content: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();
344-
let mut writer = client.write(None, None, content.len() as u64).await.unwrap();
331+
let mut writer = client.write(None).await.unwrap();
345332
for chunk in content.chunks(4096) {
346333
writer.write(Bytes::copy_from_slice(chunk)).await.unwrap();
347334
}
@@ -359,7 +346,7 @@ mod tests {
359346
let endpoint = format!("local://{}", temp_dir.path().display());
360347
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
361348

362-
let mut writer = client.write(None, None, 5).await.unwrap();
349+
let mut writer = client.write(None).await.unwrap();
363350
writer.write(Bytes::from_static(b"hello")).await.unwrap();
364351
let file_info = writer.close().await.unwrap();
365352

@@ -416,7 +403,7 @@ mod tests {
416403
let endpoint = format!("local://{}", temp_dir.path().display());
417404
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
418405

419-
let mut writer = client.write(None, None, 100).await.unwrap();
406+
let mut writer = client.write(None).await.unwrap();
420407
writer.write(Bytes::from_static(b"some data")).await.unwrap();
421408
writer.abort().await.unwrap();
422409

@@ -431,7 +418,7 @@ mod tests {
431418
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
432419

433420
let content = b"Hello, World!";
434-
let mut writer = client.write(None, None, Some(content.len() as u64)).await.unwrap();
421+
let mut writer = client.write(None).await.unwrap();
435422
writer.write(Bytes::from_static(content)).await.unwrap();
436423
let file_info = writer.close().await.unwrap();
437424

data/tests/test_session_resume.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ mod tests {
6767

6868
// Feed it half the data, and checkpoint.
6969
let mut cleaner = file_upload_session
70-
.start_clean(Some("data".into()), data.len() as u64, None)
70+
.start_clean(Some("data".into()), Some(data.len() as u64), None)
7171
.await;
7272
cleaner.add_data(&data[..half_n]).await.unwrap();
7373
cleaner.checkpoint().await.unwrap();
@@ -85,7 +85,7 @@ mod tests {
8585

8686
// Feed it half the data, and checkpoint.
8787
let mut cleaner = file_upload_session
88-
.start_clean(Some("data".into()), data.len() as u64, None)
88+
.start_clean(Some("data".into()), Some(data.len() as u64), None)
8989
.await;
9090

9191
// Add all the data. Roughly the first half should dedup.
@@ -140,7 +140,7 @@ mod tests {
140140

141141
// Feed it half the data, and checkpoint.
142142
let mut cleaner = file_upload_session
143-
.start_clean(Some("data".into()), data.len() as u64, None)
143+
.start_clean(Some("data".into()), Some(data.len() as u64), None)
144144
.await;
145145
cleaner.add_data(&data[..rn]).await.unwrap();
146146
cleaner.checkpoint().await.unwrap();
@@ -172,7 +172,7 @@ mod tests {
172172

173173
// Feed it half the data, and checkpoint.
174174
let mut cleaner = file_upload_session
175-
.start_clean(Some("data".into()), data.len() as u64, None)
175+
.start_clean(Some("data".into()), Some(data.len() as u64), None)
176176
.await;
177177

178178
// Add all the data. Roughly the first half should dedup.

hf_xet/src/progress_update.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ impl WrappedProgressUpdaterImpl {
254254
py,
255255
PyItemProgressUpdate {
256256
item_name: PyString::new(py, &u.item_name).into(),
257-
total_bytes: u.total_bytes,
257+
total_bytes: u.total_bytes.unwrap_or(0),
258258
bytes_completed: u.bytes_completed,
259259
bytes_completion_increment: u.bytes_completion_increment,
260260
},

progress_tracking/src/aggregator.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ mod tests {
276276
.register_updates(ProgressUpdate {
277277
item_updates: vec![ItemProgressUpdate {
278278
item_name: Arc::from("fileA.txt"),
279-
total_bytes: 100,
279+
total_bytes: Some(100),
280280
bytes_completed: 10,
281281
bytes_completion_increment: 10,
282282
}],
@@ -299,7 +299,7 @@ mod tests {
299299
.register_updates(ProgressUpdate {
300300
item_updates: vec![ItemProgressUpdate {
301301
item_name: Arc::from("fileB.txt"),
302-
total_bytes: 200,
302+
total_bytes: Some(200),
303303
bytes_completed: 50,
304304
bytes_completion_increment: 50,
305305
}],
@@ -323,13 +323,13 @@ mod tests {
323323
item_updates: vec![
324324
ItemProgressUpdate {
325325
item_name: Arc::from("fileC.txt"),
326-
total_bytes: 300,
326+
total_bytes: Some(300),
327327
bytes_completed: 90,
328328
bytes_completion_increment: 90,
329329
},
330330
ItemProgressUpdate {
331331
item_name: Arc::from("fileA.txt"),
332-
total_bytes: 100,
332+
total_bytes: Some(100),
333333
bytes_completed: 30,
334334
bytes_completion_increment: 20,
335335
},
@@ -368,19 +368,19 @@ mod tests {
368368

369369
let a = &flushed.item_updates[0];
370370
assert_eq!(a.item_name.as_ref(), "fileA.txt");
371-
assert_eq!(a.total_bytes, 100);
371+
assert_eq!(a.total_bytes, Some(100));
372372
assert_eq!(a.bytes_completed, 30);
373373
assert_eq!(a.bytes_completion_increment, 30);
374374

375375
let b = &flushed.item_updates[1];
376376
assert_eq!(b.item_name.as_ref(), "fileB.txt");
377-
assert_eq!(b.total_bytes, 200);
377+
assert_eq!(b.total_bytes, Some(200));
378378
assert_eq!(b.bytes_completed, 50);
379379
assert_eq!(b.bytes_completion_increment, 50);
380380

381381
let c = &flushed.item_updates[2];
382382
assert_eq!(c.item_name.as_ref(), "fileC.txt");
383-
assert_eq!(c.total_bytes, 300);
383+
assert_eq!(c.total_bytes, Some(300));
384384
assert_eq!(c.bytes_completed, 90);
385385
assert_eq!(c.bytes_completion_increment, 90);
386386
}

progress_tracking/src/item_tracking.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl SingleItemProgressUpdater {
109109
.clone()
110110
.do_item_update(ItemProgressUpdate {
111111
item_name: self.item_name.clone(),
112-
total_bytes: self.n_bytes.load(Ordering::Relaxed),
112+
total_bytes: Some(self.n_bytes.load(Ordering::Relaxed)),
113113
bytes_completed: old_completed_count + increment,
114114
bytes_completion_increment: increment,
115115
})

progress_tracking/src/progress_info.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55
#[derive(Clone, Debug)]
66
pub struct ItemProgressUpdate {
77
pub item_name: Arc<str>,
8-
pub total_bytes: u64,
8+
pub total_bytes: Option<u64>,
99

1010
// Bytes completed are the total bytes completed, either through
1111
// deduplication, upload/download, loading from cache, etc.
@@ -19,7 +19,10 @@ impl ItemProgressUpdate {
1919

2020
// Just in case the total got updated, as can be the case when we don't know the
2121
// size ahead of time.
22-
self.total_bytes = self.total_bytes.max(other.total_bytes);
22+
self.total_bytes = match (self.total_bytes, other.total_bytes) {
23+
(Some(a), Some(b)) => Some(a.max(b)),
24+
(a, b) => a.or(b),
25+
};
2326
self.bytes_completed = self.bytes_completed.max(other.bytes_completed);
2427
self.bytes_completion_increment += other.bytes_completion_increment;
2528
}

0 commit comments

Comments
 (0)