Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse SinkConverter class and remove duplication in Storage Write API code path #5

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ private void writeSinkRecords(Collection<SinkRecord> records) {
tableWriterBuilder = new StorageWriteApiWriter.Builder(
storageApiWriter,
TableNameUtils.tableName(table.getBaseTableId()),
config.getRecordConverter(),
config,
recordConverter,
batchHandler
);
} else if (config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG).contains(record.topic())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,14 @@ public SinkRecordConverter(BigQuerySinkTaskConfig config,
config.getBoolean(config.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG);
this.usePartitionDecorator =
config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG);

}

public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record, TableId table) {
Map<String, Object> convertedRecord = config.isUpsertDeleteEnabled()
? getUpsertDeleteRow(record, table)
: getRegularRow(record);

Map<String, Object> result = config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)
? FieldNameSanitizer.replaceInvalidKeys(convertedRecord)
: convertedRecord;

return InsertAllRequest.RowToInsert.of(getRowId(record), result);
return InsertAllRequest.RowToInsert.of(getRowId(record), convertedRecord);
}

private Map<String, Object> getUpsertDeleteRow(SinkRecord record, TableId table) {
Expand Down Expand Up @@ -119,22 +114,31 @@ private Map<String, Object> getUpsertDeleteRow(SinkRecord record, TableId table)
result.put(MergeQueries.INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME, System.currentTimeMillis() / 1000);
}

return result;
return maybeSanitize(result);
}

private Map<String, Object> getRegularRow(SinkRecord record) {
public Map<String, Object> getRegularRow(SinkRecord record) {
Map<String, Object> result = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);

config.getKafkaDataFieldName().ifPresent(
fieldName -> result.put(fieldName, KafkaDataBuilder.buildKafkaDataRecord(record))
);
config.getKafkaDataFieldName().ifPresent(fieldName -> {
Map<String, Object> kafkaDataField = config.getBoolean(config.USE_STORAGE_WRITE_API_CONFIG)
? KafkaDataBuilder.buildKafkaDataRecordStorageApi(record)
: KafkaDataBuilder.buildKafkaDataRecord(record);
result.put(fieldName, kafkaDataField);
});

config.getKafkaKeyFieldName().ifPresent(fieldName -> {
Map<String, Object> keyData = recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY);
result.put(fieldName, keyData);
});

return result;
return maybeSanitize(result);
}

private Map<String, Object> maybeSanitize(Map<String, Object> convertedRecord) {
return config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)
? FieldNameSanitizer.replaceInvalidKeys(convertedRecord)
: convertedRecord;
}

private String getRowId(SinkRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,80 +104,79 @@ public void initializeAndWriteRecords(TableName tableName, List<ConvertedRecord>
StorageWriteApiRetryHandler retryHandler = new StorageWriteApiRetryHandler(tableName, getSinkRecords(rows), retry, retryWait, time);
logger.debug("Sending {} records to write Api Application stream {}", rows.size(), streamName);
RecordBatches<ConvertedRecord> batches = new RecordBatches<>(rows);
try (StreamWriter writer = streamWriter(tableName, streamName, rows)) {
do {
try {
List<ConvertedRecord> batch = batches.currentBatch();
JSONArray jsonRecords = getJsonRecords(batch);
logger.trace("Sending records to Storage API writer for batch load");
ApiFuture<AppendRowsResponse> response = writer.appendRows(jsonRecords);
AppendRowsResponse writeResult = response.get();
logger.trace("Received response from Storage API writer batch");

if (writeResult.hasUpdatedSchema()) {
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
if (!canAttemptSchemaUpdate()) {
throw new BigQueryStorageWriteApiConnectException("Connector is not configured to perform schema updates.");
}
retryHandler.attemptTableOperation(schemaManager::updateSchema);
} else if (writeResult.hasError()) {
Status errorStatus = writeResult.getError();
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, writeResult.getError().getMessage());
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage));
if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(errorMessage)) {
rows = maybeHandleDlqRoutingAndFilterRecords(rows, convertToMap(writeResult.getRowErrorsList()), tableName.getTable());
if (rows.isEmpty()) {
writer.onSuccess();
return;
}
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(errorStatus.getMessage())) {
failTask(retryHandler.getMostRecentException());
}
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
} else {
if (!writeResult.hasAppendResult()) {
logger.warn(
"Write result did not report any errors, but also did not succeed. "
+ "This may be indicative of a bug in the BigQuery Java client library or back end; "
+ "please report it to the maintainers of the connector to investigate."
);
}
logger.trace("Append call completed successfully on stream {}", streamName);
writer.onSuccess();
return;
StreamWriter writer = streamWriter(tableName, streamName, rows);
do {
try {
List<ConvertedRecord> batch = batches.currentBatch();
JSONArray jsonRecords = getJsonRecords(batch);
logger.trace("Sending records to Storage API writer for batch load");
ApiFuture<AppendRowsResponse> response = writer.appendRows(jsonRecords);
AppendRowsResponse writeResult = response.get();
logger.trace("Received response from Storage API writer batch");

if (writeResult.hasUpdatedSchema()) {
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
if (!canAttemptSchemaUpdate()) {
throw new BigQueryStorageWriteApiConnectException("Connector is not configured to perform schema updates.");
}
} catch (BigQueryStorageWriteApiConnectException exception) {
throw exception;
} catch (Exception e) {
String message = e.getMessage();
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));

if (shouldHandleSchemaMismatch(e)) {
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
retryHandler.attemptTableOperation(schemaManager::updateSchema);
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
rows = maybeHandleDlqRoutingAndFilterRecords(rows, getRowErrorMapping(e), tableName.getTable());
retryHandler.attemptTableOperation(schemaManager::updateSchema);
} else if (writeResult.hasError()) {
Status errorStatus = writeResult.getError();
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, writeResult.getError().getMessage());
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage));
if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(errorMessage)) {
rows = maybeHandleDlqRoutingAndFilterRecords(rows, convertToMap(writeResult.getRowErrorsList()), tableName.getTable());
if (rows.isEmpty()) {
writer.onSuccess();
return;
}
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
writer.refresh();
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
retryHandler.attemptTableOperation(schemaManager::createTable);
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())
&& BigQueryStorageWriteApiErrorResponses.isNonRetriableStorageError(e)
) {
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(errorStatus.getMessage())) {
failTask(retryHandler.getMostRecentException());
}
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
} else {
if (!writeResult.hasAppendResult()) {
logger.warn(
"Write result did not report any errors, but also did not succeed. "
+ "This may be indicative of a bug in the BigQuery Java client library or back end; "
+ "please report it to the maintainers of the connector to investigate."
);
}
logger.trace("Append call completed successfully on stream {}", streamName);
writer.onSuccess();
return;
}
} while (retryHandler.maybeRetry());
throw new BigQueryStorageWriteApiConnectException(
String.format("Exceeded %s attempts to write to table %s ", retryHandler.getAttempt(), tableName),
retryHandler.getMostRecentException());
}
} catch (BigQueryStorageWriteApiConnectException exception) {
throw exception;
} catch (Exception e) {
String message = e.getMessage();
String errorMessage = String.format("Failed to write rows on table %s due to %s", tableName, message);
retryHandler.setMostRecentException(new BigQueryStorageWriteApiConnectException(errorMessage, e));

if (shouldHandleSchemaMismatch(e)) {
logger.warn("Sent records schema does not match with table schema, will attempt to update schema");
retryHandler.attemptTableOperation(schemaManager::updateSchema);
} else if (BigQueryStorageWriteApiErrorResponses.isMalformedRequest(message)) {
rows = maybeHandleDlqRoutingAndFilterRecords(rows, getRowErrorMapping(e), tableName.getTable());
if (rows.isEmpty()) {
writer.onSuccess();
return;
}
} else if (BigQueryStorageWriteApiErrorResponses.isStreamClosed(message)) {
writer.refresh();
} else if (BigQueryStorageWriteApiErrorResponses.isTableMissing(message) && getAutoCreateTables()) {
retryHandler.attemptTableOperation(schemaManager::createTable);
} else if (!BigQueryStorageWriteApiErrorResponses.isRetriableError(e.getMessage())
&& BigQueryStorageWriteApiErrorResponses.isNonRetriableStorageError(e)
) {
failTask(retryHandler.getMostRecentException());
}
logger.warn(errorMessage + " Retry attempt " + retryHandler.getAttempt());
}
} while (retryHandler.maybeRetry());
throw new BigQueryStorageWriteApiConnectException(
String.format("Exceeded %s attempts to write to table %s ", retryHandler.getAttempt(), tableName),
retryHandler.getMostRecentException());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,6 @@ public void onSuccess() {
public void refresh() {
// No-op; handled internally by ApplicationStream class
}

@Override
public void close() {
// No-op
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ public void refresh() {
closeAndDelete(tableName.toString());
jsonStreamWriter = null;
}

@Override
public void close() {
jsonStreamWriter.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
import com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizer;
import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter;
import com.wepay.kafka.connect.bigquery.write.batch.TableWriterBuilder;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -55,21 +56,18 @@ public void run() {

public static class Builder implements TableWriterBuilder {
private final List<ConvertedRecord> records = new ArrayList<>();
private final RecordConverter<Map<String, Object>> recordConverter;
private final BigQuerySinkTaskConfig config;
private final SinkRecordConverter recordConverter;
private final TableName tableName;
private final StorageWriteApiBase streamWriter;
private final StorageApiBatchModeHandler batchModeHandler;

public Builder(StorageWriteApiBase streamWriter,
TableName tableName,
RecordConverter<Map<String, Object>> storageApiRecordConverter,
BigQuerySinkTaskConfig config,
SinkRecordConverter recordConverter,
StorageApiBatchModeHandler batchModeHandler) {
this.streamWriter = streamWriter;
this.tableName = tableName;
this.recordConverter = storageApiRecordConverter;
this.config = config;
this.recordConverter = recordConverter;
this.batchModeHandler = batchModeHandler;
}

Expand All @@ -90,22 +88,8 @@ public void addRow(SinkRecord sinkRecord, TableId tableId) {
* @return converted record as JSONObject
*/
private JSONObject convertRecord(SinkRecord record) {
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);

config.getKafkaDataFieldName().ifPresent(
fieldName -> convertedRecord.put(fieldName, KafkaDataBuilder.buildKafkaDataRecordStorageApi(record))
);

config.getKafkaKeyFieldName().ifPresent(fieldName -> {
Map<String, Object> keyData = recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY);
convertedRecord.put(fieldName, keyData);
});

Map<String, Object> result = config.getBoolean(BigQuerySinkConfig.SANITIZE_FIELD_NAME_CONFIG)
? FieldNameSanitizer.replaceInvalidKeys(convertedRecord)
: convertedRecord;

return getJsonFromMap(result);
Map<String, Object> convertedRecord = recordConverter.getRegularRow(record);
return getJsonFromMap(convertedRecord);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.io.IOException;
import org.json.JSONArray;

public interface StreamWriter extends AutoCloseable {
public interface StreamWriter {

/**
* Write the provided rows
Expand All @@ -30,9 +30,4 @@ ApiFuture<AppendRowsResponse> appendRows(
*/
void onSuccess();

/**
* Inherited from {@link AutoCloseable}, but overridden to remove the throws clause
*/
void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
import com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter;
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter;
import com.wepay.kafka.connect.bigquery.write.batch.TableWriterBuilder;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -42,14 +43,17 @@ public class StorageWriteApiWriterTest {
public void testRecordConversion() {
StorageWriteApiBase mockStreamWriter = Mockito.mock(StorageWriteApiBase.class);
BigQuerySinkTaskConfig mockedConfig = Mockito.mock(BigQuerySinkTaskConfig.class);
StorageApiBatchModeHandler batchModeHandler = mock(StorageApiBatchModeHandler.class);
RecordConverter<Map<String, Object>> mockedRecordConverter = new BigQueryRecordConverter(
when(mockedConfig.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG)).thenReturn(true);
RecordConverter<Map<String, Object>> recordConverter = new BigQueryRecordConverter(
false,
false,
true
);
when(mockedConfig.getRecordConverter()).thenReturn(recordConverter);
StorageApiBatchModeHandler batchModeHandler = mock(StorageApiBatchModeHandler.class);
SinkRecordConverter sinkRecordConverter = new SinkRecordConverter(mockedConfig, null, null);
TableWriterBuilder builder = new StorageWriteApiWriter.Builder(
mockStreamWriter, null, mockedRecordConverter, mockedConfig, batchModeHandler);
mockStreamWriter, null, sinkRecordConverter, batchModeHandler);
@SuppressWarnings("unchecked")
ArgumentCaptor<List<ConvertedRecord>> records = ArgumentCaptor.forClass(List.class);
String expectedKafkaKey = "{\"key\":\"12345\"}";
Expand Down Expand Up @@ -90,14 +94,16 @@ public void testBatchLoadStreamName() {
TableName tableName = TableName.of("p", "d", "t");
StorageWriteApiBase mockStreamWriter = Mockito.mock(StorageWriteApiBatchApplicationStream.class);
BigQuerySinkTaskConfig mockedConfig = Mockito.mock(BigQuerySinkTaskConfig.class);
StorageApiBatchModeHandler batchModeHandler = mock(StorageApiBatchModeHandler.class);
RecordConverter mockedRecordConverter = new BigQueryRecordConverter(
when(mockedConfig.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG)).thenReturn(true);
RecordConverter<Map<String, Object>> recordConverter = new BigQueryRecordConverter(
false, false, false);
when (mockedConfig.getRecordConverter()).thenReturn(recordConverter);
StorageApiBatchModeHandler batchModeHandler = mock(StorageApiBatchModeHandler.class);
SinkRecordConverter sinkRecordConverter = new SinkRecordConverter(mockedConfig, null, null);
ArgumentCaptor<String> streamName = ArgumentCaptor.forClass(String.class);
String expectedStreamName = tableName.toString() + "_s1";
TableWriterBuilder builder = new StorageWriteApiWriter.Builder(
mockStreamWriter, tableName, mockedRecordConverter, mockedConfig, batchModeHandler);

mockStreamWriter, tableName, sinkRecordConverter, batchModeHandler);

Mockito.when(mockedConfig.getKafkaDataFieldName()).thenReturn(Optional.empty());
Mockito.when(mockedConfig.getKafkaKeyFieldName()).thenReturn(Optional.of("i_am_kafka_key"));
Expand Down
Loading