Skip to content

Rework C++ truncation for arrow #2387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1095,6 +1095,7 @@ if(${TEST})

set(benchmark_srcs
stream/test/stream_test_common.cpp
column_store/test/benchmark_column.cpp
column_store/test/benchmark_memory_segment.cpp
processing/test/benchmark_clause.cpp
processing/test/benchmark_ternary.cpp
59 changes: 48 additions & 11 deletions cpp/arcticdb/arrow/arrow_utils.cpp
Original file line number Diff line number Diff line change
@@ -14,16 +14,55 @@

namespace arcticdb {

sparrow::array empty_arrow_array_from_type(const TypeDescriptor& type, std::string_view name) {
auto res = type.visit_tag([](auto &&impl) {
using TagType = std::decay_t<decltype(impl)>;
using DataTagType = typename TagType::DataTypeTag;
using RawType = typename DataTagType::raw_type;
std::optional<sparrow::validity_bitmap> validity_bitmap;
if constexpr (is_sequence_type(TagType::DataTypeTag::data_type)) {
// TODO: This is super hacky. Our string column return type is dictionary encoded, and this should be
// consistent when there are zero rows. But sparrow (or possibly Arrow) requires at least one key-value
// pair in the dictionary, even if there are zero rows
std::unique_ptr<int64_t[]> offset_ptr(new int64_t[2]);
offset_ptr[0] = 0;
offset_ptr[1] = 1;
std::unique_ptr<char[]> strings_ptr(new char[1]);
strings_ptr[0] = 'a';
sparrow::u8_buffer<int64_t> offset_buffer(offset_ptr.release(), 2);
sparrow::u8_buffer<char> strings_buffer(strings_ptr.release(), 1);
sparrow::u8_buffer<int32_t> dict_keys_buffer{nullptr, 0};
sparrow::big_string_array dict_values_array(
std::move(strings_buffer),
std::move(offset_buffer)
);
return sparrow::array{
create_dict_array<int32_t>(
sparrow::array{std::move(dict_values_array)},
std::move(dict_keys_buffer),
validity_bitmap
)};
} else {
return sparrow::array{create_primitive_array<RawType>(nullptr, 0, validity_bitmap)};
}
});
res.set_name(name);
return res;
}

std::vector<sparrow::array> arrow_arrays_from_column(const Column& column, std::string_view name) {
std::vector<sparrow::array> vec;
auto column_data = column.data();
vec.reserve(column.num_blocks());

column.type().visit_tag([&vec, &column_data, &column, name](auto &&impl) {
using TagType = std::decay_t<decltype(impl)>;
if (column_data.num_blocks() == 0) {
// For empty columns we want to return one empty array instead of no arrays.
vec.emplace_back(empty_arrow_array_from_type(column.type(), name));
}
while (auto block = column_data.next<TagType>()) {
auto bitmap = create_validity_bitmap(block->offset(), column);
if constexpr(is_sequence_type(TagType::DataTypeTag::data_type)) {
if constexpr (is_sequence_type(TagType::DataTypeTag::data_type)) {
vec.emplace_back(string_dict_from_block<TagType>(*block, column, name, bitmap));
} else {
vec.emplace_back(arrow_array_from_block<TagType>(*block, name, bitmap));
@@ -39,24 +78,22 @@ std::shared_ptr<std::vector<sparrow::record_batch>> segment_to_arrow_data(Segmen
const auto column_blocks = segment.column(0).num_blocks();
util::check(total_blocks == column_blocks * num_columns, "Expected regular block size");

auto output = std::make_shared<std::vector<sparrow::record_batch>>();
output->reserve(total_blocks);

for(auto i = 0UL; i < column_blocks; ++i)
output->emplace_back(sparrow::record_batch{});

// column_blocks == 0 is a special case where we are returning a zero-row structure (e.g. if date_range is
// provided outside of the time range covered by the symbol)
auto output = std::make_shared<std::vector<sparrow::record_batch>>(column_blocks == 0 ? 1 : column_blocks, sparrow::record_batch{});
for (auto i = 0UL; i < num_columns; ++i) {
auto& column = segment.column(static_cast<position_t>(i));
util::check(column.num_blocks() == column_blocks, "Non-standard column block number: {} != {}", column.num_blocks(), column_blocks);

auto column_arrays = arrow_arrays_from_column(column, segment.field(i).name());
util::check(column_arrays.size() == output->size(), "Unexpected number of arrow arrays returned: {} != {}", column_arrays.size(), output->size());

for(auto block_idx = 0UL; block_idx < column_blocks; ++block_idx) {
for (auto block_idx = 0UL; block_idx < column_arrays.size(); ++block_idx) {
util::check(block_idx < output->size(), "Block index overflow {} > {}", block_idx, output->size());
(*output)[block_idx].add_column(static_cast<std::string>(segment.field(i).name()), std::move(column_arrays[block_idx]));
(*output)[block_idx].add_column(static_cast<std::string>(segment.field(i).name()),
std::move(column_arrays[block_idx]));
}
}

return output;
}

7 changes: 4 additions & 3 deletions cpp/arcticdb/column_store/block.hpp
Original file line number Diff line number Diff line change
@@ -57,12 +57,13 @@ struct MemBlock {
}

~MemBlock() {
magic_.check();
magic_.check(true);
if(owns_external_data_) {
util::check(is_external(), "Cannot free inline allocated block");
if(external_data_ != nullptr) {
if (is_external()) {
log::version().warn("Unexpected release of detachable block memory");
delete[] external_data_;
} else {
log::version().warn("Cannot free inline allocated block");
}
}
}
29 changes: 20 additions & 9 deletions cpp/arcticdb/column_store/chunked_buffer.hpp
Original file line number Diff line number Diff line change
@@ -462,34 +462,45 @@ class ChunkedBufferImpl {
util::check(bytes <= bytes_, "Expected allocation size {} smaller than actual allocation {}", bytes, bytes_);
}

// Note that with all the truncate_*_block methods, the bytes_ and offsets are no longer accurate after the methods
// are called, but downstream logic uses these values to match up blocks with record batches, so this is deliberate
void truncate_single_block(size_t start_offset, size_t end_offset) {
auto [block, offset, ts] = block_and_offset(start_offset);
// Inclusive of start_offset, exclusive of end_offset
util::check(end_offset >= start_offset, "Truncate single block expects end ({}) >= start ({})", end_offset, start_offset);
util::check(blocks_.size() == 1, "Truncate single block expects buffer with only one block");
const auto removed_bytes = start_offset + (block->bytes() - end_offset);
util::check(removed_bytes < block->bytes(), "Can't truncate {} bytes from a {} byte block", removed_bytes, block->bytes());
auto [block, offset, ts] = block_and_offset(start_offset);
const auto removed_bytes = block->bytes() - (end_offset - start_offset);
util::check(removed_bytes <= block->bytes(), "Can't truncate {} bytes from a {} byte block", removed_bytes, block->bytes());
auto remaining_bytes = block->bytes() - removed_bytes;
auto new_block = create_block(remaining_bytes, 0);
new_block->copy_from(block->data() + start_offset, remaining_bytes, 0);
blocks_[0] = new_block;
if (remaining_bytes > 0) {
auto new_block = create_block(remaining_bytes, 0);
new_block->copy_from(block->data() + start_offset, remaining_bytes, 0);
blocks_[0] = new_block;
} else {
blocks_.clear();
block_offsets_.clear();
}
block->abandon();
delete block;
}

void truncate_first_block(size_t bytes) {
// bytes is the number of bytes to remove, and is asserted to be in the first block of the buffer
auto [block, offset, ts] = block_and_offset(bytes);
util::check(block == *blocks_.begin(), "Truncate first block position {} not within initial block", bytes);
util::check(bytes < block->bytes(), "Can't truncate {} bytes from a {} byte block", bytes, block->bytes());
auto remaining_bytes = block->bytes() - bytes;
auto new_block = create_block(bytes, 0);
auto new_block = create_block(remaining_bytes, block->offset_);
new_block->copy_from(block->data() + bytes, remaining_bytes, 0);
blocks_[0] = new_block;
block->abandon();
delete block;
}

void truncate_last_block(size_t bytes) {
auto [block, offset, ts] = block_and_offset(bytes);
util::check(block == *blocks_.rbegin(), "Truncate first block position {} not within initial block", bytes);
// bytes is the number of bytes to remove, and is asserted to be in the last block of the buffer
auto [block, offset, ts] = block_and_offset(bytes_ - bytes);
util::check(block == *blocks_.rbegin(), "Truncate last block position {} not within last block", bytes);
util::check(bytes < block->bytes(), "Can't truncate {} bytes from a {} byte block", bytes, block->bytes());
auto remaining_bytes = block->bytes() - bytes;
auto new_block = create_block(remaining_bytes, block->offset_);
13 changes: 6 additions & 7 deletions cpp/arcticdb/column_store/column.cpp
Original file line number Diff line number Diff line change
@@ -590,20 +590,19 @@ std::vector<std::shared_ptr<Column>> Column::split(const std::shared_ptr<Column>
return output;
}

void Column::truncate_first_block(size_t row) {
void Column::truncate_first_block(size_t start_row) {
if(!is_sparse()) {
auto bytes = data_type_size(type_, OutputFormat::NATIVE, DataTypeMode::INTERNAL) * row;
auto bytes = start_row * data_type_size(type_, OutputFormat::NATIVE, DataTypeMode::INTERNAL);
data_.buffer().truncate_first_block(bytes);
}
}

void Column::truncate_last_block(size_t row) {
void Column::truncate_last_block(size_t end_row) {
if(!is_sparse()) {
const auto column_row_count = row_count();
if(row < static_cast<size_t>(column_row_count))
return;

auto bytes = data_type_size(type_, OutputFormat::NATIVE, DataTypeMode::INTERNAL) * (column_row_count - row);
util::check(column_row_count >= static_cast<int64_t>(end_row),
"Cannot truncate column of length {} to row {}", column_row_count, end_row);
auto bytes = (column_row_count - end_row) * data_type_size(type_, OutputFormat::NATIVE, DataTypeMode::INTERNAL);
data_.buffer().truncate_last_block(bytes);
}
}
65 changes: 32 additions & 33 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
@@ -728,59 +728,58 @@ class Column {
// Only works if column is of numeric type and is monotonically increasing
// Returns the index such that if val were inserted before that index, the order would be preserved
// By default returns the lowest index satisfying this property. If from_right=true, returns the highest such index
template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
// from (inclusive) and to (exclusive) can optionally be provided to search a subset of the rows in the column
template<class T>
requires std::integral<T> || std::floating_point<T>
size_t search_sorted(T val, bool from_right=false, std::optional<int64_t> from = std::nullopt, std::optional<int64_t> to = std::nullopt) const {
// There will not necessarily be a unique answer for sparse columns
internal::check<ErrorCode::E_ASSERTION_FAILURE>(!is_sparse(),
"Column::search_sorted not supported with sparse columns");
std::optional<size_t> res;
auto column_data = data();
details::visit_type(type().data_type(), [this, &res, &column_data, val, from_right, &from, &to](auto type_desc_tag) {
return details::visit_type(type().data_type(), [this, &column_data, val, from_right, &from, &to](auto type_desc_tag) -> int64_t {
using type_info = ScalarTypeInfo<decltype(type_desc_tag)>;
auto accessor = random_accessor<typename type_info::TDT>(&column_data);
if constexpr(std::is_same_v<T, typename type_info::RawType>) {
int64_t low = from.value_or(0);
int64_t high = to.value_or(row_count() - 1);
while (!res.has_value()) {
auto mid{low + (high - low) / 2};
auto mid_value = accessor.at(mid);
if (val == mid_value) {
// At least one value in the column exactly matches the input val
// Search to the right/left for the last/first such value
if (from_right) {
while (++mid <= high && val == accessor.at(mid)) {}
res = mid;
int64_t first = from.value_or(0);
const int64_t last = to.value_or(row_count());
internal::check<ErrorCode::E_ASSERTION_FAILURE>(last >= first,
"Invalid input range for Column::search_sorted. First: {}, Last: {}", first, last);
int64_t step;
int64_t count{last - first};
int64_t idx;
if (from_right) {
while (count > 0) {
idx = first;
step = count / 2;
idx = std::min(idx + step, last);
if (accessor.at(idx) <= val) {
first = ++idx;
count -= step + 1;
} else {
while (--mid >= low && val == accessor.at(mid)) {}
res = mid + 1;
count = step;
}
} else if (val > mid_value) {
if (mid + 1 <= high && val >= accessor.at(mid + 1)) {
// Narrow the search interval
low = mid + 1;
} else {
// val is less than the next value, so we have found the right interval
res = mid + 1;
}
} else { // val < mid_value
if (mid - 1 >= low && val <= accessor.at(mid + 1)) {
// Narrow the search interval
high = mid - 1;
}
} else {
while (count > 0) {
idx = first;
step = count / 2;
idx = std::min(idx + step, last);
if (accessor.at(idx) < val) {
first = ++idx;
count -= step + 1;
} else {
// val is greater than the previous value, so we have found the right interval
res = mid;
count = step;
}
}
}
return first;
} else {
// TODO: Could relax this requirement using something like has_valid_common_type
internal::raise<ErrorCode::E_ASSERTION_FAILURE>(
"Column::search_sorted requires input value to be of same type as column");
return {};
}
});
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
res.has_value(), "Column::search_sorted should always find an index");
return *res;
}

[[nodiscard]] static std::vector<std::shared_ptr<Column>> split(
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
@@ -449,7 +449,7 @@ class SegmentInMemory {
impl_->compact_blocks();
}

std::shared_ptr<Column> column_ptr(position_t idx) {
std::shared_ptr<Column> column_ptr(position_t idx) const {
return impl_->column_ptr(idx);
}

2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
@@ -562,7 +562,7 @@ class SegmentInMemoryImpl {
return *columns_[idx];
}

std::shared_ptr<Column> column_ptr(position_t idx) {
std::shared_ptr<Column> column_ptr(position_t idx) const {
return columns_[idx];
}

57 changes: 57 additions & 0 deletions cpp/arcticdb/column_store/test/benchmark_column.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/* Copyright 2025 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#include <algorithm>
#include <random>

#include <benchmark/benchmark.h>

#include <arcticdb/column_store/column.hpp>

using namespace arcticdb;

// run like: --benchmark_time_unit=ms --benchmark_filter=.* --benchmark_min_time=5x

static std::random_device rd;
static std::mt19937 gen(rd());

static void BM_search_sorted_random(benchmark::State& state) {
auto num_rows = state.range(0);
std::vector<timestamp> data;
data.reserve(num_rows);
std::uniform_int_distribution<timestamp> dis(std::numeric_limits<timestamp>::min(), std::numeric_limits<timestamp>::max());
for (auto idx = 0; idx < num_rows; ++idx) {
data.emplace_back(dis(gen));
}
std::ranges::sort(data);
Column col(make_scalar_type(DataType::NANOSECONDS_UTC64), num_rows, AllocationType::PRESIZED, Sparsity::NOT_PERMITTED);
memcpy(col.ptr(), data.data(), num_rows * sizeof(timestamp));
col.set_row_data(num_rows - 1);
for (auto _ : state) {
state.PauseTiming();
auto value = dis(gen);
state.ResumeTiming();
col.search_sorted(value);
}
}

static void BM_search_sorted_single_value(benchmark::State& state) {
auto num_rows = state.range(0);
auto from_right = state.range(1);
std::uniform_int_distribution<timestamp> dis(std::numeric_limits<timestamp>::min(), std::numeric_limits<timestamp>::max());
auto value = dis(gen);
std::vector<timestamp> data(num_rows, value);
Column col(make_scalar_type(DataType::NANOSECONDS_UTC64), num_rows, AllocationType::PRESIZED, Sparsity::NOT_PERMITTED);
memcpy(col.ptr(), data.data(), num_rows * sizeof(timestamp));
col.set_row_data(num_rows - 1);
for (auto _ : state) {
col.search_sorted(value, from_right);
}
}

BENCHMARK(BM_search_sorted_random)->Args({100'000});
BENCHMARK(BM_search_sorted_single_value)->Args({100'000, true})->Args({100'000, false});
Loading

Unchanged files with check annotations Beta