Skip to content

Commit 0f454b2

Browse files
authored
Merge pull request #5 from Aiven-Open/storage-write-api-cleanup-conversion
Reuse SinkConverter class and remove duplication in Storage Write API code path
2 parents 97be285 + 0cd4010 commit 0f454b2

File tree

8 files changed

+101
-124
lines changed

8 files changed

+101
-124
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ private void writeSinkRecords(Collection<SinkRecord> records) {
290290
tableWriterBuilder = new StorageWriteApiWriter.Builder(
291291
storageApiWriter,
292292
TableNameUtils.tableName(table.getBaseTableId()),
293-
config.getRecordConverter(),
294-
config,
293+
recordConverter,
295294
batchHandler
296295
);
297296
} else if (config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG).contains(record.topic())) {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,14 @@ public SinkRecordConverter(BigQuerySinkTaskConfig config,
6363
config.getBoolean(config.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG);
6464
this.usePartitionDecorator =
6565
config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG);
66-
6766
}
6867

6968
public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record, TableId table) {
7069
Map<String, Object> convertedRecord = config.isUpsertDeleteEnabled()
7170
? getUpsertDeleteRow(record, table)
7271
: getRegularRow(record);
7372

74-
Map<String, Object> result = config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)
75-
? FieldNameSanitizer.replaceInvalidKeys(convertedRecord)
76-
: convertedRecord;
77-
78-
return InsertAllRequest.RowToInsert.of(getRowId(record), result);
73+
return InsertAllRequest.RowToInsert.of(getRowId(record), convertedRecord);
7974
}
8075

8176
private Map<String, Object> getUpsertDeleteRow(SinkRecord record, TableId table) {
@@ -119,22 +114,31 @@ private Map<String, Object> getUpsertDeleteRow(SinkRecord record, TableId table)
119114
result.put(MergeQueries.INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME, System.currentTimeMillis() / 1000);
120115
}
121116

122-
return result;
117+
return maybeSanitize(result);
123118
}
124119

125-
private Map<String, Object> getRegularRow(SinkRecord record) {
120+
public Map<String, Object> getRegularRow(SinkRecord record) {
126121
Map<String, Object> result = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
127122

128-
config.getKafkaDataFieldName().ifPresent(
129-
fieldName -> result.put(fieldName, KafkaDataBuilder.buildKafkaDataRecord(record))
130-
);
123+
config.getKafkaDataFieldName().ifPresent(fieldName -> {
124+
Map<String, Object> kafkaDataField = config.getBoolean(config.USE_STORAGE_WRITE_API_CONFIG)
125+
? KafkaDataBuilder.buildKafkaDataRecordStorageApi(record)
126+
: KafkaDataBuilder.buildKafkaDataRecord(record);
127+
result.put(fieldName, kafkaDataField);
128+
});
131129

132130
config.getKafkaKeyFieldName().ifPresent(fieldName -> {
133131
Map<String, Object> keyData = recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY);
134132
result.put(fieldName, keyData);
135133
});
136134

137-
return result;
135+
return maybeSanitize(result);
136+
}
137+
138+
private Map<String, Object> maybeSanitize(Map<String, Object> convertedRecord) {
139+
return config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)
140+
? FieldNameSanitizer.replaceInvalidKeys(convertedRecord)
141+
: convertedRecord;
138142
}
139143

140144
private String getRowId(SinkRecord record) {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiBase.java

+64-65
Original file line numberDiff line numberDiff line change
@@ -104,80 +104,79 @@ public void initializeAndWriteRecords(TableName tableName, List<ConvertedRecord>
104104
StorageWriteApiRetryHandler retryHandler = new StorageWriteApiRetryHandler(tableName, getSinkRecords(rows), retry, retryWait, time);
105105
logger.debug("Sending {} records to write Api Application stream {}", rows.size(), streamName);
106106
RecordBatches<ConvertedRecord> batches = new RecordBatches<>(rows);
107-
try (StreamWriter writer = streamWriter(tableName, streamName, rows)) {
108-
do {
109-
try {
110-
List<ConvertedRecord> batch = batches.currentBatch();
111-
JSONArray jsonRecords = getJsonRecords(batch);
112-
logger.trace("Sending records to Storage API writer for batch load");
113-
ApiFuture<AppendRowsResponse> response = writer.appendRows(jsonRecords);
114-
AppendRowsResponse writeResult = response.get();
115-
logger.trace("Received response from Storage API writer batch");
116-
117-
if (writeResult.hasUpdatedSchema()) {
118-
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
119-
if (!canAttemptSchemaUpdate()) {
120-
throw new BigQueryStorageWriteApiConnectException("Connector is not configured to perform schema updates.");
121-
}
122-
retryHandler.attemptTableOperation(schemaManager::updateSchema);
123-
} else if (writeResult.hasError()) {
124-
Status errorStatus = writeResult.getError();
125-
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, writeResult.getError().getMessage());
126-
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage));
127-
if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(errorMessage)) {
128-
rows = maybeHandleDlqRoutingAndFilterRecords(rows, convertToMap(writeResult.getRowErrorsList()), tableName.getTable());
129-
if (rows.isEmpty()) {
130-
writer.onSuccess();
131-
return;
132-
}
133-
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(errorStatus.getMessage())) {
134-
failTask(retryHandler.getMostRecentException());
135-
}
136-
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
137-
} else {
138-
if (!writeResult.hasAppendResult()) {
139-
logger.warn(
140-
"Write result did not report any errors, but also did not succeed. "
141-
+ "This may be indicative of a bug in the BigQuery Java client library or back end; "
142-
+ "please report it to the maintainers of the connector to investigate."
143-
);
144-
}
145-
logger.trace("Append call completed successfully on stream {}", streamName);
146-
writer.onSuccess();
147-
return;
107+
StreamWriter writer = streamWriter(tableName, streamName, rows);
108+
do {
109+
try {
110+
List<ConvertedRecord> batch = batches.currentBatch();
111+
JSONArray jsonRecords = getJsonRecords(batch);
112+
logger.trace("Sending records to Storage API writer for batch load");
113+
ApiFuture<AppendRowsResponse> response = writer.appendRows(jsonRecords);
114+
AppendRowsResponse writeResult = response.get();
115+
logger.trace("Received response from Storage API writer batch");
116+
117+
if (writeResult.hasUpdatedSchema()) {
118+
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
119+
if (!canAttemptSchemaUpdate()) {
120+
throw new BigQueryStorageWriteApiConnectException("Connector is not configured to perform schema updates.");
148121
}
149-
} catch (BigQueryStorageWriteApiConnectException exception) {
150-
throw exception;
151-
} catch (Exception e) {
152-
String message = e.getMessage();
153-
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
154-
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));
155-
156-
if (shouldHandleSchemaMismatch(e)) {
157-
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
158-
retryHandler.attemptTableOperation(schemaManager::updateSchema);
159-
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
160-
rows = maybeHandleDlqRoutingAndFilterRecords(rows, getRowErrorMapping(e), tableName.getTable());
122+
retryHandler.attemptTableOperation(schemaManager::updateSchema);
123+
} else if (writeResult.hasError()) {
124+
Status errorStatus = writeResult.getError();
125+
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, writeResult.getError().getMessage());
126+
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage));
127+
if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(errorMessage)) {
128+
rows = maybeHandleDlqRoutingAndFilterRecords(rows, convertToMap(writeResult.getRowErrorsList()), tableName.getTable());
161129
if (rows.isEmpty()) {
162130
writer.onSuccess();
163131
return;
164132
}
165-
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
166-
writer.refresh();
167-
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
168-
retryHandler.attemptTableOperation(schemaManager::createTable);
169-
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())
170-
&& BigQueryStorageWriteApiErrorResponses.isNonRetriableStorageError(e)
171-
) {
133+
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(errorStatus.getMessage())) {
172134
failTask(retryHandler.getMostRecentException());
173135
}
174136
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
137+
} else {
138+
if (!writeResult.hasAppendResult()) {
139+
logger.warn(
140+
"Write result did not report any errors, but also did not succeed. "
141+
+ "This may be indicative of a bug in the BigQuery Java client library or back end; "
142+
+ "please report it to the maintainers of the connector to investigate."
143+
);
144+
}
145+
logger.trace("Append call completed successfully on stream {}", streamName);
146+
writer.onSuccess();
147+
return;
175148
}
176-
} while (retryHandler.maybeRetry());
177-
throw new BigQueryStorageWriteApiConnectException(
178-
String.format("Exceeded %s attempts to write to table %s ", retryHandler.getAttempt(), tableName),
179-
retryHandler.getMostRecentException());
180-
}
149+
} catch (BigQueryStorageWriteApiConnectException exception) {
150+
throw exception;
151+
} catch (Exception e) {
152+
String message = e.getMessage();
153+
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
154+
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));
155+
156+
if (shouldHandleSchemaMismatch(e)) {
157+
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
158+
retryHandler.attemptTableOperation(schemaManager::updateSchema);
159+
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
160+
rows = maybeHandleDlqRoutingAndFilterRecords(rows, getRowErrorMapping(e), tableName.getTable());
161+
if (rows.isEmpty()) {
162+
writer.onSuccess();
163+
return;
164+
}
165+
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
166+
writer.refresh();
167+
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
168+
retryHandler.attemptTableOperation(schemaManager::createTable);
169+
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())
170+
&& BigQueryStorageWriteApiErrorResponses.isNonRetriableStorageError(e)
171+
) {
172+
failTask(retryHandler.getMostRecentException());
173+
}
174+
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
175+
}
176+
} while (retryHandler.maybeRetry());
177+
throw new BigQueryStorageWriteApiConnectException(
178+
String.format("Exceeded %s attempts to write to table %s ", retryHandler.getAttempt(), tableName),
179+
retryHandler.getMostRecentException());
181180
}
182181

183182
/**

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiBatchApplicationStream.java

-5
Original file line numberDiff line numberDiff line change
@@ -391,11 +391,6 @@ public void onSuccess() {
391391
public void refresh() {
392392
// No-op; handled internally by ApplicationStream class
393393
}
394-
395-
@Override
396-
public void close() {
397-
// No-op
398-
}
399394
}
400395

401396
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiDefaultStream.java

-5
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,6 @@ public void refresh() {
144144
closeAndDelete(tableName.toString());
145145
jsonStreamWriter = null;
146146
}
147-
148-
@Override
149-
public void close() {
150-
jsonStreamWriter.close();
151-
}
152147
}
153148

154149
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiWriter.java

+6-22
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
99
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
1010
import com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizer;
11+
import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter;
1112
import com.wepay.kafka.connect.bigquery.write.batch.TableWriterBuilder;
1213
import java.util.ArrayList;
1314
import java.util.List;
@@ -55,21 +56,18 @@ public void run() {
5556

5657
public static class Builder implements TableWriterBuilder {
5758
private final List<ConvertedRecord> records = new ArrayList<>();
58-
private final RecordConverter<Map<String, Object>> recordConverter;
59-
private final BigQuerySinkTaskConfig config;
59+
private final SinkRecordConverter recordConverter;
6060
private final TableName tableName;
6161
private final StorageWriteApiBase streamWriter;
6262
private final StorageApiBatchModeHandler batchModeHandler;
6363

6464
public Builder(StorageWriteApiBase streamWriter,
6565
TableName tableName,
66-
RecordConverter<Map<String, Object>> storageApiRecordConverter,
67-
BigQuerySinkTaskConfig config,
66+
SinkRecordConverter recordConverter,
6867
StorageApiBatchModeHandler batchModeHandler) {
6968
this.streamWriter = streamWriter;
7069
this.tableName = tableName;
71-
this.recordConverter = storageApiRecordConverter;
72-
this.config = config;
70+
this.recordConverter = recordConverter;
7371
this.batchModeHandler = batchModeHandler;
7472
}
7573

@@ -90,22 +88,8 @@ public void addRow(SinkRecord sinkRecord, TableId tableId) {
9088
* @return converted record as JSONObject
9189
*/
9290
private JSONObject convertRecord(SinkRecord record) {
93-
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
94-
95-
config.getKafkaDataFieldName().ifPresent(
96-
fieldName -> convertedRecord.put(fieldName, KafkaDataBuilder.buildKafkaDataRecordStorageApi(record))
97-
);
98-
99-
config.getKafkaKeyFieldName().ifPresent(fieldName -> {
100-
Map<String, Object> keyData = recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY);
101-
convertedRecord.put(fieldName, keyData);
102-
});
103-
104-
Map<String, Object> result = config.getBoolean(BigQuerySinkConfig.SANITIZE_FIELD_NAME_CONFIG)
105-
? FieldNameSanitizer.replaceInvalidKeys(convertedRecord)
106-
: convertedRecord;
107-
108-
return getJsonFromMap(result);
91+
Map<String, Object> convertedRecord = recordConverter.getRegularRow(record);
92+
return getJsonFromMap(convertedRecord);
10993
}
11094

11195
/**

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/storage/StreamWriter.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.io.IOException;
77
import org.json.JSONArray;
88

9-
public interface StreamWriter extends AutoCloseable {
9+
public interface StreamWriter {
1010

1111
/**
1212
* Write the provided rows
@@ -30,9 +30,4 @@ ApiFuture<AppendRowsResponse> appendRows(
3030
*/
3131
void onSuccess();
3232

33-
/**
34-
* Inherited from {@link AutoCloseable}, but overridden to remove the throws clause
35-
*/
36-
void close();
37-
3833
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/StorageWriteApiWriterTest.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
1515
import com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter;
1616
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
17+
import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter;
1718
import com.wepay.kafka.connect.bigquery.write.batch.TableWriterBuilder;
1819
import java.util.HashSet;
1920
import java.util.List;
@@ -42,14 +43,17 @@ public class StorageWriteApiWriterTest {
4243
public void testRecordConversion() {
4344
StorageWriteApiBase mockStreamWriter = Mockito.mock(StorageWriteApiBase.class);
4445
BigQuerySinkTaskConfig mockedConfig = Mockito.mock(BigQuerySinkTaskConfig.class);
45-
StorageApiBatchModeHandler batchModeHandler = mock(StorageApiBatchModeHandler.class);
46-
RecordConverter<Map<String, Object>> mockedRecordConverter = new BigQueryRecordConverter(
46+
when(mockedConfig.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG)).thenReturn(true);
47+
RecordConverter<Map<String, Object>> recordConverter = new BigQueryRecordConverter(
4748
false,
4849
false,
4950
true
5051
);
52+
when(mockedConfig.getRecordConverter()).thenReturn(recordConverter);
53+
StorageApiBatchModeHandler batchModeHandler = mock(StorageApiBatchModeHandler.class);
54+
SinkRecordConverter sinkRecordConverter = new SinkRecordConverter(mockedConfig, null, null);
5155
TableWriterBuilder builder = new StorageWriteApiWriter.Builder(
52-
mockStreamWriter, null, mockedRecordConverter, mockedConfig, batchModeHandler);
56+
mockStreamWriter, null, sinkRecordConverter, batchModeHandler);
5357
@SuppressWarnings("unchecked")
5458
ArgumentCaptor<List<ConvertedRecord>> records = ArgumentCaptor.forClass(List.class);
5559
String expectedKafkaKey = "{\"key\":\"12345\"}";
@@ -90,14 +94,16 @@ public void testBatchLoadStreamName() {
9094
TableName tableName = TableName.of("p", "d", "t");
9195
StorageWriteApiBase mockStreamWriter = Mockito.mock(StorageWriteApiBatchApplicationStream.class);
9296
BigQuerySinkTaskConfig mockedConfig = Mockito.mock(BigQuerySinkTaskConfig.class);
93-
StorageApiBatchModeHandler batchModeHandler = mock(StorageApiBatchModeHandler.class);
94-
RecordConverter mockedRecordConverter = new BigQueryRecordConverter(
97+
when(mockedConfig.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG)).thenReturn(true);
98+
RecordConverter<Map<String, Object>> recordConverter = new BigQueryRecordConverter(
9599
false, false, false);
100+
when (mockedConfig.getRecordConverter()).thenReturn(recordConverter);
101+
StorageApiBatchModeHandler batchModeHandler = mock(StorageApiBatchModeHandler.class);
102+
SinkRecordConverter sinkRecordConverter = new SinkRecordConverter(mockedConfig, null, null);
96103
ArgumentCaptor<String> streamName = ArgumentCaptor.forClass(String.class);
97104
String expectedStreamName = tableName.toString() + "_s1";
98105
TableWriterBuilder builder = new StorageWriteApiWriter.Builder(
99-
mockStreamWriter, tableName, mockedRecordConverter, mockedConfig, batchModeHandler);
100-
106+
mockStreamWriter, tableName, sinkRecordConverter, batchModeHandler);
101107

102108
Mockito.when(mockedConfig.getKafkaDataFieldName()).thenReturn(Optional.empty());
103109
Mockito.when(mockedConfig.getKafkaKeyFieldName()).thenReturn(Optional.of("i_am_kafka_key"));

0 commit comments

Comments
 (0)