@@ -101,23 +101,52 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke
101101 fileSize := fi .Size ()
102102
103103 url := fmt .Sprintf ("http://%s/v1/files/%s" , addr , key )
104+
105+ // Compute the SHA-256 of the local file once and bind it to every PUT
106+ // attempt — the server uses it to detect mid-flight content drift and
107+ // reject (409) if a partial upload claims a new identity, forcing a clean
108+ // restart.
109+ localHash , err := downloader .CalculateSHA (localPath )
110+ if err != nil {
111+ // Hash failure isn't fatal — we can still upload; we just lose
112+ // resume-safety and end-of-transfer integrity checks.
113+ xlog .Warn ("Failed to hash local file for upload integrity check" , "localPath" , localPath , "error" , err )
114+ localHash = ""
115+ }
116+
104117 xlog .Info ("Uploading file to remote node" , "node" , nodeID , "file" , filepath .Base (localPath ), "size" , humanFileSize (fileSize ), "url" , url )
105118
119+ // Outer time budget: bound the total resumable-upload duration so a
120+ // permanently-unreachable worker doesn't hold the request forever. Default
121+ // matches the existing per-response timeout.
122+ outerBudget := h .resumeBudget ()
123+
124+ resumeCtx , cancel := context .WithTimeout (ctx , outerBudget )
125+ defer cancel ()
126+
106127 var lastErr error
107- attempts := h .maxRetries + 1 // maxRetries=3 means 4 total attempts (1 initial + 3 retries)
108- for attempt := 1 ; attempt <= attempts ; attempt ++ {
128+ attempt := 0
129+ for {
130+ attempt ++
109131 if attempt > 1 {
110- backoff := time . Duration ( 5 << ( attempt - 2 )) * time . Second // 5s, 10s, 20s
132+ backoff := nextBackoff ( attempt )
111133 xlog .Warn ("Retrying file upload" , "node" , nodeID , "file" , filepath .Base (localPath ),
112- "attempt" , attempt , "of" , attempts , " backoff" , backoff , "lastError" , lastErr )
134+ "attempt" , attempt , "backoff" , backoff , "lastError" , lastErr )
113135 select {
114- case <- ctx .Done ():
115- return "" , fmt .Errorf ("upload cancelled during retry backoff: %w" , ctx .Err ())
136+ case <- resumeCtx .Done ():
137+ return "" , fmt .Errorf ("upload cancelled during retry backoff (after %d attempts) : %w (last: %v) " , attempt - 1 , resumeCtx .Err (), lastErr )
116138 case <- time .After (backoff ):
117139 }
118140 }
119141
120- result , err := h .doUpload (ctx , addr , nodeID , localPath , key , url , fileSize )
142+ // Determine resume offset from the server before each attempt. A
143+ // HEAD response that reports an in-progress upload (X-Target-SHA256)
144+ // matching ours unlocks resume from the reported size; any other
145+ // outcome (missing file, hash mismatch, partial-of-different-file)
146+ // resets to 0 and uploads the entire file.
147+ startOffset := h .resumeOffset (resumeCtx , addr , key , localHash , fileSize )
148+
149+ result , err := h .doUpload (ctx , resumeCtx , addr , nodeID , localPath , key , url , fileSize , startOffset , localHash )
121150 if err == nil {
122151 if attempt > 1 {
123152 xlog .Info ("File upload succeeded after retry" , "node" , nodeID , "file" , filepath .Base (localPath ), "attempt" , attempt )
@@ -126,50 +155,190 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke
126155 }
127156 lastErr = err
128157
158+ // Non-transient failures (4xx other than 416, hard auth, etc.) abort
159+ // immediately — retrying won't help.
129160 if ! isTransientError (err ) {
130161 xlog .Error ("File upload failed with non-transient error" , "node" , nodeID , "file" , filepath .Base (localPath ), "error" , err )
131162 return "" , err
132163 }
164+
165+ // Caller-cancelled (not deadline) — give up.
166+ if errors .Is (ctx .Err (), context .Canceled ) {
167+ return "" , fmt .Errorf ("upload cancelled by caller after %d attempts: %w" , attempt , lastErr )
168+ }
169+
170+ // Outer budget exhausted.
171+ if errors .Is (resumeCtx .Err (), context .DeadlineExceeded ) {
172+ return "" , fmt .Errorf ("uploading %s to node %s failed after %d attempts within %s budget: %w" ,
173+ localPath , nodeID , attempt , outerBudget , lastErr )
174+ }
175+
133176 xlog .Warn ("File upload failed with transient error" , "node" , nodeID , "file" , filepath .Base (localPath ),
134- "attempt" , attempt , "of" , attempts , " error" , err )
177+ "attempt" , attempt , "error" , err )
135178 }
179+ }
180+
181+ // resumeBudget returns the maximum total time the resumable upload loop will
182+ // spend retrying transient failures end-to-end. Past this budget the upload
183+ // fails rather than spinning forever — 1h covers multi-GB transfers on
184+ // pathological links without letting a wedged server jam the master.
185+ func (h * HTTPFileStager ) resumeBudget () time.Duration {
186+ return 1 * time .Hour
187+ }
136188
137- return "" , fmt .Errorf ("uploading %s to node %s failed after %d attempts: %w" , localPath , nodeID , attempts , lastErr )
189+ // nextBackoff returns the sleep before retry #attempt: 1s, 2s, 4s, ..., capped
190+ // at 30s, with the first sleep (attempt=2) being 1s.
191+ func nextBackoff (attempt int ) time.Duration {
192+ if attempt < 2 {
193+ return 0
194+ }
195+ const (
196+ base = 1 * time .Second
197+ ceiling = 30 * time .Second
198+ )
199+ shift := uint (attempt - 2 )
200+ if shift > 30 {
201+ shift = 30 // saturate before time.Duration overflows
202+ }
203+ b := base << shift
204+ if b > ceiling || b < 0 {
205+ b = ceiling
206+ }
207+ return b
138208}
139209
140- // doUpload performs a single upload attempt.
141- func (h * HTTPFileStager ) doUpload (ctx context.Context , addr , nodeID , localPath , key , url string , fileSize int64 ) (string , error ) {
210+ // resumeOffset asks the server (via HEAD) how many bytes of the current upload
211+ // are already on disk. It returns 0 if the server has no usable partial state
212+ // (no file, finished file with a different hash, or a partial under a
213+ // different target hash). It returns the server-reported size when the
214+ // server's X-Target-SHA256 matches our expected final hash AND the size is
215+ // strictly less than the local file size.
216+ func (h * HTTPFileStager ) resumeOffset (ctx context.Context , addr , key , localHash string , fileSize int64 ) int64 {
217+ if localHash == "" || fileSize <= 0 {
218+ return 0
219+ }
220+ url := fmt .Sprintf ("http://%s/v1/files/%s" , addr , key )
221+ req , err := http .NewRequestWithContext (ctx , http .MethodHead , url , nil )
222+ if err != nil {
223+ return 0
224+ }
225+ if h .token != "" {
226+ req .Header .Set ("Authorization" , "Bearer " + h .token )
227+ }
228+ resp , err := h .client .Do (req )
229+ if err != nil {
230+ return 0
231+ }
232+ defer func () { _ = resp .Body .Close () }()
233+ if resp .StatusCode != http .StatusOK {
234+ return 0
235+ }
236+
237+ sizeStr := resp .Header .Get (HeaderFileSize )
238+ if sizeStr == "" {
239+ return 0
240+ }
241+ size , err := strconv .ParseInt (sizeStr , 10 , 64 )
242+ if err != nil || size <= 0 || size >= fileSize {
243+ return 0
244+ }
245+
246+ target := resp .Header .Get (HeaderTargetSHA256 )
247+ if target == "" || ! strings .EqualFold (target , localHash ) {
248+ // No partial-upload metadata, or it's for a different target.
249+ return 0
250+ }
251+
252+ xlog .Info ("Resuming upload from server-reported offset" , "key" , key , "offset" , size , "total" , fileSize )
253+ return size
254+ }
255+
256+ // doUpload performs a single upload attempt. When startOffset > 0 the request
257+ // is sent as a resumable PUT with a Content-Range header, transferring only
258+ // the bytes from startOffset to fileSize-1. The outerCtx is the long-lived
259+ // resume budget; reqCtx is what's bound to the request (currently the same as
260+ // the parent ctx, since http.Client doesn't expose a per-request timeout).
261+ func (h * HTTPFileStager ) doUpload (ctx , outerCtx context.Context , addr , nodeID , localPath , key , url string , fileSize , startOffset int64 , expectedHash string ) (string , error ) {
262+ if startOffset < 0 || startOffset > fileSize {
263+ startOffset = 0
264+ }
265+
142266 f , err := os .Open (localPath )
143267 if err != nil {
144268 return "" , fmt .Errorf ("opening local file %s: %w" , localPath , err )
145269 }
146270 defer f .Close ()
147271
272+ if startOffset > 0 {
273+ if _ , err := f .Seek (startOffset , io .SeekStart ); err != nil {
274+ return "" , fmt .Errorf ("seeking to offset %d in %s: %w" , startOffset , localPath , err )
275+ }
276+ }
277+
278+ chunkLen := fileSize - startOffset
279+
148280 var body io.Reader = f
149281 cb := StagingProgressFromContext (ctx )
150- // For files > 100MB or when a progress callback is set, wrap with progress reporting
282+ // For files > 100MB or when a progress callback is set, wrap with progress reporting.
283+ // We report against the FULL fileSize (not the chunkLen) so a resumed upload's
284+ // progress bar starts from the actual completed fraction rather than at 0%.
151285 const progressThreshold = 100 << 20
152286 if fileSize > progressThreshold || cb != nil {
153- body = newProgressReader (f , fileSize , filepath .Base (localPath ), nodeID , cb )
287+ pr := newProgressReader (f , fileSize , filepath .Base (localPath ), nodeID , cb )
288+ pr .read = startOffset // seed prior progress
289+ body = pr
154290 }
155291
156- req , err := http .NewRequestWithContext (ctx , http .MethodPut , url , body )
292+ // The body length we actually send.
293+ limitedBody := io .LimitReader (body , chunkLen )
294+
295+ req , err := http .NewRequestWithContext (outerCtx , http .MethodPut , url , limitedBody )
157296 if err != nil {
158297 return "" , fmt .Errorf ("creating request: %w" , err )
159298 }
160- req .ContentLength = fileSize // explicit Content-Length for progress tracking
299+ req .ContentLength = chunkLen
161300 req .Header .Set ("Content-Type" , "application/octet-stream" )
162301 if h .token != "" {
163302 req .Header .Set ("Authorization" , "Bearer " + h .token )
164303 }
304+ if expectedHash != "" {
305+ // Lets the server detect cross-attempt content drift and reject
306+ // resume with 409 if the local file changed identity.
307+ req .Header .Set (HeaderContentSHA256 , expectedHash )
308+ }
309+ if startOffset > 0 || (expectedHash != "" && fileSize > 0 ) {
310+ // Send Content-Range even on the first chunk (0-...) when we have an
311+ // expected hash, so the server's range-aware branch records the
312+ // target-hash sidecar for future resume attempts.
313+ req .Header .Set ("Content-Range" , fmt .Sprintf ("bytes %d-%d/%d" , startOffset , fileSize - 1 , fileSize ))
314+ }
165315
166316 resp , err := h .client .Do (req )
167317 if err != nil {
168- xlog .Error ("File upload failed" , "node" , nodeID , "file" , filepath .Base (localPath ), "size" , humanFileSize (fileSize ), "error" , err )
318+ xlog .Error ("File upload failed" , "node" , nodeID , "file" , filepath .Base (localPath ),
319+ "size" , humanFileSize (fileSize ), "offset" , startOffset , "error" , err )
169320 return "" , fmt .Errorf ("uploading %s to node %s: %w" , localPath , nodeID , err )
170321 }
171322 defer resp .Body .Close ()
172323
324+ // 308 Permanent Redirect ("Resume Incomplete") means the chunk landed but
325+ // the upload as a whole hasn't completed. From our perspective the
326+ // connection survived and the server has more bytes than before — but
327+ // since we always send the whole remainder, hitting 308 means the server
328+ // truncated us. Treat as transient so the retry loop re-HEADs and tries
329+ // again from the new offset.
330+ if resp .StatusCode == http .StatusPermanentRedirect {
331+ body , _ := io .ReadAll (resp .Body )
332+ return "" , & transientStatusError {status : resp .StatusCode , msg : fmt .Sprintf ("server reports resume-incomplete: %s" , string (body ))}
333+ }
334+
335+ // 416 Range Not Satisfiable: client/server disagree on offset. Treat as
336+ // transient — the next iteration re-HEADs to learn the correct offset.
337+ if resp .StatusCode == http .StatusRequestedRangeNotSatisfiable {
338+ body , _ := io .ReadAll (resp .Body )
339+ return "" , & transientStatusError {status : resp .StatusCode , msg : fmt .Sprintf ("range not satisfiable: %s" , string (body ))}
340+ }
341+
173342 if resp .StatusCode != http .StatusOK {
174343 respBody , _ := io .ReadAll (resp .Body )
175344 xlog .Error ("File upload rejected by remote node" , "node" , nodeID , "file" , filepath .Base (localPath ), "status" , resp .StatusCode , "response" , string (respBody ))
@@ -187,11 +356,30 @@ func (h *HTTPFileStager) doUpload(ctx context.Context, addr, nodeID, localPath,
187356 return result .LocalPath , nil
188357}
189358
359+ // transientStatusError wraps an HTTP status that should be treated as
360+ // transient by the upload retry loop.
361+ type transientStatusError struct {
362+ status int
363+ msg string
364+ }
365+
366+ func (e * transientStatusError ) Error () string {
367+ return fmt .Sprintf ("HTTP %d: %s" , e .status , e .msg )
368+ }
369+
370+ func (e * transientStatusError ) Transient () bool { return true }
371+
190372// isTransientError returns true if the error is likely transient and worth retrying.
191373func isTransientError (err error ) bool {
192374 if err == nil {
193375 return false
194376 }
377+ // Errors that explicitly opt into transient semantics (e.g. 308/416 from
378+ // the resumable-upload protocol).
379+ var transient interface { Transient () bool }
380+ if errors .As (err , & transient ) && transient .Transient () {
381+ return true
382+ }
195383 // Connection reset by peer
196384 if errors .Is (err , syscall .ECONNRESET ) {
197385 return true
0 commit comments