@@ -120,7 +120,8 @@ public class OkHttpServices implements RESTServices {
120120 private static final Set <Integer > RETRYABLE_STATUS_CODES =
121121 Set .of (STATUS_BAD_GATEWAY , STATUS_SERVICE_UNAVAILABLE , STATUS_GATEWAY_TIMEOUT );
122122
123- private boolean checkFirstRequest = true ;
123+ // When true (digest auth), ping server before streaming non-resendable content to avoid wasted uploads
124+ private boolean useDigestAuthPing = true ;
124125
125126 static protected class ThreadState {
126127 boolean isFirstRequest ;
@@ -131,7 +132,7 @@ static protected class ThreadState {
131132 }
132133
133134 private final ThreadLocal <ThreadState > threadState =
134- ThreadLocal .withInitial (() -> new ThreadState (checkFirstRequest ));
135+ ThreadLocal .withInitial (() -> new ThreadState (useDigestAuthPing ));
135136
136137 public record ConnectionConfig (String host , int port , String basePath , String database ,
137138 SecurityContext securityContext , List <OkHttpClientConfigurator > clientConfigurators ) {
@@ -197,7 +198,7 @@ private OkHttpClient connect(ConnectionConfig config) {
197198 throw new IllegalArgumentException ("No security context provided" );
198199 }
199200
200- this .checkFirstRequest = config .securityContext instanceof DigestAuthContext ;
201+ this .useDigestAuthPing = config .securityContext instanceof DigestAuthContext ;
201202 this .database = config .database ;
202203 this .baseUri = HttpUrlBuilder .newBaseUrl (config .host , config .port , config .basePath , config .securityContext .getSSLContext ());
203204
@@ -283,15 +284,17 @@ private void setFirstRequest(boolean value) {
283284 threadState .get ().isFirstRequest = value ;
284285 }
285286
286- private void checkFirstRequest () {
287- if (checkFirstRequest ) setFirstRequest (true );
287+ private void resetFirstRequestFlag () {
288+ if (useDigestAuthPing ) {
289+ setFirstRequest (true );
290+ }
288291 }
289292
290- private int makeFirstRequest (int retry ) {
291- return makeFirstRequest (baseUri , "ping" , retry );
293+ private int pingServerBeforeStreaming (int retry ) {
294+ return pingServer (baseUri , "ping" , retry );
292295 }
293296
294- private int makeFirstRequest (HttpUrl requestUri , String path , int retry ) {
297+ private int pingServer (HttpUrl requestUri , String path , int retry ) {
295298 Response response = sendRequestOnce (setupRequest (requestUri , path , null ).head ());
296299 int statusCode = response .code ();
297300 if (!RETRYABLE_STATUS_CODES .contains (statusCode )) {
@@ -515,19 +518,20 @@ private Response sendRequestWithRetry(Request.Builder requestBldr, Function<Requ
515518 private Response sendRequestWithRetry (
516519 Request .Builder requestBldr , boolean isRetryable , Function <Request .Builder , Response > doFunction , Consumer <Boolean > resendableConsumer
517520 ) {
518- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
521+ // If the thread is already interrupted, fail fast rather than attempting a request
522+ if (Thread .currentThread ().isInterrupted ()) {
523+ throw new MarkLogicIOException ("Request cancelled: thread was interrupted before request could be sent" );
524+ }
525+
526+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
519527 Response response = null ;
520528 int status = -1 ;
521529
522530 /*
523531 * This loop is for retrying the request if the service is unavailable
524532 */
525533 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
526- try {
527- retryContext .sleepIfNeeded ();
528- } catch (InterruptedException e ) {
529- // Ignore interruption
530- }
534+ retryContext .sleepIfNeeded ();
531535
532536 /*
533537 * Execute the function which is passed as an argument
@@ -1191,16 +1195,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
11911195 }
11921196 boolean isResendable = handleBase .isResendable ();
11931197
1194- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
1198+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
11951199 Response response = null ;
11961200 int status = -1 ;
11971201 Headers responseHeaders = null ;
11981202 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
1199- try {
1200- retryContext .sleepIfNeeded ();
1201- } catch (InterruptedException e ) {
1202- // Ignore interruption
1203- }
1203+ retryContext .sleepIfNeeded ();
12041204
12051205 Object value = handleBase .sendContent ();
12061206 if (value == null ) {
@@ -1209,7 +1209,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
12091209 }
12101210
12111211 if (isFirstRequest () && !isResendable && isStreaming (value )) {
1212- int firstRequestDelay = makeFirstRequest (retryContext .getRetry ());
1212+ int firstRequestDelay = pingServerBeforeStreaming (retryContext .getRetry ());
12131213 if (firstRequestDelay != 0 ) {
12141214 retryContext .calculateNextDelay (0 , firstRequestDelay );
12151215 continue ;
@@ -1247,7 +1247,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
12471247 closeResponse (response );
12481248
12491249 if (!isResendable ) {
1250- checkFirstRequest ();
1250+ resetFirstRequestFlag ();
12511251 throw new ResourceNotResendableException (
12521252 "Cannot retry request for " +
12531253 ((uri != null ) ? uri : "new document" ));
@@ -1348,24 +1348,20 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
13481348 requestBldr = addVersionHeader (desc , requestBldr , "If-Match" );
13491349 }
13501350
1351- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
1351+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
13521352 Response response = null ;
13531353 int status = -1 ;
13541354 Headers responseHeaders = null ;
13551355 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
1356- try {
1357- retryContext .sleepIfNeeded ();
1358- } catch (InterruptedException e ) {
1359- // Ignore interruption
1360- }
1356+ retryContext .sleepIfNeeded ();
13611357
13621358 MultipartBody .Builder multiPart = new MultipartBody .Builder ();
13631359 boolean hasStreamingPart = addParts (multiPart , reqlog ,
13641360 new String []{metadataMimetype , contentMimetype },
13651361 new AbstractWriteHandle []{metadataHandle , contentHandle });
13661362
13671363 if (isFirstRequest () && hasStreamingPart ) {
1368- int firstRequestDelay = makeFirstRequest (retryContext .getRetry ());
1364+ int firstRequestDelay = pingServerBeforeStreaming (retryContext .getRetry ());
13691365 if (firstRequestDelay != 0 ) {
13701366 retryContext .calculateNextDelay (0 , firstRequestDelay );
13711367 continue ;
@@ -1923,7 +1919,6 @@ public <T extends SearchReadHandle> T search(RequestLogger reqlog, T searchHandl
19231919
19241920 addPointInTimeQueryParam (params , searchHandle );
19251921
1926- @ SuppressWarnings ("rawtypes" )
19271922 HandleImplementation searchBase = HandleAccessor .checkHandle (searchHandle , "search" );
19281923
19291924 Format searchFormat = searchBase .getFormat ();
@@ -2112,15 +2107,11 @@ void init() {
21122107 }
21132108
21142109 Response getResponse () {
2115- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , OkHttpServices .this ::checkFirstRequest );
2110+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , OkHttpServices .this ::resetFirstRequestFlag );
21162111 Response response = null ;
21172112 int status = -1 ;
21182113 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
2119- try {
2120- retryContext .sleepIfNeeded ();
2121- } catch (InterruptedException e ) {
2122- // Ignore interruption
2123- }
2114+ retryContext .sleepIfNeeded ();
21242115
21252116 if (queryDef instanceof StructuredQueryDefinition && !(queryDef instanceof RawQueryDefinition )) {
21262117 response = doPost (reqlog , requestBldr , structure );
@@ -2644,15 +2635,11 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
26442635 String connectPath = null ;
26452636 Request .Builder requestBldr = null ;
26462637
2647- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
2638+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
26482639 Response response = null ;
26492640 int status = -1 ;
26502641 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
2651- try {
2652- retryContext .sleepIfNeeded ();
2653- } catch (InterruptedException e ) {
2654- // Ignore interruption
2655- }
2642+ retryContext .sleepIfNeeded ();
26562643
26572644 Object nextValue = (handle != null ) ? handle .sendContent () : value ;
26582645
@@ -2673,7 +2660,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
26732660 boolean isResendable = (handle == null ) ? !isStreaming : handle .isResendable ();
26742661
26752662 if (isFirstRequest () && !isResendable && isStreaming ) {
2676- int firstRequestDelay = makeFirstRequest (retryContext .getRetry ());
2663+ int firstRequestDelay = pingServerBeforeStreaming (retryContext .getRetry ());
26772664 if (firstRequestDelay != 0 ) {
26782665 retryContext .calculateNextDelay (0 , firstRequestDelay );
26792666 continue ;
@@ -2720,7 +2707,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
27202707 closeResponse (response );
27212708
27222709 if (!isResendable ) {
2723- checkFirstRequest ();
2710+ resetFirstRequestFlag ();
27242711 throw new ResourceNotResendableException (
27252712 "Cannot retry request for " + connectPath );
27262713 }
@@ -3029,7 +3016,7 @@ public <R extends AbstractReadHandle> R putResource(RequestLogger reqlog,
30293016
30303017 Consumer <Boolean > resendableConsumer = (resendable ) -> {
30313018 if (!isResendable ) {
3032- checkFirstRequest ();
3019+ resetFirstRequestFlag ();
30333020 throw new ResourceNotResendableException (
30343021 "Cannot retry request for " + path );
30353022 }
@@ -3073,15 +3060,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R putResour
30733060 String outputMimetype = outputBase .getMimetype ();
30743061 Class as = outputBase .receiveAs ();
30753062
3076- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
3063+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
30773064 Response response = null ;
30783065 int status = -1 ;
30793066 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
3080- try {
3081- retryContext .sleepIfNeeded ();
3082- } catch (InterruptedException e ) {
3083- // Ignore interruption
3084- }
3067+ retryContext .sleepIfNeeded ();
30853068
30863069 MultipartBody .Builder multiPart = new MultipartBody .Builder ();
30873070 boolean hasStreamingPart = addParts (multiPart , reqlog , input );
@@ -3182,7 +3165,7 @@ public <R extends AbstractReadHandle> R postResource(RequestLogger reqlog,
31823165 Consumer <Boolean > resendableConsumer = new Consumer <Boolean >() {
31833166 public void accept (Boolean resendable ) {
31843167 if (!isResendable ) {
3185- checkFirstRequest ();
3168+ resetFirstRequestFlag ();
31863169 throw new ResourceNotResendableException ("Cannot retry request for " + path );
31873170 }
31883171 }
@@ -3240,15 +3223,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R postResou
32403223 String outputMimetype = outputBase != null ? outputBase .getMimetype () : null ;
32413224 Class as = outputBase != null ? outputBase .receiveAs () : null ;
32423225
3243- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
3226+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
32443227 Response response = null ;
32453228 int status = -1 ;
32463229 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
3247- try {
3248- retryContext .sleepIfNeeded ();
3249- } catch (InterruptedException e ) {
3250- // Ignore interruption
3251- }
3230+ retryContext .sleepIfNeeded ();
32523231
32533232 MultipartBody .Builder multiPart = new MultipartBody .Builder ();
32543233 boolean hasStreamingPart = addParts (multiPart , reqlog , null , input , requestHeaders );
@@ -3778,7 +3757,7 @@ private <U extends OkHttpResultIterator> U postIteratedResourceImpl(
37783757
37793758 Consumer <Boolean > resendableConsumer = resendable -> {
37803759 if (!isResendable ) {
3781- checkFirstRequest ();
3760+ resetFirstRequestFlag ();
37823761 throw new ResourceNotResendableException (
37833762 "Cannot retry request for " + path );
37843763 }
@@ -3843,15 +3822,11 @@ private <W extends AbstractWriteHandle, U extends OkHttpResultIterator> U postIt
38433822 throws ResourceNotFoundException , ResourceNotResendableException , ForbiddenUserException , FailedRequestException {
38443823 if (params == null ) params = new RequestParameters ();
38453824 if (transaction != null ) params .add ("txid" , transaction .getTransactionId ());
3846- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
3825+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
38473826 Response response = null ;
38483827 int status = -1 ;
38493828 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
3850- try {
3851- retryContext .sleepIfNeeded ();
3852- } catch (InterruptedException e ) {
3853- // Ignore interruption
3854- }
3829+ retryContext .sleepIfNeeded ();
38553830
38563831 MultipartBody .Builder multiPart = new MultipartBody .Builder ();
38573832 boolean hasStreamingPart = addParts (multiPart , reqlog , input );
@@ -3966,7 +3941,7 @@ private Request.Builder makePutWebResource(String path,
39663941 private Response doPut (RequestLogger reqlog , Request .Builder requestBldr , Object value ) {
39673942 if (value == null ) throw new IllegalArgumentException ("Resource write with null value" );
39683943
3969- if (isFirstRequest () && isStreaming (value )) makeFirstRequest (0 );
3944+ if (isFirstRequest () && isStreaming (value )) pingServerBeforeStreaming (0 );
39703945
39713946 MediaType mediaType = makeType (requestBldr .build ().header (HEADER_CONTENT_TYPE ));
39723947 if (value instanceof OutputStreamSender ) {
@@ -3987,7 +3962,7 @@ private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object
39873962
39883963 private Response doPut (Request .Builder requestBldr ,
39893964 MultipartBody .Builder multiPart , boolean hasStreamingPart ) {
3990- if (isFirstRequest () && hasStreamingPart ) makeFirstRequest (0 );
3965+ if (isFirstRequest () && hasStreamingPart ) pingServerBeforeStreaming (0 );
39913966
39923967 requestBldr = requestBldr .put (multiPart .build ());
39933968 Response response = sendRequestOnce (requestBldr );
@@ -4007,7 +3982,7 @@ private Request.Builder makePostWebResource(String path, RequestParameters param
40073982
40083983 private Response doPost (RequestLogger reqlog , Request .Builder requestBldr , Object value ) {
40093984 if (isFirstRequest () && isStreaming (value )) {
4010- makeFirstRequest (0 );
3985+ pingServerBeforeStreaming (0 );
40113986 }
40123987
40133988 MediaType mediaType = makeType (requestBldr .build ().header (HEADER_CONTENT_TYPE ));
@@ -4034,7 +4009,7 @@ private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Objec
40344009
40354010 private Response doPost (Request .Builder requestBldr ,
40364011 MultipartBody .Builder multiPart , boolean hasStreamingPart ) {
4037- if (isFirstRequest () && hasStreamingPart ) makeFirstRequest (0 );
4012+ if (isFirstRequest () && hasStreamingPart ) pingServerBeforeStreaming (0 );
40384013
40394014 Response response = sendRequestOnce (requestBldr .post (multiPart .build ()));
40404015
@@ -4953,17 +4928,11 @@ public InputStream match(QueryDefinition queryDef,
49534928 }
49544929 requestBldr = addTelemetryAgentId (requestBldr );
49554930
4956- MediaType mediaType = makeType (requestBldr .build ().header (HEADER_CONTENT_TYPE ));
4957-
4958- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
4931+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
49594932 Response response = null ;
49604933 int status = -1 ;
49614934 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
4962- try {
4963- retryContext .sleepIfNeeded ();
4964- } catch (InterruptedException e ) {
4965- // Ignore interruption
4966- }
4935+ retryContext .sleepIfNeeded ();
49674936
49684937 if (queryDef instanceof StructuredQueryDefinition ) {
49694938 response = doPost (null , requestBldr , structure );
@@ -5599,14 +5568,32 @@ private void executeRequest(CallResponseImpl responseImpl) {
55995568 boolean hasStreamingPart = hasStreamingPart ();
56005569 Consumer <Boolean > resendableConsumer = resendable -> {
56015570 if (hasStreamingPart ) {
5602- checkFirstRequest ();
5571+ resetFirstRequestFlag ();
56035572 throw new ResourceNotResendableException (
56045573 "Cannot retry request for " + getEndpoint ());
56055574 }
56065575 };
56075576
56085577 Function <Request .Builder , Response > sendRequestFunction = requestBldr -> {
5609- if (isFirstRequest () && hasStreamingPart ) makeFirstRequest (callBaseUri , "" , 0 );
5578+ if (isFirstRequest () && hasStreamingPart ) {
5579+ // Ping the server before streaming; if unavailable, wait and retry the ping
5580+ int pingDelay = pingServer (callBaseUri , "" , 0 );
5581+ int pingRetries = 0 ;
5582+ int maxPingRetries = 10 ; // Prevent infinite loop
5583+ while (pingDelay > 0 && pingRetries < maxPingRetries && !Thread .currentThread ().isInterrupted ()) {
5584+ try {
5585+ Thread .sleep (pingDelay );
5586+ } catch (InterruptedException e ) {
5587+ Thread .currentThread ().interrupt ();
5588+ throw new RuntimeException ("Interrupted while waiting to ping server before streaming" , e );
5589+ }
5590+ pingRetries ++;
5591+ pingDelay = pingServer (callBaseUri , "" , 0 );
5592+ }
5593+ if (pingRetries >= maxPingRetries ) {
5594+ logger .warn ("Server still unavailable after {} ping attempts before streaming" , maxPingRetries );
5595+ }
5596+ }
56105597 Response response = sendRequestOnce (requestBldr );
56115598 if (isFirstRequest ()) setFirstRequest (false );
56125599 return response ;
0 commit comments