diff --git a/functions/webdav/post.ts b/functions/webdav/post.ts index b64c76b..c1b6fe3 100644 --- a/functions/webdav/post.ts +++ b/functions/webdav/post.ts @@ -15,7 +15,9 @@ export async function handleRequestPostCreateMultipart({ }); const { key, uploadId } = multipartUpload; - return new Response(JSON.stringify({ key, uploadId })); + return new Response(JSON.stringify({ key, uploadId }), { + headers: { "Content-Type": "application/json" } + }); } export async function handleRequestPostCompleteMultipart({ @@ -26,6 +28,7 @@ export async function handleRequestPostCompleteMultipart({ const url = new URL(request.url); const uploadId = new URLSearchParams(url.search).get("uploadId"); if (!uploadId) return notFound(); + const multipartUpload = bucket.resumeMultipartUpload(path, uploadId); const completeBody: { parts: Array } = await request.json(); @@ -33,13 +36,88 @@ export async function handleRequestPostCompleteMultipart({ try { const object = await multipartUpload.complete(completeBody.parts); return new Response(null, { - headers: { etag: object.httpEtag }, + headers: { + etag: object.httpEtag, + "Content-Type": "application/json" + }, }); } catch (error: any) { return new Response(error.message, { status: 400 }); } } +// 新增:处理流式分片上传 +export async function handleRequestPostStreamChunk({ + bucket, + path, + request, +}: RequestHandlerParams) { + const url = new URL(request.url); + const uploadId = url.searchParams.get("uploadId"); + const partNumber = url.searchParams.get("partNumber"); + + if (!uploadId || !partNumber) { + return new Response("Bad Request", { status: 400 }); + } + + const transferEncoding = request.headers.get("transfer-encoding"); + + if (transferEncoding === "chunked") { + // 处理流式分片数据 + const reader = request.body?.getReader(); + if (!reader) { + return new Response("Bad Request", { status: 400 }); + } + + const chunks: Uint8Array[] = []; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + + const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0); + const combined = new Uint8Array(totalLength); + let offset = 0; + + for (const chunk of chunks) { + combined.set(chunk, offset); + offset += chunk.length; + } + + const multipartUpload = bucket.resumeMultipartUpload(path, uploadId); + const uploadedPart = await multipartUpload.uploadPart( + parseInt(partNumber), + combined + ); + + return new Response(null, { + headers: { + "Content-Type": "application/json", + etag: uploadedPart.etag + }, + }); + + } catch (error) { + console.error("Stream chunk upload error:", error); + return new Response("Internal Server Error", { status: 500 }); + } + } + + // 回退到常规分片上传 + const multipartUpload = bucket.resumeMultipartUpload(path, uploadId); + const uploadedPart = await multipartUpload.uploadPart( + parseInt(partNumber), + request.body + ); + + return new Response(null, { + headers: { "Content-Type": "application/json", etag: uploadedPart.etag }, + }); +} + export const handleRequestPost = async function ({ bucket, path, @@ -53,8 +131,11 @@ export const handleRequestPost = async function ({ } if (searchParams.has("uploadId")) { + if (searchParams.has("partNumber")) { + return handleRequestPostStreamChunk({ bucket, path, request }); + } return handleRequestPostCompleteMultipart({ bucket, path, request }); } return new Response("Method not allowed", { status: 405 }); -}; +}; \ No newline at end of file diff --git a/functions/webdav/put.ts b/functions/webdav/put.ts index e76fa11..46a8244 100644 --- a/functions/webdav/put.ts +++ b/functions/webdav/put.ts @@ -24,6 +24,65 @@ async function handleRequestPutMultipart({ }); } + +async function handleStreamedUpload({ + bucket, + path, + request, +}: RequestHandlerParams) { + const contentLength = request.headers.get("content-length"); + const transferEncoding = request.headers.get("transfer-encoding"); + + if (transferEncoding === "chunked" || !contentLength) { + // 处理 chunked transfer encoding + const reader = request.body?.getReader(); + if (!reader) { + return new Response("Bad Request", { status: 400 }); + } + + // 创建可写流来处理分块数据 + const chunks: Uint8Array[] = []; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + + // 合并所有分块 + const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0); + const combined = new Uint8Array(totalLength); + let offset = 0; + + for (const chunk of chunks) { + combined.set(chunk, offset); + offset += chunk.length; + } + + // 上传到 R2 + const thumbnail = request.headers.get("fd-thumbnail"); + const customMetadata = thumbnail ? { thumbnail } : undefined; + + const result = await bucket.put(path, combined, { + onlyIf: request.headers, + httpMetadata: request.headers, + customMetadata, + }); + + if (!result) return new Response("Preconditions failed", { status: 412 }); + return new Response("", { status: 201 }); + + } catch (error) { + console.error("Stream upload error:", error); + return new Response("Internal Server Error", { status: 500 }); + } + } + + // 回退到常规上传 + return handleRegularUpload({ bucket, path, request }); +} + export async function handleRequestPut({ bucket, path, @@ -46,6 +105,20 @@ export async function handleRequestPut({ if (parentDir === null) return new Response("Conflict", { status: 409 }); } + // 检查是否为流式上传 + const transferEncoding = request.headers.get("transfer-encoding"); + if (transferEncoding === "chunked") { + return handleStreamedUpload({ bucket, path, request }); + } + + return handleRegularUpload({ bucket, path, request }); +} + +async function handleRegularUpload({ + bucket, + path, + request, +}: RequestHandlerParams) { const thumbnail = request.headers.get("fd-thumbnail"); const customMetadata = thumbnail ? { thumbnail } : undefined; @@ -56,6 +129,5 @@ export async function handleRequestPut({ }); if (!result) return new Response("Preconditions failed", { status: 412 }); - return new Response("", { status: 201 }); -} +} \ No newline at end of file diff --git a/src/app/transfer.ts b/src/app/transfer.ts index 69e52fb..4f4c7f9 100644 --- a/src/app/transfer.ts +++ b/src/app/transfer.ts @@ -111,114 +111,6 @@ export async function blobDigest(blob: Blob) { export const SIZE_LIMIT = 100 * 1000 * 1000; // 100MB -function xhrFetch( - url: RequestInfo | URL, - requestInit: RequestInit & { - onUploadProgress?: (progressEvent: ProgressEvent) => void; - } -) { - return new Promise((resolve, reject) => { - const xhr = new XMLHttpRequest(); - xhr.upload.onprogress = requestInit.onUploadProgress ?? null; - xhr.open( - requestInit.method ?? "GET", - url instanceof Request ? url.url : url - ); - const headers = new Headers(requestInit.headers); - headers.forEach((value, key) => xhr.setRequestHeader(key, value)); - xhr.onload = () => { - const headers = xhr - .getAllResponseHeaders() - .trim() - .split("\r\n") - .reduce((acc, header) => { - const [key, value] = header.split(": "); - acc[key] = value; - return acc; - }, {} as Record); - resolve(new Response(xhr.responseText, { status: xhr.status, headers })); - }; - xhr.onerror = reject; - if ( - requestInit.body instanceof Blob || - typeof requestInit.body === "string" - ) { - xhr.send(requestInit.body); - } - }); -} - -export async function multipartUpload( - key: string, - file: File, - options?: { - headers?: Record; - onUploadProgress?: (progressEvent: { - loaded: number; - total: number; - }) => void; - } -) { - const headers = options?.headers || {}; - headers["content-type"] = file.type; - - const uploadResponse = await fetch(`/webdav/${encodeKey(key)}?uploads`, { - headers, - method: "POST", - }); - const { uploadId } = await uploadResponse.json<{ uploadId: string }>(); - const totalChunks = Math.ceil(file.size / SIZE_LIMIT); - - const limit = pLimit(2); - const parts = Array.from({ length: totalChunks }, (_, i) => i + 1); - const partsLoaded = Array.from({ length: totalChunks + 1 }, () => 0); - const promises = parts.map((i) => - limit(async () => { - const chunk = file.slice((i - 1) * SIZE_LIMIT, i * SIZE_LIMIT); - const searchParams = new URLSearchParams({ - partNumber: i.toString(), - uploadId, - }); - const uploadUrl = `/webdav/${encodeKey(key)}?${searchParams}`; - if (i === limit.concurrency) - await new Promise((resolve) => setTimeout(resolve, 1000)); - - const uploadPart = () => - xhrFetch(uploadUrl, { - method: "PUT", - headers, - body: chunk, - onUploadProgress: (progressEvent) => { - partsLoaded[i] = progressEvent.loaded; - options?.onUploadProgress?.({ - loaded: partsLoaded.reduce((a, b) => a + b), - total: file.size, - }); - }, - }); - - const retryReducer = (acc: Promise) => - acc - .then((res) => { - const retryAfter = res.headers.get("retry-after"); - if (!retryAfter) return res; - return uploadPart(); - }) - .catch(uploadPart); - const response = await [1, 2].reduce(retryReducer, uploadPart()); - return { partNumber: i, etag: response.headers.get("etag")! }; - }) - ); - const uploadedParts = await Promise.all(promises); - const completeParams = new URLSearchParams({ uploadId }); - const response = await fetch(`/webdav/${encodeKey(key)}?${completeParams}`, { - method: "POST", - body: JSON.stringify({ parts: uploadedParts }), - }); - if (!response.ok) throw new Error(await response.text()); - return response; -} - export async function copyPaste(source: string, target: string, move = false) { const uploadUrl = `${WEBDAV_ENDPOINT}${encodeKey(source)}`; const destinationUrl = new URL( @@ -247,6 +139,119 @@ export async function createFolder(cwd: string) { } } +// 添加流式上传函数 +export async function streamUpload( + key: string, + file: File, + options?: { + headers?: Record; + onUploadProgress?: (progressEvent: { + loaded: number; + total: number; + }) => void; + chunkSize?: number; + } +) { + const chunkSize = options?.chunkSize || 64 * 1024; // 64KB chunks + const headers = options?.headers || {}; + headers["content-type"] = file.type; + headers["transfer-encoding"] = "chunked"; + + // 创建可读流 + const stream = new ReadableStream({ + start(controller) { + let offset = 0; + + const pushChunk = async () => { + if (offset >= file.size) { + controller.close(); + return; + } + + const chunk = file.slice(offset, offset + chunkSize); + const arrayBuffer = await chunk.arrayBuffer(); + controller.enqueue(new Uint8Array(arrayBuffer)); + + offset += chunkSize; + options?.onUploadProgress?.({ + loaded: Math.min(offset, file.size), + total: file.size + }); + + // 继续下一个chunk + setTimeout(pushChunk, 0); + }; + + pushChunk(); + } + }); + + const uploadUrl = `${WEBDAV_ENDPOINT}${encodeKey(key)}`; + return await fetch(uploadUrl, { + method: "PUT", + headers, + body: stream, + // @ts-ignore - 启用流式传输 + duplex: 'half' + }); +} + +// 流式上传单个分片 +async function streamChunk( + url: string, + file: File | Blob, + options: { + headers?: Record; + onProgress?: (event: { loaded: number; total: number }) => void; + } +) { + const chunkSize = 64 * 1024; // 64KB + let loaded = 0; + + const stream = new ReadableStream({ + start(controller) { + let offset = 0; + + const reader = new FileReader(); + + const readNextChunk = () => { + if (offset >= file.size) { + controller.close(); + return; + } + + const chunk = file.slice(offset, offset + chunkSize); + reader.onload = (e) => { + const arrayBuffer = e.target?.result as ArrayBuffer; + controller.enqueue(new Uint8Array(arrayBuffer)); + + loaded += arrayBuffer.byteLength; + options.onProgress?.({ loaded, total: file.size }); + + offset += chunkSize; + setTimeout(readNextChunk, 0); + }; + + reader.readAsArrayBuffer(chunk); + }; + + readNextChunk(); + } + }); + + return await fetch(url, { + method: "PUT", + headers: { + ...options.headers, + "transfer-encoding": "chunked" + }, + body: stream, + // @ts-ignore + duplex: 'half' + }); +} + +// 修改 processTransferTask 函数 export async function processTransferTask({ task, onTaskProgress, @@ -284,18 +289,85 @@ export async function processTransferTask({ const headers: { "fd-thumbnail"?: string } = {}; if (thumbnailDigest) headers["fd-thumbnail"] = thumbnailDigest; + if (file.size >= SIZE_LIMIT) { return await multipartUpload(remoteKey, file, { headers, onUploadProgress: onTaskProgress, }); } else { - const uploadUrl = `${WEBDAV_ENDPOINT}${encodeKey(remoteKey)}`; - return await xhrFetch(uploadUrl, { - method: "PUT", + // 对于小文件,也使用流式上传替代原来的 xhrFetch + return await streamUpload(remoteKey, file, { headers, - body: file, onUploadProgress: onTaskProgress, }); } } + +// 修改 multipartUpload 函数使用流式传输 +export async function multipartUpload( + key: string, + file: File, + options?: { + headers?: Record; + onUploadProgress?: (progressEvent: { + loaded: number; + total: number; + }) => void; + } +) { + const headers = options?.headers || {}; + headers["content-type"] = file.type; + + // 对于大文件,使用真正的分片上传 + if (file.size > SIZE_LIMIT) { + const uploadResponse = await fetch(`/webdav/${encodeKey(key)}?uploads`, { + headers, + method: "POST", + }); + const { uploadId } = await uploadResponse.json<{ uploadId: string }>(); + const totalChunks = Math.ceil(file.size / SIZE_LIMIT); + + const limit = pLimit(2); + const parts = Array.from({ length: totalChunks }, (_, i) => i + 1); + const partsLoaded = Array.from({ length: totalChunks + 1 }, () => 0); + + const promises = parts.map((i) => + limit(async () => { + const chunk = file.slice((i - 1) * SIZE_LIMIT, i * SIZE_LIMIT); + const searchParams = new URLSearchParams({ + partNumber: i.toString(), + uploadId, + }); + const uploadUrl = `/webdav/${encodeKey(key)}?${searchParams}`; + + // 对每个分片使用流式上传 + const response = await streamChunk(uploadUrl, chunk, { + headers, + onProgress: (progressEvent) => { + partsLoaded[i] = progressEvent.loaded; + options?.onUploadProgress?.({ + loaded: partsLoaded.reduce((a, b) => a + b), + total: file.size, + }); + } + }); + + return { partNumber: i, etag: response.headers.get("etag")! }; + }) + ); + + const uploadedParts = await Promise.all(promises); + const completeParams = new URLSearchParams({ uploadId }); + const response = await fetch(`/webdav/${encodeKey(key)}?${completeParams}`, { + method: "POST", + body: JSON.stringify({ parts: uploadedParts }), + }); + + if (!response.ok) throw new Error(await response.text()); + return response; + } else { + // 小文件直接使用流式上传 + return await streamUpload(key, file, options); + } +} \ No newline at end of file