Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add automatic batch reduction and throughput tests #6

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
public class BigQueryStorageWriteApiErrorResponses {

private static final Logger logger = LoggerFactory.getLogger(BigQueryStorageWriteApiErrorResponses.class);
private static final int INVALID_ARGUMENT_CODE = 3;
private static final String PERMISSION_DENIED = "PERMISSION_DENIED";
private static final String NOT_EXIST = "(or it may not exist)";
private static final String NOT_FOUND = "Not found: table";
private static final String TABLE_IS_DELETED = "Table is deleted";
private static final String MESSAGE_TOO_LARGE = "MessageSize is too large";
private static final String[] retriableCodes = {Code.INTERNAL.name(), Code.ABORTED.name(), Code.CANCELLED.name()};
/*
Below list is taken from :
Expand All @@ -50,7 +49,7 @@ public class BigQueryStorageWriteApiErrorResponses {
* @return Returns true if message contains table missing substrings
*/
public static boolean isTableMissing(String errorMessage) {
return (errorMessage.contains(PERMISSION_DENIED) && errorMessage.contains(NOT_EXIST))
return (errorMessage.contains(Code.PERMISSION_DENIED.name()) && errorMessage.contains(NOT_EXIST))
|| (errorMessage.contains(StorageError.StorageErrorCode.TABLE_NOT_FOUND.name()))
|| errorMessage.contains(NOT_FOUND)
|| errorMessage.contains(Code.NOT_FOUND.name())
Expand All @@ -75,7 +74,6 @@ public static boolean isRetriableError(String errorMessage) {
*/
public static boolean isMalformedRequest(String errorMessage) {
return errorMessage.contains(Code.INVALID_ARGUMENT.name());

}

/**
Expand Down Expand Up @@ -126,4 +124,9 @@ public static boolean isNonRetriableStorageError(Exception exception) {
return nonRetriableStreamFailureCodes.contains(errorCode);
}

public static boolean isMessageTooLargeError(String errorMessage) {
return isMalformedRequest(errorMessage)
&& errorMessage.contains(MESSAGE_TOO_LARGE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ public RecordBatches(List<E> records) {
}

public List<E> currentBatch() {
int size = Math.max(records.size() - batchStart, batchSize);
return records.subList(batchStart, size);
int size = Math.min(records.size() - batchStart, batchSize);
return records.subList(batchStart, batchStart + size);
}

public void advanceToNextBatch() {
batchStart += batchSize;
}

public void reduceBatchSize() {
if (!canReduceBatchSize()) {
if (batchSize <= 1) {
throw new IllegalStateException("Cannot reduce batch size any further");
}
batchSize /= 2;
Expand All @@ -35,8 +35,4 @@ public boolean completed() {
return batchStart >= records.size();
}

public boolean canReduceBatchSize() {
return batchSize <= 1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,78 +105,160 @@ public void initializeAndWriteRecords(TableName tableName, List<ConvertedRecord>
logger.debug("Sending {} records to write Api Application stream {}", rows.size(), streamName);
RecordBatches<ConvertedRecord> batches = new RecordBatches<>(rows);
StreamWriter writer = streamWriter(tableName, streamName, rows);
do {
try {
List<ConvertedRecord> batch = batches.currentBatch();
JSONArray jsonRecords = getJsonRecords(batch);
logger.trace("Sending records to Storage API writer for batch load");
ApiFuture<AppendRowsResponse> response = writer.appendRows(jsonRecords);
AppendRowsResponse writeResult = response.get();
logger.trace("Received response from Storage API writer batch");

if (writeResult.hasUpdatedSchema()) {
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
if (!canAttemptSchemaUpdate()) {
throw new BigQueryStorageWriteApiConnectException("Connector is not configured to perform schema updates.");
while (!batches.completed()) {
List<ConvertedRecord> batch = batches.currentBatch();

while (!batch.isEmpty()) {
try {
writeBatch(writer, batch, retryHandler, tableName);
batch = Collections.emptyList(); // Can't do batch.clear(); it'll mess with the batch tracking logic in RecordBatches
} catch (RetryException e) {
retryHandler.maybeRetry("write to table " + tableName);
if (e.getMessage() != null) {
logger.warn(e.getMessage() + " Retry attempt " + retryHandler.getAttempt());
}
retryHandler.attemptTableOperation(schemaManager::updateSchema);
} else if (writeResult.hasError()) {
Status errorStatus = writeResult.getError();
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, writeResult.getError().getMessage());
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage));
if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(errorMessage)) {
rows = maybeHandleDlqRoutingAndFilterRecords(rows, convertToMap(writeResult.getRowErrorsList()), tableName.getTable());
if (rows.isEmpty()) {
writer.onSuccess();
return;
} catch (BatchTooLargeException e) {
if (batch.size() <= 1) {
Map<Integer, String> rowErrorMapping = Collections.singletonMap(
0, e.getMessage()
);
batch = maybeHandleDlqRoutingAndFilterRecords(batch, rowErrorMapping, tableName.getTable());
if (!batch.isEmpty()) {
retryHandler.maybeRetry("write to table " + tableName);
}
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(errorStatus.getMessage())) {
failTask(retryHandler.getMostRecentException());
} else {
int previousSize = batch.size();
batches.reduceBatchSize();
batch = batches.currentBatch();
logger.debug("Reducing batch size for table {} from {} to {}", tableName, previousSize, batch.size());
}
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
} else {
if (!writeResult.hasAppendResult()) {
logger.warn(
"Write result did not report any errors, but also did not succeed. "
+ "This may be indicative of a bug in the BigQuery Java client library or back end; "
+ "please report it to the maintainers of the connector to investigate."
);
} catch (MalformedRowsException e) {
batch = maybeHandleDlqRoutingAndFilterRecords(batch, e.getRowErrorMapping(), tableName.getTable());
if (!batch.isEmpty()) {
// TODO: Does this actually make sense? Should we count this as part of our retry logic?
// As long as we're guaranteed that the number of rows in the batch is decreasing, it
// may make sense to skip the maybeRetry invocation
retryHandler.maybeRetry("write to table " + tableName);
}
logger.trace("Append call completed successfully on stream {}", streamName);
writer.onSuccess();
return;
}
} catch (BigQueryStorageWriteApiConnectException exception) {
throw exception;
} catch (Exception e) {
String message = e.getMessage();
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));

if (shouldHandleSchemaMismatch(e)) {
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
retryHandler.attemptTableOperation(schemaManager::updateSchema);
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
rows = maybeHandleDlqRoutingAndFilterRecords(rows, getRowErrorMapping(e), tableName.getTable());
if (rows.isEmpty()) {
writer.onSuccess();
return;
}
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
writer.refresh();
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
retryHandler.attemptTableOperation(schemaManager::createTable);
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())
&& BigQueryStorageWriteApiErrorResponses.isNonRetriableStorageError(e)
) {
}

batches.advanceToNextBatch();
}

writer.onSuccess();
}

private void writeBatch(
StreamWriter writer,
List<ConvertedRecord> batch,
StorageWriteApiRetryHandler retryHandler,
TableName tableName
) throws BatchTooLargeException, MalformedRowsException, RetryException {
try {
JSONArray jsonRecords = getJsonRecords(batch);
logger.trace("Sending records to Storage API writer for batch load");
ApiFuture<AppendRowsResponse> response = writer.appendRows(jsonRecords);
AppendRowsResponse writeResult = response.get();
logger.trace("Received response from Storage API writer batch");

if (writeResult.hasUpdatedSchema()) {
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
if (!canAttemptSchemaUpdate()) {
throw new BigQueryStorageWriteApiConnectException("Connector is not configured to perform schema updates.");
}
retryHandler.attemptTableOperation(schemaManager::updateSchema);
throw new RetryException();
} else if (writeResult.hasError()) {
Status errorStatus = writeResult.getError();
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, writeResult.getError().getMessage());
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage));
if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(errorMessage)) {
throw new MalformedRowsException(convertToMap(writeResult.getRowErrorsList()));
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(errorStatus.getMessage())) {
failTask(retryHandler.getMostRecentException());
}
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
throw new RetryException(errorMessage);
} else {
if (!writeResult.hasAppendResult()) {
logger.warn(
"Write result did not report any errors, but also did not succeed. "
+ "This may be indicative of a bug in the BigQuery Java client library or back end; "
+ "please report it to the maintainers of the connector to investigate."
);
}
logger.trace("Append call completed successfully on stream {}", writer.streamName());
}
} catch (BigQueryStorageWriteApiConnectException | BatchWriteException exception) {
throw exception;
} catch (Exception e) {
String message = e.getMessage();
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));

if (shouldHandleSchemaMismatch(e)) {
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
retryHandler.attemptTableOperation(schemaManager::updateSchema);
} else if (BigQueryStorageWriteApiErrorResponses.isMessageTooLargeError(message)) {
throw new BatchTooLargeException(errorMessage);
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
throw new MalformedRowsException(getRowErrorMapping(e));
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
writer.refresh();
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
retryHandler.attemptTableOperation(schemaManager::createTable);
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())
&& BigQueryStorageWriteApiErrorResponses.isNonRetriableStorageError(e)
) {
failTask(retryHandler.getMostRecentException());
}
} while (retryHandler.maybeRetry());
throw new BigQueryStorageWriteApiConnectException(
String.format("Exceeded %s attempts to write to table %s ", retryHandler.getAttempt(), tableName),
retryHandler.getMostRecentException());
throw new RetryException(errorMessage);
}
}

private abstract static class BatchWriteException extends Exception {

protected BatchWriteException() {
super();
}

protected BatchWriteException(String message) {
super(message);
}

}

private static class BatchTooLargeException extends BatchWriteException {

public BatchTooLargeException(String message) {
super(message);
}

}

private static class MalformedRowsException extends BatchWriteException {

private final Map<Integer, String> rowErrorMapping;

public MalformedRowsException(Map<Integer, String> rowErrorMapping) {
this.rowErrorMapping = rowErrorMapping;
}

public Map<Integer, String> getRowErrorMapping() {
return rowErrorMapping;
}

}

private static class RetryException extends BatchWriteException {

public RetryException() {
super();
}

public RetryException(String message) {
super(message);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,8 @@ ApplicationStream createApplicationStream(String tableName, List<ConvertedRecord
}
logger.warn(baseErrorMessage + " Retry attempt {}", retryHandler.getAttempt());
}
} while (retryHandler.maybeRetry());
throw new BigQueryStorageWriteApiConnectException(
String.format(
"Exceeded %s attempts to create Application stream on table %s ",
retryHandler.getAttempt(), tableName),
retryHandler.getMostRecentException());
retryHandler.maybeRetry("create application stream on table " + tableName);
} while (true);
}

/**
Expand Down Expand Up @@ -391,6 +387,12 @@ public void onSuccess() {
public void refresh() {
// No-op; handled internally by ApplicationStream class
}

@Override
public String streamName() {
return streamName;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,8 @@ JsonStreamWriter getDefaultStream(TableName table, List<ConvertedRecord> rows) {
}
logger.warn(baseErrorMessage + " Retry attempt {}", retryHandler.getAttempt());
}
} while (retryHandler.maybeRetry());
throw new BigQueryStorageWriteApiConnectException(
String.format(
"Exceeded %s attempts to create Default stream writer on table %s ",
retryHandler.getAttempt(), tableName),
retryHandler.getMostRecentException());
retryHandler.maybeRetry("create default stream on table " + tableName);
} while (true);
});
}

Expand Down Expand Up @@ -144,6 +140,11 @@ public void refresh() {
closeAndDelete(tableName.toString());
jsonStreamWriter = null;
}

@Override
public String streamName() {
return StorageWriteApiWriter.DEFAULT;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,19 @@ private void waitRandomTime() throws InterruptedException {
time.sleep(userConfiguredRetryWait + additionalWait + random.nextInt(1000));
}

public boolean maybeRetry() {
public void maybeRetry(String operation) {
if (currentAttempt < (userConfiguredRetry + additionalRetries)) {
currentAttempt++;
try {
waitRandomTime();
} catch (InterruptedException e) {
logger.warn("Thread interrupted while waiting for random time");
}
return true;
} else {
return false;
throw new BigQueryStorageWriteApiConnectException(
String.format("Exceeded %s attempts to %s ", getAttempt(), operation),
getMostRecentException()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ ApiFuture<AppendRowsResponse> appendRows(
*/
void onSuccess();

String streamName();

}
Loading
Loading