Skip to content

Make GoogleCloudStorageRetryingInputStream request same generation on resume #127626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
private long currentOffset;
private boolean closed;
private Long lastGeneration;

// Used for testing only
GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException {
Expand Down Expand Up @@ -84,13 +85,22 @@ private InputStream openStream() throws IOException {
return SocketAccess.doPrivilegedIOException(() -> {
final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName());
meteredGet.setReturnRawInputStream(true);
if (lastGeneration != null) {
meteredGet.setGeneration(lastGeneration);
}

if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
if (meteredGet.getRequestHeaders() != null) {
meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end);
}
}
final HttpResponse resp = meteredGet.executeMedia();
// Store the generation of the first response we received, so we can detect
// if the file has changed if we need to resume
if (lastGeneration == null) {
lastGeneration = parseGenerationHeader(resp);
}

final Long contentLength = resp.getHeaders().getContentLength();
InputStream content = resp.getContent();
if (contentLength != null) {
Expand All @@ -107,9 +117,22 @@ private InputStream openStream() throws IOException {
}
} catch (StorageException storageException) {
if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) {
throw addSuppressedExceptions(
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
);
if (lastGeneration != null) {
throw addSuppressedExceptions(
new NoSuchFileException(
"Blob object ["
+ blobId.getName()
+ "] generation ["
+ lastGeneration
+ "] unavailable on resume (contents changed, or object deleted): "
+ storageException.getMessage()
)
);
} else {
throw addSuppressedExceptions(
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
);
}
}
if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
long currentPosition = Math.addExact(start, currentOffset);
Expand All @@ -126,6 +149,24 @@ private InputStream openStream() throws IOException {
}
}

private Long parseGenerationHeader(HttpResponse response) {
final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation");
if (generationHeader != null) {
try {
return Long.parseLong(generationHeader);
} catch (NumberFormatException e) {
final String message = "Unexpected value for x-goog-generation header: " + generationHeader;
logger.warn(message);
assert false : message;
}
} else {
String message = "Missing x-goog-generation header";
logger.warn(message);
assert false : message;
}
return null;
}

// Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream
// We have to implement our own validation logic here
static final class ContentLengthValidatingInputStream extends FilterInputStream {
Expand Down Expand Up @@ -212,7 +253,6 @@ private void ensureOpen() {
}
}

// TODO: check that object did not change when stream is reopened (e.g. based on etag)
private void reopenStreamOrFail(StorageException e) throws IOException {
if (attempt >= maxAttempts) {
throw addSuppressedExceptions(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public void setReturnRawInputStream(boolean b) {
get.setReturnRawInputStream(b);
}

public void setGeneration(Long generation) {
get.setGeneration(generation);
}

public HttpHeaders getRequestHeaders() {
return get.getRequestHeaders();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.apache.http.HttpStatus;
Expand All @@ -45,6 +46,7 @@
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.http.ResponseInjectingHttpHandler;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -56,6 +58,7 @@
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -71,6 +74,7 @@

import static fixture.gcs.TestUtils.createServiceAccount;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.common.io.Streams.readFully;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
Expand All @@ -86,6 +90,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;

@SuppressForbidden(reason = "use a http server")
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
Expand Down Expand Up @@ -212,6 +217,11 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions<?, ?> ser
);
}

@Override
protected void addSuccessfulDownloadHeaders(HttpExchange exchange) {
exchange.getResponseHeaders().add("x-goog-generation", "1");
}

public void testShouldRetryOnConnectionRefused() {
// port 1 should never be open
endpointUrlOverride = "http://127.0.0.1:1";
Expand Down Expand Up @@ -242,6 +252,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
httpServer.createContext(downloadStorageEndpoint(blobContainer, "large_blob_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
addSuccessfulDownloadHeaders(exchange);
final HttpHeaderParser.Range range = getRange(exchange);
final int offset = Math.toIntExact(range.start());
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.toIntExact(Math.min(range.end() + 1, bytes.length)));
Expand Down Expand Up @@ -570,6 +581,54 @@ public void testCompareAndExchangeWhenThrottled() throws IOException {
container.delete(randomPurpose());
}

public void testContentsChangeWhileStreaming() throws IOException {
GoogleCloudStorageHttpHandler handler = new GoogleCloudStorageHttpHandler("bucket");
httpServer.createContext("/", handler);
final int enoughBytesToTriggerChunkedDownload = Math.toIntExact(ByteSizeValue.ofMb(30).getBytes());

final BlobContainer container = createBlobContainer(1, null, null, null, null, null, null);

final String key = randomIdentifier();
byte[] initialValue = randomByteArrayOfLength(enoughBytesToTriggerChunkedDownload);
container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true);

BytesReference reference = readFully(container.readBlob(randomPurpose(), key));
assertEquals(new BytesArray(initialValue), reference);

try (InputStream inputStream = container.readBlob(randomPurpose(), key)) {
// Trigger the first chunk to load
int read = inputStream.read();
assert read != -1;

// Restart the server (this triggers a retry)
restartHttpServer();
httpServer.createContext("/", handler);

// Update the file
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToTriggerChunkedDownload);
container.writeBlob(randomPurpose(), key, new BytesArray(updatedValue), false);

// Read the rest of the stream, it should throw because the contents changed
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
assertThat(
message,
startsWith(
"Blob object ["
+ container.path().buildAsString()
+ key
+ "] generation [1] unavailable on resume (contents changed, or object deleted):"
)
);
}
}

private void restartHttpServer() throws IOException {
InetSocketAddress currentAddress = httpServer.getAddress();
httpServer.stop(0);
httpServer = MockHttpServer.createHttp(currentAddress, 0);
httpServer.start();
}

private HttpHandler safeHandler(HttpHandler handler) {
final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger);
return exchange -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
Expand Down Expand Up @@ -44,8 +42,8 @@
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageHttpHandler implements HttpHandler {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
private static final String GENERATION = "generation";

private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000);
private final MockGcsBlobStore mockGcsBlobStore;
Expand Down Expand Up @@ -82,7 +80,8 @@ public void handle(final HttpExchange exchange) throws IOException {
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o/*", request)) {
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch, generation);
writeBlobVersionAsJson(exchange, blob);
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
Expand Down Expand Up @@ -116,7 +115,8 @@ public void handle(final HttpExchange exchange) throws IOException {
// Download Object https://cloud.google.com/storage/docs/request-body
final String path = exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "");
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch);
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch, generation);
if (blob != null) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
final BytesReference response;
Expand Down Expand Up @@ -144,6 +144,7 @@ public void handle(final HttpExchange exchange) throws IOException {
// we implement "metageneration", at that point we must incorporate both
// See: https://cloud.google.com/storage/docs/metadata#etags
exchange.getResponseHeaders().add("ETag", String.valueOf(blob.generation()));
exchange.getResponseHeaders().add("x-goog-generation", String.valueOf(blob.generation()));
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(statusCode, response.length());
response.writeTo(exchange.getResponseBody());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,31 @@ public int length() {
}
}

BlobVersion getBlob(String path, Long ifGenerationMatch) {
/**
* Get the blob at the specified path
*
* @param path The path
* @param ifGenerationMatch The ifGenerationMatch parameter value (if present)
* @param generation The generation parameter value (if present)
* @return The blob if it exists
* @throws BlobNotFoundException if there is no blob at the path, or its generation does not match the generation parameter
* @throws GcsRestException if the blob's generation does not match the ifGenerationMatch parameter
*/
BlobVersion getBlob(String path, Long ifGenerationMatch, Long generation) {
final BlobVersion blob = blobs.get(path);
if (blob == null) {
throw new BlobNotFoundException(path);
} else {
if (ifGenerationMatch != null) {
if (blob.generation != ifGenerationMatch) {
throw new GcsRestException(
RestStatus.PRECONDITION_FAILED,
"Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation
);
}
}
return blob;
}
if (generation != null && generation != blob.generation) {
throw new BlobNotFoundException(blob.path, blob.generation);
}
if (ifGenerationMatch != null && ifGenerationMatch != blob.generation) {
throw new GcsRestException(
RestStatus.PRECONDITION_FAILED,
"Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation
);
}
return blob;
}

BlobVersion updateBlob(String path, Long ifGenerationMatch, BytesReference contents) {
Expand Down Expand Up @@ -324,6 +334,10 @@ static class BlobNotFoundException extends GcsRestException {
BlobNotFoundException(String path) {
super(RestStatus.NOT_FOUND, "Blob not found: " + path);
}

BlobNotFoundException(String path, long generation) {
super(RestStatus.NOT_FOUND, "Blob not found: " + path + ", generation " + generation);
}
}

static class GcsRestException extends RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public void tearDown() throws Exception {
super.tearDown();
}

/**
* Override to add any headers you expect on a successful download
*/
protected void addSuccessfulDownloadHeaders(HttpExchange exchange) {}

protected abstract String downloadStorageEndpoint(BlobContainer container, String blob);

protected abstract String bytesContentType();
Expand Down Expand Up @@ -118,6 +123,7 @@ public void testReadBlobWithRetries() throws Exception {
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
addSuccessfulDownloadHeaders(exchange);
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart);
Expand Down Expand Up @@ -183,6 +189,7 @@ public void testReadRangeBlobWithRetries() throws Exception {
final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd);
final int length = (effectiveRangeEnd - rangeStart) + 1;
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
addSuccessfulDownloadHeaders(exchange);
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
exchange.getResponseBody().write(bytes, rangeStart, length);
exchange.close();
Expand Down Expand Up @@ -401,6 +408,7 @@ protected int sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws
length = bytes.length - rangeStart;
}
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
addSuccessfulDownloadHeaders(exchange);
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
int minSend = Math.min(0, length - 1);
final int bytesToSend = randomIntBetween(minSend, length - 1);
Expand Down