@@ -120,7 +120,8 @@ public class OkHttpServices implements RESTServices {
120120 private static final Set <Integer > RETRYABLE_STATUS_CODES =
121121 Set .of (STATUS_BAD_GATEWAY , STATUS_SERVICE_UNAVAILABLE , STATUS_GATEWAY_TIMEOUT );
122122
123- private boolean checkFirstRequest = true ;
123+ // When true (digest auth), ping server before streaming non-resendable content to avoid wasted uploads
124+ private boolean useDigestAuthPing = true ;
124125
125126 static protected class ThreadState {
126127 boolean isFirstRequest ;
@@ -131,7 +132,7 @@ static protected class ThreadState {
131132 }
132133
133134 private final ThreadLocal <ThreadState > threadState =
134- ThreadLocal .withInitial (() -> new ThreadState (checkFirstRequest ));
135+ ThreadLocal .withInitial (() -> new ThreadState (useDigestAuthPing ));
135136
136137 public record ConnectionConfig (String host , int port , String basePath , String database ,
137138 SecurityContext securityContext , List <OkHttpClientConfigurator > clientConfigurators ) {
@@ -197,7 +198,7 @@ private OkHttpClient connect(ConnectionConfig config) {
197198 throw new IllegalArgumentException ("No security context provided" );
198199 }
199200
200- this .checkFirstRequest = config .securityContext instanceof DigestAuthContext ;
201+ this .useDigestAuthPing = config .securityContext instanceof DigestAuthContext ;
201202 this .database = config .database ;
202203 this .baseUri = HttpUrlBuilder .newBaseUrl (config .host , config .port , config .basePath , config .securityContext .getSSLContext ());
203204
@@ -283,15 +284,17 @@ private void setFirstRequest(boolean value) {
283284 threadState .get ().isFirstRequest = value ;
284285 }
285286
286- private void checkFirstRequest () {
287- if (checkFirstRequest ) setFirstRequest (true );
287+ private void resetFirstRequestFlag () {
288+ if (useDigestAuthPing ) {
289+ setFirstRequest (true );
290+ }
288291 }
289292
290- private int makeFirstRequest (int retry ) {
291- return makeFirstRequest (baseUri , "ping" , retry );
293+ private int pingServerBeforeStreaming (int retry ) {
294+ return pingServer (baseUri , "ping" , retry );
292295 }
293296
294- private int makeFirstRequest (HttpUrl requestUri , String path , int retry ) {
297+ private int pingServer (HttpUrl requestUri , String path , int retry ) {
295298 Response response = sendRequestOnce (setupRequest (requestUri , path , null ).head ());
296299 int statusCode = response .code ();
297300 if (!RETRYABLE_STATUS_CODES .contains (statusCode )) {
@@ -515,19 +518,15 @@ private Response sendRequestWithRetry(Request.Builder requestBldr, Function<Requ
515518 private Response sendRequestWithRetry (
516519 Request .Builder requestBldr , boolean isRetryable , Function <Request .Builder , Response > doFunction , Consumer <Boolean > resendableConsumer
517520 ) {
518- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
521+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
519522 Response response = null ;
520523 int status = -1 ;
521524
522525 /*
523526 * This loop is for retrying the request if the service is unavailable
524527 */
525528 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
526- try {
527- retryContext .sleepIfNeeded ();
528- } catch (InterruptedException e ) {
529- // Ignore interruption
530- }
529+ retryContext .sleepIfNeeded ();
531530
532531 /*
533532 * Execute the function which is passed as an argument
@@ -1191,16 +1190,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
11911190 }
11921191 boolean isResendable = handleBase .isResendable ();
11931192
1194- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
1193+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
11951194 Response response = null ;
11961195 int status = -1 ;
11971196 Headers responseHeaders = null ;
11981197 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
1199- try {
1200- retryContext .sleepIfNeeded ();
1201- } catch (InterruptedException e ) {
1202- // Ignore interruption
1203- }
1198+ retryContext .sleepIfNeeded ();
12041199
12051200 Object value = handleBase .sendContent ();
12061201 if (value == null ) {
@@ -1209,7 +1204,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
12091204 }
12101205
12111206 if (isFirstRequest () && !isResendable && isStreaming (value )) {
1212- int firstRequestDelay = makeFirstRequest (retryContext .getRetry ());
1207+ int firstRequestDelay = pingServerBeforeStreaming (retryContext .getRetry ());
12131208 if (firstRequestDelay != 0 ) {
12141209 retryContext .calculateNextDelay (0 , firstRequestDelay );
12151210 continue ;
@@ -1247,7 +1242,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
12471242 closeResponse (response );
12481243
12491244 if (!isResendable ) {
1250- checkFirstRequest ();
1245+ resetFirstRequestFlag ();
12511246 throw new ResourceNotResendableException (
12521247 "Cannot retry request for " +
12531248 ((uri != null ) ? uri : "new document" ));
@@ -1348,24 +1343,20 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
13481343 requestBldr = addVersionHeader (desc , requestBldr , "If-Match" );
13491344 }
13501345
1351- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
1346+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
13521347 Response response = null ;
13531348 int status = -1 ;
13541349 Headers responseHeaders = null ;
13551350 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
1356- try {
1357- retryContext .sleepIfNeeded ();
1358- } catch (InterruptedException e ) {
1359- // Ignore interruption
1360- }
1351+ retryContext .sleepIfNeeded ();
13611352
13621353 MultipartBody .Builder multiPart = new MultipartBody .Builder ();
13631354 boolean hasStreamingPart = addParts (multiPart , reqlog ,
13641355 new String []{metadataMimetype , contentMimetype },
13651356 new AbstractWriteHandle []{metadataHandle , contentHandle });
13661357
13671358 if (isFirstRequest () && hasStreamingPart ) {
1368- int firstRequestDelay = makeFirstRequest (retryContext .getRetry ());
1359+ int firstRequestDelay = pingServerBeforeStreaming (retryContext .getRetry ());
13691360 if (firstRequestDelay != 0 ) {
13701361 retryContext .calculateNextDelay (0 , firstRequestDelay );
13711362 continue ;
@@ -1923,7 +1914,6 @@ public <T extends SearchReadHandle> T search(RequestLogger reqlog, T searchHandl
19231914
19241915 addPointInTimeQueryParam (params , searchHandle );
19251916
1926- @ SuppressWarnings ("rawtypes" )
19271917 HandleImplementation searchBase = HandleAccessor .checkHandle (searchHandle , "search" );
19281918
19291919 Format searchFormat = searchBase .getFormat ();
@@ -2112,15 +2102,11 @@ void init() {
21122102 }
21132103
21142104 Response getResponse () {
2115- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , OkHttpServices .this ::checkFirstRequest );
2105+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , OkHttpServices .this ::resetFirstRequestFlag );
21162106 Response response = null ;
21172107 int status = -1 ;
21182108 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
2119- try {
2120- retryContext .sleepIfNeeded ();
2121- } catch (InterruptedException e ) {
2122- // Ignore interruption
2123- }
2109+ retryContext .sleepIfNeeded ();
21242110
21252111 if (queryDef instanceof StructuredQueryDefinition && !(queryDef instanceof RawQueryDefinition )) {
21262112 response = doPost (reqlog , requestBldr , structure );
@@ -2644,15 +2630,11 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
26442630 String connectPath = null ;
26452631 Request .Builder requestBldr = null ;
26462632
2647- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
2633+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
26482634 Response response = null ;
26492635 int status = -1 ;
26502636 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
2651- try {
2652- retryContext .sleepIfNeeded ();
2653- } catch (InterruptedException e ) {
2654- // Ignore interruption
2655- }
2637+ retryContext .sleepIfNeeded ();
26562638
26572639 Object nextValue = (handle != null ) ? handle .sendContent () : value ;
26582640
@@ -2673,7 +2655,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
26732655 boolean isResendable = (handle == null ) ? !isStreaming : handle .isResendable ();
26742656
26752657 if (isFirstRequest () && !isResendable && isStreaming ) {
2676- int firstRequestDelay = makeFirstRequest (retryContext .getRetry ());
2658+ int firstRequestDelay = pingServerBeforeStreaming (retryContext .getRetry ());
26772659 if (firstRequestDelay != 0 ) {
26782660 retryContext .calculateNextDelay (0 , firstRequestDelay );
26792661 continue ;
@@ -2720,7 +2702,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
27202702 closeResponse (response );
27212703
27222704 if (!isResendable ) {
2723- checkFirstRequest ();
2705+ resetFirstRequestFlag ();
27242706 throw new ResourceNotResendableException (
27252707 "Cannot retry request for " + connectPath );
27262708 }
@@ -3029,7 +3011,7 @@ public <R extends AbstractReadHandle> R putResource(RequestLogger reqlog,
30293011
30303012 Consumer <Boolean > resendableConsumer = (resendable ) -> {
30313013 if (!isResendable ) {
3032- checkFirstRequest ();
3014+ resetFirstRequestFlag ();
30333015 throw new ResourceNotResendableException (
30343016 "Cannot retry request for " + path );
30353017 }
@@ -3073,15 +3055,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R putResour
30733055 String outputMimetype = outputBase .getMimetype ();
30743056 Class as = outputBase .receiveAs ();
30753057
3076- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
3058+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
30773059 Response response = null ;
30783060 int status = -1 ;
30793061 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
3080- try {
3081- retryContext .sleepIfNeeded ();
3082- } catch (InterruptedException e ) {
3083- // Ignore interruption
3084- }
3062+ retryContext .sleepIfNeeded ();
30853063
30863064 MultipartBody .Builder multiPart = new MultipartBody .Builder ();
30873065 boolean hasStreamingPart = addParts (multiPart , reqlog , input );
@@ -3182,7 +3160,7 @@ public <R extends AbstractReadHandle> R postResource(RequestLogger reqlog,
31823160 Consumer <Boolean > resendableConsumer = new Consumer <Boolean >() {
31833161 public void accept (Boolean resendable ) {
31843162 if (!isResendable ) {
3185- checkFirstRequest ();
3163+ resetFirstRequestFlag ();
31863164 throw new ResourceNotResendableException ("Cannot retry request for " + path );
31873165 }
31883166 }
@@ -3240,15 +3218,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R postResou
32403218 String outputMimetype = outputBase != null ? outputBase .getMimetype () : null ;
32413219 Class as = outputBase != null ? outputBase .receiveAs () : null ;
32423220
3243- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
3221+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
32443222 Response response = null ;
32453223 int status = -1 ;
32463224 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
3247- try {
3248- retryContext .sleepIfNeeded ();
3249- } catch (InterruptedException e ) {
3250- // Ignore interruption
3251- }
3225+ retryContext .sleepIfNeeded ();
32523226
32533227 MultipartBody .Builder multiPart = new MultipartBody .Builder ();
32543228 boolean hasStreamingPart = addParts (multiPart , reqlog , null , input , requestHeaders );
@@ -3778,7 +3752,7 @@ private <U extends OkHttpResultIterator> U postIteratedResourceImpl(
37783752
37793753 Consumer <Boolean > resendableConsumer = resendable -> {
37803754 if (!isResendable ) {
3781- checkFirstRequest ();
3755+ resetFirstRequestFlag ();
37823756 throw new ResourceNotResendableException (
37833757 "Cannot retry request for " + path );
37843758 }
@@ -3843,15 +3817,11 @@ private <W extends AbstractWriteHandle, U extends OkHttpResultIterator> U postIt
38433817 throws ResourceNotFoundException , ResourceNotResendableException , ForbiddenUserException , FailedRequestException {
38443818 if (params == null ) params = new RequestParameters ();
38453819 if (transaction != null ) params .add ("txid" , transaction .getTransactionId ());
3846- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
3820+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
38473821 Response response = null ;
38483822 int status = -1 ;
38493823 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
3850- try {
3851- retryContext .sleepIfNeeded ();
3852- } catch (InterruptedException e ) {
3853- // Ignore interruption
3854- }
3824+ retryContext .sleepIfNeeded ();
38553825
38563826 MultipartBody .Builder multiPart = new MultipartBody .Builder ();
38573827 boolean hasStreamingPart = addParts (multiPart , reqlog , input );
@@ -3966,7 +3936,7 @@ private Request.Builder makePutWebResource(String path,
39663936 private Response doPut (RequestLogger reqlog , Request .Builder requestBldr , Object value ) {
39673937 if (value == null ) throw new IllegalArgumentException ("Resource write with null value" );
39683938
3969- if (isFirstRequest () && isStreaming (value )) makeFirstRequest (0 );
3939+ if (isFirstRequest () && isStreaming (value )) pingServerBeforeStreaming (0 );
39703940
39713941 MediaType mediaType = makeType (requestBldr .build ().header (HEADER_CONTENT_TYPE ));
39723942 if (value instanceof OutputStreamSender ) {
@@ -3987,7 +3957,7 @@ private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object
39873957
39883958 private Response doPut (Request .Builder requestBldr ,
39893959 MultipartBody .Builder multiPart , boolean hasStreamingPart ) {
3990- if (isFirstRequest () && hasStreamingPart ) makeFirstRequest (0 );
3960+ if (isFirstRequest () && hasStreamingPart ) pingServerBeforeStreaming (0 );
39913961
39923962 requestBldr = requestBldr .put (multiPart .build ());
39933963 Response response = sendRequestOnce (requestBldr );
@@ -4007,7 +3977,7 @@ private Request.Builder makePostWebResource(String path, RequestParameters param
40073977
40083978 private Response doPost (RequestLogger reqlog , Request .Builder requestBldr , Object value ) {
40093979 if (isFirstRequest () && isStreaming (value )) {
4010- makeFirstRequest (0 );
3980+ pingServerBeforeStreaming (0 );
40113981 }
40123982
40133983 MediaType mediaType = makeType (requestBldr .build ().header (HEADER_CONTENT_TYPE ));
@@ -4034,7 +4004,7 @@ private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Objec
40344004
40354005 private Response doPost (Request .Builder requestBldr ,
40364006 MultipartBody .Builder multiPart , boolean hasStreamingPart ) {
4037- if (isFirstRequest () && hasStreamingPart ) makeFirstRequest (0 );
4007+ if (isFirstRequest () && hasStreamingPart ) pingServerBeforeStreaming (0 );
40384008
40394009 Response response = sendRequestOnce (requestBldr .post (multiPart .build ()));
40404010
@@ -4953,17 +4923,11 @@ public InputStream match(QueryDefinition queryDef,
49534923 }
49544924 requestBldr = addTelemetryAgentId (requestBldr );
49554925
4956- MediaType mediaType = makeType (requestBldr .build ().header (HEADER_CONTENT_TYPE ));
4957-
4958- RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::checkFirstRequest );
4926+ RetryContext retryContext = new RetryContext (logger , RETRYABLE_STATUS_CODES , this ::resetFirstRequestFlag );
49594927 Response response = null ;
49604928 int status = -1 ;
49614929 for (; retryContext .shouldContinueRetrying (minRetryAttempts , maxDelayForRetries ); retryContext .incrementRetry ()) {
4962- try {
4963- retryContext .sleepIfNeeded ();
4964- } catch (InterruptedException e ) {
4965- // Ignore interruption
4966- }
4930+ retryContext .sleepIfNeeded ();
49674931
49684932 if (queryDef instanceof StructuredQueryDefinition ) {
49694933 response = doPost (null , requestBldr , structure );
@@ -5599,14 +5563,16 @@ private void executeRequest(CallResponseImpl responseImpl) {
55995563 boolean hasStreamingPart = hasStreamingPart ();
56005564 Consumer <Boolean > resendableConsumer = resendable -> {
56015565 if (hasStreamingPart ) {
5602- checkFirstRequest ();
5566+ resetFirstRequestFlag ();
56035567 throw new ResourceNotResendableException (
56045568 "Cannot retry request for " + getEndpoint ());
56055569 }
56065570 };
56075571
56085572 Function <Request .Builder , Response > sendRequestFunction = requestBldr -> {
5609- if (isFirstRequest () && hasStreamingPart ) makeFirstRequest (callBaseUri , "" , 0 );
5573+ if (isFirstRequest () && hasStreamingPart ) {
5574+ pingServer (callBaseUri , "" , 0 );
5575+ }
56105576 Response response = sendRequestOnce (requestBldr );
56115577 if (isFirstRequest ()) setFirstRequest (false );
56125578 return response ;
0 commit comments