Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
e12dc44
Adjusted dependencies to remove conflicting and unnecessary runtime d…
chrisfontana Oct 16, 2020
9bc0e00
Initial BigQuery schema evolution support
chrisfontana Oct 16, 2020
9a4d646
Added delayed callback queues to AbstractBufferedTransportProvider
chrisfontana Nov 2, 2020
6640d9c
Added logic to retry schema evolution if the evolution failed due to …
chrisfontana Nov 9, 2020
461057f
Tweaked simple BigQuery schema evolution logic to retain field mode w…
chrisfontana Nov 16, 2020
76bb2d8
Minor refactoring to BigQuery schema evolution implementation based o…
chrisfontana Nov 16, 2020
3810933
Adjusted BigQuery schema evolver implementation to better support dat…
chrisfontana Nov 17, 2020
705985f
Added logic to catch when a concurrent schema update exception would …
chrisfontana Nov 24, 2020
c6be5e3
Changed BigQuery transport provider to only create/update the destina…
chrisfontana Nov 30, 2020
87be75a
Bumped version to 1.0.2-19-SNAPSHOT for a future release
chrisfontana Dec 8, 2020
edfa6cc
Merge branch 'cfontana_bigquery_transport_schema_evolution' into bigq…
chrisfontana Dec 8, 2020
f143b3e
Merge tag '1.0.2-18' into bigquery_transport_schema_evolution_milesto…
chrisfontana Dec 10, 2020
4206027
Minor changes to fix checkstyle and license build errors
chrisfontana Dec 10, 2020
da97773
Implemented exception handling logic for BigQuery transport provider
chrisfontana Nov 18, 2020
a41e78c
- Implemented type coercion support for BigQuery transport provider s…
chrisfontana Dec 3, 2020
9cba9d2
- Implemented Datastream specific configurations for the Bigquery tra…
chrisfontana Dec 8, 2020
3964b79
Added support to set/update labels on BigQuery tables in the BigQuery…
chrisfontana Dec 14, 2020
40cd2fc
Implemented exponential backoff when batch size limits are encountere…
chrisfontana Dec 15, 2020
ac14d05
Adjusted default max batch size for the BigQuery transport provider
chrisfontana Dec 16, 2020
d2d7aa8
Small change to make BigquerySchemaEvolver implementation constructor…
chrisfontana Dec 17, 2020
89001c0
Minor change to always mark packages as delivered when building batches
chrisfontana Dec 17, 2020
c778575
Minor change to support default dataset for the Bigquery transport pr…
chrisfontana Jan 4, 2021
f2cd7d8
Refactored and reworked BigQuery transport provider parameters and co…
chrisfontana Jan 16, 2021
3765347
New endpoint to allow checkpoints to be viewed and updated
Jan 19, 2021
aadf56c
New endpoint to allow checkpoints to be viewed and updated
Jan 19, 2021
8f11e51
New endpoint to allow checkpoints to be viewed and updated
Jan 20, 2021
01a2e8c
Fixed partition timezone issue in BigqueryTransportProviderTests
chrisfontana Jan 25, 2021
94bcda0
Changed Bigquery exception record topic to use real exception destina…
chrisfontana Jan 25, 2021
e5ade90
Fixed a typo in comments
chrisfontana Jan 25, 2021
85da4fb
Added comment to clarify use of shared datastream configuration map
chrisfontana Jan 25, 2021
ff65545
Improved handling of legacy Datastreams
chrisfontana Jan 25, 2021
0c2dde2
Minor refactoring and code cleanup
chrisfontana Jan 25, 2021
5d9758b
Added a comment to document the exception handling logic in the BigQu…
chrisfontana Jan 25, 2021
9aa5081
Additional test coverage for handling of complex schemas in the BigQu…
chrisfontana Jan 26, 2021
059a33d
Merge pull request #14 from wayfair-contribs/bigquery_transport_schem…
chrisfontana Feb 8, 2021
a62e49b
Changed the BigQuery Datastream Destination to support destination/to…
chrisfontana Feb 8, 2021
c055a0c
Merge pull request #17 from wayfair-contribs/bigquery_datastream_dest…
chrisfontana Feb 8, 2021
065257e
Added additional logging and metrics to the BigQuery transport provider
chrisfontana Feb 16, 2021
51e228f
Apply Santosh's feedback
Feb 17, 2021
82b7414
Fix checkstyles
Feb 17, 2021
0776ac3
Removed cast in customCheckPointProvider
Feb 18, 2021
e6a050b
Merge pull request #20 from wayfair-contribs/arli_checkpoint_19
r2d2li Feb 18, 2021
2388942
Merge pull request #19 from wayfair-contribs/cfontana_logging_and_met…
chrisfontana Feb 19, 2021
243ebab
Forked the CachedSchemaRegistryClient to gain control over the Avro s…
chrisfontana Feb 10, 2021
117e608
Added the ability to relax Avro schema validation by Datastream in th…
chrisfontana Feb 19, 2021
c36872e
Fixed a bug where a BigQuery Transport Provider is closed prematurely…
chrisfontana Feb 23, 2021
f4bc948
Small tweak to make assigning tasks to the BigQuery transport provide…
chrisfontana Feb 24, 2021
038f97d
Merge pull request #18 from wayfair-contribs/cfontana_invalid_schema_…
chrisfontana Feb 24, 2021
7fcd0b2
Merge pull request #21 from wayfair-contribs/cfontana_bigquery_transp…
chrisfontana Feb 24, 2021
16ea123
Minor fix to tests that broke after merging
chrisfontana Feb 24, 2021
f0f3347
Bumped version to 1.0.2-20-SNAPSHOT to prepare for the next release
chrisfontana Feb 24, 2021
90e2edf
Merge branch 'bigquery_transport_schema_evolution_milestone_1' into w…
chrisfontana Feb 24, 2021
448685d
Relaxed validation of destination connection string when using the Ka…
chrisfontana Mar 3, 2021
ddd722d
Merge pull request #22 from wayfair-contribs/cfontana_kafka_mirrormak…
chrisfontana Mar 3, 2021
81c99c2
Locking version number for release build
chrisfontana Mar 3, 2021
21cdcbb
Bumped version to prepare for new development
chrisfontana Mar 3, 2021
ef4a4c5
CDC implementation
Mar 15, 2021
63d6458
Merge with release branch (1)
Mar 15, 2021
32d8b7c
Resolve conflict with resetting checkpoint
Mar 16, 2021
2314a67
Added metadata option to disable dead-letter table for a datastream u…
chrisfontana Mar 16, 2021
b960dc1
Small tweak to avoid setting a new BigQuery schema when the evolved s…
chrisfontana Mar 16, 2021
efd1df0
Added Noop schema evolver for the BigQuery transport provider
chrisfontana Mar 16, 2021
38ed640
Fix NPE when produce timestamp is not avaiable for JDBC
Mar 17, 2021
785a6f0
Merge branch 'cfontana_gbq_cdc_support' into 1_0_2-20-cdc
Mar 18, 2021
11bef24
Fix all checkstyle issue and findbug issues
Mar 19, 2021
1ac3f8d
Fix checkstyle and findbugs issue
Mar 22, 2021
ee33c6f
Dont cache schema for schema change
Mar 22, 2021
ee31195
Read credentialname property before password property
Mar 23, 2021
96c831b
Make record key as byte array
Mar 24, 2021
abcc36e
Query capture instance name from db
Mar 24, 2021
6ee2a60
Use source table name to create datastream
Mar 25, 2021
758b585
SQL Datetime converted to BQ Datetime rather than Timestamp
Apr 2, 2021
befbb60
Add ts_ms to root level
Apr 5, 2021
a870f71
Convert date to utc
Apr 6, 2021
49c7a91
Upgarde gbq to latest
Apr 6, 2021
727a360
Upgarde gcs and gbq client; minor changes
Apr 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,12 @@ mainGeneratedRest/
mainGeneratedDataTemplate/
.DS_Store
ligradle/

# Kafka Web View data
kafkawebview_data/

# Local server properties
server-local.properties

# Local Brooklin data
.brooklin/
23 changes: 20 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,25 @@ project(':datastream-common') {
compile "com.intellij:annotations:$intellijAnnotationsVersion"
compile "com.google.guava:guava:$guavaVersion"
compile "tech.allegro.schema.json2avro:converter:$json2avroVersion"
compile "com.wayfair.commons:commons-metrics:$wayfairMetricsVersion"

testCompile "org.mockito:mockito-core:$mockitoVersion"
}
}

project(':datastream-bigquery') {
checkstyle {
configFile = file("${project.rootDir}/checkstyle/checkstyle.xml")
sourceSets = [ getProject().sourceSets.main ]
toolVersion = "6.7"
ignoreFailures = false
}
dependencies {

compile "org.apache.avro:avro:$avroVersion"
compile "org.apache.kafka:kafka-clients:$kafkaClientsVersion"

compile "io.confluent:kafka-avro-serializer:$kafkaSchemaRegistryClientVersion"
compile "io.confluent:kafka-schema-registry:$kafkaSchemaRegistryVersion"
compile "io.confluent:kafka-schema-registry-client:$kafkaSchemaRegistryClientVersion"

compile "com.google.cloud:google-cloud-storage:$googleCloudStorageVersion"
Expand All @@ -174,6 +181,7 @@ project(':datastream-bigquery') {
compile project(':datastream-server')
compile project(':datastream-utils')

testCompile project(':datastream-testcommon')
testCompile "org.mockito:mockito-core:$mockitoVersion"

tasks.create(name: "copyDependentLibs", type: Copy) {
Expand All @@ -199,7 +207,6 @@ project(':datastream-cloud-storage') {
compile "org.apache.parquet:parquet-column:$parquetWriterVersion"

compile "io.confluent:kafka-avro-serializer:$kafkaSchemaRegistryClientVersion"
compile "io.confluent:kafka-schema-registry:$kafkaSchemaRegistryVersion"
compile "io.confluent:kafka-schema-registry-client:$kafkaSchemaRegistryClientVersion"

compile "com.google.cloud:google-cloud-storage:$googleCloudStorageVersion"
Expand All @@ -225,6 +232,7 @@ project(':datastream-server-api') {
dependencies {
compile project(':datastream-common')
compile project(':datastream-utils')
testCompile "org.mockito:mockito-core:$mockitoVersion"
}
}

Expand Down Expand Up @@ -352,7 +360,7 @@ project(':datastream-tools') {
compile "commons-cli:commons-cli:$commonsCliVersion"
}

tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
tasks.create(name: "releaseTarGz", group: "distribution", dependsOn: configurations.archives.artifacts, type: Tar) {
description = "Create Brooklin release tarball"
baseName = "${rootProject.name}"
into { "${property('baseName')}-$rootProject.version" }
Expand Down Expand Up @@ -387,6 +395,15 @@ project(':datastream-tools') {
from(project(':datastream-kafka-connector').configurations.runtime) { into("libs/") }
duplicatesStrategy 'exclude'
}
tasks.create(name: "copyDependentLibs", type: Copy) {
from (configurations.runtime) {
}
into "$buildDir/dependent-libs"
}

jar {
dependsOn 'copyDependentLibs'
}
}

project(':datastream-client') {
Expand Down
6 changes: 4 additions & 2 deletions config/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.datastreamServiceAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.datastreamServiceAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.datastreamServiceAppender.File=${datastream.logs.dir}/server.log
log4j.appender.file.MaxFileSize=200MB
log4j.appender.datastreamServiceAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.datastreamServiceAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.datastreamServiceAppender.layout.ConversionPattern=[%d{dd MMM yyyy HH:mm:ss,SSS}] [%p] [%t] [%c:%L] %m %n

log4j.rootLogger=INFO, datastreamServiceAppender , stdout
log4j.rootLogger=INFO, datastreamServiceAppender
log4j.logger.com.linkedin.datastream=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,33 @@
*/
package com.linkedin.datastream.bigquery;

import java.nio.BufferUnderflowException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.Schema;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import com.linkedin.datastream.bigquery.schema.BigquerySchemaEvolver;
import com.linkedin.datastream.bigquery.translator.RecordTranslator;
import com.linkedin.datastream.bigquery.translator.SchemaTranslator;
import com.linkedin.datastream.common.DatastreamRecordMetadata;
import com.linkedin.datastream.common.Package;
import com.linkedin.datastream.metrics.DynamicMetricsManager;
import com.linkedin.datastream.serde.Deserializer;
import com.linkedin.datastream.server.api.transport.buffered.AbstractBatch;

/**
* This class populates a batch of BQ rows to be committed.
*/
public class Batch extends AbstractBatch {

private static final Logger _logger = LoggerFactory.getLogger(Batch.class);

private final int _maxBatchSize;
private final int _maxBatchAge;
private final BigqueryBatchCommitter _committer;
Expand All @@ -38,38 +41,44 @@ public class Batch extends AbstractBatch {

private org.apache.avro.Schema _avroSchema;
private Schema _schema;
private SchemaRegistry _schemaRegistry;
private String _destination;
private KafkaAvroDeserializer _deserializer;
private final BigqueryDatastreamDestination _destination;
private final Deserializer _valueDeserializer;
private final BigquerySchemaEvolver _schemaEvolver;
private final org.apache.avro.Schema _fixedAvroSchema;

/**
* Constructor for Batch.
* @param maxBatchSize any batch bigger than this threshold will be committed to BQ.
* @param maxBatchAge any batch older than this threshold will be committed to BQ.
* @param maxInflightWriteLogCommits maximum allowed batches in the commit backlog
* @param schemaRegistry schema registry client object
* @param valueDeserializer a Deserializer
* @param committer committer object
*/
public Batch(int maxBatchSize,
int maxBatchAge,
int maxInflightWriteLogCommits,
SchemaRegistry schemaRegistry,
BigqueryBatchCommitter committer) {
public Batch(
final BigqueryDatastreamDestination destination,
final int maxBatchSize,
final int maxBatchAge,
final int maxInflightWriteLogCommits,
final Deserializer valueDeserializer,
final BigqueryBatchCommitter committer,
final BigquerySchemaEvolver schemaEvolver,
final org.apache.avro.Schema fixedAvroSchema
) {
super(maxInflightWriteLogCommits);
this._destination = destination;
this._maxBatchSize = maxBatchSize;
this._maxBatchAge = maxBatchAge;
this._committer = committer;
this._valueDeserializer = valueDeserializer;
this._schemaEvolver = schemaEvolver;
this._fixedAvroSchema = fixedAvroSchema;
this._batch = new ArrayList<>();
this._batchCreateTimeStamp = System.currentTimeMillis();
this._schema = null;
this._destination = null;
this._schemaRegistry = schemaRegistry;
this._deserializer = _schemaRegistry.getDeserializer();
}

private void reset() {
_batch.clear();
_destination = null;
_ackCallbacks.clear();
_recordMetadata.clear();
_sourceTimestamps.clear();
Expand All @@ -82,7 +91,7 @@ public void write(Package aPackage) throws InterruptedException {

// skip null records
if (aPackage.getRecord().getValue() == null) {
LOG.info("Null record received from topic {}, partition {}, and offset {}",
_logger.info("Null record received from topic {}, partition {}, and offset {}",
aPackage.getTopic(), aPackage.getPartition(), aPackage.getOffset());
DynamicMetricsManager.getInstance().createOrUpdateMeter(
this.getClass().getSimpleName(),
Expand All @@ -94,81 +103,84 @@ public void write(Package aPackage) throws InterruptedException {
return;
}

if (_destination == null) {
String[] datasetRetentionTableSuffix = aPackage.getDestination().split("/");
if (datasetRetentionTableSuffix.length == 3) {
_destination = datasetRetentionTableSuffix[0] +
"/" + aPackage.getTopic() + datasetRetentionTableSuffix[2] +
"/" + datasetRetentionTableSuffix[1];
} else {
_destination = datasetRetentionTableSuffix[0] +
"/" + aPackage.getTopic() +
"/" + datasetRetentionTableSuffix[1];
}
}

if (_schema == null) {
_avroSchema = _schemaRegistry.getSchemaByTopic(aPackage.getTopic());
_schema = SchemaTranslator.translate(_avroSchema);
_committer.setDestTableSchema(_destination, _schema);
}

GenericRecord record;
try {
record = (GenericRecord) _deserializer.deserialize(
record = (GenericRecord) _valueDeserializer.deserialize((byte[]) aPackage.getRecord().getValue());
} catch (Exception e) {
_logger.warn("Error deserializing message at Topic {} - Partition {} - Offset {} - Reason {} - Exception {} for destination {}",
aPackage.getTopic(),
(byte[]) aPackage.getRecord().getValue());
} catch (SerializationException e) {
if (e.getCause() instanceof BufferUnderflowException) {
LOG.warn("Skipping message at Topic {} - Partition {} - Offset {} - Reason {} - Exception {}",
aPackage.getTopic(),
aPackage.getPartition(),
aPackage.getOffset(),
e.getMessage(),
e.getCause().getClass());
DynamicMetricsManager.getInstance().createOrUpdateMeter(
this.getClass().getSimpleName(),
aPackage.getTopic(),
"deserializerErrorCount",
1);
aPackage.getAckCallback().onCompletion(new DatastreamRecordMetadata(
aPackage.getCheckpoint(), aPackage.getTopic(), aPackage.getPartition()), null);
return;
} else {
throw e;
}
aPackage.getPartition(),
aPackage.getOffset(),
e.getMessage(),
e.getCause().getClass(),
_destination,
e);
DynamicMetricsManager.getInstance().createOrUpdateMeter(
this.getClass().getSimpleName(),
aPackage.getTopic(),
"deserializerErrorCount",
1);
throw e;
}

_batch.add(RecordTranslator.translate(record, _avroSchema));
if (_fixedAvroSchema == null) {
processAvroSchema(record.getSchema());
} else {
processAvroSchema(_fixedAvroSchema);
}

_batch.add(RecordTranslator.translate(record));

_ackCallbacks.add(aPackage.getAckCallback());
_recordMetadata.add(new DatastreamRecordMetadata(aPackage.getCheckpoint(),
aPackage.getTopic(),
aPackage.getPartition()));
_sourceTimestamps.add(aPackage.getTimestamp());
} else if (aPackage.isTryFlushSignal() || aPackage.isForceFlushSignal()) {
if (_batch.isEmpty()) {
LOG.debug("Nothing to flush.");
return;
}

if (aPackage.isForceFlushSignal() || _batch.size() >= _maxBatchSize ||
System.currentTimeMillis() - _batchCreateTimeStamp >= _maxBatchAge) {
flush(aPackage.isForceFlushSignal());
}
}

private void processAvroSchema(final org.apache.avro.Schema avroSchema) {
final Optional<Schema> newBQSchema;
if (_avroSchema == null) {
newBQSchema = Optional.of(SchemaTranslator.translate(avroSchema));
} else if (!_avroSchema.equals(avroSchema)) {
final Schema evolvedSchema = _schemaEvolver.evolveSchema(_schema, SchemaTranslator.translate(avroSchema));
if (!_schema.equals(evolvedSchema)) {
newBQSchema = Optional.of(evolvedSchema);
} else {
newBQSchema = Optional.empty();
}
} else {
newBQSchema = Optional.empty();
}
newBQSchema.ifPresent(schema -> {
_avroSchema = avroSchema;
_schema = schema;
_committer.setDestTableSchema(_destination, _schema);
});
}

if (_batch.size() >= _maxBatchSize ||
System.currentTimeMillis() - _batchCreateTimeStamp >= _maxBatchAge ||
aPackage.isForceFlushSignal()) {
private void flush(final boolean force) throws InterruptedException {
if (_batch.isEmpty()) {
_logger.debug("Nothing to flush.");
} else {
waitForRoomInCommitBacklog();
incrementInflightWriteLogCommits();
_committer.commit(
new ArrayList<>(_batch),
_destination,
_destination.toString(),
new ArrayList<>(_ackCallbacks),
new ArrayList<>(_recordMetadata),
new ArrayList<>(_sourceTimestamps),
() -> decrementInflightWriteLogCommitsAndNotify()
this::decrementInflightWriteLogCommitsAndNotify
);
reset();

if (aPackage.isForceFlushSignal()) {
if (force) {
waitForCommitBacklogToClear();
}
}
Expand Down
Loading