Skip to content

Commit 766ee06

Browse files
committed
Read chunks of data in C++
1 parent 3521df0 commit 766ee06

24 files changed

+1572
-543
lines changed

cpp/arcticdb/CMakeLists.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -760,8 +760,8 @@ if (SSL_LINK)
760760
find_package(OpenSSL REQUIRED)
761761
list(APPEND arcticdb_core_libraries OpenSSL::SSL)
762762
if (NOT WIN32)
763-
list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY})
764-
list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR})
763+
#list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY})
764+
#list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR})
765765
endif()
766766
endif ()
767767
target_link_libraries(arcticdb_core_object PUBLIC ${arcticdb_core_libraries})

cpp/arcticdb/pipeline/chunking.cpp

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/* Copyright 2023 Man Group Operations Limited
2+
*
3+
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
4+
*
5+
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
6+
*/
7+
8+
#include <optional>
9+
#include <arcticdb/pipeline/chunking.hpp>
10+
#include <arcticdb/pipeline/read_frame.hpp>
11+
12+
namespace arcticdb {
13+
14+
ChunkIterator::ChunkIterator(
15+
pipelines::index::IndexSegmentReader&& index_segment_reader,
16+
std::shared_ptr<pipelines::PipelineContext> pipeline_context,
17+
AtomKey index_key,
18+
std::shared_ptr<Store> store,
19+
ReadQuery& read_query,
20+
const ReadOptions& read_options,
21+
std::any& handler_data,
22+
DecodePathData shared_data) :
23+
index_segment_reader_(std::move(index_segment_reader)),
24+
pipeline_context_(std::move(pipeline_context)),
25+
index_key_(index_key),
26+
store_(std::move(store)),
27+
handler_data_(handler_data),
28+
shared_data_(std::move(shared_data)),
29+
read_query_(std::make_shared<ReadQuery>(read_query)),
30+
read_options_(read_options),
31+
read_ahead_(ConfigsMap::instance()->get_int("Chunk.ReadaheadRows", 1)),
32+
row_ranges_to_read_(pipeline_context_->fetch_index_.count()) {
33+
do_read_ahead();
34+
}
35+
36+
std::optional<ReadResult> ChunkIterator::next() {
37+
auto release_gil = std::make_unique<py::gil_scoped_release>();
38+
if (row_pos_ == row_ranges_to_read_)
39+
return std::nullopt;
40+
41+
auto required_row_pos = row_pos_;
42+
util::check(required_row_pos < results_.size(),
43+
"Request for row pos that has not been scheduled: {} >= {}",
44+
required_row_pos,
45+
results_.size());
46+
++row_pos_;
47+
do_read_ahead();
48+
auto read_version = std::move(results_[required_row_pos]).get();
49+
return create_python_read_result(read_version.versioned_item_, std::move(read_version.frame_and_descriptor_));
50+
}
51+
52+
void ChunkIterator::do_read_ahead() {
53+
while (scheduled_row_pos_ < row_pos_ + read_ahead_ && scheduled_row_pos_ < row_ranges_to_read_)
54+
schedule_row_range();
55+
}
56+
57+
void ChunkIterator::schedule_row_range() {
58+
auto local_context = std::make_shared<PipelineContext>();
59+
local_context->set_descriptor(pipeline_context_->descriptor());
60+
const auto previous_row_range = pipeline_context_->slice_and_keys_[slice_and_key_pos_].slice_.row_range;
61+
auto current_row_range = previous_row_range;
62+
const auto start_pos = slice_and_key_pos_;
63+
while (current_row_range == previous_row_range && slice_and_key_pos_ < pipeline_context_->slice_and_keys_.size()) {
64+
local_context->slice_and_keys_.emplace_back(pipeline_context_->slice_and_keys_[slice_and_key_pos_]);
65+
local_context->fetch_index_.set_bit(row_pos_, pipeline_context_->fetch_index_[slice_and_key_pos_]);
66+
++slice_and_key_pos_;
67+
if (slice_and_key_pos_ == pipeline_context_->slice_and_keys_.size())
68+
break;
69+
70+
current_row_range = pipeline_context_->slice_and_keys_[slice_and_key_pos_].slice_.row_range;
71+
}
72+
73+
const auto row_range_size = slice_and_key_pos_ - start_pos;
74+
util::check(row_range_size == local_context->slice_and_keys_.size(),
75+
"Expected equality of range size and slice and keys {} != {},",
76+
row_range_size,
77+
local_context->slice_and_keys_.size());
78+
79+
pipeline_context_->fetch_index_.copy_range(local_context->fetch_index_, start_pos, slice_and_key_pos_);
80+
local_context->fetch_index_.resize(row_range_size);
81+
local_context->norm_meta_ = pipeline_context_->norm_meta_;
82+
83+
auto frame = allocate_frame(local_context);
84+
auto output = do_direct_read_or_process(store_,
85+
read_query_,
86+
read_options_,
87+
local_context,
88+
shared_data_,
89+
handler_data_).thenValue(
90+
[this, frame, local_context](auto&&) mutable {
91+
return ReadVersionOutput{
92+
VersionedItem{to_atom(index_key_)},
93+
FrameAndDescriptor{frame, TimeseriesDescriptor{index_segment_reader_.tsd()}, {}, {}}};
94+
});
95+
96+
results_.emplace_back(std::move(output));
97+
++scheduled_row_pos_;
98+
}
99+
100+
} // namespace arcticdb

cpp/arcticdb/pipeline/chunking.hpp

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/* Copyright 2023 Man Group Operations Limited
2+
*
3+
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
4+
*
5+
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
6+
*/
7+
8+
#pragma once
9+
10+
#include <optional>
11+
#include <arcticdb/pipeline/index_segment_reader.hpp>
12+
#include <arcticdb/pipeline/pipeline_context.hpp>
13+
#include <arcticdb/pipeline/read_frame.hpp>
14+
#include <arcticdb/version/read_version_output.hpp>
15+
16+
#include <arcticdb/version/read_version_output.hpp>
17+
#include <arcticdb/entity/read_result.hpp>
18+
#include <arcticdb/pipeline/read_options.hpp>
19+
#include <arcticdb/pipeline/query.hpp>
20+
21+
namespace arcticdb {
22+
23+
class ChunkIterator {
24+
pipelines::index::IndexSegmentReader index_segment_reader_;
25+
std::shared_ptr<pipelines::PipelineContext> pipeline_context_;
26+
AtomKey index_key_;
27+
std::shared_ptr<Store> store_;
28+
std::any handler_data_;
29+
DecodePathData shared_data_;
30+
std::shared_ptr<pipelines::ReadQuery> read_query_;
31+
ReadOptions read_options_;
32+
size_t row_pos_ = 0;
33+
size_t scheduled_row_pos_ = 0;
34+
size_t read_ahead_;
35+
const size_t row_ranges_to_read_;
36+
size_t slice_and_key_pos_ = 0;
37+
std::vector<folly::Future<ReadVersionOutput>> results_;
38+
39+
public:
40+
ChunkIterator(
41+
pipelines::index::IndexSegmentReader&& index_segment_reader,
42+
std::shared_ptr<pipelines::PipelineContext> pipeline_context,
43+
AtomKey index_key,
44+
std::shared_ptr<Store> store,
45+
pipelines::ReadQuery& read_query,
46+
const ReadOptions& read_options,
47+
std::any& handler_data,
48+
DecodePathData shared_data);
49+
50+
std::optional<ReadResult> next();
51+
private:
52+
void do_read_ahead();
53+
54+
void schedule_row_range();
55+
56+
};
57+
58+
} // namespace arcticdb

cpp/arcticdb/pipeline/error.hpp

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/* Copyright 2023 Man Group Operations Limited
2+
*
3+
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
4+
*
5+
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
6+
*/
7+
#pragma once
8+
9+
#include <vector>
10+
#include <string>
11+
#include <folly/Function.h>
12+
13+
namespace arcticdb {
14+
struct Error {
15+
16+
explicit Error(folly::Function<void(std::string)> raiser, std::string msg);
17+
void throw_error();
18+
19+
folly::Function<void(std::string)> raiser_;
20+
std::string msg_;
21+
};
22+
}

cpp/arcticdb/pipeline/pipeline_utils.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,10 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de
5454
return create_python_read_result(VersionedItem{key}, output_format, std::move(frame_and_desc));
5555
}
5656

57+
inline void sort_by_row_range(std::vector<SliceAndKey>& slices_and_keys) {
58+
std::sort(std::begin(slices_and_keys), std::end(slices_and_keys), [] (const SliceAndKey& left, const SliceAndKey& right) {
59+
return std::tie(left.slice_.row_range.first, left.slice_.col_range.first) < std::tie(right.slice_.row_range.first, right.slice_.col_range.first);
60+
});
61+
}
62+
5763
}

0 commit comments

Comments
 (0)