Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion api-description.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,28 @@ paths:
tags:
- "File upload"
summary: "Upload a file part"
description:
"Append a single chunk to the upload identified by `uuid`.\n\n
**Idempotent retry contract.** A chunk PUT whose response was lost
in flight can be safely retried with the *previous* `cryptifytoken`
value (the one the client sent on the failed attempt) at the same
`Content-Range` offset and with the same body. The server detects
this case — the request matches the cached `(prev_token, offset,
length, sha256)` of the most recently committed chunk — and
replays the previously returned `cryptifytoken` without
re-writing the file or double-counting against quotas. If the
request looks like a retry but the body bytes differ, the server
responds 400; clients must not retry the same offset with
different bytes.\n\n
Retries are only honoured for the *most recently committed*
chunk. If a client falls behind by more than one chunk it must
start a new upload."
operationId: "uploadFilePart"
parameters:
- in: "header"
name: "cryptifytoken"
description:
"Identifies the version of the upload file parts. Part of the header from the last fileupload response."
"Identifies the version of the upload file parts. Part of the header from the last fileupload response. On a retry of a chunk whose response was lost, send the *previous* token (the one originally sent on the failed PUT)."
schema:
type: "string"
required: true
Expand Down
270 changes: 250 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use rocket_cors::{AllowedOrigins, CorsOptions};

use serde::{Deserialize, Serialize};
use std::time::Duration;
use store::{FileState, Store};
use store::{FileState, LastChunkRecord, Store};

#[derive(Serialize, Deserialize)]
struct InitBody {
Expand Down Expand Up @@ -285,6 +285,7 @@ async fn upload_init(
notify_recipients: request.notify_recipients,
api_key_tenant: api_key.tenant,
api_key_validation_failed: api_key.validation_failed,
last_chunk: None,
},
);

Expand Down Expand Up @@ -420,11 +421,14 @@ fn compute_hash(cryptify_token: &[u8], data: &[u8]) -> String {
bytes_to_hex(&hash.finalize())
}

/// Wire-level error message for a `CryptifyToken` mismatch. Reused by both
/// `check_cryptify_token` (the finalize path) and the chunk classifier so the
/// message can't drift silently between call sites.
const TOKEN_MISMATCH_MSG: &str = "Cryptify Token header does not match";

fn check_cryptify_token(header: &str, expected: &str) -> Result<(), Error> {
if header != expected {
return Err(Error::BadRequest(Some(
"Cryptify Token header does not match".to_owned(),
)));
return Err(Error::BadRequest(Some(TOKEN_MISMATCH_MSG.to_owned())));
}
Ok(())
}
Expand Down Expand Up @@ -456,7 +460,7 @@ async fn upload_chunk(
.end
.ok_or_else(|| Error::BadRequest(Some("Could not read Content-Range start".to_owned())))?;

if start >= end || state.uploaded != start {
if start >= end {
return Err(Error::BadRequest(Some(
"Incorrect Content-Range header".to_owned(),
)));
Expand All @@ -469,6 +473,50 @@ async fn upload_chunk(
))));
}

// Cheap pre-check before reading the body, so a leaked UUID can't be
// used to force the server to buffer up to `chunk_size` bytes per
// request just to be rejected. Mirrors the structural part of
// `classify_chunk_request` — we only commit to reading the body when
// the request looks like either a normal next chunk or a candidate
// replay of the last committed chunk.
let is_normal_next = state.uploaded == start && headers.cryptify_token == state.cryptify_token;
let is_replay_candidate = state.last_chunk.as_ref().is_some_and(|last| {
last.prev_uploaded == start && headers.cryptify_token == last.prev_token
});
if !is_normal_next && !is_replay_candidate {
if state.uploaded != start {
return Err(Error::BadRequest(Some(
"Incorrect Content-Range header".to_owned(),
)));
}
return Err(Error::BadRequest(Some(TOKEN_MISMATCH_MSG.to_owned())));
}

let body = data
.open((end - start).bytes())
.into_bytes()
.await
.map_err(|_| Error::BadRequest(Some("Could not read data from request".to_owned())))?;
if !body.is_complete() || body.len() as u64 != end - start {
return Err(Error::BadRequest(Some("Data not complete".to_owned())));
}
let body = body.into_inner();

// Three branches: normal next chunk, idempotent retry of the last
// committed chunk, or rejection.
match classify_chunk_request(&state, &headers.cryptify_token, start, &body) {
ChunkClassification::NormalNext => {}
ChunkClassification::ReplayLastChunk(token) => {
drop(state);
store.touch(uuid);
return Ok(UploadResponder {
body: (),
cryptify_token: CryptifyToken(token),
});
}
ChunkClassification::Reject(err) => return Err(err),
}

let per_upload_limit = if state.api_key_tenant.is_some() {
API_KEY_PER_UPLOAD_LIMIT
} else {
Expand Down Expand Up @@ -497,8 +545,6 @@ async fn upload_chunk(
}));
}

check_cryptify_token(&headers.cryptify_token, &state.cryptify_token)?;

let mut file = match OpenOptions::new()
.write(true)
.open(Path::new(config.data_dir()).join(uuid))
Expand All @@ -512,24 +558,19 @@ async fn upload_chunk(
.await
.map_err(|_| Error::InternalServerError(Some("Could not write file".to_owned())))?;

let data = data
.open((end - start).bytes())
.into_bytes()
.await
.map_err(|_| Error::BadRequest(Some("Could not read data from request".to_owned())))?;
if !data.is_complete() || data.len() as u64 != end - start {
return Err(Error::BadRequest(Some("Data not complete".to_owned())));
}

let data = data.into_inner();
file.write_all(&data)
file.write_all(&body)
.await
.map_err(|_| Error::InternalServerError(Some("Could not write file".to_owned())))?;

let shasum = compute_hash(&headers.cryptify_token.into_bytes(), &data);
let prev_token = headers.cryptify_token;
let shasum = compute_hash(prev_token.as_bytes(), &body);
state.cryptify_token = shasum.clone();

state.uploaded += end - start;
state.last_chunk = Some(LastChunkRecord {
prev_token,
prev_uploaded: start,
response_token: shasum.clone(),
});

drop(state);
store.touch(uuid);
Expand All @@ -540,6 +581,58 @@ async fn upload_chunk(
})
}

/// Outcome of inspecting a chunk PUT against the current `FileState`.
enum ChunkClassification {
/// The expected next chunk in the rolling-token chain — caller proceeds
/// to the normal write path.
NormalNext,
/// The just-completed chunk being retried after a lost response. Caller
/// returns this token to the client without re-writing or double-counting.
ReplayLastChunk(String),
/// Reject the request with this error — the standard 400 you'd get
/// before idempotent-retry support, plus a stricter 400 when the
/// request looks like a retry but the body bytes (or their length)
/// diverge from the cached chunk. Never accept different bytes for
/// the same offset.
Reject(Error),
}

fn classify_chunk_request(
state: &FileState,
request_token: &str,
start: u64,
body: &[u8],
) -> ChunkClassification {
if state.uploaded == start && request_token == state.cryptify_token {
return ChunkClassification::NormalNext;
}

if let Some(last) = state.last_chunk.as_ref() {
if request_token == last.prev_token && start == last.prev_uploaded {
// Recompute the rolling hash over the incoming body. Identity
// is implicit in the rolling-token construction itself: if the
// hash matches `response_token`, the body is byte-identical to
// the original chunk (modulo a SHA-256 collision, which would
// also break the rolling chain). Length divergence surfaces
// here too.
if compute_hash(last.prev_token.as_bytes(), body) == last.response_token {
return ChunkClassification::ReplayLastChunk(last.response_token.clone());
}
return ChunkClassification::Reject(Error::BadRequest(Some(
"Idempotent retry: body differs from the original chunk".to_owned(),
)));
}
}

if state.uploaded != start {
return ChunkClassification::Reject(Error::BadRequest(Some(
"Incorrect Content-Range header".to_owned(),
)));
}

ChunkClassification::Reject(Error::BadRequest(Some(TOKEN_MISMATCH_MSG.to_owned())))
}

struct FinalizeHeaders {
cryptify_token: String,
content_range: ContentRange,
Expand Down Expand Up @@ -1040,6 +1133,143 @@ mod tests {
let _ = std::fs::remove_dir_all(&data_dir);
}

fn empty_filestate(uploaded: u64, current_token: &str) -> FileState {
FileState {
uploaded,
cryptify_token: current_token.to_owned(),
expires: 0,
recipients: lettre::message::Mailboxes::new(),
mail_content: String::new(),
mail_lang: email::Language::En,
sender: None,
sender_attributes: Vec::new(),
confirm: false,
notify_recipients: true,
api_key_tenant: None,
api_key_validation_failed: false,
last_chunk: None,
}
}

fn filestate_with_last_chunk(
uploaded: u64,
current_token: &str,
last: LastChunkRecord,
) -> FileState {
let mut s = empty_filestate(uploaded, current_token);
s.last_chunk = Some(last);
s
}

/// Build a `LastChunkRecord` whose `response_token` correctly encodes
/// `prev_token + body`, the same construction the production handler
/// uses. Tests use this so the replay path's hash check passes on a
/// genuine retry and fails when the body is tampered with.
fn last_chunk_for(prev_token: &str, prev_uploaded: u64, body: &[u8]) -> LastChunkRecord {
LastChunkRecord {
prev_token: prev_token.to_owned(),
prev_uploaded,
response_token: compute_hash(prev_token.as_bytes(), body),
}
}

#[test]
fn classify_normal_next_chunk() {
let state = empty_filestate(100, "tok-current");
match classify_chunk_request(&state, "tok-current", 100, b"chunk") {
ChunkClassification::NormalNext => {}
_ => panic!("expected NormalNext"),
}
}

#[test]
fn classify_replays_last_chunk_on_matching_retry() {
let body = b"hello world";
let last = last_chunk_for("tok-prev", 100, body);
let response_token = last.response_token.clone();
let state = filestate_with_last_chunk(100 + body.len() as u64, &response_token, last);
match classify_chunk_request(&state, "tok-prev", 100, body) {
ChunkClassification::ReplayLastChunk(t) => assert_eq!(t, response_token),
_ => panic!("expected ReplayLastChunk"),
}
}

#[test]
fn classify_rejects_retry_with_different_body() {
let body = b"original";
let last = last_chunk_for("tok-prev", 100, body);
let response_token = last.response_token.clone();
let state = filestate_with_last_chunk(100 + body.len() as u64, &response_token, last);
let tampered = b"tampered";
let result = classify_chunk_request(&state, "tok-prev", 100, tampered);
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert!(msg.contains("body differs"), "got: {}", msg);
}
_ => panic!("expected BadRequest about body differs"),
}
}

#[test]
fn classify_rejects_retry_with_different_length() {
// Same prev_token + start, but a shorter body. The recomputed
// rolling hash won't match, so the body-differs path catches this
// case too — we no longer need a length-specific record.
let body = b"original";
let last = last_chunk_for("tok-prev", 100, body);
let response_token = last.response_token.clone();
let state = filestate_with_last_chunk(100 + body.len() as u64, &response_token, last);
let result = classify_chunk_request(&state, "tok-prev", 100, b"short");
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert!(msg.contains("body differs"), "got: {}", msg);
}
_ => panic!("expected BadRequest about body differs"),
}
}

#[test]
fn classify_rejects_offset_mismatch_with_no_replay() {
// No last_chunk recorded → offset mismatch is just the regular 400.
let state = empty_filestate(100, "tok-current");
let result = classify_chunk_request(&state, "tok-current", 50, b"abc");
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert_eq!(msg, "Incorrect Content-Range header");
}
_ => panic!("expected BadRequest about Content-Range"),
}
}

#[test]
fn classify_rejects_token_mismatch_at_correct_offset() {
let state = empty_filestate(100, "tok-current");
let result = classify_chunk_request(&state, "tok-wrong", 100, b"chunk");
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert_eq!(msg, TOKEN_MISMATCH_MSG);
}
_ => panic!("expected BadRequest about token mismatch"),
}
}

#[test]
fn classify_does_not_replay_when_prev_token_does_not_match() {
// Last chunk exists but the retry presents a *different* prev_token.
// Falls through to the regular offset-mismatch rejection.
let body = b"original";
let last = last_chunk_for("tok-prev", 100, body);
let response_token = last.response_token.clone();
let state = filestate_with_last_chunk(100 + body.len() as u64, &response_token, last);
let result = classify_chunk_request(&state, "tok-something-else", 100, body);
match result {
ChunkClassification::Reject(Error::BadRequest(Some(msg))) => {
assert_eq!(msg, "Incorrect Content-Range header");
}
_ => panic!("expected BadRequest about Content-Range"),
}
}

#[test]
fn extract_pg_bearer_accepts_pg_prefixed_token() {
assert_eq!(
Expand Down
Loading