Skip to content

Commit 2cb7ff2

Browse files
authored
[18102873455] Handle sparse string columns on arrow read (#2700)
#### Reference Issues/PRs Monday ref: 18102873455 #### What does this implement or fix? Change consists of two commits (which can be reviewed separately): First one (tests and benchmarks): - Adds a python test via ternary operator which reads sparse data - Adds a benchmark to verify type handler performance - Renames `test_arrow.py` to `test_arrow_read.py` Second one (actual fix): - Correctly handle sparse input string column and preserve the speed of dense handling (by filling the inverted bitset) #### Any other comments? This change slightly slows down (`503us` -> `594us`) string processing for arrow (most noticably if very few unique strings). Experimentation shows that 100% of the slowdown can be attributed to failure to inline [this lambda function](https://github.com/man-group/ArcticDB/pull/2700/files#diff-4e399a492a48cda55aa054cc5f6f30a73f35d8ff3aa11dbd32a1f8f2b17598b4R77-R94) argument to `for_each_enumerated`. If we unroll `for_each_enumerated` performance goes back to original level. This slowdown will be fixed in a follow up PR. (which might positively impact a lot of processing pipeline affected by a similar issue) Benchmark results: ``` benchmarks --benchmark_filter="BM_arrow_string.*" --benchmark_min_time=2000x --benchmark_repetitions=5 --benchmark_time_unit=us After first commit: ----------------------------------------------------------------------------------------- Benchmark Time CPU Iterations ----------------------------------------------------------------------------------------- BM_arrow_string_handler/10000/1/0_mean 59.7 us 65.1 us 5 BM_arrow_string_handler/10000/1/0_stddev 1.19 us 1.29 us 5 BM_arrow_string_handler/100000/1/0_mean 503 us 544 us 5 BM_arrow_string_handler/100000/1/0_stddev 8.52 us 3.18 us 5 BM_arrow_string_handler/10000/10000/0_mean 202 us 210 us 5 BM_arrow_string_handler/10000/10000/0_stddev 12.6 us 2.61 us 5 BM_arrow_string_handler/100000/100000/0_mean 2707 us 2906 us 5 BM_arrow_string_handler/100000/100000/0_stddev 52.1 us 13.0 us 5 After second commit: ----------------------------------------------------------------------------------------- Benchmark Time CPU Iterations ----------------------------------------------------------------------------------------- BM_arrow_string_handler/10000/1/0_mean 65.6 us 71.6 us 5 BM_arrow_string_handler/10000/1/0_stddev 1.01 us 1.10 us 5 BM_arrow_string_handler/100000/1/0_mean 594 us 645 us 5 BM_arrow_string_handler/100000/1/0_stddev 8.67 us 6.29 us 5 BM_arrow_string_handler/10000/10000/0_mean 203 us 215 us 5 BM_arrow_string_handler/10000/10000/0_stddev 0.838 us 0.894 us 5 BM_arrow_string_handler/100000/100000/0_mean 2850 us 3046 us 5 BM_arrow_string_handler/100000/100000/0_stddev 118 us 57.9 us 5 BM_arrow_string_handler/10000/1/5000_mean 69.2 us 75.5 us 5 BM_arrow_string_handler/10000/1/5000_stddev 0.815 us 0.892 us 5 BM_arrow_string_handler/100000/1/50000_mean 614 us 670 us 5 BM_arrow_string_handler/100000/1/50000_stddev 4.92 us 5.36 us 5 BM_arrow_string_handler/10000/1/10000_mean 6.91 us 7.53 us 5 BM_arrow_string_handler/10000/1/10000_stddev 0.161 us 0.179 us 5 BM_arrow_string_handler/100000/1/100000_mean 7.99 us 8.39 us 5 BM_arrow_string_handler/100000/1/100000_stddev 0.259 us 0.121 us 5 ``` #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 87fb70f commit 2cb7ff2

File tree

7 files changed

+200
-81
lines changed

7 files changed

+200
-81
lines changed

cpp/arcticdb/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,7 @@ if(${TEST})
11051105
set(benchmark_srcs
11061106
stream/test/stream_test_common.cpp
11071107
arrow/test/arrow_test_utils.hpp
1108+
arrow/test/benchmark_arrow_reads.cpp
11081109
arrow/test/benchmark_arrow_writes.cpp
11091110
column_store/test/benchmark_column.cpp
11101111
column_store/test/benchmark_memory_segment.cpp

cpp/arcticdb/arrow/arrow_handlers.cpp

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <arcticdb/util/decode_path_data.hpp>
1212
#include <arcticdb/pipeline/column_mapping.hpp>
1313
#include <arcticdb/column_store/string_pool.hpp>
14+
#include <arcticdb/column_store/column_algorithms.hpp>
1415

1516
namespace arcticdb {
1617

@@ -51,7 +52,6 @@ void ArrowStringHandler::convert_type(
5152
std::any&, const std::shared_ptr<StringPool>& string_pool
5253
) const {
5354
using ArcticStringColumnTag = ScalarTagType<DataTypeTag<DataType::UTF_DYNAMIC64>>;
54-
auto input_data = source_column.data();
5555
struct DictEntry {
5656
int32_t offset_buffer_pos_;
5757
int64_t string_buffer_pos_;
@@ -65,49 +65,49 @@ void ArrowStringHandler::convert_type(
6565
unique_offsets.reserve(source_column.row_count());
6666
int64_t bytes = 0;
6767
int32_t unique_offset_count = 0;
68-
auto dest_ptr = reinterpret_cast<int32_t*>(
69-
dest_column.bytes_at(mapping.offset_bytes_, source_column.row_count() * sizeof(int32_t))
70-
);
68+
auto dest_ptr = reinterpret_cast<int32_t*>(dest_column.bytes_at(mapping.offset_bytes_, mapping.dest_bytes_));
7169

72-
util::BitSet bitset;
73-
util::BitSet::bulk_insert_iterator inserter(bitset);
74-
const auto end = input_data.cend<ArcticStringColumnTag, IteratorType::ENUMERATED>();
75-
// First go through the source column once to compute the size of offset and string buffers.
76-
// TODO: This can't be right if the column was sparse as it has only been decoded, not expanded
77-
for (auto en = input_data.cbegin<ArcticStringColumnTag, IteratorType::ENUMERATED>(); en != end; ++en) {
78-
if (is_a_string(en->value())) {
70+
util::BitSet dest_bitset;
71+
util::BitSet::bulk_insert_iterator inserter(dest_bitset);
72+
// For dense columns we populate an inverted bitmap (i.e. set the bits where values are missing)
73+
// This makes processing fully dense columns faster because it doesn't need to insert often into the bitmap.
74+
// Benchmarks in benchmark_arrow_reads.cpp show a 20% speedup with this approach for a dense string column with few
75+
// unique strings.
76+
bool populate_inverted_bitset = !source_column.opt_sparse_map().has_value();
77+
for_each_enumerated<ArcticStringColumnTag>(source_column, [&](const auto& en) {
78+
if (is_a_string(en.value())) {
7979
auto [entry, is_emplaced] = unique_offsets.try_emplace(
80-
en->value(),
81-
DictEntry{
82-
static_cast<int32_t>(unique_offset_count), bytes, string_pool->get_const_view(en->value())
83-
}
80+
en.value(), DictEntry{unique_offset_count, bytes, string_pool->get_const_view(en.value())}
8481
);
8582
if (is_emplaced) {
8683
bytes += entry->second.strv.size();
87-
unique_offsets_in_order.push_back(en->value());
84+
unique_offsets_in_order.push_back(en.value());
8885
++unique_offset_count;
8986
}
90-
*dest_ptr = entry->second.offset_buffer_pos_;
91-
} else {
92-
inserter = en->idx();
87+
dest_ptr[en.idx()] = entry->second.offset_buffer_pos_;
88+
if (!populate_inverted_bitset) {
89+
inserter = en.idx();
90+
}
91+
} else if (populate_inverted_bitset) {
92+
inserter = en.idx();
9393
}
94-
++dest_ptr;
95-
}
94+
});
9695
inserter.flush();
97-
// At this point bitset has ones where the source column contained None or NaN
98-
// Inverting and shrinking to the source column size it then makes a sparse map for the input data
99-
bitset.invert();
100-
// TODO: row_count() here won't be right when the original data was sparse, but we don't support sparse
101-
// string columns yet anyway
102-
bitset.resize(source_column.row_count());
103-
if (bitset.count() != bitset.size()) {
104-
handle_truncation(bitset, mapping.truncate_);
105-
create_dense_bitmap(mapping.offset_bytes_, bitset, dest_column, AllocationType::DETACHABLE);
106-
} // else there weren't any Nones or NaNs
107-
// bitset.count() == 0 is the special case where all of the rows contained None or NaN. In this case, do not create
96+
if (populate_inverted_bitset) {
97+
// For dense columns at this point bitset has ones where the source column is missing values.
98+
// We invert it back, so we can use it for the sparse map on the output column.
99+
dest_bitset.invert();
100+
}
101+
dest_bitset.resize(mapping.num_rows_);
102+
103+
if (dest_bitset.count() != dest_bitset.size()) {
104+
handle_truncation(dest_bitset, mapping.truncate_);
105+
create_dense_bitmap(mapping.offset_bytes_, dest_bitset, dest_column, AllocationType::DETACHABLE);
106+
} // else there weren't any missing values
107+
// bitset.count() == 0 is the special case where all the rows were missing. In this case, do not create
108108
// the extra string and offset buffers. string_dict_from_block will then do the right thing and call
109109
// minimal_strings_dict
110-
if (bitset.count() > 0) {
110+
if (dest_bitset.count() > 0) {
111111
auto& string_buffer = dest_column.create_extra_buffer(
112112
mapping.offset_bytes_, ExtraBufferType::STRING, bytes, AllocationType::DETACHABLE
113113
);

cpp/arcticdb/arrow/test/arrow_test_utils.hpp

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software
66
* will be governed by the Apache License, version 2.0.
77
*/
8+
#pragma once
89

910
#include <sparrow/record_batch.hpp>
1011

1112
#include <arcticdb/util/allocator.hpp>
13+
#include <arcticdb/column_store/column.hpp>
1214

1315
using namespace arcticdb;
1416

@@ -30,10 +32,33 @@ sparrow::array create_array(const std::vector<T>& data) {
3032
}
3133
}
3234

33-
sparrow::record_batch create_record_batch(const std::vector<std::pair<std::string, sparrow::array>>& columns) {
35+
inline sparrow::record_batch create_record_batch(const std::vector<std::pair<std::string, sparrow::array>>& columns) {
3436
sparrow::record_batch record_batch{};
3537
for (const auto& column : columns) {
3638
record_batch.add_column(column.first, column.second);
3739
}
3840
return record_batch;
3941
}
42+
43+
template<typename RawType>
44+
void allocate_and_fill_chunked_column(
45+
Column& column, size_t num_rows, size_t chunk_size, std::optional<std::span<RawType>> values = std::nullopt
46+
) {
47+
// Allocate column in chunks
48+
for (size_t row = 0; row < num_rows; row += chunk_size) {
49+
auto data_size = data_type_size(column.type(), OutputFormat::ARROW, DataTypeMode::EXTERNAL);
50+
auto current_block_size = std::min(chunk_size, num_rows - row);
51+
auto bytes = current_block_size * data_size;
52+
column.allocate_data(bytes);
53+
column.advance_data(bytes);
54+
}
55+
56+
// Actually fill the data
57+
for (size_t row = 0; row < num_rows; ++row) {
58+
if (values.has_value()) {
59+
column.reference_at<RawType>(row) = values.value()[row];
60+
} else {
61+
column.reference_at<RawType>(row) = static_cast<RawType>(row);
62+
}
63+
}
64+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/* Copyright 2025 Man Group Operations Limited
2+
*
3+
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
4+
*
5+
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software
6+
* will be governed by the Apache License, version 2.0.
7+
*/
8+
9+
#include <benchmark/benchmark.h>
10+
#include <fmt/format.h>
11+
12+
#include <arcticdb/arrow/test/arrow_test_utils.hpp>
13+
#include <arcticdb/arrow/arrow_handlers.hpp>
14+
#include <arcticdb/column_store/string_pool.hpp>
15+
#include <arcticdb/pipeline/column_mapping.hpp>
16+
17+
using namespace arcticdb;
18+
19+
// run like: --benchmark_time_unit=ms --benchmark_filter=.* --benchmark_min_time=5x
20+
21+
static void BM_arrow_string_handler(benchmark::State& state) {
22+
const auto num_rows = state.range(0);
23+
const auto num_different_strings = state.range(1);
24+
const auto num_sparse = state.range(2);
25+
26+
const double sparsity_ratio = num_sparse / num_rows;
27+
auto source_type_desc = TypeDescriptor{DataType::UTF_DYNAMIC64, Dimension::Dim0};
28+
auto dest_type_desc = TypeDescriptor{DataType::UTF_DYNAMIC32, Dimension::Dim0};
29+
auto dest_size = data_type_size(dest_type_desc, OutputFormat::ARROW, DataTypeMode::EXTERNAL);
30+
auto sparsity = num_sparse == 0 ? Sparsity::NOT_PERMITTED : Sparsity::PERMITTED;
31+
32+
// Fill up source column
33+
auto source_column = Column(source_type_desc, num_rows, AllocationType::DYNAMIC, sparsity);
34+
auto string_pool = std::make_shared<StringPool>();
35+
for (auto i = 0u, num_set = 0u; i < num_rows; i++) {
36+
auto offset = string_pool->get(fmt::format("str_{}", i % num_different_strings)).offset();
37+
auto expected_set = std::round((i + 1) * (1 - sparsity_ratio));
38+
if (num_set < expected_set) {
39+
source_column.set_scalar(i, offset);
40+
++num_set;
41+
}
42+
}
43+
auto mapping = ColumnMapping(
44+
source_type_desc,
45+
dest_type_desc,
46+
FieldWrapper(dest_type_desc, "col").field(),
47+
dest_size,
48+
num_rows,
49+
0,
50+
0,
51+
dest_size * num_different_strings,
52+
0
53+
);
54+
auto handler = ArrowStringHandler();
55+
auto handler_data = std::any{};
56+
57+
for (auto _ : state) {
58+
state.PauseTiming();
59+
auto dest_column = Column(dest_type_desc, 0, AllocationType::DETACHABLE, sparsity);
60+
allocate_and_fill_chunked_column<int32_t>(dest_column, num_rows, num_rows);
61+
state.ResumeTiming();
62+
handler.convert_type(source_column, dest_column, mapping, DecodePathData{}, handler_data, string_pool);
63+
}
64+
}
65+
66+
BENCHMARK(BM_arrow_string_handler)
67+
// Not sparse, small string and offset buffers
68+
->Args({10'000, 1, 0})
69+
->Args({100'000, 1, 0})
70+
// Not sparse, large string and offset buffers
71+
->Args({10'000, 10'000, 0})
72+
->Args({100'000, 100'000, 0})
73+
// Half sparse
74+
->Args({10'000, 1, 5'000})
75+
->Args({100'000, 1, 50'000})
76+
// Fully sparse
77+
->Args({10'000, 1, 10'000})
78+
->Args({100'000, 1, 100'000});

cpp/arcticdb/arrow/test/test_arrow_read.cpp

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,12 @@
1212

1313
#include <arcticdb/pipeline/column_mapping.hpp>
1414
#include <arcticdb/stream/test/stream_test_common.hpp>
15+
#include <arcticdb/arrow/test/arrow_test_utils.hpp>
1516
#include <arcticdb/arrow/arrow_utils.hpp>
1617
#include <arcticdb/arrow/arrow_handlers.hpp>
1718

1819
using namespace arcticdb;
1920

20-
template<typename RawType>
21-
void allocate_and_fill_chunked_column(
22-
Column& column, size_t num_rows, size_t chunk_size, std::optional<std::span<RawType>> values = std::nullopt
23-
) {
24-
// Allocate column in chunks
25-
for (size_t row = 0; row < num_rows; row += chunk_size) {
26-
auto data_size = data_type_size(column.type(), OutputFormat::ARROW, DataTypeMode::EXTERNAL);
27-
auto current_block_size = std::min(chunk_size, num_rows - row);
28-
auto bytes = current_block_size * data_size;
29-
column.allocate_data(bytes);
30-
column.advance_data(bytes);
31-
}
32-
33-
// Actually fill the data
34-
for (size_t row = 0; row < num_rows; ++row) {
35-
if (values.has_value()) {
36-
column.reference_at<RawType>(row) = values.value()[row];
37-
} else {
38-
column.reference_at<RawType>(row) = static_cast<RawType>(row);
39-
}
40-
}
41-
}
42-
4321
SegmentInMemory get_detachable_segment(
4422
StreamId symbol, std::span<const FieldRef> fields, size_t num_rows, size_t chunk_size
4523
) {

python/tests/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,6 +1123,14 @@ def lmdb_version_store_dynamic_schema(
11231123
raise ValueError(f"Unexpected encoding version: {encoding_version}")
11241124

11251125

1126+
@pytest.fixture
1127+
def lmdb_version_store_dynamic_schema_arrow(lmdb_version_store_dynamic_schema_v1) -> NativeVersionStore:
1128+
store = lmdb_version_store_dynamic_schema_v1
1129+
store.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
1130+
store._set_allow_arrow_input()
1131+
return store
1132+
1133+
11261134
@pytest.fixture
11271135
def lmdb_version_store_empty_types_v1(version_store_factory, lib_name) -> NativeVersionStore:
11281136
library_name = lib_name + "_v1"

0 commit comments

Comments
 (0)