diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java b/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java index 511d288..49df41a 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java @@ -401,15 +401,9 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor updatePrimaryKeys(tableId, normalizedPrimaryKeys); // TODO: check schema of table if it exists already if (table == null) { - List clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema()); - Clustering clustering = maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null : - Clustering.newBuilder() - .setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns, - clusteringSupportedKeys.size()))) - .build(); TableDefinition tableDefinition = StandardTableDefinition.newBuilder() .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId))) - .setClustering(clustering) + .setClustering(getClustering(event.getSchema(), primaryKeys)) .build(); TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); @@ -453,14 +447,9 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor tableId = TableId.of(project, normalizedDatabaseName, normalizedTableName); table = bigQuery.getTable(tableId); primaryKeys = event.getPrimaryKey(); - List clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema()); - Clustering clustering = maxClusteringColumns <= 0 ? null : - Clustering.newBuilder() - .setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size()))) - .build(); TableDefinition tableDefinition = StandardTableDefinition.newBuilder() .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId))) - .setClustering(clustering) + .setClustering(getClustering(event.getSchema(), primaryKeys)) .build(); TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); if (encryptionConfig != null) { @@ -493,13 +482,9 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor bigQuery.delete(tableId); } else { primaryKeys = event.getPrimaryKey(); - clustering = maxClusteringColumns <= 0 ? null : - Clustering.newBuilder() - .setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size()))) - .build(); tableDefinition = StandardTableDefinition.newBuilder() .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId))) - .setClustering(clustering) + .setClustering(getClustering(event.getSchema(), primaryKeys)) .build(); } @@ -513,6 +498,17 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor } } + @Nullable + private Clustering getClustering(Schema recordSchema, List primaryKeys) { + List clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, recordSchema); + Clustering clustering = maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null : + Clustering.newBuilder() + .setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns, + clusteringSupportedKeys.size()))) + .build(); + return clustering; + } + @VisibleForTesting static List getClusteringSupportedKeys(List primaryKeys, Schema recordSchema) { List result = new ArrayList<>(); @@ -827,16 +823,13 @@ private Job createLoadJob(TableId tableId, TableBlob blob, int attemptNumber, Jo throws IOException, DeltaFailureException { Table table = bigQuery.getTable(tableId); if (table == null) { + Schema schema = jobType.isForTargetTable() ? blob.getTargetSchema() : blob.getStagingSchema(); List primaryKeys = getPrimaryKeys(TableId.of(project, blob.getDataset(), blob.getTable())); - Clustering clustering = maxClusteringColumns <= 0 ? null : Clustering.newBuilder() - .setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size()))) - .build(); - Schema schema = jobType.isForTargetTable() ? blob.getTargetSchema() : blob.getStagingSchema(); TableDefinition tableDefinition = StandardTableDefinition.newBuilder() .setLocation(bucket.getLocation()) .setSchema(Schemas.convert(schema)) - .setClustering(clustering) + .setClustering(getClustering(schema, primaryKeys)) .build(); TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); if (encryptionConfig != null) { diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java index 181400a..242e0c8 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java @@ -256,9 +256,10 @@ public void testCreateTableWithInvalidTypesForClustering() throws Exception { // Primary keys with all un-supported types for clustering List primaryKeys = new ArrayList<>(); - primaryKeys.add("id1"); + String primaryKey1 = "id1"; + primaryKeys.add(primaryKey1); Schema schema = Schema.recordOf(allinvalidsTableName, - Schema.Field.of("id1", Schema.of(Schema.Type.BYTES))); + Schema.Field.of(primaryKey1, Schema.of(Schema.Type.BYTES))); DDLEvent allInvalidsCreateTable = DDLEvent.builder() .setOperation(DDLOperation.Type.CREATE_TABLE) @@ -273,19 +274,56 @@ public void testCreateTableWithInvalidTypesForClustering() throws Exception { Table table = bigQuery.getTable(allInvalidsTable); StandardTableDefinition tableDefinition = table.getDefinition(); Clustering clustering = tableDefinition.getClustering(); + // No clustering should be added Assert.assertNull(clustering); + + // test INSERT Operation + StructuredRecord record = StructuredRecord.builder(schema) + .set(primaryKey1, new byte[] {0, 1, 1, 0}) + .build(); + + DMLEvent insertEvent = DMLEvent.builder() + .setOperationType(DMLOperation.Type.INSERT) + .setDatabaseName(dataset) + .setTableName(allinvalidsTableName) + .setRow(record) + .build(); + + eventConsumer.applyDML(new Sequenced<>(insertEvent, 1L)); + eventConsumer.flush(); + + TableResult result = executeQuery(String.format("SELECT * from %s.%s", dataset, allinvalidsTableName)); + Assert.assertEquals(1, result.getTotalRows()); + + DDLEvent allInvalidsTruncateTable = DDLEvent.builder() + .setOperation(DDLOperation.Type.TRUNCATE_TABLE) + .setDatabaseName(dataset) + .setTableName(allinvalidsTableName) + .setSchema(schema) + .setPrimaryKey(primaryKeys) + .setOffset(new Offset()) + .build(); + eventConsumer.applyDDL(new Sequenced<>(allInvalidsTruncateTable, 0)); + + table = bigQuery.getTable(allInvalidsTable); + tableDefinition = table.getDefinition(); + clustering = tableDefinition.getClustering(); + + // No clustering should be added + Assert.assertNull(clustering); + bigQuery.delete(allInvalidsTable); // Primary keys with some un-supported types for clustering primaryKeys = new ArrayList<>(); - primaryKeys.add("id1"); + primaryKeys.add(primaryKey1); primaryKeys.add("id2"); primaryKeys.add("id3"); primaryKeys.add("id4"); primaryKeys.add("id5"); schema = Schema.recordOf(allinvalidsTableName, - Schema.Field.of("id1", Schema.of(Schema.Type.BYTES)), + Schema.Field.of(primaryKey1, Schema.of(Schema.Type.BYTES)), Schema.Field.of("id2", Schema.of(Schema.Type.BYTES)), Schema.Field.of("id3", Schema.of(Schema.Type.BYTES)), Schema.Field.of("id4", Schema.of(Schema.Type.BYTES)), @@ -307,6 +345,29 @@ public void testCreateTableWithInvalidTypesForClustering() throws Exception { clustering = tableDefinition.getClustering(); Assert.assertNotNull(clustering); Assert.assertEquals(primaryKeys.subList(4, 5), clustering.getFields()); + + // test INSERT Operation + record = StructuredRecord.builder(schema) + .set(primaryKey1, new byte[] {0, 1, 1, 0}) + .set("id2", new byte[] {0, 1, 1, 0}) + .set("id3", new byte[] {0, 1, 1, 0}) + .set("id4", new byte[] {0, 1, 1, 0}) + .set("id5", 100) + .build(); + + insertEvent = DMLEvent.builder() + .setOperationType(DMLOperation.Type.INSERT) + .setDatabaseName(dataset) + .setTableName(someInvalidsTableName) + .setRow(record) + .build(); + + eventConsumer.applyDML(new Sequenced<>(insertEvent, 1L)); + eventConsumer.flush(); + + result = executeQuery(String.format("SELECT * from %s.%s", dataset, someInvalidsTableName)); + Assert.assertEquals(1, result.getTotalRows()); + bigQuery.delete(someInvalidsTable); } finally { cleanupTest(bucket, dataset, eventConsumer);