Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.ParquetEncoding;
import io.trino.parquet.PrimitiveField;
import io.trino.parquet.metadata.ColumnChunkMetadata;
import io.trino.parquet.metadata.PrunedBlockMetadata;
import io.trino.parquet.predicate.DictionaryDescriptor;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import io.trino.parquet.reader.decoders.ValueDecoder;
import io.trino.parquet.reader.flat.ColumnAdapter;
import io.trino.parquet.reader.flat.DictionaryDecoder;
Expand All @@ -28,13 +32,19 @@
import io.trino.spi.type.DateType;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.io.ParquetDecodingException;

import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.parquet.ParquetEncoding.PLAIN_DICTIONARY;
import static io.trino.parquet.ParquetEncoding.RLE_DICTIONARY;
import static io.trino.parquet.ParquetReaderUtils.isOnlyDictionaryEncodingPages;
import static io.trino.parquet.reader.decoders.ValueDecoder.ValueDecodersProvider;
import static io.trino.parquet.reader.flat.DictionaryDecoder.DictionaryDecoderProvider;
import static io.trino.parquet.reader.flat.RowRangesIterator.createRowRangesIterator;
Expand All @@ -56,6 +66,8 @@ public abstract class AbstractColumnReader<BufferType>
@Nullable
protected DictionaryDecoder<BufferType> dictionaryDecoder;
private boolean produceDictionaryBlock;
@Nullable
private DictionaryPage dictionaryPage;

public AbstractColumnReader(
PrimitiveField field,
Expand All @@ -77,6 +89,7 @@ public void setPageReader(PageReader pageReader, Optional<FilteredRowRanges> row
// if it is partly or completely dictionary encoded. At most one dictionary page
// can be placed in a column chunk.
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
this.dictionaryPage = dictionaryPage;

// For dictionary based encodings - https://github.com/apache/parquet-format/blob/master/Encodings.md
if (dictionaryPage != null) {
Expand All @@ -87,6 +100,29 @@ public void setPageReader(PageReader pageReader, Optional<FilteredRowRanges> row
this.rowRanges = createRowRangesIterator(rowRanges);
}

@Override
public boolean dictionaryPredicateMatch(RowGroupInfo rowGroupInfo)
throws IOException
{
checkState(hasPageReader(), "Don't have a pageReader yet, invoke setPageReader() first");
Optional<TupleDomainParquetPredicate> indexPredicate = rowGroupInfo.indexPredicate();
Optional<Set<ColumnDescriptor>> candidateColumnsForDictionaryMatching = rowGroupInfo.candidateColumnsForDictionaryMatching();
if (indexPredicate.isPresent() && candidateColumnsForDictionaryMatching.isPresent()) {
ColumnDescriptor descriptor = field.getDescriptor();
PrunedBlockMetadata prunedBlockMetadata = rowGroupInfo.prunedBlockMetadata();
ColumnChunkMetadata columnMetaData = prunedBlockMetadata.getColumnChunkMetaData(descriptor);
if (candidateColumnsForDictionaryMatching.get().contains(descriptor) && isOnlyDictionaryEncodingPages(columnMetaData)) {
Statistics<?> columnStatistics = columnMetaData.getStatistics();
boolean nullAllowed = columnStatistics == null || columnStatistics.getNumNulls() != 0;
return indexPredicate.get().matches(new DictionaryDescriptor(
descriptor,
nullAllowed,
Optional.ofNullable(dictionaryPage)));
}
}
return true;
}

protected abstract boolean isNonNull();

protected boolean produceDictionaryBlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.parquet.reader;

import java.io.IOException;
import java.util.Optional;

public interface ColumnReader
Expand All @@ -21,6 +22,9 @@ public interface ColumnReader

void setPageReader(PageReader pageReader, Optional<FilteredRowRanges> rowRanges);

boolean dictionaryPredicateMatch(RowGroupInfo rowGroupInfo)
throws IOException;

void prepareNextRead(int batchSize);

ColumnChunk readPrimitive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class ParquetReader
private static final int BATCH_SIZE_GROWTH_FACTOR = 2;
public static final String PARQUET_CODEC_METRIC_PREFIX = "ParquetReaderCompressionFormat_";
public static final String COLUMN_INDEX_ROWS_FILTERED = "ParquetColumnIndexRowsFiltered";
public static final String PARQUET_READER_DICTIONARY_FILTERED_ROWGROUPS = "ParquetReaderDictionaryFilteredRowGroups";

private final Optional<String> fileCreatedBy;
private final List<RowGroupInfo> rowGroups;
Expand Down Expand Up @@ -151,6 +152,7 @@ public class ParquetReader
private int currentPageId;

private long columnIndexRowsFiltered = -1;
private long dictionaryFilteredRowGroups;
private final Optional<FileDecryptionContext> decryptionContext;

public ParquetReader(
Expand Down Expand Up @@ -467,38 +469,67 @@ private int nextBatch()
private boolean advanceToNextRowGroup()
throws IOException
{
currentRowGroupMemoryContext.close();
currentRowGroupMemoryContext = memoryContext.newAggregatedMemoryContext();
freeCurrentRowGroupBuffers();

if (currentRowGroup >= 0 && rowGroupStatisticsValidation.isPresent()) {
StatisticsValidation statisticsValidation = rowGroupStatisticsValidation.get();
writeValidation.orElseThrow().validateRowGroupStatistics(dataSource.getId(), currentBlockMetadata, statisticsValidation.build());
statisticsValidation.reset();
}

currentRowGroup++;
if (currentRowGroup == rowGroups.size()) {
return false;
}
RowGroupInfo rowGroupInfo = rowGroups.get(currentRowGroup);
currentBlockMetadata = rowGroupInfo.prunedBlockMetadata();
firstRowIndexInGroup = rowGroupInfo.fileRowOffset();
currentGroupRowCount = currentBlockMetadata.getRowCount();
FilteredRowRanges currentGroupRowRanges = blockRowRanges[currentRowGroup];
log.debug("advanceToNextRowGroup dataSource %s, currentRowGroup %d, rowRanges %s, currentBlockMetadata %s", dataSource.getId(), currentRowGroup, currentGroupRowRanges, currentBlockMetadata);
if (currentGroupRowRanges != null) {
long rowCount = currentGroupRowRanges.getRowCount();
columnIndexRowsFiltered += currentGroupRowCount - rowCount;
if (rowCount == 0) {
// Filters on multiple columns with page indexes may yield non-overlapping row ranges and eliminate the entire row group.
// Advance to next row group to ensure that we don't return a null Page and close the page source before all row groups are processed
return advanceToNextRowGroup();
while (currentRowGroup < rowGroups.size()) {
currentRowGroupMemoryContext.close();
currentRowGroupMemoryContext = memoryContext.newAggregatedMemoryContext();
freeCurrentRowGroupBuffers();

if (currentRowGroup >= 0 && rowGroupStatisticsValidation.isPresent()) {
StatisticsValidation statisticsValidation = rowGroupStatisticsValidation.get();
writeValidation.orElseThrow().validateRowGroupStatistics(dataSource.getId(), currentBlockMetadata, statisticsValidation.build());
statisticsValidation.reset();
}

currentRowGroup++;
if (currentRowGroup == rowGroups.size()) {
return false;
}
RowGroupInfo rowGroupInfo = rowGroups.get(currentRowGroup);
currentBlockMetadata = rowGroupInfo.prunedBlockMetadata();
firstRowIndexInGroup = rowGroupInfo.fileRowOffset();
currentGroupRowCount = currentBlockMetadata.getRowCount();
FilteredRowRanges currentGroupRowRanges = blockRowRanges[currentRowGroup];
log.debug("advanceToNextRowGroup dataSource %s, currentRowGroup %d, rowRanges %s, currentBlockMetadata %s", dataSource.getId(), currentRowGroup, currentGroupRowRanges, currentBlockMetadata);
if (currentGroupRowRanges != null) {
long rowCount = currentGroupRowRanges.getRowCount();
columnIndexRowsFiltered += currentGroupRowCount - rowCount;
if (rowCount == 0) {
// Filters on multiple columns with page indexes may yield non-overlapping row ranges and eliminate the entire row group.
// Advance to next row group to ensure that we don't return a null Page and close the page source before all row groups are processed
continue;
}
currentGroupRowCount = rowCount;
}
nextRowInGroup = 0L;
initializeColumnReaders();

// check dictionary predicate matches, or skip row group
if (!dictionaryPredicateMatch(rowGroupInfo)) {
dictionaryFilteredRowGroups++;
continue;
}
return true;
}
return false;
}

private boolean dictionaryPredicateMatch(RowGroupInfo rowGroupInfo)
{
for (PrimitiveField field : primitiveFields) {
// check presence of indexPredicate and don't eagerly initializePageReader if it's not present
if (rowGroupInfo.indexPredicate().isPresent()) {
try {
initializePageReader(field);
boolean match = columnReaders.get(field.getId()).dictionaryPredicateMatch(rowGroupInfo);
if (!match) {
return false;
}
}
catch (Exception e) {
log.error(e, "Error while matching dictionary predicate for field " + field);
}
}
currentGroupRowCount = rowCount;
}
nextRowInGroup = 0L;
initializeColumnReaders();
return true;
}

Expand Down Expand Up @@ -654,29 +685,10 @@ private FilteredOffsetIndex getFilteredOffsetIndex(FilteredRowRanges rowRanges,
private ColumnChunk readPrimitive(PrimitiveField field)
throws IOException
{
ColumnDescriptor columnDescriptor = field.getDescriptor();
int fieldId = field.getId();
ColumnReader columnReader = columnReaders.get(fieldId);
if (!columnReader.hasPageReader()) {
validateParquet(currentBlockMetadata.getRowCount() > 0, dataSource.getId(), "Row group has 0 rows");
ColumnChunkMetadata metadata = currentBlockMetadata.getColumnChunkMetaData(columnDescriptor);
FilteredRowRanges rowRanges = blockRowRanges[currentRowGroup];
OffsetIndex offsetIndex = null;
if (rowRanges != null) {
offsetIndex = getFilteredOffsetIndex(rowRanges, currentRowGroup, currentBlockMetadata.getRowCount(), metadata.getPath());
}
ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup));
columnReader.setPageReader(
createPageReader(
dataSource.getId(),
columnChunkInputStream,
metadata,
columnDescriptor,
offsetIndex,
fileCreatedBy,
decryptionContext,
options.getMaxPageReadSize().toBytes()),
Optional.ofNullable(rowRanges));
initializePageReader(field);
}
ColumnChunk columnChunk = columnReader.readPrimitive();

Expand All @@ -692,6 +704,34 @@ private ColumnChunk readPrimitive(PrimitiveField field)
return columnChunk;
}

private void initializePageReader(PrimitiveField field)
throws ParquetCorruptionException
{
ColumnDescriptor columnDescriptor = field.getDescriptor();
int fieldId = field.getId();
ColumnReader columnReader = columnReaders.get(fieldId);
checkState(!columnReader.hasPageReader(), "Page reader already initialized");
validateParquet(currentBlockMetadata.getRowCount() > 0, dataSource.getId(), "Row group has 0 rows");
ColumnChunkMetadata metadata = currentBlockMetadata.getColumnChunkMetaData(columnDescriptor);
FilteredRowRanges rowRanges = blockRowRanges[currentRowGroup];
OffsetIndex offsetIndex = null;
if (rowRanges != null) {
offsetIndex = getFilteredOffsetIndex(rowRanges, currentRowGroup, currentBlockMetadata.getRowCount(), metadata.getPath());
}
ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup));
columnReader.setPageReader(
createPageReader(
dataSource.getId(),
columnChunkInputStream,
metadata,
columnDescriptor,
offsetIndex,
fileCreatedBy,
decryptionContext,
options.getMaxPageReadSize().toBytes()),
Optional.ofNullable(rowRanges));
}

public List<Column> getColumnFields()
{
return columnFields;
Expand All @@ -704,6 +744,7 @@ public Metrics getMetrics()
if (columnIndexRowsFiltered >= 0) {
metrics.put(COLUMN_INDEX_ROWS_FILTERED, new LongCount(columnIndexRowsFiltered));
}
metrics.put(PARQUET_READER_DICTIONARY_FILTERED_ROWGROUPS, new LongCount(dictionaryFilteredRowGroups));
metrics.putAll(dataSource.getMetrics().getMetrics());

return new Metrics(metrics.buildOrThrow());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
package io.trino.parquet.reader;

import io.trino.parquet.metadata.PrunedBlockMetadata;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;

import java.util.Optional;
import java.util.Set;

public record RowGroupInfo(PrunedBlockMetadata prunedBlockMetadata, long fileRowOffset, Optional<ColumnIndexStore> columnIndexStore) {}
public record RowGroupInfo(PrunedBlockMetadata prunedBlockMetadata, long fileRowOffset, Optional<ColumnIndexStore> columnIndexStore,
Optional<TupleDomainParquetPredicate> indexPredicate, Optional<Set<ColumnDescriptor>> candidateColumnsForDictionaryMatching)
{}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetada
long nextStart = 0;
ImmutableList.Builder<RowGroupInfo> rowGroupInfoBuilder = ImmutableList.builder();
for (BlockMetadata block : parquetMetadata.getBlocks()) {
rowGroupInfoBuilder.add(new RowGroupInfo(createPrunedColumnsMetadata(block, input.getId(), descriptorsByPath), nextStart, Optional.empty()));
rowGroupInfoBuilder.add(new RowGroupInfo(createPrunedColumnsMetadata(block, input.getId(), descriptorsByPath), nextStart, Optional.empty(), Optional.empty(), Optional.empty()));
nextStart += block.rowCount();
}
return new ParquetReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ public static List<io.trino.spi.Page> generateInputPages(List<Type> types, int p
return pagesBuilder.build();
}

public static List<io.trino.spi.Page> generateInputPagesWithBlockData(List<Type> types, List<? extends List<?>> blockData, int pageCount)
{
checkArgument(blockData.size() == types.size());
ImmutableList.Builder<io.trino.spi.Page> pagesBuilder = ImmutableList.builder();
for (int pageIndex = 0; pageIndex < pageCount; pageIndex++) {
Block[] blocks = new Block[types.size()];
for (int i = 0; i < types.size(); i++) {
blocks[i] = generateBlock(types.get(i), blockData.get(i));
}
pagesBuilder.add(new Page(blocks));
}
return pagesBuilder.build();
}

public static List<Integer> generateGroupSizes(int positionsCount)
{
int maxGroupSize = 17;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,20 +475,10 @@ void encryptedDictionaryPruningTwoColumns()
TupleDomainParquetPredicate predicateAge = new TupleDomainParquetPredicate(
domainAge, ImmutableList.of(age), UTC);

List<RowGroupInfo> groupsAge = getFilteredRowGroups(
0,
source.getEstimatedSize(),
source,
metadata,
List.of(domainAge),
List.of(predicateAge),
ImmutableMap.of(ImmutableList.of("age"), age),
UTC,
200,
ParquetReaderOptions.builder().build());
Map<String, List<Integer>> data = readTwoColumnFile(file, new TestingKeyRetriever(Optional.of(KEY_FOOT), Optional.of(KEY_AGE), Optional.of(KEY_ID)), domainAge, predicateAge);

// No row-groups should pass after dictionary pruning
assertThat(groupsAge).isEmpty();
// Should be filtered by dictionary filtering in reader
assertThat(data).containsValues(List.of(), List.of());

// ——— Predicate on inaccessible column (id = missingId) → should fail (no column key) ———
TupleDomain<ColumnDescriptor> domainId = TupleDomain.withColumnDomains(ImmutableMap.of(id, singleValue(INTEGER, (long) missingId)));
Expand Down Expand Up @@ -905,11 +895,28 @@ private static List<Integer> readSingleColumnFile(
}
}

private static Map<String, List<Integer>> readTwoColumnFile(
File file, DecryptionKeyRetriever retriever)
throws IOException
{
ColumnDescriptor ageDescriptor = new ColumnDescriptor(
new String[] {"age"},
Types.required(PrimitiveType.PrimitiveTypeName.INT32).named("age"), 0, 0);

ColumnDescriptor idDescriptor = new ColumnDescriptor(
new String[] {"id"},
Types.required(PrimitiveType.PrimitiveTypeName.INT32).named("id"), 0, 0);

TupleDomainParquetPredicate allPredicate = new TupleDomainParquetPredicate(
TupleDomain.all(), ImmutableList.of(ageDescriptor, idDescriptor), UTC);
return readTwoColumnFile(file, retriever, TupleDomain.all(), allPredicate);
}

/**
* Reads both columns and returns a map “age” → values, “id → values.
*/
private static Map<String, List<Integer>> readTwoColumnFile(
File file, DecryptionKeyRetriever retriever)
File file, DecryptionKeyRetriever retriever, TupleDomain<ColumnDescriptor> domain, TupleDomainParquetPredicate predicate)
throws IOException
{
ParquetDataSource source = new FileParquetDataSource(file, ParquetReaderOptions.builder().build());
Expand All @@ -931,12 +938,9 @@ private static Map<String, List<Integer>> readTwoColumnFile(
ImmutableList.of("age"), ageDescriptor,
ImmutableList.of("id"), idDescriptor);

TupleDomainParquetPredicate predicate = new TupleDomainParquetPredicate(
TupleDomain.all(), ImmutableList.of(ageDescriptor, idDescriptor), UTC);

List<RowGroupInfo> groups = getFilteredRowGroups(
0, source.getEstimatedSize(), source, metadata,
List.of(TupleDomain.all()), List.of(predicate),
List.of(domain), List.of(predicate),
byPath, UTC, 200, ParquetReaderOptions.builder().build());

PrimitiveField ageField = new PrimitiveField(INTEGER, true, ageDescriptor, 0);
Expand Down
Loading