Skip to content

Support for reading legacy date in Hive for Parquet #24611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,127 +120,159 @@ public DataSize getSmallFileThreshold()

public ParquetReaderOptions withIgnoreStatistics(boolean ignoreStatistics)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withIgnoreStatistics(ignoreStatistics)
.build();
}

public ParquetReaderOptions withMaxReadBlockSize(DataSize maxReadBlockSize)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withMaxReadBlockSize(maxReadBlockSize)
.build();
}

public ParquetReaderOptions withMaxReadBlockRowCount(int maxReadBlockRowCount)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withMaxReadBlockRowCount(maxReadBlockRowCount)
.build();
}

public ParquetReaderOptions withMaxMergeDistance(DataSize maxMergeDistance)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withMaxMergeDistance(maxMergeDistance)
.build();
}

public ParquetReaderOptions withMaxBufferSize(DataSize maxBufferSize)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withMaxBufferSize(maxBufferSize)
.build();
}

public ParquetReaderOptions withUseColumnIndex(boolean useColumnIndex)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withUseColumnIndex(useColumnIndex)
.build();
}

public ParquetReaderOptions withBloomFilter(boolean useBloomFilter)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withBloomFilter(useBloomFilter)
.build();
}

public ParquetReaderOptions withSmallFileThreshold(DataSize smallFileThreshold)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withSmallFileThreshold(smallFileThreshold)
.build();
}

public ParquetReaderOptions withVectorizedDecodingEnabled(boolean vectorizedDecodingEnabled)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
return new Builder(this)
.withVectorizedDecodingEnabled(vectorizedDecodingEnabled)
.build();
}

private static class Builder
{
private boolean ignoreStatistics;
private DataSize maxReadBlockSize;
private int maxReadBlockRowCount;
private DataSize maxMergeDistance;
private DataSize maxBufferSize;
private boolean useColumnIndex;
private boolean useBloomFilter;
private DataSize smallFileThreshold;
private boolean vectorizedDecodingEnabled;

public Builder(ParquetReaderOptions parquetReaderOptions)
{
requireNonNull(parquetReaderOptions, "parquetReaderOptions is null");
this.ignoreStatistics = parquetReaderOptions.ignoreStatistics;
this.maxReadBlockSize = parquetReaderOptions.maxReadBlockSize;
this.maxReadBlockRowCount = parquetReaderOptions.maxReadBlockRowCount;
this.maxMergeDistance = parquetReaderOptions.maxMergeDistance;
this.maxBufferSize = parquetReaderOptions.maxBufferSize;
this.useColumnIndex = parquetReaderOptions.useColumnIndex;
this.useBloomFilter = parquetReaderOptions.useBloomFilter;
this.smallFileThreshold = parquetReaderOptions.smallFileThreshold;
this.vectorizedDecodingEnabled = parquetReaderOptions.vectorizedDecodingEnabled;
}

public Builder withIgnoreStatistics(boolean ignoreStatistics)
{
this.ignoreStatistics = ignoreStatistics;
return this;
}

public Builder withMaxReadBlockSize(DataSize maxReadBlockSize)
{
this.maxReadBlockSize = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null");
return this;
}

public Builder withMaxReadBlockRowCount(int maxReadBlockRowCount)
{
this.maxReadBlockRowCount = maxReadBlockRowCount;
return this;
}

public Builder withMaxMergeDistance(DataSize maxMergeDistance)
{
this.maxMergeDistance = requireNonNull(maxMergeDistance, "maxMergeDistance is null");
return this;
}

public Builder withMaxBufferSize(DataSize maxBufferSize)
{
this.maxBufferSize = requireNonNull(maxBufferSize, "maxBufferSize is null");
return this;
}

public Builder withUseColumnIndex(boolean useColumnIndex)
{
this.useColumnIndex = useColumnIndex;
return this;
}

public Builder withBloomFilter(boolean useBloomFilter)
{
this.useBloomFilter = useBloomFilter;
return this;
}

public Builder withSmallFileThreshold(DataSize smallFileThreshold)
{
this.smallFileThreshold = requireNonNull(smallFileThreshold, "smallFileThreshold is null");
return this;
}

public Builder withVectorizedDecodingEnabled(boolean vectorizedDecodingEnabled)
{
this.vectorizedDecodingEnabled = vectorizedDecodingEnabled;
return this;
}

private ParquetReaderOptions build()
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@
import io.trino.plugin.hive.TransformConnectorPageSource;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.coercions.TypeCoercer;
import io.trino.plugin.hive.util.ValueAdjuster;
import io.trino.plugin.hive.util.ValueAdjusters;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimestampType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.MessageColumnIO;
Expand All @@ -78,6 +81,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -107,6 +111,8 @@
import static io.trino.plugin.hive.parquet.ParquetPageSource.handleException;
import static io.trino.plugin.hive.parquet.ParquetTypeTranslator.createCoercer;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static java.lang.Boolean.parseBoolean;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand All @@ -126,6 +132,8 @@ public class ParquetPageSourceFactory
Optional.empty(),
HiveColumnHandle.ColumnType.SYNTHESIZED,
Optional.empty());
// Hive's key used in file footer's metadata to document which calendar (hybrid or proleptic Gregorian) was used for write Date type
public static final String HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC = "writer.date.proleptic";

private static final Set<String> PARQUET_SERDE_CLASS_NAMES = ImmutableSet.<String>builder()
.add(PARQUET_HIVE_SERDE_CLASS)
Expand Down Expand Up @@ -232,6 +240,8 @@ public static ConnectorPageSource createPageSource(
FileMetadata fileMetaData = parquetMetadata.getFileMetaData();
fileSchema = fileMetaData.getSchema();

boolean convertDateToProleptic = shouldConvertDateToProleptic(fileMetaData.getKeyValueMetaData());

Optional<MessageType> message = getParquetMessageType(columns, useColumnNames, fileSchema);

requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of()));
Expand Down Expand Up @@ -282,9 +292,9 @@ public static ConnectorPageSource createPageSource(
exception -> handleException(dataSourceId, exception),
// We avoid using disjuncts of parquetPredicate for page pruning in ParquetReader as currently column indexes
// are not present in the Parquet files which are read with disjunct predicates.
parquetPredicates.size() == 1 ? Optional.of(parquetPredicates.get(0)) : Optional.empty(),
parquetPredicates.size() == 1 ? Optional.of(parquetPredicates.getFirst()) : Optional.empty(),
parquetWriteValidation);
return createParquetPageSource(columns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);
return createParquetPageSource(columns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider, convertDateToProleptic);
}
catch (Exception e) {
try {
Expand Down Expand Up @@ -471,6 +481,18 @@ public static ConnectorPageSource createParquetPageSource(
boolean useColumnNames,
ParquetReaderProvider parquetReaderProvider)
throws IOException
{
return createParquetPageSource(columnHandles, fileSchema, messageColumn, useColumnNames, parquetReaderProvider, false);
}

public static ConnectorPageSource createParquetPageSource(
List<HiveColumnHandle> columnHandles,
MessageType fileSchema,
MessageColumnIO messageColumn,
boolean useColumnNames,
ParquetReaderProvider parquetReaderProvider,
boolean convertDateToProleptic)
throws IOException
{
List<Column> parquetColumnFieldsBuilder = new ArrayList<>(columnHandles.size());
Map<String, Integer> baseColumnIdToOrdinal = new HashMap<>();
Expand All @@ -492,12 +514,16 @@ public static ConnectorPageSource createParquetPageSource(
String baseColumnName = useColumnNames ? baseColumn.getBaseColumnName() : fileSchema.getFields().get(baseColumn.getBaseHiveColumnIndex()).getName();

Optional<TypeCoercer<?, ?>> coercer = Optional.empty();
Optional<ValueAdjuster<?>> valueAdjuster = Optional.empty();
Integer ordinal = baseColumnIdToOrdinal.get(baseColumnName);
if (ordinal == null) {
ColumnIO columnIO = lookupColumnByName(messageColumn, baseColumnName);
if (columnIO != null && columnIO.getType().isPrimitive()) {
PrimitiveType primitiveType = columnIO.getType().asPrimitiveType();
coercer = createCoercer(primitiveType.getPrimitiveTypeName(), primitiveType.getLogicalTypeAnnotation(), baseColumn.getBaseType());
if (convertDateToProleptic && (column.getBaseType().equals(DATE) || column.getBaseType() instanceof TimestampType)) {
valueAdjuster = ValueAdjusters.createValueAdjuster(column.getBaseType());
}
}
io.trino.spi.type.Type readType = coercer.map(TypeCoercer::getFromType).orElseGet(baseColumn::getBaseType);

Expand All @@ -509,26 +535,41 @@ public static ConnectorPageSource createParquetPageSource(

ordinal = parquetColumnFieldsBuilder.size();
parquetColumnFieldsBuilder.add(new Column(baseColumnName, field.get()));

baseColumnIdToOrdinal.put(baseColumnName, ordinal);
}

if (column.isBaseColumn()) {
transforms.column(ordinal, coercer.map(Function.identity()));
transforms.column(ordinal, chain(valueAdjuster.map(Function.identity()), coercer.map(Function.identity())));
}
else {
transforms.dereferenceField(
ImmutableList.<Integer>builder()
.add(ordinal)
.addAll(getProjection(column, baseColumn))
.build(),
coercer.map(Function.identity()));
chain(valueAdjuster.map(Function.identity()), coercer.map(Function.identity())));
}
}
ParquetReader parquetReader = parquetReaderProvider.createParquetReader(parquetColumnFieldsBuilder, appendRowNumberColumn);
ConnectorPageSource pageSource = new ParquetPageSource(parquetReader);
return transforms.build(pageSource);
}

private static Optional<Function<Block, Block>> chain(Optional<Function<Block, Block>> valueAdjuster, Optional<Function<Block, Block>> typeCoercer)
{
return Optional.of(
Stream.of(valueAdjuster, typeCoercer)
.map(function -> function.orElse(Function.identity()))
.reduce(Function.identity(), Function::andThen));
}

private static boolean shouldConvertDateToProleptic(Map<String, String> keyValueMetaData)
{
// if entry exists and explicitly states 'false' then we should convert to Proleptic, in other case no
return keyValueMetaData.containsKey(HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC) && !parseBoolean(keyValueMetaData.get(HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC));
}

private static Optional<org.apache.parquet.schema.Type> getBaseColumnParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
Expand Down
Loading