diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 736c45cc2066..98a453b8cf4f 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -1490,9 +1490,9 @@ SELECT * FROM "test_table$files"; ``` ```text - content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids -----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+--------------- - 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | | | + content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | file_sequence_number | data_sequence_number | referenced_data_file | pos | manifest_location | first_row_id | content_offset | content_size_in_bytes +----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+----------------+----------------------+----------------------+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+---------------------- + 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | | | | 1 | 1 | | 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-6116016324956900164-0-3c1b2496-0670-4e37-81f6.avro | | | ``` The output of the query has the following columns: @@ -1570,6 +1570,34 @@ The output of the query has the following columns: * - `readable_metrics` - `JSON` - File metrics in human-readable form. +* - `file_sequence_number` + - `BIGINT` + - The sequence number of the file, tracking when the file was added. +* - `data_sequence_number` + - `BIGINT` + - The data sequence number for the file, used for determining row-level deletes + applicability. +* - `referenced_data_file` + - `VARCHAR` + - The path of the data file that a delete file applies to. Only set for + position delete files and deletion vectors, `NULL` for data files. +* - `pos` + - `BIGINT` + - The ordinal position of the file in the manifest. +* - `manifest_location` + - `VARCHAR` + - The location of the manifest that contains this file. +* - `first_row_id` + - `BIGINT` + - The ID of the first row in the data file. +* - `content_offset` + - `BIGINT` + - The offset in the file where the content starts. Only set for deletion + vectors, `NULL` for data files. +* - `content_size_in_bytes` + - `BIGINT` + - The size of the content in bytes. Only set for deletion vectors, `NULL` for + data files. ::: ##### `$entries` and `$all_entries` tables diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java index 7181dc0ad92a..150d5a8fb0a6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java @@ -73,6 +73,14 @@ public final class FilesTable public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids"; public static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id"; public static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics"; + public static final String FILE_SEQUENCE_NUMBER_COLUMN_NAME = "file_sequence_number"; + public static final String DATA_SEQUENCE_NUMBER_COLUMN_NAME = "data_sequence_number"; + public static final String REFERENCED_DATA_FILE_COLUMN_NAME = "referenced_data_file"; + public static final String POS_COLUMN_NAME = "pos"; + public static final String MANIFEST_LOCATION_COLUMN_NAME = "manifest_location"; + public static final String FIRST_ROW_ID_COLUMN_NAME = "first_row_id"; + public static final String CONTENT_OFFSET_COLUMN_NAME = "content_offset"; + public static final String CONTENT_SIZE_IN_BYTES_COLUMN_NAME = "content_size_in_bytes"; private static final List COLUMN_NAMES = ImmutableList.of( CONTENT_COLUMN_NAME, @@ -92,7 +100,15 @@ public final class FilesTable SPLIT_OFFSETS_COLUMN_NAME, EQUALITY_IDS_COLUMN_NAME, SORT_ORDER_ID_COLUMN_NAME, - READABLE_METRICS_COLUMN_NAME); + READABLE_METRICS_COLUMN_NAME, + FILE_SEQUENCE_NUMBER_COLUMN_NAME, + DATA_SEQUENCE_NUMBER_COLUMN_NAME, + REFERENCED_DATA_FILE_COLUMN_NAME, + POS_COLUMN_NAME, + MANIFEST_LOCATION_COLUMN_NAME, + FIRST_ROW_ID_COLUMN_NAME, + CONTENT_OFFSET_COLUMN_NAME, + CONTENT_SIZE_IN_BYTES_COLUMN_NAME); private final ConnectorTableMetadata tableMetadata; private final Table icebergTable; @@ -159,9 +175,17 @@ public static Type getColumnType(String columnName, TypeManager typeManager) SORT_ORDER_ID_COLUMN_NAME, SPEC_ID_COLUMN_NAME -> INTEGER; case FILE_PATH_COLUMN_NAME, - FILE_FORMAT_COLUMN_NAME -> VARCHAR; + FILE_FORMAT_COLUMN_NAME, + REFERENCED_DATA_FILE_COLUMN_NAME, + MANIFEST_LOCATION_COLUMN_NAME -> VARCHAR; case RECORD_COUNT_COLUMN_NAME, - FILE_SIZE_IN_BYTES_COLUMN_NAME -> BIGINT; + FILE_SIZE_IN_BYTES_COLUMN_NAME, + FILE_SEQUENCE_NUMBER_COLUMN_NAME, + DATA_SEQUENCE_NUMBER_COLUMN_NAME, + POS_COLUMN_NAME, + FIRST_ROW_ID_COLUMN_NAME, + CONTENT_OFFSET_COLUMN_NAME, + CONTENT_SIZE_IN_BYTES_COLUMN_NAME -> BIGINT; case COLUMN_SIZES_COLUMN_NAME, NULL_VALUE_COUNTS_COLUMN_NAME, VALUE_COUNTS_COLUMN_NAME, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java index 0c76b1a896f1..a4f85e1a581b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java @@ -32,6 +32,7 @@ import io.trino.spi.type.RowType; import io.trino.spi.type.TypeManager; import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestReader; import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.PartitionField; @@ -188,79 +189,51 @@ public SourcePage getNextSourcePage() long start = System.nanoTime(); ContentFile contentFile = contentIterator.next(); - // content writeValueOrNull(pageBuilder, CONTENT_COLUMN_NAME, () -> contentFile.content().id(), INTEGER::writeInt); - // file_path writeValueOrNull(pageBuilder, FILE_PATH_COLUMN_NAME, contentFile::location, VARCHAR::writeString); - // file_format writeValueOrNull(pageBuilder, FILE_FORMAT_COLUMN_NAME, () -> contentFile.format().toString(), VARCHAR::writeString); - // spec_id writeValueOrNull(pageBuilder, SPEC_ID_COLUMN_NAME, contentFile::specId, INTEGER::writeInt); - // partitions - if (partitionColumnType.isPresent() && columnNameToIndex.containsKey(FilesTable.PARTITION_COLUMN_NAME)) { - PartitionSpec partitionSpec = idToPartitionSpecMapping.get(contentFile.specId()); - StructLikeWrapperWithFieldIdToIndex partitionStruct = createStructLikeWrapper(partitionSpec.partitionType(), contentFile.partition()); - List partitionTypes = partitionTypes(partitionFields, idToTypeMapping); - List> partitionColumnClass = partitionTypes.stream() - .map(type -> type.typeId().javaClass()) - .collect(toImmutableList()); - List partitionColumnTypes = partitionColumnType.orElseThrow().rowType().getFields().stream() - .map(RowType.Field::getType) - .collect(toImmutableList()); - - if (pageBuilder.getBlockBuilder(columnNameToIndex.get(FilesTable.PARTITION_COLUMN_NAME)) instanceof RowBlockBuilder rowBlockBuilder) { - rowBlockBuilder.buildEntry(fields -> { - for (int i = 0; i < partitionColumnTypes.size(); i++) { - io.trino.spi.type.Type trinoType = partitionColumnType.get().rowType().getFields().get(i).getType(); - Object value = null; - Integer fieldId = partitionColumnType.get().fieldIds().get(i); - if (partitionStruct.getFieldIdToIndex().containsKey(fieldId)) { - value = convertIcebergValueToTrino( - partitionTypes.get(i), - partitionStruct.getStructLikeWrapper().get().get(partitionStruct.getFieldIdToIndex().get(fieldId), partitionColumnClass.get(i))); - } - writeNativeValue(trinoType, fields.get(i), value); - } - }); - } - } - // record_count + writePartitionColumns(contentFile); writeValueOrNull(pageBuilder, RECORD_COUNT_COLUMN_NAME, contentFile::recordCount, BIGINT::writeLong); - // file_size_in_bytes writeValueOrNull(pageBuilder, FILE_SIZE_IN_BYTES_COLUMN_NAME, contentFile::fileSizeInBytes, BIGINT::writeLong); - // column_sizes writeValueOrNull(pageBuilder, COLUMN_SIZES_COLUMN_NAME, contentFile::columnSizes, FilesTablePageSource::writeIntegerBigintInMap); - // value_counts writeValueOrNull(pageBuilder, VALUE_COUNTS_COLUMN_NAME, contentFile::valueCounts, FilesTablePageSource::writeIntegerBigintInMap); - // null_value_counts writeValueOrNull(pageBuilder, NULL_VALUE_COUNTS_COLUMN_NAME, contentFile::nullValueCounts, FilesTablePageSource::writeIntegerBigintInMap); - // nan_value_counts writeValueOrNull(pageBuilder, NAN_VALUE_COUNTS_COLUMN_NAME, contentFile::nanValueCounts, FilesTablePageSource::writeIntegerBigintInMap); - // lower_bounds writeValueOrNull(pageBuilder, LOWER_BOUNDS_COLUMN_NAME, contentFile::lowerBounds, this::writeIntegerVarcharInMap); - // upper_bounds writeValueOrNull(pageBuilder, UPPER_BOUNDS_COLUMN_NAME, contentFile::upperBounds, this::writeIntegerVarcharInMap); - // key_metadata writeValueOrNull(pageBuilder, KEY_METADATA_COLUMN_NAME, contentFile::keyMetadata, (blkBldr, value) -> VARBINARY.writeSlice(blkBldr, Slices.wrappedHeapBuffer(value))); - // split_offset writeValueOrNull(pageBuilder, SPLIT_OFFSETS_COLUMN_NAME, contentFile::splitOffsets, FilesTablePageSource::writeLongInArray); - // equality_ids writeValueOrNull(pageBuilder, EQUALITY_IDS_COLUMN_NAME, contentFile::equalityFieldIds, FilesTablePageSource::writeIntegerInArray); - // sort_order_id writeValueOrNull(pageBuilder, SORT_ORDER_ID_COLUMN_NAME, contentFile::sortOrderId, (blkBldr, value) -> INTEGER.writeLong(blkBldr, value)); - // readable_metrics writeValueOrNull(pageBuilder, READABLE_METRICS_COLUMN_NAME, () -> metadataSchema.findField(MetricsUtil.READABLE_METRICS), (blkBldr, value) -> VARCHAR.writeString(blkBldr, readableMetricsToJson(readableMetricsStruct(schema, contentFile, value.type().asStructType()), primitiveFields))); + writeValueOrNull(pageBuilder, FilesTable.FILE_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::fileSequenceNumber, BIGINT::writeLong); + writeValueOrNull(pageBuilder, FilesTable.DATA_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::dataSequenceNumber, BIGINT::writeLong); + if (contentFile instanceof DeleteFile deleteFile) { + writeValueOrNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME, deleteFile::referencedDataFile, VARCHAR::writeString); + writeValueOrNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME, deleteFile::contentOffset, BIGINT::writeLong); + writeValueOrNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME, deleteFile::contentSizeInBytes, BIGINT::writeLong); + } + else { + // For non-delete files, these columns should be null + writeNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME); + writeNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME); + writeNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME); + } + writeValueOrNull(pageBuilder, FilesTable.POS_COLUMN_NAME, contentFile::pos, BIGINT::writeLong); + writeValueOrNull(pageBuilder, FilesTable.MANIFEST_LOCATION_COLUMN_NAME, contentFile::manifestLocation, VARCHAR::writeString); + writeValueOrNull(pageBuilder, FilesTable.FIRST_ROW_ID_COLUMN_NAME, contentFile::firstRowId, BIGINT::writeLong); readTimeNanos += System.nanoTime() - start; } @@ -297,6 +270,46 @@ public void close() } } + private void writePartitionColumns(ContentFile contentFile) + { + if (partitionColumnType.isPresent() && columnNameToIndex.containsKey(FilesTable.PARTITION_COLUMN_NAME)) { + PartitionSpec partitionSpec = idToPartitionSpecMapping.get(contentFile.specId()); + StructLikeWrapperWithFieldIdToIndex partitionStruct = createStructLikeWrapper(partitionSpec.partitionType(), contentFile.partition()); + List partitionTypes = partitionTypes(partitionFields, idToTypeMapping); + List> partitionColumnClass = partitionTypes.stream() + .map(type -> type.typeId().javaClass()) + .collect(toImmutableList()); + List partitionColumnTypes = partitionColumnType.orElseThrow().rowType().getFields().stream() + .map(RowType.Field::getType) + .collect(toImmutableList()); + + if (pageBuilder.getBlockBuilder(columnNameToIndex.get(FilesTable.PARTITION_COLUMN_NAME)) instanceof RowBlockBuilder rowBlockBuilder) { + rowBlockBuilder.buildEntry(fields -> { + for (int i = 0; i < partitionColumnTypes.size(); i++) { + io.trino.spi.type.Type trinoType = partitionColumnType.get().rowType().getFields().get(i).getType(); + Object value = null; + Integer fieldId = partitionColumnType.get().fieldIds().get(i); + if (partitionStruct.getFieldIdToIndex().containsKey(fieldId)) { + value = convertIcebergValueToTrino( + partitionTypes.get(i), + partitionStruct.getStructLikeWrapper().get().get(partitionStruct.getFieldIdToIndex().get(fieldId), partitionColumnClass.get(i))); + } + writeNativeValue(trinoType, fields.get(i), value); + } + }); + } + } + } + + private void writeNull(PageBuilder pageBuilder, String columnName) + { + Integer channel = columnNameToIndex.get(columnName); + if (channel == null) { + return; + } + pageBuilder.getBlockBuilder(channel).appendNull(); + } + private void writeValueOrNull(PageBuilder pageBuilder, String columnName, Supplier valueSupplier, BiConsumer valueWriter) { Integer channel = columnNameToIndex.get(columnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java index cbf1f255a30e..eec1be40cf15 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java @@ -452,7 +452,15 @@ public void testFilesPartitionTable() "('split_offsets', 'array(bigint)', '', '')," + "('equality_ids', 'array(integer)', '', '')," + "('sort_order_id', 'integer', '', '')," + - "('readable_metrics', 'json', '', '')"); + "('readable_metrics', 'json', '', '')," + + "('file_sequence_number', 'bigint', '', '')," + + "('data_sequence_number', 'bigint', '', '')," + + "('referenced_data_file', 'varchar', '', '')," + + "('pos', 'bigint', '', '')," + + "('manifest_location', 'varchar', '', '')," + + "('first_row_id', 'bigint', '', '')," + + "('content_offset', 'bigint', '', '')," + + "('content_size_in_bytes', 'bigint', '', '')"); assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\""); long offset = format == PARQUET ? 4L : 3L; diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java index a5d1cbc701e0..4f5bd9b9c2e0 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java @@ -37,7 +37,7 @@ public class DataFileRecord @SuppressWarnings("unchecked") public static DataFileRecord toDataFileRecord(MaterializedRow row) { - assertThat(row.getFieldCount()).isEqualTo(17); + assertThat(row.getFieldCount()).isEqualTo(25); return new DataFileRecord( (int) row.getField(0), (String) row.getField(1), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index c240fd7ea3a6..468c8bfd556e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -1123,7 +1123,13 @@ public void testFilesTable() "CAST(upper_bounds AS JSON), " + "key_metadata, " + "split_offsets, " + - "equality_ids " + + "equality_ids, " + + "file_sequence_number, " + + "data_sequence_number, " + + "referenced_data_file, " + + "first_row_id, " + + "content_offset, " + + "content_size_in_bytes " + "FROM \"" + tableName + "$files\"", """ VALUES @@ -1138,6 +1144,12 @@ public void testFilesTable() JSON '{"1":"24","2":"VIETNAM","3":"4","4":"y final packaget"}', null, ARRAY[4L], + null, + 1L, + 1L, + null, + null, + null, null), (0, 'ORC', @@ -1150,6 +1162,12 @@ public void testFilesTable() JSON '{"1":"4"}', X'54 72 69 6e 6f', ARRAY[4L], + null, + 2L, + 2L, + null, + null, + null, null), (2, 'PARQUET', @@ -1162,8 +1180,17 @@ public void testFilesTable() JSON '{"3":"1"}', null, ARRAY[4], - ARRAY[3]) + ARRAY[3], + 3L, + 3L, + null, + null, + null, + null) """); + // Verify columns with unpredictable exact values + assertThat(query("SELECT bool_and(manifest_location IS NOT NULL), bool_and(pos IS NOT NULL) FROM \"" + tableName + "$files\"")) + .matches("VALUES (true, true)"); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java index 668ef2c7180f..8a591a88d3e6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java @@ -1358,6 +1358,30 @@ void testIcebergWritesAndTrinoReadsDeletionVector() assertThat(query("SELECT * FROM " + tableName)) .matches("VALUES (1), (2), (4)"); + // Verify new columns for data files: delete-specific columns are NULL + assertThat(query( + "SELECT referenced_data_file, content_offset, content_size_in_bytes, " + + "file_sequence_number IS NOT NULL, data_sequence_number IS NOT NULL, " + + "manifest_location IS NOT NULL, pos IS NOT NULL " + + "FROM \"" + tableName + "$files\" WHERE content = 0")) + .matches("VALUES (CAST(NULL AS VARCHAR), CAST(NULL AS BIGINT), CAST(NULL AS BIGINT), true, true, true, true)"); + + // Verify new columns for deletion vector: referenced_data_file matches the data file, content_offset and content_size_in_bytes are set + assertThat(query( + "SELECT referenced_data_file IS NOT NULL, content_offset IS NOT NULL, " + + "content_size_in_bytes IS NOT NULL AND content_size_in_bytes > 0, " + + "file_sequence_number IS NOT NULL, data_sequence_number IS NOT NULL, " + + "manifest_location IS NOT NULL, pos IS NOT NULL " + + "FROM \"" + tableName + "$files\" WHERE content = 1")) + .matches("VALUES (true, true, true, true, true, true, true)"); + + // Verify referenced_data_file for the DV matches the data file path + assertThat(query( + "SELECT dv.referenced_data_file = df.file_path " + + "FROM \"" + tableName + "$files\" dv " + + "JOIN \"" + tableName + "$files\" df ON dv.content = 1 AND df.content = 0")) + .matches("VALUES (true)"); + assertUpdate("DROP TABLE " + tableName); } @@ -1394,6 +1418,31 @@ void testTrinoWritesAndReadsDeletionVector() assertThat(query("SELECT sum(record_count), count(*), count_if(file_format = 'PUFFIN'), count(distinct file_path) FROM \"" + table.getName() + "$files\" WHERE content = 1")) .matches("VALUES (BIGINT '500', BIGINT '10', BIGINT '10', BIGINT '1')"); + // Verify new columns for DVs: delete-specific columns are set, sequence numbers and manifest_location present + assertThat(query( + "SELECT bool_and(referenced_data_file IS NOT NULL), " + + "bool_and(content_offset IS NOT NULL), " + + "bool_and(content_size_in_bytes IS NOT NULL AND content_size_in_bytes > 0), " + + "bool_and(file_sequence_number IS NOT NULL), " + + "bool_and(manifest_location IS NOT NULL) " + + "FROM \"" + table.getName() + "$files\" WHERE content = 1")) + .matches("VALUES (true, true, true, true, true)"); + + // Verify data files have NULL for delete-specific columns + assertThat(query( + "SELECT bool_and(referenced_data_file IS NULL), " + + "bool_and(content_offset IS NULL), " + + "bool_and(content_size_in_bytes IS NULL) " + + "FROM \"" + table.getName() + "$files\" WHERE content = 0")) + .matches("VALUES (true, true, true)"); + + // Verify referenced_data_file for each DV matches an actual data file path + assertThat(query( + "SELECT count(*) FROM \"" + table.getName() + "$files\" dv " + + "WHERE dv.content = 1 AND dv.referenced_data_file IN " + + "(SELECT file_path FROM \"" + table.getName() + "$files\" WHERE content = 0)")) + .matches("SELECT count(*) FROM \"" + table.getName() + "$files\" WHERE content = 1"); + // delete multiples of 5 => 100 rows removed assertUpdate("DELETE FROM " + table.getName() + " WHERE id % 5 = 0", 100);