Skip to content

Commit 107c5f5

Browse files
authored
Check for stale writer before checking for schema update in StorageWriteApiBase error-handling logic (#17)
1 parent 91cfa83 commit 107c5f5

File tree

2 files changed

+3
-4
lines changed

2 files changed

+3
-4
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiBase.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -219,15 +219,15 @@ private void writeBatch(
219219
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
220220
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));
221221

222-
if (shouldHandleSchemaMismatch(e)) {
222+
if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
223+
writer.refresh();
224+
} else if (shouldHandleSchemaMismatch(e)) {
223225
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
224226
retryHandler.attemptTableOperation(schemaManager::updateSchema);
225227
} else if (BigQueryStorageWriteApiErrorResponses.isMessageTooLargeError(message)) {
226228
throw new BatchTooLargeException(errorMessage);
227229
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
228230
throw new MalformedRowsException(getRowErrorMapping(e));
229-
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
230-
writer.refresh();
231231
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
232232
retryHandler.attemptTableOperation(schemaManager::createTable);
233233
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/StorageWriteApiBigQuerySinkConnectorIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,6 @@ private void produceAvroRecords(
684684

685685
// send prepared records
686686
schemaRegistry.produceRecordsWithKey(keyConverter, valueConverter, records, topic);
687-
688687
}
689688

690689
private void initialiseAvroConverters() {

0 commit comments

Comments
 (0)