49
49
import io .trino .plugin .hive .TransformConnectorPageSource ;
50
50
import io .trino .plugin .hive .acid .AcidTransaction ;
51
51
import io .trino .plugin .hive .coercions .TypeCoercer ;
52
+ import io .trino .plugin .hive .util .ValueAdjuster ;
53
+ import io .trino .plugin .hive .util .ValueAdjusters ;
52
54
import io .trino .spi .TrinoException ;
53
55
import io .trino .spi .block .Block ;
54
56
import io .trino .spi .connector .ConnectorPageSource ;
55
57
import io .trino .spi .connector .ConnectorSession ;
56
58
import io .trino .spi .connector .SourcePage ;
57
59
import io .trino .spi .predicate .Domain ;
58
60
import io .trino .spi .predicate .TupleDomain ;
61
+ import io .trino .spi .type .TimestampType ;
59
62
import org .apache .parquet .column .ColumnDescriptor ;
60
63
import org .apache .parquet .io .ColumnIO ;
61
64
import org .apache .parquet .io .MessageColumnIO ;
78
81
import java .util .OptionalLong ;
79
82
import java .util .Set ;
80
83
import java .util .function .Function ;
84
+ import java .util .stream .Stream ;
81
85
82
86
import static com .google .common .base .Preconditions .checkArgument ;
83
87
import static com .google .common .base .Preconditions .checkState ;
107
111
import static io .trino .plugin .hive .parquet .ParquetPageSource .handleException ;
108
112
import static io .trino .plugin .hive .parquet .ParquetTypeTranslator .createCoercer ;
109
113
import static io .trino .spi .type .BigintType .BIGINT ;
114
+ import static io .trino .spi .type .DateType .DATE ;
115
+ import static java .lang .Boolean .parseBoolean ;
110
116
import static java .lang .String .format ;
111
117
import static java .util .Objects .requireNonNull ;
112
118
@@ -126,6 +132,8 @@ public class ParquetPageSourceFactory
126
132
Optional .empty (),
127
133
HiveColumnHandle .ColumnType .SYNTHESIZED ,
128
134
Optional .empty ());
135
+ // Hive's key used in file footer's metadata to document which calendar (hybrid or proleptic Gregorian) was used for write Date type
136
+ public static final String HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC = "writer.date.proleptic" ;
129
137
130
138
private static final Set <String > PARQUET_SERDE_CLASS_NAMES = ImmutableSet .<String >builder ()
131
139
.add (PARQUET_HIVE_SERDE_CLASS )
@@ -232,6 +240,8 @@ public static ConnectorPageSource createPageSource(
232
240
FileMetadata fileMetaData = parquetMetadata .getFileMetaData ();
233
241
fileSchema = fileMetaData .getSchema ();
234
242
243
+ boolean convertDateToProleptic = shouldConvertDateToProleptic (fileMetaData .getKeyValueMetaData ());
244
+
235
245
Optional <MessageType > message = getParquetMessageType (columns , useColumnNames , fileSchema );
236
246
237
247
requestedSchema = message .orElse (new MessageType (fileSchema .getName (), ImmutableList .of ()));
@@ -284,7 +294,7 @@ public static ConnectorPageSource createPageSource(
284
294
// are not present in the Parquet files which are read with disjunct predicates.
285
295
parquetPredicates .size () == 1 ? Optional .of (parquetPredicates .getFirst ()) : Optional .empty (),
286
296
parquetWriteValidation );
287
- return createParquetPageSource (columns , fileSchema , messageColumn , useColumnNames , parquetReaderProvider );
297
+ return createParquetPageSource (columns , fileSchema , messageColumn , useColumnNames , parquetReaderProvider , convertDateToProleptic );
288
298
}
289
299
catch (Exception e ) {
290
300
try {
@@ -471,6 +481,18 @@ public static ConnectorPageSource createParquetPageSource(
471
481
boolean useColumnNames ,
472
482
ParquetReaderProvider parquetReaderProvider )
473
483
throws IOException
484
+ {
485
+ return createParquetPageSource (columnHandles , fileSchema , messageColumn , useColumnNames , parquetReaderProvider , false );
486
+ }
487
+
488
+ public static ConnectorPageSource createParquetPageSource (
489
+ List <HiveColumnHandle > columnHandles ,
490
+ MessageType fileSchema ,
491
+ MessageColumnIO messageColumn ,
492
+ boolean useColumnNames ,
493
+ ParquetReaderProvider parquetReaderProvider ,
494
+ boolean convertDateToProleptic )
495
+ throws IOException
474
496
{
475
497
List <Column > parquetColumnFieldsBuilder = new ArrayList <>(columnHandles .size ());
476
498
Map <String , Integer > baseColumnIdToOrdinal = new HashMap <>();
@@ -492,12 +514,16 @@ public static ConnectorPageSource createParquetPageSource(
492
514
String baseColumnName = useColumnNames ? baseColumn .getBaseColumnName () : fileSchema .getFields ().get (baseColumn .getBaseHiveColumnIndex ()).getName ();
493
515
494
516
Optional <TypeCoercer <?, ?>> coercer = Optional .empty ();
517
+ Optional <ValueAdjuster <?>> valueAdjuster = Optional .empty ();
495
518
Integer ordinal = baseColumnIdToOrdinal .get (baseColumnName );
496
519
if (ordinal == null ) {
497
520
ColumnIO columnIO = lookupColumnByName (messageColumn , baseColumnName );
498
521
if (columnIO != null && columnIO .getType ().isPrimitive ()) {
499
522
PrimitiveType primitiveType = columnIO .getType ().asPrimitiveType ();
500
523
coercer = createCoercer (primitiveType .getPrimitiveTypeName (), primitiveType .getLogicalTypeAnnotation (), baseColumn .getBaseType ());
524
+ if (convertDateToProleptic && (column .getBaseType ().equals (DATE ) || column .getBaseType () instanceof TimestampType )) {
525
+ valueAdjuster = ValueAdjusters .createValueAdjuster (column .getBaseType ());
526
+ }
501
527
}
502
528
io .trino .spi .type .Type readType = coercer .map (TypeCoercer ::getFromType ).orElseGet (baseColumn ::getBaseType );
503
529
@@ -509,26 +535,41 @@ public static ConnectorPageSource createParquetPageSource(
509
535
510
536
ordinal = parquetColumnFieldsBuilder .size ();
511
537
parquetColumnFieldsBuilder .add (new Column (baseColumnName , field .get ()));
538
+
512
539
baseColumnIdToOrdinal .put (baseColumnName , ordinal );
513
540
}
514
541
515
542
if (column .isBaseColumn ()) {
516
- transforms .column (ordinal , coercer .map (Function .identity ()));
543
+ transforms .column (ordinal , chain ( valueAdjuster . map ( Function . identity ()), coercer .map (Function .identity () )));
517
544
}
518
545
else {
519
546
transforms .dereferenceField (
520
547
ImmutableList .<Integer >builder ()
521
548
.add (ordinal )
522
549
.addAll (getProjection (column , baseColumn ))
523
550
.build (),
524
- coercer .map (Function .identity ()));
551
+ chain ( valueAdjuster . map ( Function . identity ()), coercer .map (Function .identity () )));
525
552
}
526
553
}
527
554
ParquetReader parquetReader = parquetReaderProvider .createParquetReader (parquetColumnFieldsBuilder , appendRowNumberColumn );
528
555
ConnectorPageSource pageSource = new ParquetPageSource (parquetReader );
529
556
return transforms .build (pageSource );
530
557
}
531
558
559
+ private static Optional <Function <Block , Block >> chain (Optional <Function <Block , Block >> valueAdjuster , Optional <Function <Block , Block >> typeCoercer )
560
+ {
561
+ return Optional .of (
562
+ Stream .of (valueAdjuster , typeCoercer )
563
+ .map (function -> function .orElse (Function .identity ()))
564
+ .reduce (Function .identity (), Function ::andThen ));
565
+ }
566
+
567
+ private static boolean shouldConvertDateToProleptic (Map <String , String > keyValueMetaData )
568
+ {
569
+ // if entry exists and explicitly states 'false' then we should convert to Proleptic, in other case no
570
+ return keyValueMetaData .containsKey (HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC ) && !parseBoolean (keyValueMetaData .get (HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC ));
571
+ }
572
+
532
573
private static Optional <org .apache .parquet .schema .Type > getBaseColumnParquetType (HiveColumnHandle column , MessageType messageType , boolean useParquetColumnNames )
533
574
{
534
575
if (useParquetColumnNames ) {
0 commit comments