diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 9bb3cd7e88ad..35232792ea79 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -1559,6 +1559,14 @@ The output of the query has the following columns: * - `sort_order_id` - `INTEGER` - ID representing sort order for this file. +* - `content_offset` + - `BIGINT` + - The offset in the file where the content starts. Applicable for deletion + vectors stored in shared Puffin files. +* - `content_size_in_bytes` + - `BIGINT` + - The length of referenced content stored in the file. Applicable for deletion + vectors stored in shared Puffin files. * - `readable_metrics` - `JSON` - File metrics in human-readable form. diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java index 7181dc0ad92a..e307bcb4a2d2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java @@ -72,6 +72,8 @@ public final class FilesTable public static final String SPLIT_OFFSETS_COLUMN_NAME = "split_offsets"; public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids"; public static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id"; + public static final String CONTENT_OFFSET_COLUMN_NAME = "content_offset"; + public static final String CONTENT_SIZE_IN_BYTES_COLUMN_NAME = "content_size_in_bytes"; public static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics"; private static final List COLUMN_NAMES = ImmutableList.of( @@ -92,6 +94,8 @@ public final class FilesTable SPLIT_OFFSETS_COLUMN_NAME, EQUALITY_IDS_COLUMN_NAME, SORT_ORDER_ID_COLUMN_NAME, + CONTENT_OFFSET_COLUMN_NAME, + CONTENT_SIZE_IN_BYTES_COLUMN_NAME, READABLE_METRICS_COLUMN_NAME); private final ConnectorTableMetadata tableMetadata; @@ -161,7 +165,9 @@ public static Type getColumnType(String columnName, TypeManager typeManager) case FILE_PATH_COLUMN_NAME, FILE_FORMAT_COLUMN_NAME -> VARCHAR; case RECORD_COUNT_COLUMN_NAME, - FILE_SIZE_IN_BYTES_COLUMN_NAME -> BIGINT; + FILE_SIZE_IN_BYTES_COLUMN_NAME, + CONTENT_OFFSET_COLUMN_NAME, + CONTENT_SIZE_IN_BYTES_COLUMN_NAME -> BIGINT; case COLUMN_SIZES_COLUMN_NAME, NULL_VALUE_COUNTS_COLUMN_NAME, VALUE_COUNTS_COLUMN_NAME, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java index 0c76b1a896f1..f3ab877b3fdd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java @@ -32,6 +32,7 @@ import io.trino.spi.type.RowType; import io.trino.spi.type.TypeManager; import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestReader; import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.PartitionField; @@ -67,6 +68,8 @@ import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper; import static io.trino.plugin.iceberg.system.FilesTable.COLUMN_SIZES_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_OFFSET_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.EQUALITY_IDS_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.FILE_FORMAT_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.FILE_PATH_COLUMN_NAME; @@ -258,6 +261,14 @@ public SourcePage getNextSourcePage() // sort_order_id writeValueOrNull(pageBuilder, SORT_ORDER_ID_COLUMN_NAME, contentFile::sortOrderId, (blkBldr, value) -> INTEGER.writeLong(blkBldr, value)); + // content_offset + writeValueOrNull(pageBuilder, CONTENT_OFFSET_COLUMN_NAME, + () -> contentFile instanceof DeleteFile deleteFile ? deleteFile.contentOffset() : null, + BIGINT::writeLong); + // content_size_in_bytes + writeValueOrNull(pageBuilder, CONTENT_SIZE_IN_BYTES_COLUMN_NAME, + () -> contentFile instanceof DeleteFile deleteFile ? deleteFile.contentSizeInBytes() : null, + BIGINT::writeLong); // readable_metrics writeValueOrNull(pageBuilder, READABLE_METRICS_COLUMN_NAME, () -> metadataSchema.findField(MetricsUtil.READABLE_METRICS), (blkBldr, value) -> VARCHAR.writeString(blkBldr, readableMetricsToJson(readableMetricsStruct(schema, contentFile, value.type().asStructType()), primitiveFields))); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java index cbf1f255a30e..321d6332e2aa 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java @@ -431,6 +431,87 @@ public void testFilesTable() } } + @Test + public void testFilesTableDeleteFileDeduplication() + throws Exception + { + try (TestTable testTable = newTrinoTable("test_files_delete_dedup_", + "WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation")) { + String tableName = testTable.getName(); + Table icebergTable = loadTable(tableName); + + // Verify initial state: only data files, no delete files + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 0")) + .matches("VALUES BIGINT '5'"); // one data file per regionkey partition + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content != 0")) + .matches("VALUES BIGINT '0'"); + + // Write a position delete via MOR path + assertUpdate("DELETE FROM " + tableName + " WHERE nationkey = 7", 1); + + // Write an equality delete file for regionkey=2 + writeEqualityDeleteForTable(icebergTable, fileSystemFactory, + Optional.of(icebergTable.spec()), + Optional.of(new PartitionData(new Long[] {2L})), + ImmutableMap.of("regionkey", 2L), + Optional.empty()); + + // Verify: each file path should appear exactly once (no duplicates) + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 1")) + .matches("VALUES BIGINT '1'"); // exactly 1 position delete file + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 2")) + .matches("VALUES BIGINT '1'"); // exactly 1 equality delete file + + // Verify no duplicate file paths exist + assertThat(query("SELECT count(file_path) - count(DISTINCT file_path) FROM \"" + tableName + "$files\"")) + .matches("VALUES BIGINT '0'"); + } + } + + @Test + public void testFilesTableDeletionVectors() + { + try (TestTable testTable = newTrinoTable("test_files_dv_", + "(id INTEGER) WITH (format_version = 3, format = 'PARQUET')")) { + String tableName = testTable.getName(); + + // Insert data across multiple data files + for (int i = 0; i < 3; i++) { + assertUpdate("INSERT INTO " + tableName + " SELECT x FROM UNNEST(sequence(%s, %s)) t(x)".formatted(i * 100 + 1, (i + 1) * 100), 100); + } + + // Verify initial state: 3 data files, no delete files + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 0")) + .matches("VALUES BIGINT '3'"); + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content != 0")) + .matches("VALUES BIGINT '0'"); + + // Delete rows to create deletion vectors (stored in shared Puffin files) + assertUpdate("DELETE FROM " + tableName + " WHERE id % 2 = 0", 150); + + // In v3, deletion vectors for multiple data files are stored in a single Puffin file. + // The $files table shows one entry per DV (one per data file), all sharing the same file_path. + // content_offset and content_size_in_bytes distinguish individual DVs within the shared Puffin file. + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 1")) + .matches("VALUES BIGINT '3'"); // one DV entry per data file + assertThat(query("SELECT count_if(file_format = 'PUFFIN') FROM \"" + tableName + "$files\" WHERE content = 1")) + .matches("VALUES BIGINT '3'"); + // All DV entries share the same Puffin file path + assertThat(query("SELECT count(DISTINCT file_path) FROM \"" + tableName + "$files\" WHERE content = 1")) + .matches("VALUES BIGINT '1'"); + + // Each DV entry has distinct content_offset within the shared Puffin file + assertThat(query("SELECT count(DISTINCT content_offset) FROM \"" + tableName + "$files\" WHERE content = 1")) + .matches("VALUES BIGINT '3'"); + // All DV entries have non-null content_offset and content_size_in_bytes + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 1 AND content_offset IS NOT NULL AND content_size_in_bytes IS NOT NULL")) + .matches("VALUES BIGINT '3'"); + // Data files have null content_offset and content_size_in_bytes + assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 0 AND content_offset IS NULL AND content_size_in_bytes IS NULL")) + .matches("VALUES BIGINT '3'"); + } + } + @Test public void testFilesPartitionTable() { @@ -452,6 +533,8 @@ public void testFilesPartitionTable() "('split_offsets', 'array(bigint)', '', '')," + "('equality_ids', 'array(integer)', '', '')," + "('sort_order_id', 'integer', '', '')," + + "('content_offset', 'bigint', '', '')," + + "('content_size_in_bytes', 'bigint', '', '')," + "('readable_metrics', 'json', '', '')"); assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\""); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java index a5d1cbc701e0..30a4a7b0d53a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java @@ -37,7 +37,7 @@ public class DataFileRecord @SuppressWarnings("unchecked") public static DataFileRecord toDataFileRecord(MaterializedRow row) { - assertThat(row.getFieldCount()).isEqualTo(17); + assertThat(row.getFieldCount()).isEqualTo(19); return new DataFileRecord( (int) row.getField(0), (String) row.getField(1),