Skip to content

Commit bedb3c1

Browse files
committed
[parquet] Apply Parquet 1.15 changes
1 parent 93f03c2 commit bedb3c1

File tree

3 files changed

+136
-56
lines changed

3 files changed

+136
-56
lines changed

paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 97 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.paimon.fs.VectoredReadable;
2424
import org.apache.paimon.utils.RoaringBitmap32;
2525

26-
import org.apache.hadoop.classification.InterfaceAudience.Private;
2726
import org.apache.hadoop.fs.Path;
2827
import org.apache.parquet.ParquetReadOptions;
2928
import org.apache.parquet.Preconditions;
@@ -79,11 +78,13 @@
7978
import org.apache.parquet.io.SeekableInputStream;
8079
import org.apache.parquet.schema.MessageType;
8180
import org.apache.parquet.schema.PrimitiveType;
81+
import org.apache.parquet.util.AutoCloseables;
8282
import org.slf4j.Logger;
8383
import org.slf4j.LoggerFactory;
8484

8585
import javax.annotation.Nullable;
8686

87+
import java.io.ByteArrayInputStream;
8788
import java.io.Closeable;
8889
import java.io.IOException;
8990
import java.io.InputStream;
@@ -246,7 +247,7 @@ private static final ParquetMetadata readFooter(
246247
private ColumnChunkPageReadStore currentRowGroup = null;
247248
private DictionaryPageReader nextDictionaryReader = null;
248249

249-
private InternalFileDecryptor fileDecryptor;
250+
private InternalFileDecryptor fileDecryptor = null;
250251

251252
public ParquetFileReader(
252253
InputFile file, ParquetReadOptions options, @Nullable RoaringBitmap32 selection)
@@ -285,6 +286,7 @@ public ParquetFileReader(
285286
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
286287
paths.put(ColumnPath.get(col.getPath()), col);
287288
}
289+
288290
if (options.usePageChecksumVerification()) {
289291
this.crc = new CRC32();
290292
this.crcAllocator = ReusingByteBufferAllocator.strict(options.getAllocator());
@@ -298,18 +300,6 @@ private static <T> List<T> listWithNulls(int size) {
298300
return new ArrayList<>(Collections.nCopies(size, null));
299301
}
300302

301-
private boolean checkRowIndexOffsetExists(List<BlockMetaData> blocks) {
302-
for (BlockMetaData block : blocks) {
303-
if (block.getRowIndexOffset() == -1) {
304-
LOG.warn(
305-
"Row index offset was not found in block metadata of file {}, skip applying filter in order to get the correct row position",
306-
file.getPath());
307-
return false;
308-
}
309-
}
310-
return true;
311-
}
312-
313303
public ParquetMetadata getFooter() {
314304
if (footer == null) {
315305
try {
@@ -364,35 +354,44 @@ public String getFile() {
364354

365355
private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) throws IOException {
366356
FilterCompat.Filter recordFilter = options.getRecordFilter();
367-
if (checkRowIndexOffsetExists(blocks)) {
368-
if (FilterCompat.isFilteringRequired(recordFilter)) {
369-
// set up data filters based on configured levels
370-
List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
371357

372-
if (options.useStatsFilter()) {
373-
levels.add(STATISTICS);
374-
}
358+
for (BlockMetaData block : blocks) {
359+
if (block.getRowIndexOffset() == -1) {
360+
LOG.warn(
361+
"Row index offset was not found in block metadata of file {}, "
362+
+ "skip applying filter in order to get the correct row position",
363+
file.getPath());
364+
return blocks;
365+
}
366+
}
375367

376-
if (options.useDictionaryFilter()) {
377-
levels.add(DICTIONARY);
378-
}
368+
if (FilterCompat.isFilteringRequired(recordFilter)) {
369+
// set up data filters based on configured levels
370+
List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
379371

380-
if (options.useBloomFilter()) {
381-
levels.add(BLOOMFILTER);
382-
}
383-
blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this);
372+
if (options.useStatsFilter()) {
373+
levels.add(STATISTICS);
384374
}
385375

386-
if (selection != null) {
387-
blocks =
388-
blocks.stream()
389-
.filter(
390-
it ->
391-
selection.intersects(
392-
it.getRowIndexOffset(),
393-
it.getRowIndexOffset() + it.getRowCount()))
394-
.collect(Collectors.toList());
376+
if (options.useDictionaryFilter()) {
377+
levels.add(DICTIONARY);
395378
}
379+
380+
if (options.useBloomFilter()) {
381+
levels.add(BLOOMFILTER);
382+
}
383+
blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this);
384+
}
385+
386+
if (selection != null) {
387+
blocks =
388+
blocks.stream()
389+
.filter(
390+
it ->
391+
selection.intersects(
392+
it.getRowIndexOffset(),
393+
it.getRowIndexOffset() + it.getRowCount()))
394+
.collect(Collectors.toList());
396395
}
397396

398397
return blocks;
@@ -421,8 +420,8 @@ public void appendTo(ParquetFileWriter writer) throws IOException {
421420
* Reads all the columns requested from the row group at the specified block.
422421
*
423422
* @param blockIndex the index of the requested block
424-
* @throws IOException if an error occurs while reading
425423
* @return the PageReadStore which can provide PageReaders for each column.
424+
* @throws IOException if an error occurs while reading
426425
*/
427426
public PageReadStore readRowGroup(int blockIndex) throws IOException {
428427
return internalReadRowGroup(blockIndex);
@@ -431,8 +430,8 @@ public PageReadStore readRowGroup(int blockIndex) throws IOException {
431430
/**
432431
* Reads all the columns requested from the row group at the current file position.
433432
*
434-
* @throws IOException if an error occurs while reading
435433
* @return the PageReadStore which can provide PageReaders for each column.
434+
* @throws IOException if an error occurs while reading
436435
*/
437436
public PageReadStore readNextRowGroup() throws IOException {
438437
ColumnChunkPageReadStore rowGroup = null;
@@ -584,7 +583,8 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges r
584583
*/
585584
private void readAllPartsVectoredOrNormal(
586585
List<ConsecutivePartList> allParts, ChunkListBuilder builder) throws IOException {
587-
if (shouldUseVectoredIo()) {
586+
587+
if (shouldUseVectoredIo(allParts)) {
588588
try {
589589
readVectored(allParts, builder);
590590
return;
@@ -599,9 +599,35 @@ private void readAllPartsVectoredOrNormal(
599599
}
600600
}
601601

602-
/** Should the read use vectored IO. */
603-
private boolean shouldUseVectoredIo() {
604-
return f.in() instanceof VectoredReadable;
602+
/**
603+
* Should the read use vectored IO?
604+
*
605+
* <p>Use Paimon vectored io.
606+
*
607+
* @param allParts all parts to read.
608+
* @return true or false.
609+
*/
610+
private boolean shouldUseVectoredIo(final List<ConsecutivePartList> allParts) {
611+
return f.in() instanceof VectoredReadable && arePartsValidForVectoredIo(allParts);
612+
}
613+
614+
/**
615+
* Validate the parts for vectored IO. Vectored IO doesn't support reading ranges of size
616+
* greater than Integer.MAX_VALUE.
617+
*
618+
* @param allParts all parts to read.
619+
* @return true or false.
620+
*/
621+
private boolean arePartsValidForVectoredIo(List<ConsecutivePartList> allParts) {
622+
for (ConsecutivePartList consecutivePart : allParts) {
623+
if (consecutivePart.length >= Integer.MAX_VALUE) {
624+
LOG.debug(
625+
"Part length {} greater than Integer.MAX_VALUE thus disabling vectored IO",
626+
consecutivePart.length);
627+
return false;
628+
}
629+
}
630+
return true;
605631
}
606632

607633
/**
@@ -621,7 +647,6 @@ private boolean shouldUseVectoredIo() {
621647
* @throws IllegalArgumentException arguments are invalid.
622648
* @throws UnsupportedOperationException if the filesystem does not support vectored IO.
623649
*/
624-
@SuppressWarnings("checkstyle:JavadocParagraph")
625650
private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
626651
throws IOException {
627652

@@ -730,7 +755,6 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(
730755
}
731756
}
732757
}
733-
// actually read all the chunks
734758
readAllPartsVectoredOrNormal(allParts, builder);
735759
rowGroup.setReleaser(builder.releaser);
736760
for (Chunk chunk : builder.build()) {
@@ -802,6 +826,10 @@ private boolean advanceToNextBlock() {
802826

803827
// update the current block and instantiate a dictionary reader for it
804828
++currentBlock;
829+
830+
if (nextDictionaryReader != null) {
831+
nextDictionaryReader.close();
832+
}
805833
this.nextDictionaryReader = null;
806834

807835
return true;
@@ -977,12 +1005,25 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
9771005
}
9781006
}
9791007

980-
// Read Bloom filter data header.
1008+
// Seek to Bloom filter offset.
9811009
f.seek(bloomFilterOffset);
1010+
1011+
// Read Bloom filter length.
1012+
int bloomFilterLength = meta.getBloomFilterLength();
1013+
1014+
// If it is set, read Bloom filter header and bitset together.
1015+
// Otherwise, read Bloom filter header first and then bitset.
1016+
InputStream in = f;
1017+
if (bloomFilterLength > 0) {
1018+
byte[] headerAndBitSet = new byte[bloomFilterLength];
1019+
f.readFully(headerAndBitSet);
1020+
in = new ByteArrayInputStream(headerAndBitSet);
1021+
}
1022+
9821023
BloomFilterHeader bloomFilterHeader;
9831024
try {
9841025
bloomFilterHeader =
985-
Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD);
1026+
Util.readBloomFilterHeader(in, bloomFilterDecryptor, bloomFilterHeaderAAD);
9861027
} catch (IOException e) {
9871028
LOG.warn("read no bloom filter");
9881029
return null;
@@ -1010,9 +1051,9 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
10101051
byte[] bitset;
10111052
if (null == bloomFilterDecryptor) {
10121053
bitset = new byte[numBytes];
1013-
f.readFully(bitset);
1054+
in.read(bitset);
10141055
} else {
1015-
bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
1056+
bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
10161057
if (bitset.length != numBytes) {
10171058
throw new ParquetCryptoRuntimeException(
10181059
"Wrong length of decrypted bloom filter bitset");
@@ -1022,11 +1063,12 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException
10221063
}
10231064

10241065
/**
1066+
* Class should be considered private
1067+
*
10251068
* @param column the column chunk which the column index is to be returned for
10261069
* @return the column index for the specified column chunk or {@code null} if there is no index
10271070
* @throws IOException if any I/O error occurs during reading the file
10281071
*/
1029-
@Private
10301072
public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOException {
10311073
IndexReference ref = column.getColumnIndexReference();
10321074
if (ref == null) {
@@ -1056,11 +1098,12 @@ public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOExceptio
10561098
}
10571099

10581100
/**
1101+
* Class should be considered private
1102+
*
10591103
* @param column the column chunk which the offset index is to be returned for
10601104
* @return the offset index for the specified column chunk or {@code null} if there is no index
10611105
* @throws IOException if any I/O error occurs during reading the file
10621106
*/
1063-
@Private
10641107
public OffsetIndex readOffsetIndex(ColumnChunkMetaData column) throws IOException {
10651108
IndexReference ref = column.getOffsetIndexReference();
10661109
if (ref == null) {
@@ -1095,6 +1138,7 @@ public void close() throws IOException {
10951138
f.close();
10961139
}
10971140
} finally {
1141+
AutoCloseables.uncheckedClose(nextDictionaryReader, crcAllocator);
10981142
options.getCodecFactory().release();
10991143
}
11001144
}
@@ -1152,7 +1196,7 @@ List<Chunk> build() {
11521196
}
11531197
}
11541198

1155-
/** The data for a column chunk. */
1199+
/** The data for a column chunk */
11561200
private class Chunk {
11571201

11581202
protected final ChunkDescriptor descriptor;
@@ -1200,7 +1244,7 @@ private void verifyCrc(int referenceCrc, BytesInput bytes, String exceptionMsg)
12001244
}
12011245

12021246
/**
1203-
* Read all the pages in a given column chunk.
1247+
* Read all of the pages in a given column chunk.
12041248
*
12051249
* @return the list of pages
12061250
*/

paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
import java.io.Closeable;
3232
import java.io.IOException;
33+
import java.util.HashMap;
34+
import java.util.Map;
3335

3436
/**
3537
* Write records to a Parquet file.
@@ -67,7 +69,6 @@ public class ParquetWriter<T> implements Closeable {
6769
int maxPaddingSize,
6870
ParquetProperties encodingProps)
6971
throws IOException {
70-
7172
WriteSupport.WriteContext writeContext = writeSupport.init(conf);
7273
MessageType schema = writeContext.getSchema();
7374

@@ -86,12 +87,40 @@ public class ParquetWriter<T> implements Closeable {
8687

8788
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
8889
CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName);
90+
91+
final Map<String, String> extraMetadata;
92+
if (encodingProps.getExtraMetaData() == null
93+
|| encodingProps.getExtraMetaData().isEmpty()) {
94+
extraMetadata = writeContext.getExtraMetaData();
95+
} else {
96+
extraMetadata = new HashMap<>(writeContext.getExtraMetaData());
97+
98+
encodingProps
99+
.getExtraMetaData()
100+
.forEach(
101+
(metadataKey, metadataValue) -> {
102+
if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) {
103+
throw new IllegalArgumentException(
104+
"Cannot overwrite metadata key "
105+
+ OBJECT_MODEL_NAME_PROP
106+
+ ". Please use another key name.");
107+
}
108+
109+
if (extraMetadata.put(metadataKey, metadataValue) != null) {
110+
throw new IllegalArgumentException(
111+
"Duplicate metadata key "
112+
+ metadataKey
113+
+ ". Please use another key name.");
114+
}
115+
});
116+
}
117+
89118
this.writer =
90119
new InternalParquetRecordWriter<T>(
91120
fileWriter,
92121
writeSupport,
93122
schema,
94-
writeContext.getExtraMetaData(),
123+
extraMetadata,
95124
rowGroupSize,
96125
compressor,
97126
validating,

0 commit comments

Comments
 (0)