Skip to content

CDAP-20393: Fix inconsistency in clustering for target/staging table #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -401,15 +401,9 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
updatePrimaryKeys(tableId, normalizedPrimaryKeys);
// TODO: check schema of table if it exists already
if (table == null) {
List<String> clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema());
Clustering clustering = maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null :
Clustering.newBuilder()
.setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns,
clusteringSupportedKeys.size())))
.build();
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId)))
.setClustering(clustering)
.setClustering(getClustering(event.getSchema(), primaryKeys))
.build();

TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
Expand Down Expand Up @@ -453,14 +447,9 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
tableId = TableId.of(project, normalizedDatabaseName, normalizedTableName);
table = bigQuery.getTable(tableId);
primaryKeys = event.getPrimaryKey();
List<String> clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema());
Clustering clustering = maxClusteringColumns <= 0 ? null :
Clustering.newBuilder()
.setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size())))
.build();
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId)))
.setClustering(clustering)
.setClustering(getClustering(event.getSchema(), primaryKeys))
.build();
TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
if (encryptionConfig != null) {
Expand Down Expand Up @@ -493,13 +482,9 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
bigQuery.delete(tableId);
} else {
primaryKeys = event.getPrimaryKey();
clustering = maxClusteringColumns <= 0 ? null :
Clustering.newBuilder()
.setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size())))
.build();
tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId)))
.setClustering(clustering)
.setClustering(getClustering(event.getSchema(), primaryKeys))
.build();
}

Expand All @@ -513,6 +498,17 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
}
}

@Nullable
private Clustering getClustering(Schema recordSchema, List<String> primaryKeys) {
List<String> clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, recordSchema);
Clustering clustering = maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null :
Clustering.newBuilder()
.setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns,
clusteringSupportedKeys.size())))
.build();
return clustering;
}

@VisibleForTesting
static List<String> getClusteringSupportedKeys(List<String> primaryKeys, Schema recordSchema) {
List<String> result = new ArrayList<>();
Expand Down Expand Up @@ -827,16 +823,13 @@ private Job createLoadJob(TableId tableId, TableBlob blob, int attemptNumber, Jo
throws IOException, DeltaFailureException {
Table table = bigQuery.getTable(tableId);
if (table == null) {
Schema schema = jobType.isForTargetTable() ? blob.getTargetSchema() : blob.getStagingSchema();
List<String> primaryKeys = getPrimaryKeys(TableId.of(project, blob.getDataset(), blob.getTable()));
Clustering clustering = maxClusteringColumns <= 0 ? null : Clustering.newBuilder()
.setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size())))
.build();

Schema schema = jobType.isForTargetTable() ? blob.getTargetSchema() : blob.getStagingSchema();
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setLocation(bucket.getLocation())
.setSchema(Schemas.convert(schema))
.setClustering(clustering)
.setClustering(getClustering(schema, primaryKeys))
.build();
TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
if (encryptionConfig != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ public void testCreateTableWithInvalidTypesForClustering() throws Exception {

// Primary keys with all un-supported types for clustering
List<String> primaryKeys = new ArrayList<>();
primaryKeys.add("id1");
String primaryKey1 = "id1";
primaryKeys.add(primaryKey1);
Schema schema = Schema.recordOf(allinvalidsTableName,
Schema.Field.of("id1", Schema.of(Schema.Type.BYTES)));
Schema.Field.of(primaryKey1, Schema.of(Schema.Type.BYTES)));

DDLEvent allInvalidsCreateTable = DDLEvent.builder()
.setOperation(DDLOperation.Type.CREATE_TABLE)
Expand All @@ -273,19 +274,56 @@ public void testCreateTableWithInvalidTypesForClustering() throws Exception {
Table table = bigQuery.getTable(allInvalidsTable);
StandardTableDefinition tableDefinition = table.getDefinition();
Clustering clustering = tableDefinition.getClustering();

// No clustering should be added
Assert.assertNull(clustering);

// test INSERT Operation
StructuredRecord record = StructuredRecord.builder(schema)
.set(primaryKey1, new byte[] {0, 1, 1, 0})
.build();

DMLEvent insertEvent = DMLEvent.builder()
.setOperationType(DMLOperation.Type.INSERT)
.setDatabaseName(dataset)
.setTableName(allinvalidsTableName)
.setRow(record)
.build();

eventConsumer.applyDML(new Sequenced<>(insertEvent, 1L));
eventConsumer.flush();

TableResult result = executeQuery(String.format("SELECT * from %s.%s", dataset, allinvalidsTableName));
Assert.assertEquals(1, result.getTotalRows());

DDLEvent allInvalidsTruncateTable = DDLEvent.builder()
.setOperation(DDLOperation.Type.TRUNCATE_TABLE)
.setDatabaseName(dataset)
.setTableName(allinvalidsTableName)
.setSchema(schema)
.setPrimaryKey(primaryKeys)
.setOffset(new Offset())
.build();
eventConsumer.applyDDL(new Sequenced<>(allInvalidsTruncateTable, 0));

table = bigQuery.getTable(allInvalidsTable);
tableDefinition = table.getDefinition();
clustering = tableDefinition.getClustering();

// No clustering should be added
Assert.assertNull(clustering);

bigQuery.delete(allInvalidsTable);

// Primary keys with some un-supported types for clustering
primaryKeys = new ArrayList<>();
primaryKeys.add("id1");
primaryKeys.add(primaryKey1);
primaryKeys.add("id2");
primaryKeys.add("id3");
primaryKeys.add("id4");
primaryKeys.add("id5");
schema = Schema.recordOf(allinvalidsTableName,
Schema.Field.of("id1", Schema.of(Schema.Type.BYTES)),
Schema.Field.of(primaryKey1, Schema.of(Schema.Type.BYTES)),
Schema.Field.of("id2", Schema.of(Schema.Type.BYTES)),
Schema.Field.of("id3", Schema.of(Schema.Type.BYTES)),
Schema.Field.of("id4", Schema.of(Schema.Type.BYTES)),
Expand All @@ -307,6 +345,29 @@ public void testCreateTableWithInvalidTypesForClustering() throws Exception {
clustering = tableDefinition.getClustering();
Assert.assertNotNull(clustering);
Assert.assertEquals(primaryKeys.subList(4, 5), clustering.getFields());

// test INSERT Operation
record = StructuredRecord.builder(schema)
.set(primaryKey1, new byte[] {0, 1, 1, 0})
.set("id2", new byte[] {0, 1, 1, 0})
.set("id3", new byte[] {0, 1, 1, 0})
.set("id4", new byte[] {0, 1, 1, 0})
.set("id5", 100)
.build();

insertEvent = DMLEvent.builder()
.setOperationType(DMLOperation.Type.INSERT)
.setDatabaseName(dataset)
.setTableName(someInvalidsTableName)
.setRow(record)
.build();

eventConsumer.applyDML(new Sequenced<>(insertEvent, 1L));
eventConsumer.flush();

result = executeQuery(String.format("SELECT * from %s.%s", dataset, someInvalidsTableName));
Assert.assertEquals(1, result.getTotalRows());

bigQuery.delete(someInvalidsTable);
} finally {
cleanupTest(bucket, dataset, eventConsumer);
Expand Down