Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion api-description.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,28 @@ paths:
tags:
- "File upload"
summary: "Upload a file part"
description:
"Append a single chunk to the upload identified by `uuid`.\n\n
**Idempotent retry contract.** A chunk PUT whose response was lost
in flight can be safely retried with the *previous* `cryptifytoken`
value (the one the client sent on the failed attempt) at the same
`Content-Range` offset and with the same body. The server detects
this case — the request matches the cached `(prev_token, offset,
length, sha256)` of the most recently committed chunk — and
replays the previously returned `cryptifytoken` without
re-writing the file or double-counting against quotas. If the
request looks like a retry but the body bytes differ, the server
responds 400; clients must not retry the same offset with
different bytes.\n\n
Retries are only honoured for the *most recently committed*
chunk. If a client falls behind by more than one chunk it must
start a new upload."
operationId: "uploadFilePart"
parameters:
- in: "header"
name: "cryptifytoken"
description:
"Identifies the version of the upload file parts. Part of the header from the last fileupload response."
"Identifies the version of the upload file parts. Part of the header from the last fileupload response. On a retry of a chunk whose response was lost, send the *previous* token (the one originally sent on the failed PUT)."
schema:
type: "string"
required: true
Expand Down
266 changes: 249 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use rocket_cors::{AllowedOrigins, CorsOptions};

use serde::{Deserialize, Serialize};
use std::time::Duration;
use store::{FileState, Store};
use store::{FileState, LastChunkRecord, Store};

#[derive(Serialize, Deserialize)]
struct InitBody {
Expand Down Expand Up @@ -285,6 +285,7 @@ async fn upload_init(
notify_recipients: request.notify_recipients,
api_key_tenant: api_key.tenant,
api_key_validation_failed: api_key.validation_failed,
last_chunk: None,
},
);

Expand Down Expand Up @@ -456,7 +457,7 @@ async fn upload_chunk(
.end
.ok_or_else(|| Error::BadRequest(Some("Could not read Content-Range start".to_owned())))?;

if start >= end || state.uploaded != start {
if start >= end {
return Err(Error::BadRequest(Some(
"Incorrect Content-Range header".to_owned(),
)));
Expand All @@ -469,6 +470,31 @@ async fn upload_chunk(
))));
}

let body = data
.open((end - start).bytes())
.into_bytes()
.await
.map_err(|_| Error::BadRequest(Some("Could not read data from request".to_owned())))?;
if !body.is_complete() || body.len() as u64 != end - start {
return Err(Error::BadRequest(Some("Data not complete".to_owned())));
}
let body = body.into_inner();

// Three branches: normal next chunk, idempotent retry of the last
// committed chunk, or rejection.
match classify_chunk_request(&state, &headers.cryptify_token, start, end, &body) {
ChunkClassification::NormalNext => {}
ChunkClassification::ReplayLastChunk(token) => {
drop(state);
store.touch(uuid);
return Ok(UploadResponder {
body: (),
cryptify_token: CryptifyToken(token),
});
}
ChunkClassification::Reject(err) => return Err(err),
}

let per_upload_limit = if state.api_key_tenant.is_some() {
API_KEY_PER_UPLOAD_LIMIT
} else {
Expand Down Expand Up @@ -497,8 +523,6 @@ async fn upload_chunk(
}));
}

check_cryptify_token(&headers.cryptify_token, &state.cryptify_token)?;

let mut file = match OpenOptions::new()
.write(true)
.open(Path::new(config.data_dir()).join(uuid))
Expand All @@ -512,24 +536,22 @@ async fn upload_chunk(
.await
.map_err(|_| Error::InternalServerError(Some("Could not write file".to_owned())))?;

let data = data
.open((end - start).bytes())
.into_bytes()
.await
.map_err(|_| Error::BadRequest(Some("Could not read data from request".to_owned())))?;
if !data.is_complete() || data.len() as u64 != end - start {
return Err(Error::BadRequest(Some("Data not complete".to_owned())));
}

let data = data.into_inner();
file.write_all(&data)
file.write_all(&body)
.await
.map_err(|_| Error::InternalServerError(Some("Could not write file".to_owned())))?;

let shasum = compute_hash(&headers.cryptify_token.into_bytes(), &data);
let prev_token = headers.cryptify_token.clone();
let chunk_sha256 = sha256_of(&body);
let shasum = compute_hash(prev_token.as_bytes(), &body);
state.cryptify_token = shasum.clone();

state.uploaded += end - start;
state.last_chunk = Some(LastChunkRecord {
prev_token,
prev_uploaded: start,
chunk_len: end - start,
chunk_sha256,
response_token: shasum.clone(),
});

drop(state);
store.touch(uuid);
Expand All @@ -540,6 +562,67 @@ async fn upload_chunk(
})
}

/// Outcome of inspecting a chunk PUT against the current `FileState`.
enum ChunkClassification {
/// The expected next chunk in the rolling-token chain — caller proceeds
/// to the normal write path.
NormalNext,
/// The just-completed chunk being retried after a lost response. Caller
/// returns this token to the client without re-writing or double-counting.
ReplayLastChunk(String),
/// Reject the request with this error — the standard 400 you'd get
/// before idempotent-retry support, plus stricter 400s when the request
/// looks like a retry but the body or length diverges (almost certainly
/// a client bug, never accept different bytes for the same offset).
Reject(Error),
}

fn classify_chunk_request(
state: &FileState,
request_token: &str,
start: u64,
end: u64,
body: &[u8],
) -> ChunkClassification {
if state.uploaded == start && request_token == state.cryptify_token {
return ChunkClassification::NormalNext;
}

if let Some(last) = state.last_chunk.as_ref() {
if request_token == last.prev_token && start == last.prev_uploaded {
if end - start != last.chunk_len {
return ChunkClassification::Reject(Error::BadRequest(Some(
"Idempotent retry: chunk length differs from the original".to_owned(),
)));
}
if sha256_of(body) != last.chunk_sha256 {
return ChunkClassification::Reject(Error::BadRequest(Some(
"Idempotent retry: body hash differs from the original chunk".to_owned(),
)));
}
return ChunkClassification::ReplayLastChunk(last.response_token.clone());
}
}

if state.uploaded != start {
return ChunkClassification::Reject(Error::BadRequest(Some(
"Incorrect Content-Range header".to_owned(),
)));
}

// Right offset but wrong token: the existing `check_cryptify_token`
// wording, so error messages don't change for non-retry callers.
ChunkClassification::Reject(Error::BadRequest(Some(
"Cryptify Token header does not match".to_owned(),
)))
}

fn sha256_of(data: &[u8]) -> [u8; 32] {
let mut hash = sha2::Sha256::new();
hash.update(data);
hash.finalize().into()
}

struct FinalizeHeaders {
cryptify_token: String,
content_range: ContentRange,
Expand Down Expand Up @@ -1040,6 +1123,155 @@ mod tests {
let _ = std::fs::remove_dir_all(&data_dir);
}

fn empty_filestate(uploaded: u64, current_token: &str) -> FileState {
FileState {
uploaded,
cryptify_token: current_token.to_owned(),
expires: 0,
recipients: lettre::message::Mailboxes::new(),
mail_content: String::new(),
mail_lang: email::Language::En,
sender: None,
sender_attributes: Vec::new(),
confirm: false,
notify_recipients: true,
api_key_tenant: None,
api_key_validation_failed: false,
last_chunk: None,
}
}

fn filestate_with_last_chunk(
uploaded: u64,
current_token: &str,
last: LastChunkRecord,
) -> FileState {
let mut s = empty_filestate(uploaded, current_token);
s.last_chunk = Some(last);
s
}

#[test]
fn classify_normal_next_chunk() {
let state = empty_filestate(100, "tok-current");
match classify_chunk_request(&state, "tok-current", 100, 200, b"chunk") {
ChunkClassification::NormalNext => {}
_ => panic!("expected NormalNext"),
}
}

#[test]
fn classify_replays_last_chunk_on_matching_retry() {
let body = b"hello world";
let last = LastChunkRecord {
prev_token: "tok-prev".into(),
prev_uploaded: 100,
chunk_len: body.len() as u64,
chunk_sha256: sha256_of(body),
response_token: "tok-after".into(),
};
let state = filestate_with_last_chunk(100 + body.len() as u64, "tok-after", last);
match classify_chunk_request(&state, "tok-prev", 100, 100 + body.len() as u64, body) {
ChunkClassification::ReplayLastChunk(t) => assert_eq!(t, "tok-after"),
_ => panic!("expected ReplayLastChunk"),
}
}

#[test]
fn classify_rejects_retry_with_different_body() {
let body = b"original";
let last = LastChunkRecord {
prev_token: "tok-prev".into(),
prev_uploaded: 100,
chunk_len: body.len() as u64,
chunk_sha256: sha256_of(body),
response_token: "tok-after".into(),
};
let state = filestate_with_last_chunk(100 + body.len() as u64, "tok-after", last);
let tampered = b"tampered";
let result = classify_chunk_request(
&state,
"tok-prev",
100,
100 + tampered.len() as u64,
tampered,
);
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert!(msg.contains("body hash"), "got: {}", msg);
}
_ => panic!("expected BadRequest about body hash"),
}
}

#[test]
fn classify_rejects_retry_with_different_length() {
let body = b"original";
let last = LastChunkRecord {
prev_token: "tok-prev".into(),
prev_uploaded: 100,
chunk_len: body.len() as u64,
chunk_sha256: sha256_of(body),
response_token: "tok-after".into(),
};
let state = filestate_with_last_chunk(100 + body.len() as u64, "tok-after", last);
// Same prev_token + start, but length differs from cached record.
let result = classify_chunk_request(&state, "tok-prev", 100, 100 + 5, b"short");
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert!(msg.contains("chunk length"), "got: {}", msg);
}
_ => panic!("expected BadRequest about chunk length"),
}
}

#[test]
fn classify_rejects_offset_mismatch_with_no_replay() {
// No last_chunk recorded → offset mismatch is just the regular 400.
let state = empty_filestate(100, "tok-current");
let result = classify_chunk_request(&state, "tok-current", 50, 60, b"abc");
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert_eq!(msg, "Incorrect Content-Range header");
}
_ => panic!("expected BadRequest about Content-Range"),
}
}

#[test]
fn classify_rejects_token_mismatch_at_correct_offset() {
let state = empty_filestate(100, "tok-current");
let result = classify_chunk_request(&state, "tok-wrong", 100, 110, b"chunk");
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert_eq!(msg, "Cryptify Token header does not match");
}
_ => panic!("expected BadRequest about token mismatch"),
}
}

#[test]
fn classify_does_not_replay_when_prev_token_does_not_match() {
// Last chunk exists but the retry presents a *different* prev_token.
// Falls through to the regular offset-mismatch rejection.
let body = b"original";
let last = LastChunkRecord {
prev_token: "tok-prev".into(),
prev_uploaded: 100,
chunk_len: body.len() as u64,
chunk_sha256: sha256_of(body),
response_token: "tok-after".into(),
};
let state = filestate_with_last_chunk(100 + body.len() as u64, "tok-after", last);
let result = classify_chunk_request(&state, "tok-something-else", 100, 108, body);
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert_eq!(msg, "Incorrect Content-Range header");
}
_ => panic!("expected BadRequest about Content-Range"),
}
}

#[test]
fn extract_pg_bearer_accepts_pg_prefixed_token() {
assert_eq!(
Expand Down
Loading