Skip to content

Commit 5b718f5

Browse files
committed
Add currently-ignored test case for high-throughput loads
1 parent 286c13c commit 5b718f5

File tree

1 file changed

+95
-1
lines changed

1 file changed

+95
-1
lines changed

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/StorageWriteApiBigQuerySinkConnectorIT.java

+95-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.kafka.test.TestUtils;
5353
import org.junit.After;
5454
import org.junit.Before;
55+
import org.junit.Ignore;
5556
import org.junit.Test;
5657
import org.junit.experimental.categories.Category;
5758
import org.slf4j.Logger;
@@ -382,6 +383,9 @@ public void testAvroLargeBatches() throws InterruptedException {
382383
final String topic = suffixedTableOrTopic("storage-api-append-large-batches" + System.nanoTime());
383384
final String table = sanitizedTable(topic);
384385

386+
// pre-create the table
387+
createPerformanceTestingTable(table);
388+
385389
int tasksMax = 1;
386390
long numRecords = 100_000;
387391

@@ -440,6 +444,97 @@ public void testAvroLargeBatches() throws InterruptedException {
440444
);
441445
}
442446

447+
@Test
448+
@Ignore("TODO: Handle 'java.lang.RuntimeException: Request has waited in inflight queue for <duration> for writer <writer>, which is over maximum wait time PT5M'")
449+
public void testAvroHighThroughput() throws InterruptedException {
450+
// create topic in Kafka
451+
final String topic = suffixedTableOrTopic("storage-api-append-high-throughput" + System.nanoTime());
452+
final String table = sanitizedTable(topic);
453+
454+
// pre-create the table
455+
createPerformanceTestingTable(table);
456+
457+
int tasksMax = 10;
458+
long numRecords = 10_000_000;
459+
460+
// create topic
461+
connect.kafka().createTopic(topic, tasksMax);
462+
463+
// clean table
464+
TableClearer.clearTables(bigQuery, dataset(), table);
465+
466+
// Instantiate the converters we'll use to send records to the connector
467+
initialiseAvroConverters();
468+
469+
//produce records, each with a 100 byte value
470+
produceAvroRecords(topic, numRecords, 100);
471+
472+
// setup props for the sink connector
473+
Map<String, String> props = configs(topic);
474+
props.put(TASKS_MAX_CONFIG, Integer.toString(tasksMax));
475+
476+
// the Storage Write API allows for 10MB per write; try to get close to that
477+
// for each poll
478+
props.put(
479+
CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + FETCH_MAX_BYTES_CONFIG,
480+
Integer.toString(9 * 1024 * 1024)
481+
);
482+
props.put(
483+
CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + MAX_PARTITION_FETCH_BYTES_CONFIG,
484+
Integer.toString(Integer.MAX_VALUE)
485+
);
486+
props.put(
487+
CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + MAX_POLL_RECORDS_CONFIG,
488+
Long.toString(numRecords)
489+
);
490+
491+
// start a sink connector
492+
connect.configureConnector(CONNECTOR_NAME, props);
493+
494+
// wait for tasks to spin up
495+
waitForConnectorToStart(CONNECTOR_NAME, tasksMax);
496+
497+
// wait for tasks to write to BigQuery and commit offsets for their records
498+
waitForCommittedRecords(
499+
CONNECTOR_NAME,
500+
Collections.singleton(topic),
501+
numRecords,
502+
tasksMax,
503+
TimeUnit.MINUTES.toMillis(10)
504+
);
505+
506+
connect.deleteConnector(CONNECTOR_NAME);
507+
508+
final AtomicLong numRows = new AtomicLong();
509+
TestUtils.waitForCondition(
510+
() -> {
511+
numRows.set(countRows(bigQuery, table));
512+
assertEquals(numRecords, numRows.get());
513+
return true;
514+
},
515+
10_000L,
516+
() -> "Table should contain " + numRecords
517+
+ " rows, but has " + numRows.get() + " instead"
518+
);
519+
}
520+
521+
private void createPerformanceTestingTable(String table) {
522+
// pre-create the table
523+
com.google.cloud.bigquery.Schema tableSchema = com.google.cloud.bigquery.Schema.of(
524+
Field.of("f1", StandardSQLTypeName.STRING),
525+
Field.of(KAFKA_FIELD_NAME, StandardSQLTypeName.STRUCT, Field.of("k1", StandardSQLTypeName.STRING))
526+
);
527+
try {
528+
BigQueryTestUtils.createPartitionedTable(bigQuery, dataset(), table, tableSchema);
529+
} catch (BigQueryException ex) {
530+
if (!ex.getError().getReason().equalsIgnoreCase("duplicate")) {
531+
throw new ConnectException("Failed to create table: ", ex);
532+
} else {
533+
logger.info("Table {} already exist", table);
534+
}
535+
}
536+
}
537+
443538
private void createTable(String table, boolean incompleteSchema) {
444539
com.google.cloud.bigquery.Schema tableSchema;
445540
if (incompleteSchema) {
@@ -509,7 +604,6 @@ private void produceAvroRecords(String topic) {
509604
produceAvroRecords(topic, NUM_RECORDS_PRODUCED);
510605
}
511606

512-
513607
private void produceAvroRecords(String topic, long numRecords) {
514608
produceAvroRecords(
515609
topic,

0 commit comments

Comments
 (0)