Skip to content

Commit f3366f8

Browse files
committed
MLE-27388: Refactor: Renamed the "checkFirstRequest" stuff
The new names actually convey what is going on as opposed to making you wonder what "check" might mean. Also modified sleepIfNeeded to catch the interrupted exception instead of repeating that catch.
1 parent aaa7c20 commit f3366f8

File tree

2 files changed

+77
-81
lines changed

2 files changed

+77
-81
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java

Lines changed: 66 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ public class OkHttpServices implements RESTServices {
120120
private static final Set<Integer> RETRYABLE_STATUS_CODES =
121121
Set.of(STATUS_BAD_GATEWAY, STATUS_SERVICE_UNAVAILABLE, STATUS_GATEWAY_TIMEOUT);
122122

123-
private boolean checkFirstRequest = true;
123+
// When true (digest auth), ping server before streaming non-resendable content to avoid wasted uploads
124+
private boolean useDigestAuthPing = true;
124125

125126
static protected class ThreadState {
126127
boolean isFirstRequest;
@@ -131,7 +132,7 @@ static protected class ThreadState {
131132
}
132133

133134
private final ThreadLocal<ThreadState> threadState =
134-
ThreadLocal.withInitial(() -> new ThreadState(checkFirstRequest));
135+
ThreadLocal.withInitial(() -> new ThreadState(useDigestAuthPing));
135136

136137
public record ConnectionConfig(String host, int port, String basePath, String database,
137138
SecurityContext securityContext, List<OkHttpClientConfigurator> clientConfigurators) {
@@ -197,7 +198,7 @@ private OkHttpClient connect(ConnectionConfig config) {
197198
throw new IllegalArgumentException("No security context provided");
198199
}
199200

200-
this.checkFirstRequest = config.securityContext instanceof DigestAuthContext;
201+
this.useDigestAuthPing = config.securityContext instanceof DigestAuthContext;
201202
this.database = config.database;
202203
this.baseUri = HttpUrlBuilder.newBaseUrl(config.host, config.port, config.basePath, config.securityContext.getSSLContext());
203204

@@ -283,15 +284,17 @@ private void setFirstRequest(boolean value) {
283284
threadState.get().isFirstRequest = value;
284285
}
285286

286-
private void checkFirstRequest() {
287-
if (checkFirstRequest) setFirstRequest(true);
287+
private void resetFirstRequestFlag() {
288+
if (useDigestAuthPing) {
289+
setFirstRequest(true);
290+
}
288291
}
289292

290-
private int makeFirstRequest(int retry) {
291-
return makeFirstRequest(baseUri, "ping", retry);
293+
private int pingServerBeforeStreaming(int retry) {
294+
return pingServer(baseUri, "ping", retry);
292295
}
293296

294-
private int makeFirstRequest(HttpUrl requestUri, String path, int retry) {
297+
private int pingServer(HttpUrl requestUri, String path, int retry) {
295298
Response response = sendRequestOnce(setupRequest(requestUri, path, null).head());
296299
int statusCode = response.code();
297300
if (!RETRYABLE_STATUS_CODES.contains(statusCode)) {
@@ -515,19 +518,20 @@ private Response sendRequestWithRetry(Request.Builder requestBldr, Function<Requ
515518
private Response sendRequestWithRetry(
516519
Request.Builder requestBldr, boolean isRetryable, Function<Request.Builder, Response> doFunction, Consumer<Boolean> resendableConsumer
517520
) {
518-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
521+
// If the thread is already interrupted, fail fast rather than attempting a request
522+
if (Thread.currentThread().isInterrupted()) {
523+
throw new MarkLogicIOException("Request cancelled: thread was interrupted before request could be sent");
524+
}
525+
526+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
519527
Response response = null;
520528
int status = -1;
521529

522530
/*
523531
* This loop is for retrying the request if the service is unavailable
524532
*/
525533
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
526-
try {
527-
retryContext.sleepIfNeeded();
528-
} catch (InterruptedException e) {
529-
// Ignore interruption
530-
}
534+
retryContext.sleepIfNeeded();
531535

532536
/*
533537
* Execute the function which is passed as an argument
@@ -1191,16 +1195,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
11911195
}
11921196
boolean isResendable = handleBase.isResendable();
11931197

1194-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
1198+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
11951199
Response response = null;
11961200
int status = -1;
11971201
Headers responseHeaders = null;
11981202
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
1199-
try {
1200-
retryContext.sleepIfNeeded();
1201-
} catch (InterruptedException e) {
1202-
// Ignore interruption
1203-
}
1203+
retryContext.sleepIfNeeded();
12041204

12051205
Object value = handleBase.sendContent();
12061206
if (value == null) {
@@ -1209,7 +1209,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
12091209
}
12101210

12111211
if (isFirstRequest() && !isResendable && isStreaming(value)) {
1212-
int firstRequestDelay = makeFirstRequest(retryContext.getRetry());
1212+
int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry());
12131213
if (firstRequestDelay != 0) {
12141214
retryContext.calculateNextDelay(0, firstRequestDelay);
12151215
continue;
@@ -1247,7 +1247,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
12471247
closeResponse(response);
12481248

12491249
if (!isResendable) {
1250-
checkFirstRequest();
1250+
resetFirstRequestFlag();
12511251
throw new ResourceNotResendableException(
12521252
"Cannot retry request for " +
12531253
((uri != null) ? uri : "new document"));
@@ -1348,24 +1348,20 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
13481348
requestBldr = addVersionHeader(desc, requestBldr, "If-Match");
13491349
}
13501350

1351-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
1351+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
13521352
Response response = null;
13531353
int status = -1;
13541354
Headers responseHeaders = null;
13551355
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
1356-
try {
1357-
retryContext.sleepIfNeeded();
1358-
} catch (InterruptedException e) {
1359-
// Ignore interruption
1360-
}
1356+
retryContext.sleepIfNeeded();
13611357

13621358
MultipartBody.Builder multiPart = new MultipartBody.Builder();
13631359
boolean hasStreamingPart = addParts(multiPart, reqlog,
13641360
new String[]{metadataMimetype, contentMimetype},
13651361
new AbstractWriteHandle[]{metadataHandle, contentHandle});
13661362

13671363
if (isFirstRequest() && hasStreamingPart) {
1368-
int firstRequestDelay = makeFirstRequest(retryContext.getRetry());
1364+
int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry());
13691365
if (firstRequestDelay != 0) {
13701366
retryContext.calculateNextDelay(0, firstRequestDelay);
13711367
continue;
@@ -1923,7 +1919,6 @@ public <T extends SearchReadHandle> T search(RequestLogger reqlog, T searchHandl
19231919

19241920
addPointInTimeQueryParam(params, searchHandle);
19251921

1926-
@SuppressWarnings("rawtypes")
19271922
HandleImplementation searchBase = HandleAccessor.checkHandle(searchHandle, "search");
19281923

19291924
Format searchFormat = searchBase.getFormat();
@@ -2112,15 +2107,11 @@ void init() {
21122107
}
21132108

21142109
Response getResponse() {
2115-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::checkFirstRequest);
2110+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::resetFirstRequestFlag);
21162111
Response response = null;
21172112
int status = -1;
21182113
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
2119-
try {
2120-
retryContext.sleepIfNeeded();
2121-
} catch (InterruptedException e) {
2122-
// Ignore interruption
2123-
}
2114+
retryContext.sleepIfNeeded();
21242115

21252116
if (queryDef instanceof StructuredQueryDefinition && !(queryDef instanceof RawQueryDefinition)) {
21262117
response = doPost(reqlog, requestBldr, structure);
@@ -2644,15 +2635,11 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
26442635
String connectPath = null;
26452636
Request.Builder requestBldr = null;
26462637

2647-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
2638+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
26482639
Response response = null;
26492640
int status = -1;
26502641
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
2651-
try {
2652-
retryContext.sleepIfNeeded();
2653-
} catch (InterruptedException e) {
2654-
// Ignore interruption
2655-
}
2642+
retryContext.sleepIfNeeded();
26562643

26572644
Object nextValue = (handle != null) ? handle.sendContent() : value;
26582645

@@ -2673,7 +2660,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
26732660
boolean isResendable = (handle == null) ? !isStreaming : handle.isResendable();
26742661

26752662
if (isFirstRequest() && !isResendable && isStreaming) {
2676-
int firstRequestDelay = makeFirstRequest(retryContext.getRetry());
2663+
int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry());
26772664
if (firstRequestDelay != 0) {
26782665
retryContext.calculateNextDelay(0, firstRequestDelay);
26792666
continue;
@@ -2720,7 +2707,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
27202707
closeResponse(response);
27212708

27222709
if (!isResendable) {
2723-
checkFirstRequest();
2710+
resetFirstRequestFlag();
27242711
throw new ResourceNotResendableException(
27252712
"Cannot retry request for " + connectPath);
27262713
}
@@ -3029,7 +3016,7 @@ public <R extends AbstractReadHandle> R putResource(RequestLogger reqlog,
30293016

30303017
Consumer<Boolean> resendableConsumer = (resendable) -> {
30313018
if (!isResendable) {
3032-
checkFirstRequest();
3019+
resetFirstRequestFlag();
30333020
throw new ResourceNotResendableException(
30343021
"Cannot retry request for " + path);
30353022
}
@@ -3073,15 +3060,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R putResour
30733060
String outputMimetype = outputBase.getMimetype();
30743061
Class as = outputBase.receiveAs();
30753062

3076-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
3063+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
30773064
Response response = null;
30783065
int status = -1;
30793066
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
3080-
try {
3081-
retryContext.sleepIfNeeded();
3082-
} catch (InterruptedException e) {
3083-
// Ignore interruption
3084-
}
3067+
retryContext.sleepIfNeeded();
30853068

30863069
MultipartBody.Builder multiPart = new MultipartBody.Builder();
30873070
boolean hasStreamingPart = addParts(multiPart, reqlog, input);
@@ -3182,7 +3165,7 @@ public <R extends AbstractReadHandle> R postResource(RequestLogger reqlog,
31823165
Consumer<Boolean> resendableConsumer = new Consumer<Boolean>() {
31833166
public void accept(Boolean resendable) {
31843167
if (!isResendable) {
3185-
checkFirstRequest();
3168+
resetFirstRequestFlag();
31863169
throw new ResourceNotResendableException("Cannot retry request for " + path);
31873170
}
31883171
}
@@ -3240,15 +3223,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R postResou
32403223
String outputMimetype = outputBase != null ? outputBase.getMimetype() : null;
32413224
Class as = outputBase != null ? outputBase.receiveAs() : null;
32423225

3243-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
3226+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
32443227
Response response = null;
32453228
int status = -1;
32463229
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
3247-
try {
3248-
retryContext.sleepIfNeeded();
3249-
} catch (InterruptedException e) {
3250-
// Ignore interruption
3251-
}
3230+
retryContext.sleepIfNeeded();
32523231

32533232
MultipartBody.Builder multiPart = new MultipartBody.Builder();
32543233
boolean hasStreamingPart = addParts(multiPart, reqlog, null, input, requestHeaders);
@@ -3778,7 +3757,7 @@ private <U extends OkHttpResultIterator> U postIteratedResourceImpl(
37783757

37793758
Consumer<Boolean> resendableConsumer = resendable -> {
37803759
if (!isResendable) {
3781-
checkFirstRequest();
3760+
resetFirstRequestFlag();
37823761
throw new ResourceNotResendableException(
37833762
"Cannot retry request for " + path);
37843763
}
@@ -3843,15 +3822,11 @@ private <W extends AbstractWriteHandle, U extends OkHttpResultIterator> U postIt
38433822
throws ResourceNotFoundException, ResourceNotResendableException, ForbiddenUserException, FailedRequestException {
38443823
if (params == null) params = new RequestParameters();
38453824
if (transaction != null) params.add("txid", transaction.getTransactionId());
3846-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
3825+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
38473826
Response response = null;
38483827
int status = -1;
38493828
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
3850-
try {
3851-
retryContext.sleepIfNeeded();
3852-
} catch (InterruptedException e) {
3853-
// Ignore interruption
3854-
}
3829+
retryContext.sleepIfNeeded();
38553830

38563831
MultipartBody.Builder multiPart = new MultipartBody.Builder();
38573832
boolean hasStreamingPart = addParts(multiPart, reqlog, input);
@@ -3966,7 +3941,7 @@ private Request.Builder makePutWebResource(String path,
39663941
private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object value) {
39673942
if (value == null) throw new IllegalArgumentException("Resource write with null value");
39683943

3969-
if (isFirstRequest() && isStreaming(value)) makeFirstRequest(0);
3944+
if (isFirstRequest() && isStreaming(value)) pingServerBeforeStreaming(0);
39703945

39713946
MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE));
39723947
if (value instanceof OutputStreamSender) {
@@ -3987,7 +3962,7 @@ private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object
39873962

39883963
private Response doPut(Request.Builder requestBldr,
39893964
MultipartBody.Builder multiPart, boolean hasStreamingPart) {
3990-
if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0);
3965+
if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0);
39913966

39923967
requestBldr = requestBldr.put(multiPart.build());
39933968
Response response = sendRequestOnce(requestBldr);
@@ -4007,7 +3982,7 @@ private Request.Builder makePostWebResource(String path, RequestParameters param
40073982

40083983
private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Object value) {
40093984
if (isFirstRequest() && isStreaming(value)) {
4010-
makeFirstRequest(0);
3985+
pingServerBeforeStreaming(0);
40113986
}
40123987

40133988
MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE));
@@ -4034,7 +4009,7 @@ private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Objec
40344009

40354010
private Response doPost(Request.Builder requestBldr,
40364011
MultipartBody.Builder multiPart, boolean hasStreamingPart) {
4037-
if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0);
4012+
if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0);
40384013

40394014
Response response = sendRequestOnce(requestBldr.post(multiPart.build()));
40404015

@@ -4953,17 +4928,11 @@ public InputStream match(QueryDefinition queryDef,
49534928
}
49544929
requestBldr = addTelemetryAgentId(requestBldr);
49554930

4956-
MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE));
4957-
4958-
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
4931+
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
49594932
Response response = null;
49604933
int status = -1;
49614934
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
4962-
try {
4963-
retryContext.sleepIfNeeded();
4964-
} catch (InterruptedException e) {
4965-
// Ignore interruption
4966-
}
4935+
retryContext.sleepIfNeeded();
49674936

49684937
if (queryDef instanceof StructuredQueryDefinition) {
49694938
response = doPost(null, requestBldr, structure);
@@ -5599,14 +5568,32 @@ private void executeRequest(CallResponseImpl responseImpl) {
55995568
boolean hasStreamingPart = hasStreamingPart();
56005569
Consumer<Boolean> resendableConsumer = resendable -> {
56015570
if (hasStreamingPart) {
5602-
checkFirstRequest();
5571+
resetFirstRequestFlag();
56035572
throw new ResourceNotResendableException(
56045573
"Cannot retry request for " + getEndpoint());
56055574
}
56065575
};
56075576

56085577
Function<Request.Builder, Response> sendRequestFunction = requestBldr -> {
5609-
if (isFirstRequest() && hasStreamingPart) makeFirstRequest(callBaseUri, "", 0);
5578+
if (isFirstRequest() && hasStreamingPart) {
5579+
// Ping the server before streaming; if unavailable, wait and retry the ping
5580+
int pingDelay = pingServer(callBaseUri, "", 0);
5581+
int pingRetries = 0;
5582+
int maxPingRetries = 10; // Prevent infinite loop
5583+
while (pingDelay > 0 && pingRetries < maxPingRetries && !Thread.currentThread().isInterrupted()) {
5584+
try {
5585+
Thread.sleep(pingDelay);
5586+
} catch (InterruptedException e) {
5587+
Thread.currentThread().interrupt();
5588+
throw new RuntimeException("Interrupted while waiting to ping server before streaming", e);
5589+
}
5590+
pingRetries++;
5591+
pingDelay = pingServer(callBaseUri, "", 0);
5592+
}
5593+
if (pingRetries >= maxPingRetries) {
5594+
logger.warn("Server still unavailable after {} ping attempts before streaming", maxPingRetries);
5595+
}
5596+
}
56105597
Response response = sendRequestOnce(requestBldr);
56115598
if (isFirstRequest()) setFirstRequest(false);
56125599
return response;

0 commit comments

Comments
 (0)