Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,14 @@ The output of the query has the following columns:
* - `sort_order_id`
- `INTEGER`
- ID representing sort order for this file.
* - `content_offset`
- `BIGINT`
- The offset in the file where the content starts. Applicable for deletion
vectors stored in shared Puffin files.
* - `content_size_in_bytes`
- `BIGINT`
- The length of referenced content stored in the file. Applicable for deletion
vectors stored in shared Puffin files.
* - `readable_metrics`
- `JSON`
- File metrics in human-readable form.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public final class FilesTable
public static final String SPLIT_OFFSETS_COLUMN_NAME = "split_offsets";
public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids";
public static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id";
public static final String CONTENT_OFFSET_COLUMN_NAME = "content_offset";
public static final String CONTENT_SIZE_IN_BYTES_COLUMN_NAME = "content_size_in_bytes";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update $files section in iceberg.md.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done @ebyhr

public static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics";

private static final List<String> COLUMN_NAMES = ImmutableList.of(
Expand All @@ -92,6 +94,8 @@ public final class FilesTable
SPLIT_OFFSETS_COLUMN_NAME,
EQUALITY_IDS_COLUMN_NAME,
SORT_ORDER_ID_COLUMN_NAME,
CONTENT_OFFSET_COLUMN_NAME,
CONTENT_SIZE_IN_BYTES_COLUMN_NAME,
READABLE_METRICS_COLUMN_NAME);

private final ConnectorTableMetadata tableMetadata;
Expand Down Expand Up @@ -161,7 +165,9 @@ public static Type getColumnType(String columnName, TypeManager typeManager)
case FILE_PATH_COLUMN_NAME,
FILE_FORMAT_COLUMN_NAME -> VARCHAR;
case RECORD_COUNT_COLUMN_NAME,
FILE_SIZE_IN_BYTES_COLUMN_NAME -> BIGINT;
FILE_SIZE_IN_BYTES_COLUMN_NAME,
CONTENT_OFFSET_COLUMN_NAME,
CONTENT_SIZE_IN_BYTES_COLUMN_NAME -> BIGINT;
case COLUMN_SIZES_COLUMN_NAME,
NULL_VALUE_COUNTS_COLUMN_NAME,
VALUE_COUNTS_COLUMN_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.PartitionField;
Expand Down Expand Up @@ -67,6 +68,8 @@
import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper;
import static io.trino.plugin.iceberg.system.FilesTable.COLUMN_SIZES_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_OFFSET_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.EQUALITY_IDS_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.FILE_FORMAT_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.FILE_PATH_COLUMN_NAME;
Expand Down Expand Up @@ -258,6 +261,14 @@ public SourcePage getNextSourcePage()
// sort_order_id
writeValueOrNull(pageBuilder, SORT_ORDER_ID_COLUMN_NAME, contentFile::sortOrderId,
(blkBldr, value) -> INTEGER.writeLong(blkBldr, value));
// content_offset
writeValueOrNull(pageBuilder, CONTENT_OFFSET_COLUMN_NAME,
() -> contentFile instanceof DeleteFile deleteFile ? deleteFile.contentOffset() : null,
BIGINT::writeLong);
// content_size_in_bytes
writeValueOrNull(pageBuilder, CONTENT_SIZE_IN_BYTES_COLUMN_NAME,
() -> contentFile instanceof DeleteFile deleteFile ? deleteFile.contentSizeInBytes() : null,
BIGINT::writeLong);
// readable_metrics
writeValueOrNull(pageBuilder, READABLE_METRICS_COLUMN_NAME, () -> metadataSchema.findField(MetricsUtil.READABLE_METRICS),
(blkBldr, value) -> VARCHAR.writeString(blkBldr, readableMetricsToJson(readableMetricsStruct(schema, contentFile, value.type().asStructType()), primitiveFields)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,87 @@ public void testFilesTable()
}
}

@Test
public void testFilesTableDeleteFileDeduplication()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the test in v3?
I initially thought the request was to include offset and length to describe the delete file, so it would be good to have a test that demonstrates we currently don't support this
cc @findinpath

throws Exception
{
try (TestTable testTable = newTrinoTable("test_files_delete_dedup_",
"WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation")) {
String tableName = testTable.getName();
Table icebergTable = loadTable(tableName);

// Verify initial state: only data files, no delete files
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 0"))
.matches("VALUES BIGINT '5'"); // one data file per regionkey partition
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content != 0"))
.matches("VALUES BIGINT '0'");

// Write a position delete via MOR path
assertUpdate("DELETE FROM " + tableName + " WHERE nationkey = 7", 1);

// Write an equality delete file for regionkey=2
writeEqualityDeleteForTable(icebergTable, fileSystemFactory,
Optional.of(icebergTable.spec()),
Optional.of(new PartitionData(new Long[] {2L})),
ImmutableMap.of("regionkey", 2L),
Optional.empty());

// Verify: each file path should appear exactly once (no duplicates)
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 1"))
.matches("VALUES BIGINT '1'"); // exactly 1 position delete file
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 2"))
.matches("VALUES BIGINT '1'"); // exactly 1 equality delete file

// Verify no duplicate file paths exist
assertThat(query("SELECT count(file_path) - count(DISTINCT file_path) FROM \"" + tableName + "$files\""))
.matches("VALUES BIGINT '0'");
}
}

@Test
public void testFilesTableDeletionVectors()
{
try (TestTable testTable = newTrinoTable("test_files_dv_",
"(id INTEGER) WITH (format_version = 3, format = 'PARQUET')")) {
String tableName = testTable.getName();

// Insert data across multiple data files
for (int i = 0; i < 3; i++) {
assertUpdate("INSERT INTO " + tableName + " SELECT x FROM UNNEST(sequence(%s, %s)) t(x)".formatted(i * 100 + 1, (i + 1) * 100), 100);
}

// Verify initial state: 3 data files, no delete files
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 0"))
.matches("VALUES BIGINT '3'");
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content != 0"))
.matches("VALUES BIGINT '0'");

// Delete rows to create deletion vectors (stored in shared Puffin files)
assertUpdate("DELETE FROM " + tableName + " WHERE id % 2 = 0", 150);

// In v3, deletion vectors for multiple data files are stored in a single Puffin file.
// The $files table shows one entry per DV (one per data file), all sharing the same file_path.
// content_offset and content_size_in_bytes distinguish individual DVs within the shared Puffin file.
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 1"))
.matches("VALUES BIGINT '3'"); // one DV entry per data file
assertThat(query("SELECT count_if(file_format = 'PUFFIN') FROM \"" + tableName + "$files\" WHERE content = 1"))
.matches("VALUES BIGINT '3'");
// All DV entries share the same Puffin file path
assertThat(query("SELECT count(DISTINCT file_path) FROM \"" + tableName + "$files\" WHERE content = 1"))
.matches("VALUES BIGINT '1'");

// Each DV entry has distinct content_offset within the shared Puffin file
assertThat(query("SELECT count(DISTINCT content_offset) FROM \"" + tableName + "$files\" WHERE content = 1"))
.matches("VALUES BIGINT '3'");
// All DV entries have non-null content_offset and content_size_in_bytes
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 1 AND content_offset IS NOT NULL AND content_size_in_bytes IS NOT NULL"))
.matches("VALUES BIGINT '3'");
// Data files have null content_offset and content_size_in_bytes
assertThat(query("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = 0 AND content_offset IS NULL AND content_size_in_bytes IS NULL"))
.matches("VALUES BIGINT '3'");
}
}

@Test
public void testFilesPartitionTable()
{
Expand All @@ -452,6 +533,8 @@ public void testFilesPartitionTable()
"('split_offsets', 'array(bigint)', '', '')," +
"('equality_ids', 'array(integer)', '', '')," +
"('sort_order_id', 'integer', '', '')," +
"('content_offset', 'bigint', '', '')," +
"('content_size_in_bytes', 'bigint', '', '')," +
"('readable_metrics', 'json', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\"");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DataFileRecord
@SuppressWarnings("unchecked")
public static DataFileRecord toDataFileRecord(MaterializedRow row)
{
assertThat(row.getFieldCount()).isEqualTo(17);
assertThat(row.getFieldCount()).isEqualTo(19);
return new DataFileRecord(
(int) row.getField(0),
(String) row.getField(1),
Expand Down