Skip to content

Commit 6ca2546

Browse files
committed
vendor: Update vendored sources to duckdb/duckdb@d3de141
Parquet Reader: Move decoding logic into separate Decoder classes (duckdb/duckdb#16100)
1 parent 46c33a9 commit 6ca2546

29 files changed

+592
-293
lines changed

src/duckdb/extension/parquet/column_reader.cpp

Lines changed: 32 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ const uint8_t ParquetDecodeUtils::BITPACK_DLEN = 8;
108108
ColumnReader::ColumnReader(ParquetReader &reader, LogicalType type_p, const SchemaElement &schema_p, idx_t file_idx_p,
109109
idx_t max_define_p, idx_t max_repeat_p)
110110
: schema(schema_p), file_idx(file_idx_p), max_define(max_define_p), max_repeat(max_repeat_p), reader(reader),
111-
type(std::move(type_p)), page_rows_available(0), dictionary_selection_vector(STANDARD_VECTOR_SIZE),
112-
dictionary_size(0) {
111+
type(std::move(type_p)), page_rows_available(0), dictionary_decoder(*this), delta_binary_packed_decoder(*this),
112+
rle_decoder(*this), delta_byte_array_decoder(*this), byte_stream_split_decoder(*this) {
113113

114114
// dummies for Skip()
115115
dummy_define.resize(reader.allocator, STANDARD_VECTOR_SIZE);
@@ -202,20 +202,7 @@ void ColumnReader::Plain(shared_ptr<ByteBuffer> plain_data, uint8_t *defines, id
202202
throw NotImplementedException("Plain");
203203
}
204204

205-
void ColumnReader::PrepareDeltaLengthByteArray(ResizeableBuffer &buffer) {
206-
throw std::runtime_error("DELTA_LENGTH_BYTE_ARRAY encoding is only supported for text or binary data");
207-
}
208-
209-
void ColumnReader::PrepareDeltaByteArray(ResizeableBuffer &buffer) {
210-
throw std::runtime_error("DELTA_BYTE_ARRAY encoding is only supported for text or binary data");
211-
}
212-
213-
void ColumnReader::DeltaByteArray(uint8_t *defines, idx_t num_values, // NOLINT
214-
parquet_filter_t &filter, idx_t result_offset, Vector &result) {
215-
throw NotImplementedException("DeltaByteArray");
216-
}
217-
218-
void ColumnReader::PlainReference(shared_ptr<ByteBuffer>, Vector &result) { // NOLINT
205+
void ColumnReader::PlainReference(shared_ptr<ResizeableBuffer> &, Vector &result) { // NOLINT
219206
}
220207

221208
void ColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChunk> &columns, TProtocol &protocol_p) {
@@ -239,9 +226,8 @@ void ColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChun
239226
}
240227

241228
void ColumnReader::PrepareRead(parquet_filter_t &filter) {
242-
dict_decoder.reset();
229+
encoding = ColumnEncoding::INVALID;
243230
defined_decoder.reset();
244-
bss_decoder.reset();
245231
block.reset();
246232
PageHeader page_hdr;
247233
reader.Read(page_hdr, *protocol);
@@ -261,22 +247,11 @@ void ColumnReader::PrepareRead(parquet_filter_t &filter) {
261247
break;
262248
case PageType::DICTIONARY_PAGE: {
263249
PreparePage(page_hdr);
264-
if (page_hdr.dictionary_page_header.num_values < 0) {
250+
auto dictionary_size = page_hdr.dictionary_page_header.num_values;
251+
if (dictionary_size < 0) {
265252
throw std::runtime_error("Invalid dictionary page header (num_values < 0)");
266253
}
267-
auto old_dict_size = dictionary_size;
268-
// we use the first value in the dictionary to keep a NULL
269-
dictionary_size = page_hdr.dictionary_page_header.num_values;
270-
if (!dictionary) {
271-
dictionary = make_uniq<Vector>(type, dictionary_size + 1);
272-
} else if (dictionary_size > old_dict_size) {
273-
dictionary->Resize(old_dict_size, dictionary_size + 1);
274-
}
275-
dictionary_id = reader.file_name + "_" + schema.name + "_" + std::to_string(chunk_read_offset);
276-
// we use the first entry as a NULL, dictionary vectors don't have a separate validity mask
277-
FlatVector::Validity(*dictionary).SetInvalid(0);
278-
PlainReference(block, *dictionary);
279-
Plain(block, nullptr, dictionary_size, nullptr, 1, *dictionary);
254+
dictionary_decoder.InitializeDictionary(dictionary_size);
280255
break;
281256
}
282257
default:
@@ -459,75 +434,45 @@ void ColumnReader::PrepareDataPage(PageHeader &page_hdr) {
459434
switch (page_encoding) {
460435
case Encoding::RLE_DICTIONARY:
461436
case Encoding::PLAIN_DICTIONARY: {
462-
// where is it otherwise??
463-
auto dict_width = block->read<uint8_t>();
464-
// TODO somehow dict_width can be 0 ?
465-
dict_decoder = make_uniq<RleBpDecoder>(block->ptr, block->len, dict_width);
466-
block->inc(block->len);
437+
encoding = ColumnEncoding::DICTIONARY;
438+
dictionary_decoder.InitializePage();
467439
break;
468440
}
469441
case Encoding::RLE: {
470-
if (type.id() != LogicalTypeId::BOOLEAN) {
471-
throw std::runtime_error("RLE encoding is only supported for boolean data");
472-
}
473-
block->inc(sizeof(uint32_t));
474-
rle_decoder = make_uniq<RleBpDecoder>(block->ptr, block->len, 1);
442+
encoding = ColumnEncoding::RLE;
443+
rle_decoder.InitializePage();
475444
break;
476445
}
477446
case Encoding::DELTA_BINARY_PACKED: {
478-
dbp_decoder = make_uniq<DbpDecoder>(block->ptr, block->len);
479-
block->inc(block->len);
447+
encoding = ColumnEncoding::DELTA_BINARY_PACKED;
448+
delta_binary_packed_decoder.InitializePage();
480449
break;
481450
}
482451
case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
483-
PrepareDeltaLengthByteArray(*block);
452+
encoding = ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY;
453+
delta_byte_array_decoder.InitializeDeltaLengthByteArray();
484454
break;
485455
}
486456
case Encoding::DELTA_BYTE_ARRAY: {
487-
PrepareDeltaByteArray(*block);
457+
encoding = ColumnEncoding::DELTA_BYTE_ARRAY;
458+
delta_byte_array_decoder.InitializeDeltaByteArray();
488459
break;
489460
}
490461
case Encoding::BYTE_STREAM_SPLIT: {
491-
// Subtract 1 from length as the block is allocated with 1 extra byte,
492-
// but the byte stream split encoder needs to know the correct data size.
493-
bss_decoder = make_uniq<BssDecoder>(block->ptr, block->len - 1);
494-
block->inc(block->len);
462+
encoding = ColumnEncoding::BYTE_STREAM_SPLIT;
463+
byte_stream_split_decoder.InitializePage();
495464
break;
496465
}
497466
case Encoding::PLAIN:
498467
// nothing to do here, will be read directly below
468+
encoding = ColumnEncoding::PLAIN;
499469
break;
500470

501471
default:
502472
throw std::runtime_error("Unsupported page encoding");
503473
}
504474
}
505475

506-
void ColumnReader::ConvertDictToSelVec(uint32_t *offsets, uint8_t *defines, parquet_filter_t &filter, idx_t read_now,
507-
idx_t result_offset) {
508-
D_ASSERT(read_now <= STANDARD_VECTOR_SIZE);
509-
idx_t offset_idx = 0;
510-
for (idx_t row_idx = 0; row_idx < read_now; row_idx++) {
511-
if (HasDefines() && defines[row_idx + result_offset] != max_define) {
512-
dictionary_selection_vector.set_index(row_idx, 0); // dictionary entry 0 is NULL
513-
continue; // we don't have a dict entry for NULLs
514-
}
515-
if (filter.test(row_idx + result_offset)) {
516-
auto offset = offsets[offset_idx++];
517-
if (offset >= dictionary_size) {
518-
throw std::runtime_error("Parquet file is likely corrupted, dictionary offset out of range");
519-
}
520-
dictionary_selection_vector.set_index(row_idx, offset + 1);
521-
} else {
522-
dictionary_selection_vector.set_index(row_idx, 0); // just set NULL if the filter excludes this row
523-
offset_idx++;
524-
}
525-
}
526-
#ifdef DEBUG
527-
dictionary_selection_vector.Verify(read_now, dictionary_size + 1);
528-
#endif
529-
}
530-
531476
idx_t ColumnReader::Read(uint64_t num_values, parquet_filter_t &filter, data_ptr_t define_out, data_ptr_t repeat_out,
532477
Vector &result) {
533478
// we need to reset the location because multiple column readers share the same protocol
@@ -563,84 +508,24 @@ idx_t ColumnReader::Read(uint64_t num_values, parquet_filter_t &filter, data_ptr
563508
defined_decoder->GetBatch<uint8_t>(define_out + result_offset, read_now);
564509
}
565510

566-
idx_t null_count = 0;
567-
568-
if ((dict_decoder || dbp_decoder || rle_decoder || bss_decoder) && HasDefines()) {
569-
// we need the null count because the dictionary offsets have no entries for nulls
570-
for (idx_t i = result_offset; i < result_offset + read_now; i++) {
571-
null_count += (define_out[i] != max_define);
572-
}
573-
}
574-
575511
if (result_offset != 0 && result.GetVectorType() != VectorType::FLAT_VECTOR) {
576512
result.Flatten(result_offset);
577513
result.Resize(result_offset, STANDARD_VECTOR_SIZE);
578514
}
579515

580-
if (dict_decoder) {
581-
if ((!dictionary || dictionary_size == 0) && null_count < read_now) {
582-
throw std::runtime_error("Parquet file is likely corrupted, missing dictionary");
583-
}
584-
offset_buffer.resize(reader.allocator, sizeof(uint32_t) * (read_now - null_count));
585-
dict_decoder->GetBatch<uint32_t>(offset_buffer.ptr, read_now - null_count);
586-
ConvertDictToSelVec(reinterpret_cast<uint32_t *>(offset_buffer.ptr),
587-
reinterpret_cast<uint8_t *>(define_out), filter, read_now, result_offset);
588-
if (result_offset == 0) {
589-
result.Dictionary(*dictionary, dictionary_size + 1, dictionary_selection_vector, read_now);
590-
DictionaryVector::SetDictionaryId(result, dictionary_id);
591-
D_ASSERT(result.GetVectorType() == VectorType::DICTIONARY_VECTOR);
592-
} else {
593-
D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
594-
VectorOperations::Copy(*dictionary, result, dictionary_selection_vector, read_now, 0, result_offset);
595-
}
596-
} else if (dbp_decoder) {
597-
// TODO keep this in the state
598-
auto read_buf = make_shared_ptr<ResizeableBuffer>();
599-
600-
switch (schema.type) {
601-
case duckdb_parquet::Type::INT32:
602-
read_buf->resize(reader.allocator, sizeof(int32_t) * (read_now - null_count));
603-
dbp_decoder->GetBatch<int32_t>(read_buf->ptr, read_now - null_count);
604-
605-
break;
606-
case duckdb_parquet::Type::INT64:
607-
read_buf->resize(reader.allocator, sizeof(int64_t) * (read_now - null_count));
608-
dbp_decoder->GetBatch<int64_t>(read_buf->ptr, read_now - null_count);
609-
break;
610-
611-
default:
612-
throw std::runtime_error("DELTA_BINARY_PACKED should only be INT32 or INT64");
613-
}
614-
// Plain() will put NULLs in the right place
615-
Plain(read_buf, define_out, read_now, &filter, result_offset, result);
616-
} else if (rle_decoder) {
617-
// RLE encoding for boolean
618-
D_ASSERT(type.id() == LogicalTypeId::BOOLEAN);
619-
auto read_buf = make_shared_ptr<ResizeableBuffer>();
620-
read_buf->resize(reader.allocator, sizeof(bool) * (read_now - null_count));
621-
rle_decoder->GetBatch<uint8_t>(read_buf->ptr, read_now - null_count);
622-
PlainTemplated<bool, TemplatedParquetValueConversion<bool>>(read_buf, define_out, read_now, &filter,
623-
result_offset, result);
624-
} else if (byte_array_data) {
516+
auto define_ptr = HasDefines() ? static_cast<uint8_t *>(define_out) : nullptr;
517+
if (encoding == ColumnEncoding::DICTIONARY) {
518+
dictionary_decoder.Read(define_ptr, read_now, result, result_offset);
519+
} else if (encoding == ColumnEncoding::DELTA_BINARY_PACKED) {
520+
delta_binary_packed_decoder.Read(define_ptr, read_now, result, result_offset);
521+
} else if (encoding == ColumnEncoding::RLE) {
522+
rle_decoder.Read(define_ptr, read_now, result, result_offset);
523+
} else if (encoding == ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY ||
524+
encoding == ColumnEncoding::DELTA_BYTE_ARRAY) {
625525
// DELTA_BYTE_ARRAY or DELTA_LENGTH_BYTE_ARRAY
626-
DeltaByteArray(define_out, read_now, filter, result_offset, result);
627-
} else if (bss_decoder) {
628-
auto read_buf = make_shared_ptr<ResizeableBuffer>();
629-
630-
switch (schema.type) {
631-
case duckdb_parquet::Type::FLOAT:
632-
read_buf->resize(reader.allocator, sizeof(float) * (read_now - null_count));
633-
bss_decoder->GetBatch<float>(read_buf->ptr, read_now - null_count);
634-
break;
635-
case duckdb_parquet::Type::DOUBLE:
636-
read_buf->resize(reader.allocator, sizeof(double) * (read_now - null_count));
637-
bss_decoder->GetBatch<double>(read_buf->ptr, read_now - null_count);
638-
break;
639-
default:
640-
throw std::runtime_error("BYTE_STREAM_SPLIT encoding is only supported for FLOAT or DOUBLE data");
641-
}
642-
643-
Plain(read_buf, define_out, read_now, &filter, result_offset, result);
526+
delta_byte_array_decoder.Read(define_ptr, read_now, result, result_offset);
527+
} else if (encoding == ColumnEncoding::BYTE_STREAM_SPLIT) {
528+
byte_stream_split_decoder.Read(define_ptr, read_now, result, result_offset);
644529
} else {
645530
PlainReference(block, result);
646531
Plain(block, define_out, read_now, &filter, result_offset, result);
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#include "decoder/byte_stream_split_decoder.hpp"
2+
#include "column_reader.hpp"
3+
#include "parquet_reader.hpp"
4+
5+
namespace duckdb {
6+
7+
ByteStreamSplitDecoder::ByteStreamSplitDecoder(ColumnReader &reader) : reader(reader) {
8+
}
9+
10+
void ByteStreamSplitDecoder::InitializePage() {
11+
auto &block = reader.block;
12+
// Subtract 1 from length as the block is allocated with 1 extra byte,
13+
// but the byte stream split encoder needs to know the correct data size.
14+
bss_decoder = make_uniq<BssDecoder>(block->ptr, block->len - 1);
15+
block->inc(block->len);
16+
}
17+
18+
void ByteStreamSplitDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
19+
idx_t null_count = 0;
20+
21+
if (defines) {
22+
// we need the null count because the dictionary offsets have no entries for nulls
23+
for (idx_t i = result_offset; i < result_offset + read_count; i++) {
24+
null_count += defines[i] != reader.max_define;
25+
}
26+
}
27+
idx_t valid_count = read_count - null_count;
28+
29+
auto read_buf = make_shared_ptr<ResizeableBuffer>();
30+
auto &allocator = reader.reader.allocator;
31+
switch (reader.schema.type) {
32+
case duckdb_parquet::Type::FLOAT:
33+
read_buf->resize(allocator, sizeof(float) * valid_count);
34+
bss_decoder->GetBatch<float>(read_buf->ptr, valid_count);
35+
break;
36+
case duckdb_parquet::Type::DOUBLE:
37+
read_buf->resize(allocator, sizeof(double) * valid_count);
38+
bss_decoder->GetBatch<double>(read_buf->ptr, valid_count);
39+
break;
40+
default:
41+
throw std::runtime_error("BYTE_STREAM_SPLIT encoding is only supported for FLOAT or DOUBLE data");
42+
}
43+
44+
reader.Plain(read_buf, defines, read_count, nullptr, result_offset, result);
45+
}
46+
47+
} // namespace duckdb
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#include "decoder/delta_binary_packed_decoder.hpp"
2+
#include "column_reader.hpp"
3+
#include "parquet_reader.hpp"
4+
5+
namespace duckdb {
6+
7+
DeltaBinaryPackedDecoder::DeltaBinaryPackedDecoder(ColumnReader &reader) : reader(reader) {
8+
}
9+
10+
void DeltaBinaryPackedDecoder::InitializePage() {
11+
auto &block = reader.block;
12+
dbp_decoder = make_uniq<DbpDecoder>(block->ptr, block->len);
13+
block->inc(block->len);
14+
}
15+
16+
void DeltaBinaryPackedDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
17+
// TODO keep this in the state
18+
auto read_buf = make_shared_ptr<ResizeableBuffer>();
19+
20+
idx_t null_count = 0;
21+
if (defines) {
22+
// we need the null count because the dictionary offsets have no entries for nulls
23+
for (idx_t i = result_offset; i < result_offset + read_count; i++) {
24+
null_count += (defines[i] != reader.max_define);
25+
}
26+
}
27+
idx_t valid_count = read_count - null_count;
28+
29+
auto &allocator = reader.reader.allocator;
30+
switch (reader.schema.type) {
31+
case duckdb_parquet::Type::INT32:
32+
read_buf->resize(allocator, sizeof(int32_t) * (valid_count));
33+
dbp_decoder->GetBatch<int32_t>(read_buf->ptr, valid_count);
34+
35+
break;
36+
case duckdb_parquet::Type::INT64:
37+
read_buf->resize(allocator, sizeof(int64_t) * (valid_count));
38+
dbp_decoder->GetBatch<int64_t>(read_buf->ptr, valid_count);
39+
break;
40+
41+
default:
42+
throw std::runtime_error("DELTA_BINARY_PACKED should only be INT32 or INT64");
43+
}
44+
// Plain() will put NULLs in the right place
45+
reader.Plain(read_buf, defines, read_count, nullptr, result_offset, result);
46+
}
47+
48+
} // namespace duckdb

0 commit comments

Comments
 (0)