@@ -12,6 +12,7 @@ import (
1212 "fmt"
1313 "io"
1414 "log"
15+ "net"
1516 "net/http"
1617 "net/url"
1718 "os"
@@ -37,6 +38,17 @@ const (
3738 publishSuppressAfterApply = 2 * time .Second
3839)
3940
41+ var (
42+ initRetryWindow = 2 * time .Minute
43+ initRetryBackoff = 2 * time .Second
44+ initLatestRequestTTL = 15 * time .Second
45+ initApplyRequestTTL = 60 * time .Second
46+ sharedMountDialTimeout = 5 * time .Second
47+ sharedMountKeepAlive = 30 * time .Second
48+ sharedMountHeaderTTL = 30 * time .Second
49+ sharedMountIdleConnTTL = 90 * time .Second
50+ )
51+
4052type sharedMountClient struct {
4153 baseURL string
4254 token string
@@ -71,7 +83,19 @@ func main() {
7183 token : token ,
7284 // Long-polling calls can legitimately hold the connection open.
7385 // Prefer per-request timeouts (via context) over a tight global client timeout.
74- client : & http.Client {Timeout : 5 * time .Minute },
86+ client : & http.Client {
87+ Timeout : 5 * time .Minute ,
88+ Transport : & http.Transport {
89+ Proxy : http .ProxyFromEnvironment ,
90+ DialContext : (& net.Dialer {Timeout : sharedMountDialTimeout , KeepAlive : sharedMountKeepAlive }).DialContext ,
91+ ForceAttemptHTTP2 : true ,
92+ MaxIdleConns : 100 ,
93+ IdleConnTimeout : sharedMountIdleConnTTL ,
94+ TLSHandshakeTimeout : sharedMountDialTimeout ,
95+ ExpectContinueTimeout : 1 * time .Second ,
96+ ResponseHeaderTimeout : sharedMountHeaderTTL ,
97+ },
98+ },
7599 }
76100
77101 state := make ([]* sharedMountState , 0 , len (mounts ))
@@ -134,23 +158,99 @@ func runInit(ctx context.Context, logger *log.Logger, client *sharedMountClient,
134158 if err := ensureMountPath (state .spec .MountPath ); err != nil {
135159 return err
136160 }
137- manifest , found , err := client .latest (ctx , ownerID , state .spec .Name )
138- if err != nil {
161+ if err := runInitMount (ctx , logger , client , ownerID , state ); err != nil {
139162 return err
140163 }
141- if ! found {
142- continue
164+ }
165+ logger .Print ("init complete" )
166+ return nil
167+ }
168+
169+ func runInitMount (ctx context.Context , logger * log.Logger , client * sharedMountClient , ownerID string , state * sharedMountState ) error {
170+ deadline := time .Now ().Add (initRetryWindow )
171+ attempt := 0
172+ for {
173+ attempt ++
174+ err := runInitMountAttempt (ctx , client , ownerID , state )
175+ if err == nil {
176+ return nil
143177 }
144- if err := applyRevision ( ctx , client , ownerID , state . spec , manifest . Revision ); err != nil {
178+ if ! isRetryableInitError ( err ) || time . Now (). After ( deadline ) {
145179 return err
146180 }
147- state .currentRevision = manifest .Revision
148- state .currentChecksum = manifest .Checksum
181+ logger .Printf ("init retry for %s attempt=%d after error: %v" , state .spec .Name , attempt , err )
182+ select {
183+ case <- ctx .Done ():
184+ return ctx .Err ()
185+ case <- time .After (initRetryBackoff ):
186+ }
149187 }
150- logger .Print ("init complete" )
188+ }
189+
190+ func runInitMountAttempt (ctx context.Context , client * sharedMountClient , ownerID string , state * sharedMountState ) error {
191+ latestCtx , cancelLatest := context .WithTimeout (ctx , initLatestRequestTTL )
192+ defer cancelLatest ()
193+
194+ manifest , found , err := client .latest (latestCtx , ownerID , state .spec .Name )
195+ if err != nil {
196+ return err
197+ }
198+ if ! found {
199+ return nil
200+ }
201+
202+ applyCtx , cancelApply := context .WithTimeout (ctx , initApplyRequestTTL )
203+ defer cancelApply ()
204+
205+ if err := applyRevision (applyCtx , client , ownerID , state .spec , manifest .Revision ); err != nil {
206+ return err
207+ }
208+ state .currentRevision = manifest .Revision
209+ state .currentChecksum = manifest .Checksum
151210 return nil
152211}
153212
213+ type remoteHTTPError struct {
214+ StatusCode int
215+ Message string
216+ }
217+
218+ func (e * remoteHTTPError ) Error () string {
219+ return e .Message
220+ }
221+
222+ func isRetryableInitError (err error ) bool {
223+ if err == nil {
224+ return false
225+ }
226+ if errors .Is (err , context .DeadlineExceeded ) {
227+ return true
228+ }
229+ var netErr net.Error
230+ if errors .As (err , & netErr ) && netErr .Timeout () {
231+ return true
232+ }
233+ var urlErr * url.Error
234+ if errors .As (err , & urlErr ) {
235+ if errors .Is (urlErr .Err , context .DeadlineExceeded ) {
236+ return true
237+ }
238+ if errors .As (urlErr .Err , & netErr ) && netErr .Timeout () {
239+ return true
240+ }
241+ }
242+ var httpErr * remoteHTTPError
243+ if errors .As (err , & httpErr ) {
244+ return httpErr .StatusCode == http .StatusTooManyRequests || httpErr .StatusCode >= http .StatusInternalServerError
245+ }
246+ message := strings .ToLower (err .Error ())
247+ return strings .Contains (message , "i/o timeout" ) ||
248+ strings .Contains (message , "connection reset by peer" ) ||
249+ strings .Contains (message , "connection refused" ) ||
250+ strings .Contains (message , "no route to host" ) ||
251+ strings .Contains (message , "unexpected eof" )
252+ }
253+
154254func runSidecar (ctx context.Context , logger * log.Logger , client * sharedMountClient , ownerID string , mounts []* sharedMountState ) {
155255 for _ , state := range mounts {
156256 state := state
@@ -876,7 +976,10 @@ func (c *sharedMountClient) latest(ctx context.Context, ownerID, mount string) (
876976 }
877977 if resp .StatusCode != http .StatusOK {
878978 body , _ := io .ReadAll (resp .Body )
879- return sharedmounts.LatestManifest {}, false , fmt .Errorf ("latest fetch failed: %s" , strings .TrimSpace (string (body )))
979+ return sharedmounts.LatestManifest {}, false , & remoteHTTPError {
980+ StatusCode : resp .StatusCode ,
981+ Message : fmt .Sprintf ("latest fetch failed (%d): %s" , resp .StatusCode , strings .TrimSpace (string (body ))),
982+ }
880983 }
881984 body , err := io .ReadAll (resp .Body )
882985 if err != nil {
@@ -985,7 +1088,10 @@ func (c *sharedMountClient) downloadRevision(ctx context.Context, ownerID, mount
9851088 defer resp .Body .Close ()
9861089 if resp .StatusCode != http .StatusOK {
9871090 body , _ := io .ReadAll (resp .Body )
988- return fmt .Errorf ("revision fetch failed: %s" , strings .TrimSpace (string (body )))
1091+ return & remoteHTTPError {
1092+ StatusCode : resp .StatusCode ,
1093+ Message : fmt .Sprintf ("revision fetch failed (%d): %s" , resp .StatusCode , strings .TrimSpace (string (body ))),
1094+ }
9891095 }
9901096 _ , err = io .Copy (dest , resp .Body )
9911097 return err
@@ -1016,7 +1122,10 @@ func (c *sharedMountClient) uploadRevision(ctx context.Context, ownerID, mount,
10161122 defer resp .Body .Close ()
10171123 if resp .StatusCode != http .StatusOK {
10181124 body , _ := io .ReadAll (resp .Body )
1019- return fmt .Errorf ("revision upload failed: %s" , strings .TrimSpace (string (body )))
1125+ return & remoteHTTPError {
1126+ StatusCode : resp .StatusCode ,
1127+ Message : fmt .Sprintf ("revision upload failed (%d): %s" , resp .StatusCode , strings .TrimSpace (string (body ))),
1128+ }
10201129 }
10211130 return nil
10221131}
@@ -1046,7 +1155,10 @@ func (c *sharedMountClient) updateLatest(ctx context.Context, ownerID, mount str
10461155 }
10471156 if resp .StatusCode != http .StatusOK {
10481157 body , _ := io .ReadAll (resp .Body )
1049- return fmt .Errorf ("latest update failed: %s" , strings .TrimSpace (string (body )))
1158+ return & remoteHTTPError {
1159+ StatusCode : resp .StatusCode ,
1160+ Message : fmt .Sprintf ("latest update failed (%d): %s" , resp .StatusCode , strings .TrimSpace (string (body ))),
1161+ }
10501162 }
10511163 return nil
10521164}
0 commit comments