Skip to content

Commit 13661c7

Browse files
authored
Merge pull request #145 from encryption4all/feat/idempotent-chunk-retry
feat(upload): idempotent retry of the last committed chunk
2 parents 51bf439 + 2a6dac1 commit 13661c7

3 files changed

Lines changed: 300 additions & 21 deletions

File tree

api-description.yaml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,28 @@ paths:
9393
tags:
9494
- "File upload"
9595
summary: "Upload a file part"
96+
description:
97+
"Append a single chunk to the upload identified by `uuid`.\n\n
98+
**Idempotent retry contract.** A chunk PUT whose response was lost
99+
in flight can be safely retried with the *previous* `cryptifytoken`
100+
value (the one the client sent on the failed attempt) at the same
101+
`Content-Range` offset and with the same body. The server detects
102+
this case — the request matches the cached `(prev_token, offset,
103+
length, sha256)` of the most recently committed chunk — and
104+
replays the previously returned `cryptifytoken` without
105+
re-writing the file or double-counting against quotas. If the
106+
request looks like a retry but the body bytes differ, the server
107+
responds 400; clients must not retry the same offset with
108+
different bytes.\n\n
109+
Retries are only honoured for the *most recently committed*
110+
chunk. If a client falls behind by more than one chunk it must
111+
start a new upload."
96112
operationId: "uploadFilePart"
97113
parameters:
98114
- in: "header"
99115
name: "cryptifytoken"
100116
description:
101-
"Identifies the version of the upload file parts. Part of the header from the last fileupload response."
117+
"Identifies the version of the upload file parts. Part of the header from the last fileupload response. On a retry of a chunk whose response was lost, send the *previous* token (the one originally sent on the failed PUT)."
102118
schema:
103119
type: "string"
104120
required: true

src/main.rs

Lines changed: 250 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use rocket_cors::{AllowedOrigins, CorsOptions};
3939

4040
use serde::{Deserialize, Serialize};
4141
use std::time::Duration;
42-
use store::{FileState, Store};
42+
use store::{FileState, LastChunkRecord, Store};
4343

4444
#[derive(Serialize, Deserialize)]
4545
struct InitBody {
@@ -285,6 +285,7 @@ async fn upload_init(
285285
notify_recipients: request.notify_recipients,
286286
api_key_tenant: api_key.tenant,
287287
api_key_validation_failed: api_key.validation_failed,
288+
last_chunk: None,
288289
},
289290
);
290291

@@ -420,11 +421,14 @@ fn compute_hash(cryptify_token: &[u8], data: &[u8]) -> String {
420421
bytes_to_hex(&hash.finalize())
421422
}
422423

424+
/// Wire-level error message for a `CryptifyToken` mismatch. Reused by both
425+
/// `check_cryptify_token` (the finalize path) and the chunk classifier so the
426+
/// message can't drift silently between call sites.
427+
const TOKEN_MISMATCH_MSG: &str = "Cryptify Token header does not match";
428+
423429
fn check_cryptify_token(header: &str, expected: &str) -> Result<(), Error> {
424430
if header != expected {
425-
return Err(Error::BadRequest(Some(
426-
"Cryptify Token header does not match".to_owned(),
427-
)));
431+
return Err(Error::BadRequest(Some(TOKEN_MISMATCH_MSG.to_owned())));
428432
}
429433
Ok(())
430434
}
@@ -456,7 +460,7 @@ async fn upload_chunk(
456460
.end
457461
.ok_or_else(|| Error::BadRequest(Some("Could not read Content-Range start".to_owned())))?;
458462

459-
if start >= end || state.uploaded != start {
463+
if start >= end {
460464
return Err(Error::BadRequest(Some(
461465
"Incorrect Content-Range header".to_owned(),
462466
)));
@@ -469,6 +473,50 @@ async fn upload_chunk(
469473
))));
470474
}
471475

476+
// Cheap pre-check before reading the body, so a leaked UUID can't be
477+
// used to force the server to buffer up to `chunk_size` bytes per
478+
// request just to be rejected. Mirrors the structural part of
479+
// `classify_chunk_request` — we only commit to reading the body when
480+
// the request looks like either a normal next chunk or a candidate
481+
// replay of the last committed chunk.
482+
let is_normal_next = state.uploaded == start && headers.cryptify_token == state.cryptify_token;
483+
let is_replay_candidate = state.last_chunk.as_ref().is_some_and(|last| {
484+
last.prev_uploaded == start && headers.cryptify_token == last.prev_token
485+
});
486+
if !is_normal_next && !is_replay_candidate {
487+
if state.uploaded != start {
488+
return Err(Error::BadRequest(Some(
489+
"Incorrect Content-Range header".to_owned(),
490+
)));
491+
}
492+
return Err(Error::BadRequest(Some(TOKEN_MISMATCH_MSG.to_owned())));
493+
}
494+
495+
let body = data
496+
.open((end - start).bytes())
497+
.into_bytes()
498+
.await
499+
.map_err(|_| Error::BadRequest(Some("Could not read data from request".to_owned())))?;
500+
if !body.is_complete() || body.len() as u64 != end - start {
501+
return Err(Error::BadRequest(Some("Data not complete".to_owned())));
502+
}
503+
let body = body.into_inner();
504+
505+
// Three branches: normal next chunk, idempotent retry of the last
506+
// committed chunk, or rejection.
507+
match classify_chunk_request(&state, &headers.cryptify_token, start, &body) {
508+
ChunkClassification::NormalNext => {}
509+
ChunkClassification::ReplayLastChunk(token) => {
510+
drop(state);
511+
store.touch(uuid);
512+
return Ok(UploadResponder {
513+
body: (),
514+
cryptify_token: CryptifyToken(token),
515+
});
516+
}
517+
ChunkClassification::Reject(err) => return Err(err),
518+
}
519+
472520
let per_upload_limit = if state.api_key_tenant.is_some() {
473521
API_KEY_PER_UPLOAD_LIMIT
474522
} else {
@@ -497,8 +545,6 @@ async fn upload_chunk(
497545
}));
498546
}
499547

500-
check_cryptify_token(&headers.cryptify_token, &state.cryptify_token)?;
501-
502548
let mut file = match OpenOptions::new()
503549
.write(true)
504550
.open(Path::new(config.data_dir()).join(uuid))
@@ -512,24 +558,19 @@ async fn upload_chunk(
512558
.await
513559
.map_err(|_| Error::InternalServerError(Some("Could not write file".to_owned())))?;
514560

515-
let data = data
516-
.open((end - start).bytes())
517-
.into_bytes()
518-
.await
519-
.map_err(|_| Error::BadRequest(Some("Could not read data from request".to_owned())))?;
520-
if !data.is_complete() || data.len() as u64 != end - start {
521-
return Err(Error::BadRequest(Some("Data not complete".to_owned())));
522-
}
523-
524-
let data = data.into_inner();
525-
file.write_all(&data)
561+
file.write_all(&body)
526562
.await
527563
.map_err(|_| Error::InternalServerError(Some("Could not write file".to_owned())))?;
528564

529-
let shasum = compute_hash(&headers.cryptify_token.into_bytes(), &data);
565+
let prev_token = headers.cryptify_token;
566+
let shasum = compute_hash(prev_token.as_bytes(), &body);
530567
state.cryptify_token = shasum.clone();
531-
532568
state.uploaded += end - start;
569+
state.last_chunk = Some(LastChunkRecord {
570+
prev_token,
571+
prev_uploaded: start,
572+
response_token: shasum.clone(),
573+
});
533574

534575
drop(state);
535576
store.touch(uuid);
@@ -540,6 +581,58 @@ async fn upload_chunk(
540581
})
541582
}
542583

584+
/// Outcome of inspecting a chunk PUT against the current `FileState`.
585+
enum ChunkClassification {
586+
/// The expected next chunk in the rolling-token chain — caller proceeds
587+
/// to the normal write path.
588+
NormalNext,
589+
/// The just-completed chunk being retried after a lost response. Caller
590+
/// returns this token to the client without re-writing or double-counting.
591+
ReplayLastChunk(String),
592+
/// Reject the request with this error — the standard 400 you'd get
593+
/// before idempotent-retry support, plus a stricter 400 when the
594+
/// request looks like a retry but the body bytes (or their length)
595+
/// diverge from the cached chunk. Never accept different bytes for
596+
/// the same offset.
597+
Reject(Error),
598+
}
599+
600+
fn classify_chunk_request(
601+
state: &FileState,
602+
request_token: &str,
603+
start: u64,
604+
body: &[u8],
605+
) -> ChunkClassification {
606+
if state.uploaded == start && request_token == state.cryptify_token {
607+
return ChunkClassification::NormalNext;
608+
}
609+
610+
if let Some(last) = state.last_chunk.as_ref() {
611+
if request_token == last.prev_token && start == last.prev_uploaded {
612+
// Recompute the rolling hash over the incoming body. Identity
613+
// is implicit in the rolling-token construction itself: if the
614+
// hash matches `response_token`, the body is byte-identical to
615+
// the original chunk (modulo a SHA-256 collision, which would
616+
// also break the rolling chain). Length divergence surfaces
617+
// here too.
618+
if compute_hash(last.prev_token.as_bytes(), body) == last.response_token {
619+
return ChunkClassification::ReplayLastChunk(last.response_token.clone());
620+
}
621+
return ChunkClassification::Reject(Error::BadRequest(Some(
622+
"Idempotent retry: body differs from the original chunk".to_owned(),
623+
)));
624+
}
625+
}
626+
627+
if state.uploaded != start {
628+
return ChunkClassification::Reject(Error::BadRequest(Some(
629+
"Incorrect Content-Range header".to_owned(),
630+
)));
631+
}
632+
633+
ChunkClassification::Reject(Error::BadRequest(Some(TOKEN_MISMATCH_MSG.to_owned())))
634+
}
635+
543636
struct FinalizeHeaders {
544637
cryptify_token: String,
545638
content_range: ContentRange,
@@ -1040,6 +1133,143 @@ mod tests {
10401133
let _ = std::fs::remove_dir_all(&data_dir);
10411134
}
10421135

1136+
fn empty_filestate(uploaded: u64, current_token: &str) -> FileState {
1137+
FileState {
1138+
uploaded,
1139+
cryptify_token: current_token.to_owned(),
1140+
expires: 0,
1141+
recipients: lettre::message::Mailboxes::new(),
1142+
mail_content: String::new(),
1143+
mail_lang: email::Language::En,
1144+
sender: None,
1145+
sender_attributes: Vec::new(),
1146+
confirm: false,
1147+
notify_recipients: true,
1148+
api_key_tenant: None,
1149+
api_key_validation_failed: false,
1150+
last_chunk: None,
1151+
}
1152+
}
1153+
1154+
fn filestate_with_last_chunk(
1155+
uploaded: u64,
1156+
current_token: &str,
1157+
last: LastChunkRecord,
1158+
) -> FileState {
1159+
let mut s = empty_filestate(uploaded, current_token);
1160+
s.last_chunk = Some(last);
1161+
s
1162+
}
1163+
1164+
/// Build a `LastChunkRecord` whose `response_token` correctly encodes
1165+
/// `prev_token + body`, the same construction the production handler
1166+
/// uses. Tests use this so the replay path's hash check passes on a
1167+
/// genuine retry and fails when the body is tampered with.
1168+
fn last_chunk_for(prev_token: &str, prev_uploaded: u64, body: &[u8]) -> LastChunkRecord {
1169+
LastChunkRecord {
1170+
prev_token: prev_token.to_owned(),
1171+
prev_uploaded,
1172+
response_token: compute_hash(prev_token.as_bytes(), body),
1173+
}
1174+
}
1175+
1176+
#[test]
1177+
fn classify_normal_next_chunk() {
1178+
let state = empty_filestate(100, "tok-current");
1179+
match classify_chunk_request(&state, "tok-current", 100, b"chunk") {
1180+
ChunkClassification::NormalNext => {}
1181+
_ => panic!("expected NormalNext"),
1182+
}
1183+
}
1184+
1185+
#[test]
1186+
fn classify_replays_last_chunk_on_matching_retry() {
1187+
let body = b"hello world";
1188+
let last = last_chunk_for("tok-prev", 100, body);
1189+
let response_token = last.response_token.clone();
1190+
let state = filestate_with_last_chunk(100 + body.len() as u64, &response_token, last);
1191+
match classify_chunk_request(&state, "tok-prev", 100, body) {
1192+
ChunkClassification::ReplayLastChunk(t) => assert_eq!(t, response_token),
1193+
_ => panic!("expected ReplayLastChunk"),
1194+
}
1195+
}
1196+
1197+
#[test]
1198+
fn classify_rejects_retry_with_different_body() {
1199+
let body = b"original";
1200+
let last = last_chunk_for("tok-prev", 100, body);
1201+
let response_token = last.response_token.clone();
1202+
let state = filestate_with_last_chunk(100 + body.len() as u64, &response_token, last);
1203+
let tampered = b"tampered";
1204+
let result = classify_chunk_request(&state, "tok-prev", 100, tampered);
1205+
match result {
1206+
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
1207+
assert!(msg.contains("body differs"), "got: {}", msg);
1208+
}
1209+
_ => panic!("expected BadRequest about body differs"),
1210+
}
1211+
}
1212+
1213+
#[test]
1214+
fn classify_rejects_retry_with_different_length() {
1215+
// Same prev_token + start, but a shorter body. The recomputed
1216+
// rolling hash won't match, so the body-differs path catches this
1217+
// case too — we no longer need a length-specific record.
1218+
let body = b"original";
1219+
let last = last_chunk_for("tok-prev", 100, body);
1220+
let response_token = last.response_token.clone();
1221+
let state = filestate_with_last_chunk(100 + body.len() as u64, &response_token, last);
1222+
let result = classify_chunk_request(&state, "tok-prev", 100, b"short");
1223+
match result {
1224+
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
1225+
assert!(msg.contains("body differs"), "got: {}", msg);
1226+
}
1227+
_ => panic!("expected BadRequest about body differs"),
1228+
}
1229+
}
1230+
1231+
#[test]
1232+
fn classify_rejects_offset_mismatch_with_no_replay() {
1233+
// No last_chunk recorded → offset mismatch is just the regular 400.
1234+
let state = empty_filestate(100, "tok-current");
1235+
let result = classify_chunk_request(&state, "tok-current", 50, b"abc");
1236+
match result {
1237+
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
1238+
assert_eq!(msg, "Incorrect Content-Range header");
1239+
}
1240+
_ => panic!("expected BadRequest about Content-Range"),
1241+
}
1242+
}
1243+
1244+
#[test]
1245+
fn classify_rejects_token_mismatch_at_correct_offset() {
1246+
let state = empty_filestate(100, "tok-current");
1247+
let result = classify_chunk_request(&state, "tok-wrong", 100, b"chunk");
1248+
match result {
1249+
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
1250+
assert_eq!(msg, TOKEN_MISMATCH_MSG);
1251+
}
1252+
_ => panic!("expected BadRequest about token mismatch"),
1253+
}
1254+
}
1255+
1256+
#[test]
1257+
fn classify_does_not_replay_when_prev_token_does_not_match() {
1258+
// Last chunk exists but the retry presents a *different* prev_token.
1259+
// Falls through to the regular offset-mismatch rejection.
1260+
let body = b"original";
1261+
let last = last_chunk_for("tok-prev", 100, body);
1262+
let response_token = last.response_token.clone();
1263+
let state = filestate_with_last_chunk(100 + body.len() as u64, &response_token, last);
1264+
let result = classify_chunk_request(&state, "tok-something-else", 100, body);
1265+
match result {
1266+
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
1267+
assert_eq!(msg, "Incorrect Content-Range header");
1268+
}
1269+
_ => panic!("expected BadRequest about Content-Range"),
1270+
}
1271+
}
1272+
10431273
#[test]
10441274
fn extract_pg_bearer_accepts_pg_prefixed_token() {
10451275
assert_eq!(

0 commit comments

Comments
 (0)