Skip to content

Commit f6d1487

Browse files
committed
feat(upload): abort clamd + S3 streams on client disconnect
Propagate request.signal end-to-end through createClamdStream, createMultipartUpload, handleStreamingUpload and runUploadPipeline. Combine with AbortSignal.timeout(UPLOAD_REQUEST_TIMEOUT_MS = 120s) via AbortSignal.any so a hung scan or S3 socket cannot tie up the handler indefinitely. On abort the socket is destroyed, the multipart upload is aborted and the reader is cancelled. Also normalises the file extension spliced into the S3 key to alnum only (defence-in-depth on top of path.extname).
1 parent 0008e82 commit f6d1487

6 files changed

Lines changed: 144 additions & 8 deletions

File tree

packages/app/src/modules/shared/uploadConfig.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ export const ALLOWED_UPLOAD_MIME_TYPES = [
1717
/** ClamAV scan timeout in milliseconds (30s). */
1818
export const SCAN_TIMEOUT_MS = 30_000;
1919

20+
/**
21+
* Overall upload pipeline timeout (2min). Combined via AbortSignal.any with
22+
* the incoming request signal so a hung clamd or S3 socket aborts the whole
23+
* pipeline instead of tying up the request indefinitely.
24+
*/
25+
export const UPLOAD_REQUEST_TIMEOUT_MS = 120_000;
26+
2027
/** Minimum S3 multipart part size in bytes (5 MB, required by S3 except for the last part). */
2128
export const S3_PART_MIN_SIZE = 5 * 1024 * 1024;
2229

packages/app/src/server/services/__tests__/clamav.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,34 @@ describe("clamav service", () => {
155155
expect(socket.destroy).toHaveBeenCalled();
156156
});
157157

158+
it("destroys the clamd socket immediately when the abort signal fires", async () => {
159+
// Fresh socket for this test.
160+
net.createConnection({ host: "x", port: 0 });
161+
const socket = getSocket();
162+
socket.on.mockImplementation(() => socket);
163+
164+
const { createClamdStream } = await import("../clamav");
165+
const controller = new AbortController();
166+
createClamdStream("localhost", 3310, controller.signal);
167+
168+
socket.destroy.mockClear();
169+
controller.abort();
170+
expect(socket.destroy).toHaveBeenCalled();
171+
});
172+
173+
it("rejects sendChunk if the abort signal is already aborted at construction", async () => {
174+
net.createConnection({ host: "x", port: 0 });
175+
const socket = getSocket();
176+
socket.on.mockImplementation(() => socket);
177+
178+
const { createClamdStream } = await import("../clamav");
179+
const controller = new AbortController();
180+
controller.abort();
181+
const clamd = createClamdStream("localhost", 3310, controller.signal);
182+
183+
await expect(clamd.sendChunk(Buffer.from("x"))).rejects.toThrow(/aborted/i);
184+
});
185+
158186
it("surfaces a connection error thrown by the socket during sendChunk", async () => {
159187
// Trigger `createConnection` once to populate the mock's results
160188
// array, then grab the shared socket instance before `createClamdStream`

packages/app/src/server/services/clamav.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,37 @@ export type ScanResult = { clean: true } | { clean: false; virus: string };
1616
* → [4 bytes big-endian = chunk size][chunk data] (repeated)
1717
* → [4 bytes zero] (end of stream)
1818
* ← "stream: OK\0" or "stream: <virus> FOUND\0"
19+
*
20+
* When an `AbortSignal` is provided, an abort (client disconnect, request
21+
* timeout) immediately destroys the socket. Any pending `sendChunk` or
22+
* `finish` call rejects with the abort reason so the pipeline surfaces a
23+
* clean failure instead of hanging on the clamd read.
1924
*/
20-
export function createClamdStream(host: string, port: number) {
25+
export function createClamdStream(
26+
host: string,
27+
port: number,
28+
signal?: AbortSignal,
29+
) {
2130
const socket = net.createConnection({ host, port });
2231

2332
let socketError: Error | null = null;
2433
socket.on("error", (err) => {
2534
socketError = err;
2635
});
2736

37+
const onAbort = () => {
38+
socketError =
39+
socketError ?? new Error("clamd connection aborted by request signal");
40+
socket.destroy();
41+
};
42+
if (signal) {
43+
if (signal.aborted) {
44+
onAbort();
45+
} else {
46+
signal.addEventListener("abort", onAbort, { once: true });
47+
}
48+
}
49+
2850
// Send the INSTREAM command (null-terminated)
2951
socket.write("zINSTREAM\0");
3052

@@ -97,8 +119,11 @@ export function createClamdStream(host: string, port: number) {
97119
/**
98120
* Convenience wrapper: scan a complete buffer using the INSTREAM protocol.
99121
*/
100-
export async function scanBuffer(buffer: Buffer): Promise<ScanResult> {
101-
const clamd = createClamdStream(env.CLAMAV_HOST, env.CLAMAV_PORT);
122+
export async function scanBuffer(
123+
buffer: Buffer,
124+
signal?: AbortSignal,
125+
): Promise<ScanResult> {
126+
const clamd = createClamdStream(env.CLAMAV_HOST, env.CLAMAV_PORT, signal);
102127

103128
try {
104129
await clamd.sendChunk(buffer);

packages/app/src/server/services/fileUpload.ts

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ type UploadOptions = {
2222
maxSize?: number;
2323
/** MIME types to allow, validated via magic bytes on the first chunk. */
2424
allowedMimeTypes: readonly string[];
25+
/**
26+
* Request / timeout signal. On abort, the clamd socket is destroyed and
27+
* the S3 multipart is aborted, so no orphan parts are left behind.
28+
*/
29+
signal?: AbortSignal;
2530
};
2631

2732
/**
@@ -36,7 +41,8 @@ export type UploadFailureReason =
3641
| "wrong_type"
3742
| "empty"
3843
| "virus"
39-
| "scan_unavailable";
44+
| "scan_unavailable"
45+
| "aborted";
4046

4147
export type UploadResult =
4248
| { ok: true; key: string; fileId: string }
@@ -60,12 +66,16 @@ export async function handleStreamingUpload(
6066
options: UploadOptions,
6167
): Promise<UploadResult> {
6268
const maxSize = options.maxSize ?? MAX_FILE_SIZE;
63-
const ext = path.extname(options.fileName) || ".bin";
69+
const signal = options.signal;
70+
// Normalise ext to alnum only — `path.extname` rules out `/` traversal
71+
// (basename suffix) but exotic bytes would survive and land in the S3 key.
72+
const rawExt = path.extname(options.fileName);
73+
const ext = /^\.[A-Za-z0-9]{1,10}$/.test(rawExt) ? rawExt : ".bin";
6474
const fileId = crypto.randomUUID();
6575
const key = `${options.siren}/${options.year}/${fileId}${ext}`;
6676

67-
const clamd = createClamdStream(env.CLAMAV_HOST, env.CLAMAV_PORT);
68-
const s3Upload = createMultipartUpload(key, options.contentType);
77+
const clamd = createClamdStream(env.CLAMAV_HOST, env.CLAMAV_PORT, signal);
78+
const s3Upload = createMultipartUpload(key, options.contentType, signal);
6979
await s3Upload.init();
7080

7181
let totalBytes = 0;
@@ -74,6 +84,7 @@ export async function handleStreamingUpload(
7484

7585
try {
7686
while (true) {
87+
if (signal?.aborted) throw new UploadAbortedError();
7788
const { done, value } = await reader.read();
7889
if (done) break;
7990

@@ -103,19 +114,29 @@ export async function handleStreamingUpload(
103114
s3Upload.sendChunk(buf),
104115
]);
105116
if (clamdResult.status === "rejected") {
117+
if (signal?.aborted) throw new UploadAbortedError();
106118
throw new ClamdScanError(clamdResult.reason);
107119
}
108120
if (s3Result.status === "rejected") {
121+
if (signal?.aborted) throw new UploadAbortedError();
109122
throw s3Result.reason;
110123
}
111124
}
112125
} catch (err) {
113126
clamd.destroy();
114127
await s3Upload.abort().catch(() => {});
128+
await reader.cancel().catch(() => {});
115129

116130
if (err instanceof FileTooLargeError) {
117131
return { ok: false, reason: "too_large", error: FILE_TOO_LARGE_ERROR };
118132
}
133+
if (err instanceof UploadAbortedError || signal?.aborted) {
134+
return {
135+
ok: false,
136+
reason: "aborted",
137+
error: "L'upload a été interrompu.",
138+
};
139+
}
119140
if (err instanceof ClamdScanError) {
120141
console.error("[fileUpload] clamd sendChunk failed", err.cause);
121142
return {
@@ -144,6 +165,13 @@ export async function handleStreamingUpload(
144165
console.error("[fileUpload] clamd scan failed", err);
145166
clamd.destroy();
146167
await s3Upload.abort().catch(() => {});
168+
if (signal?.aborted) {
169+
return {
170+
ok: false,
171+
reason: "aborted",
172+
error: "L'upload a été interrompu.",
173+
};
174+
}
147175
return {
148176
ok: false,
149177
reason: "scan_unavailable",
@@ -180,6 +208,17 @@ class FileTooLargeError extends Error {
180208
}
181209
}
182210

211+
/**
212+
* Sentinel thrown when the request-scoped AbortSignal fires (client
213+
* disconnect or request-level timeout). Lets the outer catch distinguish an
214+
* interruption from a genuine scan/S3 failure.
215+
*/
216+
class UploadAbortedError extends Error {
217+
constructor() {
218+
super("Upload aborted by request signal");
219+
}
220+
}
221+
183222
/**
184223
* Wraps a failure from `clamd.sendChunk()` so the outer catch can distinguish
185224
* an antivirus-availability issue from a generic I/O failure and surface it

packages/app/src/server/services/s3.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,17 @@ import { S3_PART_MIN_SIZE } from "~/modules/shared/uploadConfig";
117117
/**
118118
* Creates an incremental multipart upload to S3.
119119
* Chunks are buffered until they reach S3_PART_MIN_SIZE, then flushed as a part.
120+
*
121+
* The optional `signal` is forwarded to every `s3Client.send()` call so an
122+
* aborted request (client disconnect, request-level timeout) cancels the
123+
* in-flight HTTP request at the AWS SDK layer. Calling `abort()` is still
124+
* required on the caller side to release the multipart upload on S3.
120125
*/
121-
export function createMultipartUpload(key: string, contentType: string) {
126+
export function createMultipartUpload(
127+
key: string,
128+
contentType: string,
129+
signal?: AbortSignal,
130+
) {
122131
let uploadId: string;
123132
let partNumber = 1;
124133
const parts: { ETag: string; PartNumber: number }[] = [];
@@ -132,6 +141,7 @@ export function createMultipartUpload(key: string, contentType: string) {
132141
Key: key,
133142
ContentType: contentType,
134143
}),
144+
{ abortSignal: signal },
135145
);
136146
if (!res.UploadId) {
137147
throw new Error("S3 CreateMultipartUpload returned no UploadId");
@@ -157,6 +167,7 @@ export function createMultipartUpload(key: string, contentType: string) {
157167
PartNumber: partNumber,
158168
Body: buffer,
159169
}),
170+
{ abortSignal: signal },
160171
);
161172
if (!res.ETag) {
162173
throw new Error("S3 UploadPart returned no ETag");
@@ -175,10 +186,15 @@ export function createMultipartUpload(key: string, contentType: string) {
175186
UploadId: uploadId,
176187
MultipartUpload: { Parts: parts },
177188
}),
189+
{ abortSignal: signal },
178190
);
179191
},
180192

181193
async abort() {
194+
// Intentionally not forwarding `signal` here: abort() runs in the
195+
// catch/cleanup path where the outer signal is usually already
196+
// aborted, which would itself short-circuit the AbortMultipartUpload
197+
// call and leave the upload alive on S3.
182198
return s3Client.send(
183199
new AbortMultipartUploadCommand({
184200
Bucket: env.S3_BUCKET_NAME,

packages/app/src/server/services/uploadPipeline.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { MAX_CSE_FILES } from "~/modules/cseOpinion/types";
66
import {
77
ALLOWED_UPLOAD_MIME_TYPES,
88
type FlowType,
9+
UPLOAD_REQUEST_TIMEOUT_MS,
910
} from "~/modules/shared/uploadConfig";
1011
import { db } from "~/server/db";
1112
import { declarations, files } from "~/server/db/schema";
@@ -50,6 +51,12 @@ export type UploadPipelineInput = {
5051
contentType: string;
5152
stream: ReadableStream<Uint8Array>;
5253
flowType: FlowType;
54+
/**
55+
* Abort signal from the HTTP request. Combined with a pipeline-level
56+
* timeout and forwarded to clamd + S3 so a client disconnect or hung
57+
* socket aborts the whole stream instead of tying up the request.
58+
*/
59+
signal?: AbortSignal;
5360
};
5461

5562
/**
@@ -101,12 +108,15 @@ export async function runUploadPipeline(
101108
}
102109
}
103110

111+
const pipelineSignal = combineSignals(input.signal);
112+
104113
const streamResult = await handleStreamingUpload(input.stream, {
105114
siren: input.siren,
106115
year: input.year,
107116
fileName: input.fileName,
108117
contentType: input.contentType,
109118
allowedMimeTypes: ALLOWED_UPLOAD_MIME_TYPES,
119+
signal: pipelineSignal,
110120
});
111121

112122
if (!streamResult.ok) {
@@ -284,3 +294,14 @@ export class MaxFilesReachedError extends Error {
284294
super("Max files reached (race condition)");
285295
}
286296
}
297+
298+
/**
299+
* Combine the incoming request signal with a pipeline-level timeout. Either
300+
* one firing aborts the clamd + S3 streams. Requires Node 20+ (native
301+
* AbortSignal.any + AbortSignal.timeout).
302+
*/
303+
function combineSignals(requestSignal: AbortSignal | undefined): AbortSignal {
304+
const timeoutSignal = AbortSignal.timeout(UPLOAD_REQUEST_TIMEOUT_MS);
305+
if (!requestSignal) return timeoutSignal;
306+
return AbortSignal.any([requestSignal, timeoutSignal]);
307+
}

0 commit comments

Comments
 (0)