Skip to content

Commit e1b342f

Browse files
committed
fix: repair native range transfer progress
1 parent 24e0f10 commit e1b342f

8 files changed

Lines changed: 285 additions & 31 deletions

File tree

crates/raria-cli/src/daemon.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ pub(crate) async fn run_daemon_with_config(
251251
let range_context = RangeExecutionContext {
252252
task_id: task_id.clone(),
253253
};
254+
let engine_for_failure = Arc::clone(&engine_ref);
254255
tokio::spawn(async move {
255256
if let Err(e) = run_job_download(
256257
engine_ref,
@@ -261,6 +262,10 @@ pub(crate) async fn run_daemon_with_config(
261262
.await
262263
{
263264
error!(%task_id, error = %e, "range download task failed");
265+
let _ = engine_for_failure.fail_native_task(
266+
&task_id,
267+
&classified_error_message(&e.to_string()),
268+
);
264269
}
265270
});
266271
}

crates/raria-cli/tests/native_api_smoke.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,104 @@ async fn daemon_native_task_reports_effective_active_connections() {
985985
}
986986
}
987987

988+
#[tokio::test]
989+
async fn daemon_native_http_range_task_progresses_with_bounded_segments() {
990+
let payload = Arc::new(
991+
(0..(512 * 1024))
992+
.map(|i| (i % 251) as u8)
993+
.collect::<Vec<_>>(),
994+
);
995+
let server = MockServer::start().await;
996+
997+
Mock::given(method("HEAD"))
998+
.and(path("/bounded-range.bin"))
999+
.respond_with(RangeResponder::new(Arc::clone(&payload)))
1000+
.mount(&server)
1001+
.await;
1002+
Mock::given(method("GET"))
1003+
.and(path("/bounded-range.bin"))
1004+
.respond_with(RangeResponder::new(Arc::clone(&payload)))
1005+
.mount(&server)
1006+
.await;
1007+
1008+
let temp = tempdir().expect("tempdir");
1009+
let session_file = temp.path().join("native-bounded-range.session.redb");
1010+
let port = allocate_port();
1011+
let mut child = spawn_native_daemon(temp.path(), &session_file, port);
1012+
wait_for_native_api_ready(port, &mut child)
1013+
.await
1014+
.expect("native API ready");
1015+
1016+
let client = reqwest::Client::new();
1017+
let download_dir = temp.path().join("nested").join("downloads");
1018+
let created: serde_json::Value = client
1019+
.post(format!("http://127.0.0.1:{port}/api/v1/tasks"))
1020+
.json(&serde_json::json!({
1021+
"sources": [format!("{}/bounded-range.bin", server.uri())],
1022+
"downloadDir": download_dir,
1023+
"filename": "bounded-range.bin",
1024+
"segments": 4
1025+
}))
1026+
.send()
1027+
.await
1028+
.expect("create bounded range task")
1029+
.json()
1030+
.await
1031+
.expect("bounded range task json");
1032+
let task_id = created["taskId"].as_str().expect("task id");
1033+
1034+
let deadline = Instant::now() + Duration::from_secs(30);
1035+
let completed = loop {
1036+
let task: serde_json::Value = client
1037+
.get(format!("http://127.0.0.1:{port}/api/v1/tasks/{task_id}"))
1038+
.send()
1039+
.await
1040+
.expect("bounded range task detail request")
1041+
.json()
1042+
.await
1043+
.expect("bounded range task detail json");
1044+
if task["completedBytes"].as_u64().unwrap_or(0) > 0 {
1045+
assert!(
1046+
task["downloadBytesPerSecond"].as_u64().unwrap_or(0) > 0,
1047+
"bounded range task should expose a non-zero speed after progress: {task}"
1048+
);
1049+
}
1050+
if task["lifecycle"] == "completed" {
1051+
break task;
1052+
}
1053+
1054+
assert!(
1055+
Instant::now() < deadline,
1056+
"bounded range task never completed: {task}"
1057+
);
1058+
tokio::time::sleep(Duration::from_millis(50)).await;
1059+
};
1060+
assert_eq!(
1061+
completed["completedBytes"].as_u64(),
1062+
Some(payload.len() as u64)
1063+
);
1064+
assert_eq!(
1065+
std::fs::read(temp.path().join("nested/downloads/bounded-range.bin"))
1066+
.expect("read bounded range output"),
1067+
payload.as_ref().clone()
1068+
);
1069+
1070+
let requests = server.received_requests().await.expect("received requests");
1071+
let bounded_range_seen = requests.iter().any(|request| {
1072+
request.method.as_str() == "GET"
1073+
&& request.url.path() == "/bounded-range.bin"
1074+
&& request
1075+
.headers
1076+
.get("range")
1077+
.and_then(|value| value.to_str().ok())
1078+
.is_some_and(|range| range.starts_with("bytes=") && !range.ends_with('-'))
1079+
});
1080+
assert!(
1081+
bounded_range_seen,
1082+
"native HTTP range task should issue bounded Range requests"
1083+
);
1084+
}
1085+
9881086
#[tokio::test]
9891087
async fn daemon_native_task_stops_after_file_not_found_budget() {
9901088
let server = MockServer::start().await;

crates/raria-cli/tests/single_download.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1592,7 +1592,7 @@ async fn single_download_continue_resumes_from_existing_file_length() {
15921592

15931593
Mock::given(method("GET"))
15941594
.and(path("/continue.bin"))
1595-
.and(header("range", "bytes=4-"))
1595+
.and(header("range", "bytes=4-7"))
15961596
.respond_with(ResponseTemplate::new(206).set_body_bytes(b"5678"))
15971597
.mount(&server)
15981598
.await;

crates/raria-core/src/engine.rs

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use std::collections::{HashMap, HashSet};
3333
use std::path::PathBuf;
3434
use std::sync::Arc;
3535
use std::sync::atomic::{AtomicU64, Ordering};
36-
use std::time::Instant;
36+
use std::time::{Duration, Instant};
3737
use tokio::sync::Notify;
3838
use tokio_util::sync::CancellationToken;
3939
use tracing::{debug, error, info, warn};
@@ -42,6 +42,11 @@ fn native_task_log_fields(task_id: &TaskId) -> [(&'static str, String); 1] {
4242
[("task_id", task_id.to_string())]
4343
}
4444

45+
fn speed_from_delta(bytes: u64, elapsed: Duration) -> u64 {
46+
let nanos = elapsed.as_nanos().max(1);
47+
((bytes as u128).saturating_mul(1_000_000_000) / nanos) as u64
48+
}
49+
4550
/// Input for planning a native segmented range download.
4651
#[derive(Debug, Clone, Copy)]
4752
pub struct NativeSegmentPlanningInput<'a> {
@@ -155,6 +160,8 @@ pub struct Engine {
155160
global_upload_limit: AtomicU64,
156161
/// Per-job limiter handles layered on top of the global limiter.
157162
job_rate_limiters: Mutex<HashMap<Gid, Arc<SharedRateLimiter>>>,
163+
/// Per-job progress sampling state used to project native range speed.
164+
job_speed_samples: Mutex<HashMap<Gid, SpeedSample>>,
158165
/// Unique session identifier (random hex, persisted for lifetime of process).
159166
pub session_id: String,
160167
store: Option<Arc<Store>>,
@@ -164,6 +171,12 @@ pub struct Engine {
164171
started_at: Instant,
165172
}
166173

174+
#[derive(Debug, Clone, Copy)]
175+
struct SpeedSample {
176+
downloaded: u64,
177+
sampled_at: Instant,
178+
}
179+
167180
impl Engine {
168181
/// Create a new Engine with the given configuration (no persistence).
169182
pub fn new(config: GlobalConfig) -> Self {
@@ -180,6 +193,7 @@ impl Engine {
180193
global_rate_limiter,
181194
global_upload_limit: AtomicU64::new(global_upload_limit),
182195
job_rate_limiters: Mutex::new(HashMap::new()),
196+
job_speed_samples: Mutex::new(HashMap::new()),
183197
session_id: format!("{:016x}", rand::random::<u64>()),
184198
store: None,
185199
shutdown: CancellationToken::new(),
@@ -203,6 +217,7 @@ impl Engine {
203217
global_rate_limiter,
204218
global_upload_limit: AtomicU64::new(global_upload_limit),
205219
job_rate_limiters: Mutex::new(HashMap::new()),
220+
job_speed_samples: Mutex::new(HashMap::new()),
206221
session_id: format!("{:016x}", rand::random::<u64>()),
207222
store: Some(store),
208223
shutdown: CancellationToken::new(),
@@ -778,6 +793,7 @@ impl Engine {
778793
self.cancel_registry.cancel(task_id);
779794
self.scheduler.dequeue_task(task_id);
780795
self.clear_job_rate_limiter(gid);
796+
self.clear_job_speed_sample(gid);
781797
self.registry
782798
.update(gid, |job| {
783799
job.status = Status::Removed;
@@ -805,6 +821,7 @@ impl Engine {
805821
let gid = self
806822
.gid_for_task_id(task_id)
807823
.context("native task not found")?;
824+
self.clear_job_speed_sample(gid);
808825
self.registry
809826
.update(gid, |job| {
810827
job.status = Status::Waiting;
@@ -1065,6 +1082,7 @@ impl Engine {
10651082
[("gid", gid.to_string())],
10661083
);
10671084
self.clear_job_rate_limiter(gid);
1085+
self.clear_job_speed_sample(gid);
10681086
Ok(())
10691087
}
10701088

@@ -1155,6 +1173,7 @@ impl Engine {
11551173
[("gid", gid.to_string())],
11561174
);
11571175
self.clear_job_rate_limiter(gid);
1176+
self.clear_job_speed_sample(gid);
11581177
self.work_notify.notify_one();
11591178
Ok(())
11601179
}
@@ -1196,6 +1215,7 @@ impl Engine {
11961215
[("gid", gid.to_string()), ("error", error_msg.to_string())],
11971216
);
11981217
self.clear_job_rate_limiter(gid);
1218+
self.clear_job_speed_sample(gid);
11991219
self.work_notify.notify_one();
12001220
Ok(())
12011221
}
@@ -1374,10 +1394,45 @@ impl Engine {
13741394
self.job_rate_limiters.lock().remove(&gid);
13751395
}
13761396

1397+
fn clear_job_speed_sample(&self, gid: Gid) {
1398+
self.job_speed_samples.lock().remove(&gid);
1399+
}
1400+
13771401
/// Update download progress for a job.
13781402
pub fn update_progress(&self, gid: Gid, bytes: u64) {
13791403
let _ = self.registry.update(gid, |job| {
1380-
job.downloaded += bytes;
1404+
job.downloaded = job.downloaded.saturating_add(bytes);
1405+
let downloaded = job.downloaded;
1406+
let now = Instant::now();
1407+
let mut samples = self.job_speed_samples.lock();
1408+
let speed = if let Some(sample) = samples.get(&gid).copied() {
1409+
let elapsed = now.duration_since(sample.sampled_at);
1410+
if !elapsed.is_zero() && downloaded >= sample.downloaded {
1411+
let delta = downloaded - sample.downloaded;
1412+
let bps = speed_from_delta(delta, elapsed);
1413+
samples.insert(
1414+
gid,
1415+
SpeedSample {
1416+
downloaded,
1417+
sampled_at: now,
1418+
},
1419+
);
1420+
bps
1421+
} else {
1422+
job.download_speed
1423+
}
1424+
} else {
1425+
let speed = job.download_speed;
1426+
samples.insert(
1427+
gid,
1428+
SpeedSample {
1429+
downloaded,
1430+
sampled_at: now,
1431+
},
1432+
);
1433+
speed
1434+
};
1435+
job.download_speed = speed;
13811436
});
13821437
}
13831438

@@ -1705,6 +1760,7 @@ impl Engine {
17051760
job.connections = 0;
17061761
})
17071762
.context("native task not found")?;
1763+
self.clear_job_speed_sample(gid);
17081764
Ok(())
17091765
}
17101766

@@ -1948,6 +2004,7 @@ impl Engine {
19482004
self.scheduler.dequeue_task(&task_id);
19492005
}
19502006
self.clear_job_rate_limiter(gid);
2007+
self.clear_job_speed_sample(gid);
19512008

19522009
// Force transition to Removed regardless of current state.
19532010
self.registry
@@ -1975,6 +2032,7 @@ impl Engine {
19752032
Status::Complete | Status::Error | Status::Removed => {
19762033
self.registry.remove(gid);
19772034
self.clear_job_rate_limiter(gid);
2035+
self.clear_job_speed_sample(gid);
19782036
debug!(%gid, "download result removed");
19792037
Ok(())
19802038
}
@@ -1993,6 +2051,7 @@ impl Engine {
19932051
Status::Complete | Status::Error | Status::Removed => {
19942052
self.registry.remove(job.gid);
19952053
self.clear_job_rate_limiter(job.gid);
2054+
self.clear_job_speed_sample(job.gid);
19962055
purged += 1;
19972056
}
19982057
_ => {}
@@ -3089,6 +3148,19 @@ mod tests {
30893148
assert_eq!(job.downloaded, 3000);
30903149
}
30913150

3151+
#[test]
3152+
fn update_progress_projects_speed_after_delta() {
3153+
let engine = Engine::new(default_config());
3154+
let handle = engine.add_uri(&default_spec()).unwrap();
3155+
3156+
engine.update_progress(handle.gid, 1000);
3157+
std::thread::sleep(std::time::Duration::from_millis(1));
3158+
engine.update_progress(handle.gid, 1000);
3159+
3160+
let job = engine.registry.get(handle.gid).unwrap();
3161+
assert!(job.download_speed > 0);
3162+
}
3163+
30923164
#[test]
30933165
fn completion_frees_slot() {
30943166
let engine = Engine::new(GlobalConfig {

crates/raria-http/src/backend.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,16 @@ impl ByteSourceBackend for HttpBackend {
399399
request = request.header(name, value);
400400
}
401401

402-
if offset > 0 {
403-
request = request.header(RANGE, format!("bytes={offset}-"));
402+
if offset > 0 || ctx.length.is_some() {
403+
let range = match ctx.length {
404+
Some(0) => format!("bytes={offset}-{offset}"),
405+
Some(length) => {
406+
let end = offset.saturating_add(length).saturating_sub(1);
407+
format!("bytes={offset}-{end}")
408+
}
409+
None => format!("bytes={offset}-"),
410+
};
411+
request = request.header(RANGE, range);
404412
if let Some(ref etag) = ctx.etag {
405413
request = request.header("If-Range", etag.as_str());
406414
}

crates/raria-range/src/backend.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ pub struct OpenContext {
4444
pub etag: Option<String>,
4545
/// Custom HTTP headers for stream requests.
4646
pub headers: Vec<(String, String)>,
47+
/// Maximum number of bytes the backend should stream when the protocol
48+
/// can express an upper bound. The executor still caps reads locally.
49+
pub length: Option<u64>,
4750
}
4851

4952
impl Default for OpenContext {
@@ -53,6 +56,7 @@ impl Default for OpenContext {
5356
timeout: Duration::from_secs(60),
5457
etag: None,
5558
headers: Vec::new(),
59+
length: None,
5660
}
5761
}
5862
}
@@ -93,12 +97,12 @@ pub type ByteStream = Pin<Box<dyn AsyncRead + Send>>;
9397
/// Implementations exist for HTTP (`raria-http`), FTP (`raria-ftp`),
9498
/// and SFTP (`raria-sftp`).
9599
///
96-
/// The key insight: `open_from(offset)` returns a forward-only stream
97-
/// with no upper bound. The caller (SegmentExecutor) is responsible for
98-
/// consuming only the bytes it needs and then dropping the stream.
100+
/// `open_from(offset)` returns a forward-only stream. Protocols that can
101+
/// express an upper bound should honor [`OpenContext::length`]. The caller
102+
/// still consumes only the bytes it needs and then drops the stream.
99103
///
100104
/// This matches the natural semantics of:
101-
/// - HTTP: `Range: bytes=offset-`
105+
/// - HTTP: `Range: bytes=offset-end` when length is known
102106
/// - FTP: `REST offset` + `RETR`
103107
/// - SFTP: `read_from(offset)`
104108
#[async_trait]
@@ -109,8 +113,8 @@ pub trait ByteSourceBackend: Send + Sync + fmt::Debug {
109113
/// Open a byte stream starting from the given offset.
110114
///
111115
/// The stream reads forward from `offset` until EOF or the caller
112-
/// stops consuming. There is no upper-bound parameter because the
113-
/// SegmentExecutor controls how many bytes to read.
116+
/// stops consuming. When [`OpenContext::length`] is set, backends should
117+
/// request that bounded span if the protocol supports it.
114118
async fn open_from(&self, uri: &Url, offset: u64, ctx: &OpenContext) -> Result<ByteStream>;
115119

116120
/// Human-readable name for this backend (e.g., "http", "ftp", "sftp").

0 commit comments

Comments
 (0)