Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cpp/src/arrow/array/builder_binary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ Status BinaryViewBuilder::ReserveData(int64_t length) {
return data_heap_builder_.Reserve(length);
}

Result<int32_t> internal::StringHeapBuilder::AppendHeapBuffer(
std::shared_ptr<ResizableBuffer> buffer) {
if (!blocks_.empty() && current_remaining_bytes_ > 0) {
ARROW_RETURN_NOT_OK(FinishLastBlock());
}
current_remaining_bytes_ = 0;
current_out_buffer_ = nullptr;
current_offset_ = 0;
int32_t index = static_cast<int32_t>(blocks_.size());
blocks_.emplace_back(std::move(buffer));
return index;
}

void BinaryViewBuilder::Reset() {
ArrayBuilder::Reset();
data_builder_.Reset();
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/array/builder_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,13 @@ class ARROW_EXPORT StringHeapBuilder {
blocks_.clear();
}

/// \brief Register a pre-populated buffer as a new heap block.
///
/// Returns the buffer_index that out-of-line BinaryView headers should use
/// to reference data in this buffer. The caller is responsible for ensuring
/// that offsets used in BinaryView headers are valid within this buffer.
Result<int32_t> AppendHeapBuffer(std::shared_ptr<ResizableBuffer> buffer);

int64_t current_remaining_bytes() const { return current_remaining_bytes_; }

Result<std::vector<std::shared_ptr<ResizableBuffer>>> Finish() {
Expand Down Expand Up @@ -645,6 +652,27 @@ class ARROW_EXPORT BinaryViewBuilder : public ArrayBuilder {
UnsafeAppend(value.data(), static_cast<int64_t>(value.size()));
}

/// \brief Append a pre-constructed BinaryView header without copying data.
void UnsafeAppendView(BinaryViewType::c_type view) {
UnsafeAppendToBitmap(true);
data_builder_.UnsafeAppend(view);
}

/// \brief Append a value and return the BinaryView header that was created.
Result<BinaryViewType::c_type> AppendAndGetView(const uint8_t* value, int64_t length) {
ARROW_RETURN_NOT_OK(Reserve(1));
UnsafeAppendToBitmap(true);
ARROW_ASSIGN_OR_RAISE(auto v,
data_heap_builder_.Append</*Safe=*/true>(value, length));
data_builder_.UnsafeAppend(v);
return v;
}

/// \brief Register an external buffer as a data buffer for out-of-line views.
Result<int32_t> AppendBuffer(std::shared_ptr<ResizableBuffer> buffer) {
return data_heap_builder_.AppendHeapBuffer(std::move(buffer));
}

/// \brief Ensures there is enough allocated available capacity in the
/// out-of-line data heap to append the indicated number of bytes without
/// additional allocations
Expand Down
179 changes: 179 additions & 0 deletions cpp/src/parquet/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/type_traits.h"
#include "arrow/util/binary_view_util.h"
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_stream_utils_internal.h"
Expand Down Expand Up @@ -870,6 +871,7 @@ class DictDecoderImpl : public TypedDecoderImpl<Type>, public DictDecoder<Type>
explicit DictDecoderImpl(const ColumnDescriptor* descr,
MemoryPool* pool = ::arrow::default_memory_pool())
: TypedDecoderImpl<Type>(descr, Encoding::RLE_DICTIONARY),
pool_(pool),
dictionary_(AllocateBuffer(pool, 0)),
dictionary_length_(0),
byte_array_data_(AllocateBuffer(pool, 0)),
Expand Down Expand Up @@ -1007,6 +1009,8 @@ class DictDecoderImpl : public TypedDecoderImpl<Type>, public DictDecoder<Type>
dictionary->Decode(dictionary_->mutable_data_as<T>(), dictionary_length_);
}

MemoryPool* pool_;

// Only one is set.
std::shared_ptr<ResizableBuffer> dictionary_;

Expand Down Expand Up @@ -1261,6 +1265,13 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType> {
using BASE = DictDecoderImpl<ByteArrayType>;
using BASE::DictDecoderImpl;

void SetDict(TypedDecoder<ByteArrayType>* dictionary) override {
BASE::SetDict(dictionary);
binary_view_dict_buffer_.reset();
binary_view_cache_.clear();
cached_builder_ = nullptr;
}

int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
::arrow::BinaryDictionary32Builder* builder) override {
Expand Down Expand Up @@ -1294,6 +1305,12 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType> {
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
auto type_id = out->builder->type()->id();
if (type_id == ::arrow::Type::BINARY_VIEW || type_id == ::arrow::Type::STRING_VIEW) {
return DecodeArrowDenseBinaryView(num_values, null_count, valid_bits,
valid_bits_offset, out, out_num_values);
}

constexpr int32_t kBufferSize = 1024;
int32_t indices[kBufferSize];

Expand Down Expand Up @@ -1428,6 +1445,109 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType> {
*out_num_values = values_decoded;
return Status::OK();
}

Status EnsureBinaryViewCache(::arrow::BinaryViewBuilder* builder) {
// Rebuild cache if dictionary or builder changed.
if (cached_builder_ == builder && !binary_view_cache_.empty() &&
builder->length() > 0) {
return Status::OK();
}

const auto* dict_values = dictionary_->data_as<ByteArray>();
const auto* offsets = byte_array_offsets_->data_as<int32_t>();
binary_view_cache_.resize(dictionary_length_);

bool has_out_of_line = false;
for (int32_t i = 0; i < dictionary_length_; ++i) {
if (dict_values[i].len > ::arrow::BinaryViewType::kInlineSize) {
has_out_of_line = true;
break;
}
}

int32_t buffer_index = -1;
if (has_out_of_line) {
// Copy dictionary data if there are out-of-line values.
if (!binary_view_dict_buffer_) {
binary_view_dict_buffer_ = AllocateBuffer(pool_, byte_array_data_->size());
if (byte_array_data_->size() > 0) {
memcpy(binary_view_dict_buffer_->mutable_data(), byte_array_data_->data(),
byte_array_data_->size());
}
}
ARROW_ASSIGN_OR_RAISE(buffer_index,
builder->AppendBuffer(binary_view_dict_buffer_));
}

for (int32_t i = 0; i < dictionary_length_; ++i) {
if (dict_values[i].len <= ::arrow::BinaryViewType::kInlineSize) {
binary_view_cache_[i] = ::arrow::util::ToInlineBinaryView(
dict_values[i].ptr, static_cast<int32_t>(dict_values[i].len));
} else {
binary_view_cache_[i] = ::arrow::util::ToNonInlineBinaryView(
dict_values[i].ptr, static_cast<int32_t>(dict_values[i].len), buffer_index,
offsets[i]);
}
}

cached_builder_ = builder;
return Status::OK();
}

Status DecodeArrowDenseBinaryView(
int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
constexpr int32_t kBufferSize = 1024;
int32_t indices[kBufferSize];

auto* builder =
::arrow::internal::checked_cast<::arrow::BinaryViewBuilder*>(out->builder.get());
RETURN_NOT_OK(EnsureBinaryViewCache(builder));
RETURN_NOT_OK(builder->Reserve(num_values));

const int values_to_decode = num_values - null_count;
int values_decoded = 0;
int num_indices = 0;
int pos_indices = 0;

auto visit_bit_run = [&](int64_t position, int64_t length, bool valid) {
if (valid) {
while (length > 0) {
if (num_indices == pos_indices) {
const auto max_batch_size =
std::min<int32_t>(kBufferSize, values_to_decode - values_decoded);
num_indices = idx_decoder_.GetBatch(indices, max_batch_size);
if (ARROW_PREDICT_FALSE(num_indices < 1)) {
return Status::Invalid("Invalid number of indices: ", num_indices);
}
pos_indices = 0;
}
const auto batch_size = std::min<int64_t>(num_indices - pos_indices, length);
for (int64_t j = 0; j < batch_size; ++j) {
const auto index = indices[pos_indices++];
RETURN_NOT_OK(IndexInBounds(index));
builder->UnsafeAppendView(binary_view_cache_[index]);
}
values_decoded += static_cast<int32_t>(batch_size);
length -= static_cast<int32_t>(batch_size);
}
} else {
for (int64_t i = 0; i < length; ++i) {
builder->UnsafeAppendNull();
}
}
return Status::OK();
};

RETURN_NOT_OK(VisitBitRuns(valid_bits, valid_bits_offset, num_values, visit_bit_run));
*out_num_values = values_decoded;
return Status::OK();
}

std::shared_ptr<ResizableBuffer> binary_view_dict_buffer_;
std::vector<::arrow::BinaryViewType::c_type> binary_view_cache_;
::arrow::ArrayBuilder* cached_builder_ = nullptr;
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -2122,6 +2242,15 @@ class DeltaByteArrayDecoderImpl : public TypedDecoderImpl<DType> {
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* out,
int* out_num_values) {
if constexpr (std::is_same_v<DType, ByteArrayType>) {
auto type_id = out->builder->type()->id();
if (type_id == ::arrow::Type::BINARY_VIEW ||
type_id == ::arrow::Type::STRING_VIEW) {
return DecodeArrowDenseBinaryView(num_values, null_count, valid_bits,
valid_bits_offset, out, out_num_values);
}
}

std::vector<ByteArray> values(num_values - null_count);
const int num_valid_values = GetInternal(values.data(), num_values - null_count);
if (ARROW_PREDICT_FALSE(num_values - null_count != num_valid_values)) {
Expand Down Expand Up @@ -2156,6 +2285,56 @@ class DeltaByteArrayDecoderImpl : public TypedDecoderImpl<DType> {
visit_binary_helper);
}

Status DecodeArrowDenseBinaryView(int num_values, int null_count,
const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* out,
int* out_num_values) {
static_assert(std::is_same_v<DType, ByteArrayType>);

std::vector<ByteArray> values(num_values - null_count);
const int num_valid_values = GetInternal(values.data(), num_values - null_count);
if (ARROW_PREDICT_FALSE(num_values - null_count != num_valid_values)) {
throw ParquetException("Expected to decode ", num_values - null_count,
" values, but decoded ", num_valid_values, " values.");
}

auto* builder =
::arrow::internal::checked_cast<::arrow::BinaryViewBuilder*>(out->builder.get());
RETURN_NOT_OK(builder->Reserve(num_values));

const ByteArray* values_ptr = values.data();
int value_idx = 0;
::arrow::BinaryViewType::c_type last_view{};
const uint8_t* last_ptr = nullptr;
uint32_t last_len = 0;

RETURN_NOT_OK(VisitBitRuns(valid_bits, valid_bits_offset, num_values,
[&](int64_t position, int64_t run_length, bool is_valid) {
if (is_valid) {
for (int64_t i = 0; i < run_length; ++i) {
const auto& val = values_ptr[value_idx];
if (val.ptr == last_ptr && val.len == last_len) {
builder->UnsafeAppendView(last_view);
} else {
ARROW_ASSIGN_OR_RAISE(
last_view,
builder->AppendAndGetView(
val.ptr, static_cast<int64_t>(val.len)));
last_ptr = val.ptr;
last_len = val.len;
}
++value_idx;
}
return Status::OK();
} else {
return builder->AppendNulls(run_length);
}
}));

*out_num_values = num_valid_values;
return Status::OK();
}

MemoryPool* pool_;

private:
Expand Down
Loading