Skip to content

Commit b10b35f

Browse files
authored
Fix S3RepositoryAnalysisRestIT (#126593)
- Translate a 404 during a multipart copy into a `FileNotFoundException` - Use multiple threads in `S3HttpHandler` to avoid `CopyObject`/`PutObject` deadlock Closes #126576
1 parent 78ac5d5 commit b10b35f

File tree

4 files changed

+93
-49
lines changed

4 files changed

+93
-49
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.repositories.s3;
1111

1212
import com.amazonaws.AmazonClientException;
13+
import com.amazonaws.AmazonServiceException;
1314
import com.amazonaws.services.s3.AmazonS3;
1415
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
1516
import com.amazonaws.services.s3.model.AmazonS3Exception;
@@ -376,12 +377,11 @@ public void copyBlob(
376377
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().copyObject(copyRequest); });
377378
}
378379
}
379-
} catch (final AmazonClientException e) {
380-
if (e instanceof AmazonS3Exception amazonS3Exception) {
381-
if (amazonS3Exception.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
382-
final var sourceKey = s3SourceBlobContainer.buildKey(sourceBlobName);
383-
throw new NoSuchFileException("Copy source [" + sourceKey + "] not found: " + amazonS3Exception.getMessage());
384-
}
380+
} catch (final Exception e) {
381+
if (e instanceof AmazonServiceException ase && ase.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
382+
throw new NoSuchFileException(
383+
"Copy source [" + s3SourceBlobContainer.buildKey(sourceBlobName) + "] not found: " + ase.getMessage()
384+
);
385385
}
386386
throw new IOException("Unable to copy object [" + blobName + "] from [" + sourceBlobContainer + "][" + sourceBlobName + "]", e);
387387
}
@@ -618,8 +618,10 @@ private void executeMultipart(
618618
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
619619
}
620620
success = true;
621-
622-
} catch (final AmazonClientException e) {
621+
} catch (final Exception e) {
622+
if (e instanceof AmazonServiceException ase && ase.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
623+
throw new NoSuchFileException(blobName, null, e.getMessage());
624+
}
623625
throw new IOException("Unable to upload or copy object [" + blobName + "] using multipart upload", e);
624626
} finally {
625627
if ((success == false) && Strings.hasLength(uploadId.get())) {

muted-tests.yml

-3
Original file line numberDiff line numberDiff line change
@@ -420,9 +420,6 @@ tests:
420420
- class: org.elasticsearch.repositories.blobstore.testkit.rest.SnapshotRepoTestKitClientYamlTestSuiteIT
421421
method: test {p0=/10_analyze/Analysis without details}
422422
issue: https://github.com/elastic/elasticsearch/issues/126569
423-
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.S3RepositoryAnalysisRestIT
424-
method: testRepositoryAnalysis
425-
issue: https://github.com/elastic/elasticsearch/issues/126576
426423
- class: org.elasticsearch.xpack.esql.action.ForkIT
427424
method: testWithStatsSimple
428425
issue: https://github.com/elastic/elasticsearch/issues/126607

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java

+25
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,18 @@
1313
import com.sun.net.httpserver.HttpServer;
1414

1515
import org.elasticsearch.ExceptionsHelper;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.concurrent.EsExecutors;
18+
import org.elasticsearch.common.util.concurrent.ThreadContext;
19+
import org.elasticsearch.logging.LogManager;
20+
import org.elasticsearch.logging.Logger;
21+
import org.elasticsearch.threadpool.ThreadPool;
1622
import org.junit.rules.ExternalResource;
1723

1824
import java.io.IOException;
1925
import java.util.Objects;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.TimeUnit;
2028
import java.util.function.BiPredicate;
2129

2230
import static fixture.aws.AwsCredentialsUtils.ANY_REGION;
@@ -26,7 +34,10 @@
2634

2735
public class S3HttpFixture extends ExternalResource {
2836

37+
private static final Logger logger = LogManager.getLogger(S3HttpFixture.class);
38+
2939
private HttpServer server;
40+
private ExecutorService executorService;
3041

3142
private final boolean enabled;
3243
private final String bucket;
@@ -71,16 +82,30 @@ public void stop(int delay) {
7182

7283
protected void before() throws Throwable {
7384
if (enabled) {
85+
this.executorService = EsExecutors.newScaling(
86+
"s3-http-fixture",
87+
1,
88+
100,
89+
30,
90+
TimeUnit.SECONDS,
91+
true,
92+
EsExecutors.daemonThreadFactory("s3-http-fixture"),
93+
new ThreadContext(Settings.EMPTY)
94+
);
95+
7496
this.server = HttpServer.create(getLocalFixtureAddress(), 0);
7597
this.server.createContext("/", Objects.requireNonNull(createHandler()));
98+
this.server.setExecutor(executorService);
7699
server.start();
100+
logger.info("running S3HttpFixture at " + getAddress());
77101
}
78102
}
79103

80104
@Override
81105
protected void after() {
82106
if (enabled) {
83107
stop(0);
108+
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
84109
}
85110
}
86111
}

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

+58-38
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import static org.elasticsearch.test.fixture.HttpHeaderParser.parseRangeHeader;
5353
import static org.junit.Assert.assertEquals;
5454
import static org.junit.Assert.assertNotNull;
55+
import static org.junit.Assert.assertNull;
5556
import static org.w3c.dom.Node.ELEMENT_NODE;
5657

5758
/**
@@ -121,9 +122,11 @@ public void handle(final HttpExchange exchange) throws IOException {
121122
uploadsList.append("<MaxUploads>10000</MaxUploads>");
122123
uploadsList.append("<IsTruncated>false</IsTruncated>");
123124

124-
for (final var multipartUpload : uploads.values()) {
125-
if (multipartUpload.getPath().startsWith(prefix)) {
126-
multipartUpload.appendXml(uploadsList);
125+
synchronized (uploads) {
126+
for (final var multipartUpload : uploads.values()) {
127+
if (multipartUpload.getPath().startsWith(prefix)) {
128+
multipartUpload.appendXml(uploadsList);
129+
}
127130
}
128131
}
129132

@@ -135,9 +138,7 @@ public void handle(final HttpExchange exchange) throws IOException {
135138
exchange.getResponseBody().write(response);
136139

137140
} else if (request.isInitiateMultipartUploadRequest()) {
138-
final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), request.path().substring(bucket.length() + 2));
139-
uploads.put(upload.getUploadId(), upload);
140-
141+
final var upload = putUpload(request.path().substring(bucket.length() + 2));
141142
final var uploadResult = new StringBuilder();
142143
uploadResult.append("<?xml version='1.0' encoding='UTF-8'?>");
143144
uploadResult.append("<InitiateMultipartUploadResult>");
@@ -152,7 +153,7 @@ public void handle(final HttpExchange exchange) throws IOException {
152153
exchange.getResponseBody().write(response);
153154

154155
} else if (request.isUploadPartRequest()) {
155-
final var upload = uploads.get(request.getQueryParamOnce("uploadId"));
156+
final var upload = getUpload(request.getQueryParamOnce("uploadId"));
156157
if (upload == null) {
157158
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
158159
} else {
@@ -187,42 +188,45 @@ public void handle(final HttpExchange exchange) throws IOException {
187188
}
188189

189190
} else if (request.isCompleteMultipartUploadRequest()) {
190-
final var upload = uploads.remove(request.getQueryParamOnce("uploadId"));
191-
if (upload == null) {
192-
if (Randomness.get().nextBoolean()) {
193-
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
191+
final byte[] responseBody;
192+
synchronized (uploads) {
193+
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
194+
if (upload == null) {
195+
if (Randomness.get().nextBoolean()) {
196+
responseBody = null;
197+
} else {
198+
responseBody = """
199+
<?xml version="1.0" encoding="UTF-8"?>
200+
<Error>
201+
<Code>NoSuchUpload</Code>
202+
<Message>No such upload</Message>
203+
<RequestId>test-request-id</RequestId>
204+
<HostId>test-host-id</HostId>
205+
</Error>""".getBytes(StandardCharsets.UTF_8);
206+
}
194207
} else {
195-
byte[] response = ("""
196-
<?xml version="1.0" encoding="UTF-8"?>
197-
<Error>
198-
<Code>NoSuchUpload</Code>
199-
<Message>No such upload</Message>
200-
<RequestId>test-request-id</RequestId>
201-
<HostId>test-host-id</HostId>
202-
</Error>""").getBytes(StandardCharsets.UTF_8);
203-
exchange.getResponseHeaders().add("Content-Type", "application/xml");
204-
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
205-
exchange.getResponseBody().write(response);
208+
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));
209+
blobs.put(request.path(), blobContents);
210+
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
211+
+ "<CompleteMultipartUploadResult>\n"
212+
+ "<Bucket>"
213+
+ bucket
214+
+ "</Bucket>\n"
215+
+ "<Key>"
216+
+ request.path()
217+
+ "</Key>\n"
218+
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
206219
}
220+
}
221+
if (responseBody == null) {
222+
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
207223
} else {
208-
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));
209-
blobs.put(request.path(), blobContents);
210-
211-
byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
212-
+ "<CompleteMultipartUploadResult>\n"
213-
+ "<Bucket>"
214-
+ bucket
215-
+ "</Bucket>\n"
216-
+ "<Key>"
217-
+ request.path()
218-
+ "</Key>\n"
219-
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
220224
exchange.getResponseHeaders().add("Content-Type", "application/xml");
221-
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
222-
exchange.getResponseBody().write(response);
225+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBody.length);
226+
exchange.getResponseBody().write(responseBody);
223227
}
224228
} else if (request.isAbortMultipartUploadRequest()) {
225-
final var upload = uploads.remove(request.getQueryParamOnce("uploadId"));
229+
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
226230
exchange.sendResponseHeaders((upload == null ? RestStatus.NOT_FOUND : RestStatus.NO_CONTENT).getStatus(), -1);
227231

228232
} else if (request.isPutObjectRequest()) {
@@ -521,8 +525,24 @@ private static HttpHeaderParser.Range parsePartRange(final HttpExchange exchange
521525
return parseRangeHeader(sourceRangeHeaders.getFirst());
522526
}
523527

528+
MultipartUpload putUpload(String path) {
529+
final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), path);
530+
synchronized (uploads) {
531+
assertNull("upload " + upload.getUploadId() + " should not exist", uploads.put(upload.getUploadId(), upload));
532+
return upload;
533+
}
534+
}
535+
524536
MultipartUpload getUpload(String uploadId) {
525-
return uploads.get(uploadId);
537+
synchronized (uploads) {
538+
return uploads.get(uploadId);
539+
}
540+
}
541+
542+
MultipartUpload removeUpload(String uploadId) {
543+
synchronized (uploads) {
544+
return uploads.remove(uploadId);
545+
}
526546
}
527547

528548
public S3Request parseRequest(HttpExchange exchange) {

0 commit comments

Comments
 (0)