Skip to content

Make GoogleCloudStorageRetryingInputStream request same generation on resume #127626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6238090
Specify generation on resume
nicktindall May 2, 2025
e94b246
Add test for contents changed on resume
nicktindall May 2, 2025
bcab09b
Update docs/changelog/127626.yaml
nicktindall May 2, 2025
98a80c3
Assert we get a generation header
nicktindall May 2, 2025
08e1b95
Delete docs/changelog/127626.yaml
nicktindall May 2, 2025
09c6e6e
Add random path when present, don't assert generation header is present
nicktindall May 2, 2025
576109c
Update test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlo…
nicktindall May 4, 2025
4feb87d
Update test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlo…
nicktindall May 4, 2025
0886ba7
Reduce indenting
nicktindall May 4, 2025
cdfffa5
Assert that x-goog-generation is present
nicktindall May 4, 2025
d3c0e6a
Assert that x-goog-generation is present
nicktindall May 5, 2025
f3371e4
Log a warning if we see a missing or malformed generation header
nicktindall May 5, 2025
89b926a
Merge branch 'main' into ES-11432_check_generation_on_resume
nicktindall May 17, 2025
fca2161
Add x-goog-generation header to test
nicktindall May 18, 2025
2d2960c
Randomise x-goog-generation
nicktindall May 20, 2025
ce03dae
Tidy, improve naming
nicktindall May 20, 2025
eebcb0c
Test in third-party test
nicktindall May 21, 2025
4fd9b81
Merge branch 'main' into ES-11432_check_generation_on_resume
nicktindall May 21, 2025
a8826fb
Clean proxied repo after test
nicktindall May 21, 2025
aeea9f6
Try SO_REUSEADDR to allow restarting proxy
nicktindall May 21, 2025
03a9b8d
Make httpServer field volatile for thread-safe restarts
nicktindall May 22, 2025
289a594
Merge branch 'main' into ES-11432_check_generation_on_resume
nicktindall May 22, 2025
a60d7e0
Include repoName in base_path
nicktindall May 22, 2025
b0f9298
Add logging in proxy
nicktindall May 22, 2025
caaf45c
Don't use proxy to test resume
nicktindall May 22, 2025
b800710
Merge remote-tracking branch 'origin/main' into ES-11432_check_genera…
nicktindall May 22, 2025
0f8796b
Don't use proxy to test resume
nicktindall May 22, 2025
16d6c39
Merge remote-tracking branch 'origin/ES-11432_check_generation_on_res…
nicktindall May 22, 2025
a70e2ad
Don't use proxy to test resume
nicktindall May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
private long currentOffset;
private boolean closed;
private Long lastGeneration;

// Used for testing only
GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException {
Expand Down Expand Up @@ -84,13 +85,22 @@ private InputStream openStream() throws IOException {
return SocketAccess.doPrivilegedIOException(() -> {
final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName());
meteredGet.setReturnRawInputStream(true);
if (lastGeneration != null) {
meteredGet.setGeneration(lastGeneration);
}

if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
if (meteredGet.getRequestHeaders() != null) {
meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end);
}
}
final HttpResponse resp = meteredGet.executeMedia();
// Store the generation of the first response we received, so we can detect
// if the file has changed if we need to resume
if (lastGeneration == null) {
lastGeneration = parseGenerationHeader(resp);
}

final Long contentLength = resp.getHeaders().getContentLength();
InputStream content = resp.getContent();
if (contentLength != null) {
Expand All @@ -107,9 +117,22 @@ private InputStream openStream() throws IOException {
}
} catch (StorageException storageException) {
if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) {
throw addSuppressedExceptions(
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
);
if (lastGeneration != null) {
throw addSuppressedExceptions(
new NoSuchFileException(
"Blob object ["
+ blobId.getName()
+ "] generation ["
+ lastGeneration
+ "] unavailable on resume (contents changed, or object deleted): "
+ storageException.getMessage()
)
);
} else {
throw addSuppressedExceptions(
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
);
}
}
if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
long currentPosition = Math.addExact(start, currentOffset);
Expand All @@ -126,6 +149,19 @@ private InputStream openStream() throws IOException {
}
}

private Long parseGenerationHeader(HttpResponse response) {
final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation");
if (generationHeader != null) {
try {
return Long.parseLong(generationHeader);
} catch (NumberFormatException e) {
assert false : "Unexpected value for x-goog-generation header: " + generationHeader;
return null;
}
}
return null;
}

// Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream
// We have to implement our own validation logic here
static final class ContentLengthValidatingInputStream extends FilterInputStream {
Expand Down Expand Up @@ -212,7 +248,6 @@ private void ensureOpen() {
}
}

// TODO: check that object did not change when stream is reopened (e.g. based on etag)
private void reopenStreamOrFail(StorageException e) throws IOException {
if (attempt >= maxAttempts) {
throw addSuppressedExceptions(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public void setReturnRawInputStream(boolean b) {
get.setReturnRawInputStream(b);
}

public void setGeneration(Long generation) {
get.setGeneration(generation);
}

public HttpHeaders getRequestHeaders() {
return get.getRequestHeaders();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.http.ResponseInjectingHttpHandler;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -56,6 +57,7 @@
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -71,6 +73,7 @@

import static fixture.gcs.TestUtils.createServiceAccount;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.common.io.Streams.readFully;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
Expand All @@ -86,6 +89,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;

@SuppressForbidden(reason = "use a http server")
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
Expand Down Expand Up @@ -570,6 +574,54 @@ public void testCompareAndExchangeWhenThrottled() throws IOException {
container.delete(randomPurpose());
}

public void testContentsChangeWhileStreaming() throws IOException {
GoogleCloudStorageHttpHandler handler = new GoogleCloudStorageHttpHandler("bucket");
httpServer.createContext("/", handler);
final int enoughBytesToTriggerChunkedDownload = Math.toIntExact(ByteSizeValue.ofMb(30).getBytes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind adding a comment to explain briefly how the size of 30mb is decided?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming is actually incorrect (I don't think the current client does any chunking for downloads, only uploads). We just need to ensure the blob is large enough that it doesn't get entirely buffered when we read the first byte. This is probably a TCP level thing because as far as I can see the google or Java HTTP infrastructure merely relies on the underlying JVM/OS behaviour. I did some experiments locally and 2MB is usually enough to exceed the buffer, so 30M should be fairly conservative.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ce03dae


final BlobContainer container = createBlobContainer(1, null, null, null, null, null, null);

final String key = randomIdentifier();
byte[] initialValue = randomByteArrayOfLength(enoughBytesToTriggerChunkedDownload);
container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true);

BytesReference reference = readFully(container.readBlob(randomPurpose(), key));
assertEquals(new BytesArray(initialValue), reference);

try (InputStream inputStream = container.readBlob(randomPurpose(), key)) {
// Trigger the first chunk to load
int read = inputStream.read();
assert read != -1;

// Restart the server (this triggers a retry)
restartHttpServer();
httpServer.createContext("/", handler);

// Update the file
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToTriggerChunkedDownload);
container.writeBlob(randomPurpose(), key, new BytesArray(updatedValue), false);

// Read the rest of the stream, it should throw because the contents changed
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
assertThat(
message,
startsWith(
"Blob object ["
+ container.path().buildAsString()
+ key
+ "] generation [1] unavailable on resume (contents changed, or object deleted):"
)
);
}
}

private void restartHttpServer() throws IOException {
InetSocketAddress currentAddress = httpServer.getAddress();
httpServer.stop(0);
httpServer = MockHttpServer.createHttp(currentAddress, 0);
Comment on lines +627 to +629
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether rebinding to the same address can be flaky sometimes. We can have it as is for now and see how it goes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be OK as long as we have SO_REUSEADDR set? It seemed to work locally so perhaps that's the default?

httpServer.start();
}

private HttpHandler safeHandler(HttpHandler handler) {
final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger);
return exchange -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
Expand Down Expand Up @@ -44,8 +42,8 @@
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageHttpHandler implements HttpHandler {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
private static final String GENERATION = "generation";

private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000);
private final MockGcsBlobStore mockGcsBlobStore;
Expand Down Expand Up @@ -82,7 +80,8 @@ public void handle(final HttpExchange exchange) throws IOException {
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o/*", request)) {
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch, generation);
writeBlobVersionAsJson(exchange, blob);
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
Expand Down Expand Up @@ -116,7 +115,8 @@ public void handle(final HttpExchange exchange) throws IOException {
// Download Object https://cloud.google.com/storage/docs/request-body
final String path = exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "");
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch);
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch, generation);
if (blob != null) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
final BytesReference response;
Expand Down Expand Up @@ -144,6 +144,7 @@ public void handle(final HttpExchange exchange) throws IOException {
// we implement "metageneration", at that point we must incorporate both
// See: https://cloud.google.com/storage/docs/metadata#etags
exchange.getResponseHeaders().add("ETag", String.valueOf(blob.generation()));
exchange.getResponseHeaders().add("X-Goog-Generation", String.valueOf(blob.generation()));
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(statusCode, response.length());
response.writeTo(exchange.getResponseBody());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,29 @@ public int length() {
}
}

BlobVersion getBlob(String path, Long ifGenerationMatch) {
/**
* Get the blob at the specified path
*
* @param path The path
* @param ifGenerationMatch The ifGenerationMatch parameter value (if present)
* @param generation The generation parameter value (if present)
* @return The blob if it exists
* @throws BlobNotFoundException if there is no blob at the path, or it's generation does not match the generation parameter
* @throws GcsRestException if the blob's generation does not match the ifGenerationMatch parameter
*/
BlobVersion getBlob(String path, Long ifGenerationMatch, Long generation) {
final BlobVersion blob = blobs.get(path);
if (blob == null) {
throw new BlobNotFoundException(path);
} else {
if (ifGenerationMatch != null) {
if (blob.generation != ifGenerationMatch) {
throw new GcsRestException(
RestStatus.PRECONDITION_FAILED,
"Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation
);
}
if (generation != null && generation != blob.generation) {
throw new BlobNotFoundException(blob.path, blob.generation);
}
if (ifGenerationMatch != null && blob.generation != ifGenerationMatch) {
throw new GcsRestException(
RestStatus.PRECONDITION_FAILED,
"Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation
);
}
return blob;
}
Expand Down Expand Up @@ -324,6 +335,10 @@ static class BlobNotFoundException extends GcsRestException {
BlobNotFoundException(String path) {
super(RestStatus.NOT_FOUND, "Blob not found: " + path);
}

BlobNotFoundException(String path, long generation) {
super(RestStatus.NOT_FOUND, "Blob not found: " + path + ", generation " + generation);
}
}

static class GcsRestException extends RuntimeException {
Expand Down