Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -1490,9 +1490,9 @@ SELECT * FROM "test_table$files";
```

```text
content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids
----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+---------------
0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | <null> | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | <null> | <null> | <null>
content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | file_sequence_number | data_sequence_number | referenced_data_file | pos | manifest_location | first_row_id | content_offset | content_size_in_bytes
----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+----------------+----------------------+----------------------+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+----------------------
0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | <null> | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | <null> | <null> | <null> | 1 | 1 | <null> | 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-6116016324956900164-0-3c1b2496-0670-4e37-81f6.avro | <null> | <null> | <null>
```

The output of the query has the following columns:
Expand Down Expand Up @@ -1570,6 +1570,34 @@ The output of the query has the following columns:
* - `readable_metrics`
- `JSON`
- File metrics in human-readable form.
* - `file_sequence_number`
- `BIGINT`
- The sequence number of the file, tracking when the file was added.
* - `data_sequence_number`
- `BIGINT`
- The data sequence number for the file, used for determining row-level deletes
applicability.
* - `referenced_data_file`
- `VARCHAR`
- The path of the data file that a delete file applies to. Only set for
position delete files and deletion vectors, `NULL` for data files.
* - `pos`
- `BIGINT`
- The ordinal position of the file in the manifest.
* - `manifest_location`
- `VARCHAR`
- The location of the manifest that contains this file.
* - `first_row_id`
- `BIGINT`
- The ID of the first row in the data file.
* - `content_offset`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

- `BIGINT`
- The offset in the file where the content starts. Only set for deletion
vectors, `NULL` for data files.
* - `content_size_in_bytes`
- `BIGINT`
- The size of the content in bytes. Only set for deletion vectors, `NULL` for
data files.
Comment on lines +1593 to +1600
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current name is misleading, as the functionality is clearly valuable for data files as well. I understand the intent behind it, but in that case, I would recommend either incorporating "delete" or "deletion" into the name for clarity - otherwise looks confusion with "file_size_in_bytes"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its valueable for data files as in iceberg this property is set to null for data files

I could rename that and content_offset with delete or deletion prefix, but this would differ from how iceberg exposes it.

:::

##### `$entries` and `$all_entries` tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public final class FilesTable
public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids";
public static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id";
public static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics";
public static final String FILE_SEQUENCE_NUMBER_COLUMN_NAME = "file_sequence_number";
public static final String DATA_SEQUENCE_NUMBER_COLUMN_NAME = "data_sequence_number";
public static final String REFERENCED_DATA_FILE_COLUMN_NAME = "referenced_data_file";
public static final String POS_COLUMN_NAME = "pos";
public static final String MANIFEST_LOCATION_COLUMN_NAME = "manifest_location";
public static final String FIRST_ROW_ID_COLUMN_NAME = "first_row_id";
public static final String CONTENT_OFFSET_COLUMN_NAME = "content_offset";
public static final String CONTENT_SIZE_IN_BYTES_COLUMN_NAME = "content_size_in_bytes";

private static final List<String> COLUMN_NAMES = ImmutableList.of(
CONTENT_COLUMN_NAME,
Expand All @@ -92,7 +100,15 @@ public final class FilesTable
SPLIT_OFFSETS_COLUMN_NAME,
EQUALITY_IDS_COLUMN_NAME,
SORT_ORDER_ID_COLUMN_NAME,
READABLE_METRICS_COLUMN_NAME);
READABLE_METRICS_COLUMN_NAME,
FILE_SEQUENCE_NUMBER_COLUMN_NAME,
DATA_SEQUENCE_NUMBER_COLUMN_NAME,
REFERENCED_DATA_FILE_COLUMN_NAME,
POS_COLUMN_NAME,
MANIFEST_LOCATION_COLUMN_NAME,
FIRST_ROW_ID_COLUMN_NAME,
CONTENT_OFFSET_COLUMN_NAME,
CONTENT_SIZE_IN_BYTES_COLUMN_NAME);

private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;
Expand Down Expand Up @@ -159,9 +175,17 @@ public static Type getColumnType(String columnName, TypeManager typeManager)
SORT_ORDER_ID_COLUMN_NAME,
SPEC_ID_COLUMN_NAME -> INTEGER;
case FILE_PATH_COLUMN_NAME,
FILE_FORMAT_COLUMN_NAME -> VARCHAR;
FILE_FORMAT_COLUMN_NAME,
REFERENCED_DATA_FILE_COLUMN_NAME,
MANIFEST_LOCATION_COLUMN_NAME -> VARCHAR;
case RECORD_COUNT_COLUMN_NAME,
FILE_SIZE_IN_BYTES_COLUMN_NAME -> BIGINT;
FILE_SIZE_IN_BYTES_COLUMN_NAME,
FILE_SEQUENCE_NUMBER_COLUMN_NAME,
DATA_SEQUENCE_NUMBER_COLUMN_NAME,
POS_COLUMN_NAME,
FIRST_ROW_ID_COLUMN_NAME,
CONTENT_OFFSET_COLUMN_NAME,
CONTENT_SIZE_IN_BYTES_COLUMN_NAME -> BIGINT;
case COLUMN_SIZES_COLUMN_NAME,
NULL_VALUE_COUNTS_COLUMN_NAME,
VALUE_COUNTS_COLUMN_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.PartitionField;
Expand Down Expand Up @@ -188,79 +189,51 @@ public SourcePage getNextSourcePage()
long start = System.nanoTime();
ContentFile<?> contentFile = contentIterator.next();

// content
writeValueOrNull(pageBuilder, CONTENT_COLUMN_NAME, () -> contentFile.content().id(), INTEGER::writeInt);
// file_path
writeValueOrNull(pageBuilder, FILE_PATH_COLUMN_NAME, contentFile::location, VARCHAR::writeString);
// file_format
writeValueOrNull(pageBuilder, FILE_FORMAT_COLUMN_NAME, () -> contentFile.format().toString(), VARCHAR::writeString);
// spec_id
writeValueOrNull(pageBuilder, SPEC_ID_COLUMN_NAME, contentFile::specId, INTEGER::writeInt);
// partitions
if (partitionColumnType.isPresent() && columnNameToIndex.containsKey(FilesTable.PARTITION_COLUMN_NAME)) {
PartitionSpec partitionSpec = idToPartitionSpecMapping.get(contentFile.specId());
StructLikeWrapperWithFieldIdToIndex partitionStruct = createStructLikeWrapper(partitionSpec.partitionType(), contentFile.partition());
List<Type> partitionTypes = partitionTypes(partitionFields, idToTypeMapping);
List<? extends Class<?>> partitionColumnClass = partitionTypes.stream()
.map(type -> type.typeId().javaClass())
.collect(toImmutableList());
List<io.trino.spi.type.Type> partitionColumnTypes = partitionColumnType.orElseThrow().rowType().getFields().stream()
.map(RowType.Field::getType)
.collect(toImmutableList());

if (pageBuilder.getBlockBuilder(columnNameToIndex.get(FilesTable.PARTITION_COLUMN_NAME)) instanceof RowBlockBuilder rowBlockBuilder) {
rowBlockBuilder.buildEntry(fields -> {
for (int i = 0; i < partitionColumnTypes.size(); i++) {
io.trino.spi.type.Type trinoType = partitionColumnType.get().rowType().getFields().get(i).getType();
Object value = null;
Integer fieldId = partitionColumnType.get().fieldIds().get(i);
if (partitionStruct.getFieldIdToIndex().containsKey(fieldId)) {
value = convertIcebergValueToTrino(
partitionTypes.get(i),
partitionStruct.getStructLikeWrapper().get().get(partitionStruct.getFieldIdToIndex().get(fieldId), partitionColumnClass.get(i)));
}
writeNativeValue(trinoType, fields.get(i), value);
}
});
}
}
// record_count
writePartitionColumns(contentFile);
writeValueOrNull(pageBuilder, RECORD_COUNT_COLUMN_NAME, contentFile::recordCount, BIGINT::writeLong);
// file_size_in_bytes
writeValueOrNull(pageBuilder, FILE_SIZE_IN_BYTES_COLUMN_NAME, contentFile::fileSizeInBytes, BIGINT::writeLong);
// column_sizes
writeValueOrNull(pageBuilder, COLUMN_SIZES_COLUMN_NAME, contentFile::columnSizes,
FilesTablePageSource::writeIntegerBigintInMap);
// value_counts
writeValueOrNull(pageBuilder, VALUE_COUNTS_COLUMN_NAME, contentFile::valueCounts,
FilesTablePageSource::writeIntegerBigintInMap);
// null_value_counts
writeValueOrNull(pageBuilder, NULL_VALUE_COUNTS_COLUMN_NAME, contentFile::nullValueCounts,
FilesTablePageSource::writeIntegerBigintInMap);
// nan_value_counts
writeValueOrNull(pageBuilder, NAN_VALUE_COUNTS_COLUMN_NAME, contentFile::nanValueCounts,
FilesTablePageSource::writeIntegerBigintInMap);
// lower_bounds
writeValueOrNull(pageBuilder, LOWER_BOUNDS_COLUMN_NAME, contentFile::lowerBounds,
this::writeIntegerVarcharInMap);
// upper_bounds
writeValueOrNull(pageBuilder, UPPER_BOUNDS_COLUMN_NAME, contentFile::upperBounds,
this::writeIntegerVarcharInMap);
// key_metadata
writeValueOrNull(pageBuilder, KEY_METADATA_COLUMN_NAME, contentFile::keyMetadata,
(blkBldr, value) -> VARBINARY.writeSlice(blkBldr, Slices.wrappedHeapBuffer(value)));
// split_offset
writeValueOrNull(pageBuilder, SPLIT_OFFSETS_COLUMN_NAME, contentFile::splitOffsets,
FilesTablePageSource::writeLongInArray);
// equality_ids
writeValueOrNull(pageBuilder, EQUALITY_IDS_COLUMN_NAME, contentFile::equalityFieldIds,
FilesTablePageSource::writeIntegerInArray);
// sort_order_id
writeValueOrNull(pageBuilder, SORT_ORDER_ID_COLUMN_NAME, contentFile::sortOrderId,
(blkBldr, value) -> INTEGER.writeLong(blkBldr, value));
// readable_metrics
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it intentional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdym?
I see comment attached to

// readable_metrics

and I was removing all those redundant comments in this file

writeValueOrNull(pageBuilder, READABLE_METRICS_COLUMN_NAME, () -> metadataSchema.findField(MetricsUtil.READABLE_METRICS),
(blkBldr, value) -> VARCHAR.writeString(blkBldr, readableMetricsToJson(readableMetricsStruct(schema, contentFile, value.type().asStructType()), primitiveFields)));
writeValueOrNull(pageBuilder, FilesTable.FILE_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::fileSequenceNumber, BIGINT::writeLong);
writeValueOrNull(pageBuilder, FilesTable.DATA_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::dataSequenceNumber, BIGINT::writeLong);
if (contentFile instanceof DeleteFile deleteFile) {
writeValueOrNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME, deleteFile::referencedDataFile, VARCHAR::writeString);
writeValueOrNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME, deleteFile::contentOffset, BIGINT::writeLong);
writeValueOrNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME, deleteFile::contentSizeInBytes, BIGINT::writeLong);
}
else {
// For non-delete files, these columns should be null
writeNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME);
writeNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME);
writeNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME);
}
writeValueOrNull(pageBuilder, FilesTable.POS_COLUMN_NAME, contentFile::pos, BIGINT::writeLong);
writeValueOrNull(pageBuilder, FilesTable.MANIFEST_LOCATION_COLUMN_NAME, contentFile::manifestLocation, VARCHAR::writeString);
writeValueOrNull(pageBuilder, FilesTable.FIRST_ROW_ID_COLUMN_NAME, contentFile::firstRowId, BIGINT::writeLong);
readTimeNanos += System.nanoTime() - start;
}

Expand Down Expand Up @@ -297,6 +270,46 @@ public void close()
}
}

private void writePartitionColumns(ContentFile<?> contentFile)
{
if (partitionColumnType.isPresent() && columnNameToIndex.containsKey(FilesTable.PARTITION_COLUMN_NAME)) {
PartitionSpec partitionSpec = idToPartitionSpecMapping.get(contentFile.specId());
StructLikeWrapperWithFieldIdToIndex partitionStruct = createStructLikeWrapper(partitionSpec.partitionType(), contentFile.partition());
List<Type> partitionTypes = partitionTypes(partitionFields, idToTypeMapping);
List<? extends Class<?>> partitionColumnClass = partitionTypes.stream()
.map(type -> type.typeId().javaClass())
.collect(toImmutableList());
List<io.trino.spi.type.Type> partitionColumnTypes = partitionColumnType.orElseThrow().rowType().getFields().stream()
.map(RowType.Field::getType)
.collect(toImmutableList());

if (pageBuilder.getBlockBuilder(columnNameToIndex.get(FilesTable.PARTITION_COLUMN_NAME)) instanceof RowBlockBuilder rowBlockBuilder) {
rowBlockBuilder.buildEntry(fields -> {
for (int i = 0; i < partitionColumnTypes.size(); i++) {
io.trino.spi.type.Type trinoType = partitionColumnType.get().rowType().getFields().get(i).getType();
Object value = null;
Integer fieldId = partitionColumnType.get().fieldIds().get(i);
if (partitionStruct.getFieldIdToIndex().containsKey(fieldId)) {
value = convertIcebergValueToTrino(
partitionTypes.get(i),
partitionStruct.getStructLikeWrapper().get().get(partitionStruct.getFieldIdToIndex().get(fieldId), partitionColumnClass.get(i)));
}
writeNativeValue(trinoType, fields.get(i), value);
}
});
}
}
}

private void writeNull(PageBuilder pageBuilder, String columnName)
{
Integer channel = columnNameToIndex.get(columnName);
if (channel == null) {
return;
}
pageBuilder.getBlockBuilder(channel).appendNull();
}

private <T> void writeValueOrNull(PageBuilder pageBuilder, String columnName, Supplier<T> valueSupplier, BiConsumer<BlockBuilder, T> valueWriter)
{
Integer channel = columnNameToIndex.get(columnName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,15 @@ public void testFilesPartitionTable()
"('split_offsets', 'array(bigint)', '', '')," +
"('equality_ids', 'array(integer)', '', '')," +
"('sort_order_id', 'integer', '', '')," +
"('readable_metrics', 'json', '', '')");
"('readable_metrics', 'json', '', '')," +
"('file_sequence_number', 'bigint', '', '')," +
"('data_sequence_number', 'bigint', '', '')," +
"('referenced_data_file', 'varchar', '', '')," +
"('pos', 'bigint', '', '')," +
"('manifest_location', 'varchar', '', '')," +
"('first_row_id', 'bigint', '', '')," +
"('content_offset', 'bigint', '', '')," +
"('content_size_in_bytes', 'bigint', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\"");

long offset = format == PARQUET ? 4L : 3L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DataFileRecord
@SuppressWarnings("unchecked")
public static DataFileRecord toDataFileRecord(MaterializedRow row)
{
assertThat(row.getFieldCount()).isEqualTo(17);
assertThat(row.getFieldCount()).isEqualTo(25);
return new DataFileRecord(
(int) row.getField(0),
(String) row.getField(1),
Expand Down
Loading