Skip to content

Commit c9f581d

Browse files
authored
Merge pull request #6 from Aiven-Open/storage-write-api-throughput
Add automatic batch reduction and throughput tests
2 parents 0f454b2 + 5b718f5 commit c9f581d

10 files changed

+369
-113
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryStorageWriteApiErrorResponses.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919
public class BigQueryStorageWriteApiErrorResponses {
2020

2121
private static final Logger logger = LoggerFactory.getLogger(BigQueryStorageWriteApiErrorResponses.class);
22-
private static final int INVALID_ARGUMENT_CODE = 3;
23-
private static final String PERMISSION_DENIED = "PERMISSION_DENIED";
2422
private static final String NOT_EXIST = "(or it may not exist)";
2523
private static final String NOT_FOUND = "Not found: table";
2624
private static final String TABLE_IS_DELETED = "Table is deleted";
25+
private static final String MESSAGE_TOO_LARGE = "MessageSize is too large";
2726
private static final String[] retriableCodes = {Code.INTERNAL.name(), Code.ABORTED.name(), Code.CANCELLED.name()};
2827
/*
2928
Below list is taken from :
@@ -50,7 +49,7 @@ public class BigQueryStorageWriteApiErrorResponses {
5049
* @return Returns true if message contains table missing substrings
5150
*/
5251
public static boolean isTableMissing(String errorMessage) {
53-
return (errorMessage.contains(PERMISSION_DENIED) && errorMessage.contains(NOT_EXIST))
52+
return (errorMessage.contains(Code.PERMISSION_DENIED.name()) && errorMessage.contains(NOT_EXIST))
5453
|| (errorMessage.contains(StorageError.StorageErrorCode.TABLE_NOT_FOUND.name()))
5554
|| errorMessage.contains(NOT_FOUND)
5655
|| errorMessage.contains(Code.NOT_FOUND.name())
@@ -75,7 +74,6 @@ public static boolean isRetriableError(String errorMessage) {
7574
*/
7675
public static boolean isMalformedRequest(String errorMessage) {
7776
return errorMessage.contains(Code.INVALID_ARGUMENT.name());
78-
7977
}
8078

8179
/**
@@ -126,4 +124,9 @@ public static boolean isNonRetriableStorageError(Exception exception) {
126124
return nonRetriableStreamFailureCodes.contains(errorCode);
127125
}
128126

127+
public static boolean isMessageTooLargeError(String errorMessage) {
128+
return isMalformedRequest(errorMessage)
129+
&& errorMessage.contains(MESSAGE_TOO_LARGE);
130+
}
131+
129132
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/RecordBatches.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ public RecordBatches(List<E> records) {
1616
}
1717

1818
public List<E> currentBatch() {
19-
int size = Math.max(records.size() - batchStart, batchSize);
20-
return records.subList(batchStart, size);
19+
int size = Math.min(records.size() - batchStart, batchSize);
20+
return records.subList(batchStart, batchStart + size);
2121
}
2222

2323
public void advanceToNextBatch() {
2424
batchStart += batchSize;
2525
}
2626

2727
public void reduceBatchSize() {
28-
if (!canReduceBatchSize()) {
28+
if (batchSize <= 1) {
2929
throw new IllegalStateException("Cannot reduce batch size any further");
3030
}
3131
batchSize /= 2;
@@ -35,8 +35,4 @@ public boolean completed() {
3535
return batchStart >= records.size();
3636
}
3737

38-
public boolean canReduceBatchSize() {
39-
return batchSize <= 1;
40-
}
41-
4238
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiBase.java

+146-64
Original file line numberDiff line numberDiff line change
@@ -105,78 +105,160 @@ public void initializeAndWriteRecords(TableName tableName, List<ConvertedRecord>
105105
logger.debug("Sending {} records to write Api Application stream {}", rows.size(), streamName);
106106
RecordBatches<ConvertedRecord> batches = new RecordBatches<>(rows);
107107
StreamWriter writer = streamWriter(tableName, streamName, rows);
108-
do {
109-
try {
110-
List<ConvertedRecord> batch = batches.currentBatch();
111-
JSONArray jsonRecords = getJsonRecords(batch);
112-
logger.trace("Sending records to Storage API writer for batch load");
113-
ApiFuture<AppendRowsResponse> response = writer.appendRows(jsonRecords);
114-
AppendRowsResponse writeResult = response.get();
115-
logger.trace("Received response from Storage API writer batch");
116-
117-
if (writeResult.hasUpdatedSchema()) {
118-
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
119-
if (!canAttemptSchemaUpdate()) {
120-
throw new BigQueryStorageWriteApiConnectException("Connector is not configured to perform schema updates.");
108+
while (!batches.completed()) {
109+
List<ConvertedRecord> batch = batches.currentBatch();
110+
111+
while (!batch.isEmpty()) {
112+
try {
113+
writeBatch(writer, batch, retryHandler, tableName);
114+
batch = Collections.emptyList(); // Can't do batch.clear(); it'll mess with the batch tracking logic in RecordBatches
115+
} catch (RetryException e) {
116+
retryHandler.maybeRetry("write to table " + tableName);
117+
if (e.getMessage() != null) {
118+
logger.warn(e.getMessage() + " Retry attempt " + retryHandler.getAttempt());
121119
}
122-
retryHandler.attemptTableOperation(schemaManager::updateSchema);
123-
} else if (writeResult.hasError()) {
124-
Status errorStatus = writeResult.getError();
125-
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, writeResult.getError().getMessage());
126-
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage));
127-
if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(errorMessage)) {
128-
rows = maybeHandleDlqRoutingAndFilterRecords(rows, convertToMap(writeResult.getRowErrorsList()), tableName.getTable());
129-
if (rows.isEmpty()) {
130-
writer.onSuccess();
131-
return;
120+
} catch (BatchTooLargeException e) {
121+
if (batch.size() <= 1) {
122+
Map<Integer, String> rowErrorMapping = Collections.singletonMap(
123+
0, e.getMessage()
124+
);
125+
batch = maybeHandleDlqRoutingAndFilterRecords(batch, rowErrorMapping, tableName.getTable());
126+
if (!batch.isEmpty()) {
127+
retryHandler.maybeRetry("write to table " + tableName);
132128
}
133-
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(errorStatus.getMessage())) {
134-
failTask(retryHandler.getMostRecentException());
129+
} else {
130+
int previousSize = batch.size();
131+
batches.reduceBatchSize();
132+
batch = batches.currentBatch();
133+
logger.debug("Reducing batch size for table {} from {} to {}", tableName, previousSize, batch.size());
135134
}
136-
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
137-
} else {
138-
if (!writeResult.hasAppendResult()) {
139-
logger.warn(
140-
"Write result did not report any errors, but also did not succeed. "
141-
+ "This may be indicative of a bug in the BigQuery Java client library or back end; "
142-
+ "please report it to the maintainers of the connector to investigate."
143-
);
135+
} catch (MalformedRowsException e) {
136+
batch = maybeHandleDlqRoutingAndFilterRecords(batch, e.getRowErrorMapping(), tableName.getTable());
137+
if (!batch.isEmpty()) {
138+
// TODO: Does this actually make sense? Should we count this as part of our retry logic?
139+
// As long as we're guaranteed that the number of rows in the batch is decreasing, it
140+
// may make sense to skip the maybeRetry invocation
141+
retryHandler.maybeRetry("write to table " + tableName);
144142
}
145-
logger.trace("Append call completed successfully on stream {}", streamName);
146-
writer.onSuccess();
147-
return;
148143
}
149-
} catch (BigQueryStorageWriteApiConnectException exception) {
150-
throw exception;
151-
} catch (Exception e) {
152-
String message = e.getMessage();
153-
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
154-
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));
155-
156-
if (shouldHandleSchemaMismatch(e)) {
157-
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
158-
retryHandler.attemptTableOperation(schemaManager::updateSchema);
159-
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
160-
rows = maybeHandleDlqRoutingAndFilterRecords(rows, getRowErrorMapping(e), tableName.getTable());
161-
if (rows.isEmpty()) {
162-
writer.onSuccess();
163-
return;
164-
}
165-
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
166-
writer.refresh();
167-
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
168-
retryHandler.attemptTableOperation(schemaManager::createTable);
169-
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())
170-
&& BigQueryStorageWriteApiErrorResponses.isNonRetriableStorageError(e)
171-
) {
144+
}
145+
146+
batches.advanceToNextBatch();
147+
}
148+
149+
writer.onSuccess();
150+
}
151+
152+
private void writeBatch(
153+
StreamWriter writer,
154+
List<ConvertedRecord> batch,
155+
StorageWriteApiRetryHandler retryHandler,
156+
TableName tableName
157+
) throws BatchTooLargeException, MalformedRowsException, RetryException {
158+
try {
159+
JSONArray jsonRecords = getJsonRecords(batch);
160+
logger.trace("Sending records to Storage API writer for batch load");
161+
ApiFuture<AppendRowsResponse> response = writer.appendRows(jsonRecords);
162+
AppendRowsResponse writeResult = response.get();
163+
logger.trace("Received response from Storage API writer batch");
164+
165+
if (writeResult.hasUpdatedSchema()) {
166+
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
167+
if (!canAttemptSchemaUpdate()) {
168+
throw new BigQueryStorageWriteApiConnectException("Connector is not configured to perform schema updates.");
169+
}
170+
retryHandler.attemptTableOperation(schemaManager::updateSchema);
171+
throw new RetryException();
172+
} else if (writeResult.hasError()) {
173+
Status errorStatus = writeResult.getError();
174+
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, writeResult.getError().getMessage());
175+
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage));
176+
if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(errorMessage)) {
177+
throw new MalformedRowsException(convertToMap(writeResult.getRowErrorsList()));
178+
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(errorStatus.getMessage())) {
172179
failTask(retryHandler.getMostRecentException());
173180
}
174-
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
181+
throw new RetryException(errorMessage);
182+
} else {
183+
if (!writeResult.hasAppendResult()) {
184+
logger.warn(
185+
"Write result did not report any errors, but also did not succeed. "
186+
+ "This may be indicative of a bug in the BigQuery Java client library or back end; "
187+
+ "please report it to the maintainers of the connector to investigate."
188+
);
189+
}
190+
logger.trace("Append call completed successfully on stream {}", writer.streamName());
191+
}
192+
} catch (BigQueryStorageWriteApiConnectException | BatchWriteException exception) {
193+
throw exception;
194+
} catch (Exception e) {
195+
String message = e.getMessage();
196+
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
197+
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));
198+
199+
if (shouldHandleSchemaMismatch(e)) {
200+
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
201+
retryHandler.attemptTableOperation(schemaManager::updateSchema);
202+
} else if (BigQueryStorageWriteApiErrorResponses.isMessageTooLargeError(message)) {
203+
throw new BatchTooLargeException(errorMessage);
204+
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
205+
throw new MalformedRowsException(getRowErrorMapping(e));
206+
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
207+
writer.refresh();
208+
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
209+
retryHandler.attemptTableOperation(schemaManager::createTable);
210+
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())
211+
&& BigQueryStorageWriteApiErrorResponses.isNonRetriableStorageError(e)
212+
) {
213+
failTask(retryHandler.getMostRecentException());
175214
}
176-
} while (retryHandler.maybeRetry());
177-
throw new BigQueryStorageWriteApiConnectException(
178-
String.format("Exceeded %s attempts to write to table %s ", retryHandler.getAttempt(), tableName),
179-
retryHandler.getMostRecentException());
215+
throw new RetryException(errorMessage);
216+
}
217+
}
218+
219+
private abstract static class BatchWriteException extends Exception {
220+
221+
protected BatchWriteException() {
222+
super();
223+
}
224+
225+
protected BatchWriteException(String message) {
226+
super(message);
227+
}
228+
229+
}
230+
231+
private static class BatchTooLargeException extends BatchWriteException {
232+
233+
public BatchTooLargeException(String message) {
234+
super(message);
235+
}
236+
237+
}
238+
239+
private static class MalformedRowsException extends BatchWriteException {
240+
241+
private final Map<Integer, String> rowErrorMapping;
242+
243+
public MalformedRowsException(Map<Integer, String> rowErrorMapping) {
244+
this.rowErrorMapping = rowErrorMapping;
245+
}
246+
247+
public Map<Integer, String> getRowErrorMapping() {
248+
return rowErrorMapping;
249+
}
250+
251+
}
252+
253+
private static class RetryException extends BatchWriteException {
254+
255+
public RetryException() {
256+
super();
257+
}
258+
259+
public RetryException(String message) {
260+
super(message);
261+
}
180262
}
181263

182264
/**

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiBatchApplicationStream.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,8 @@ ApplicationStream createApplicationStream(String tableName, List<ConvertedRecord
235235
}
236236
logger.warn(baseErrorMessage + " Retry attempt {}", retryHandler.getAttempt());
237237
}
238-
} while (retryHandler.maybeRetry());
239-
throw new BigQueryStorageWriteApiConnectException(
240-
String.format(
241-
"Exceeded %s attempts to create Application stream on table %s ",
242-
retryHandler.getAttempt(), tableName),
243-
retryHandler.getMostRecentException());
238+
retryHandler.maybeRetry("create application stream on table " + tableName);
239+
} while (true);
244240
}
245241

246242
/**
@@ -391,6 +387,12 @@ public void onSuccess() {
391387
public void refresh() {
392388
// No-op; handled internally by ApplicationStream class
393389
}
390+
391+
@Override
392+
public String streamName() {
393+
return streamName;
394+
}
395+
394396
}
395397

396398
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiDefaultStream.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,8 @@ JsonStreamWriter getDefaultStream(TableName table, List<ConvertedRecord> rows) {
9595
}
9696
logger.warn(baseErrorMessage + " Retry attempt {}", retryHandler.getAttempt());
9797
}
98-
} while (retryHandler.maybeRetry());
99-
throw new BigQueryStorageWriteApiConnectException(
100-
String.format(
101-
"Exceeded %s attempts to create Default stream writer on table %s ",
102-
retryHandler.getAttempt(), tableName),
103-
retryHandler.getMostRecentException());
98+
retryHandler.maybeRetry("create default stream on table " + tableName);
99+
} while (true);
104100
});
105101
}
106102

@@ -144,6 +140,11 @@ public void refresh() {
144140
closeAndDelete(tableName.toString());
145141
jsonStreamWriter = null;
146142
}
143+
144+
@Override
145+
public String streamName() {
146+
return StorageWriteApiWriter.DEFAULT;
147+
}
147148
}
148149

149150
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiRetryHandler.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,19 @@ private void waitRandomTime() throws InterruptedException {
8080
time.sleep(userConfiguredRetryWait + additionalWait + random.nextInt(1000));
8181
}
8282

83-
public boolean maybeRetry() {
83+
public void maybeRetry(String operation) {
8484
if (currentAttempt < (userConfiguredRetry + additionalRetries)) {
8585
currentAttempt++;
8686
try {
8787
waitRandomTime();
8888
} catch (InterruptedException e) {
8989
logger.warn("Thread interrupted while waiting for random time");
9090
}
91-
return true;
9291
} else {
93-
return false;
92+
throw new BigQueryStorageWriteApiConnectException(
93+
String.format("Exceeded %s attempts to %s ", getAttempt(), operation),
94+
getMostRecentException()
95+
);
9496
}
9597
}
9698

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StreamWriter.java

+2
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ ApiFuture<AppendRowsResponse> appendRows(
3030
*/
3131
void onSuccess();
3232

33+
String streamName();
34+
3335
}

0 commit comments

Comments
 (0)