Skip to content

Commit 7cdcebc

Browse files
authored
Improve the performance of update by parallelising reads. Implement интернал async update method. (#2087)
#### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> Implement `async_update_impl` function which returns a future. The synchronous version for update just calls it and waits for the future just like append does. This keeps most of the code for update the same, however instead of calling `.get` on futures it will chain then and return a future. In the process of doing this the reads needed by update were made in parallel. Thus the regular update will have improved performance. Slight refactor of C++ unit tests, using std::array instead of std::vector for fixed size collections and placing const and constexpr specifiers. No functional changes. #### What does this implement or fix? #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent a28bf7f commit 7cdcebc

15 files changed

+376
-292
lines changed

cpp/arcticdb/pipeline/index_segment_reader.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ IndexSegmentReader get_index_reader(const AtomKey &prev_index, const std::shared
2424
return index::IndexSegmentReader{std::move(seg)};
2525
}
2626

27+
folly::Future<IndexSegmentReader> async_get_index_reader(const AtomKey &prev_index, const std::shared_ptr<Store> &store) {
28+
return store->read(prev_index).thenValueInline([](std::pair<VariantKey, SegmentInMemory>&& key_seg) {
29+
return IndexSegmentReader{std::move(key_seg.second)};
30+
});
31+
}
32+
2733
IndexSegmentReader::IndexSegmentReader(SegmentInMemory&& s) :
2834
seg_(std::move(s)) {
2935
}

cpp/arcticdb/pipeline/index_segment_reader.hpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,9 @@
88
#pragma once
99

1010
#include <arcticdb/column_store/memory_segment.hpp>
11-
#include <arcticdb/entity/protobufs.hpp>
1211
#include <arcticdb/pipeline/frame_slice.hpp>
1312
#include <arcticdb/pipeline/index_fields.hpp>
14-
15-
#include <boost/noncopyable.hpp>
16-
17-
#include <cstdint>
13+
#include <folly/futures/Future.h>
1814

1915
namespace arcticdb {
2016
class Store;
@@ -135,6 +131,10 @@ index::IndexSegmentReader get_index_reader(
135131
const AtomKey &prev_index,
136132
const std::shared_ptr<Store> &store);
137133

134+
folly::Future<IndexSegmentReader> async_get_index_reader(
135+
const AtomKey &prev_index,
136+
const std::shared_ptr<Store> &store);
137+
138138
IndexRange get_index_segment_range(
139139
const AtomKey &prev_index,
140140
const std::shared_ptr<Store> &store);

cpp/arcticdb/pipeline/index_utils.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,8 @@ TimeseriesDescriptor get_merged_tsd(
129129
);
130130
}
131131

132+
bool is_timeseries_index(const IndexDescriptorImpl& index_desc) {
133+
return index_desc.type() == IndexDescriptor::Type::TIMESTAMP || index_desc.type() == IndexDescriptor::Type::EMPTY;
134+
}
135+
132136
} //namespace arcticdb::pipelines::index

cpp/arcticdb/pipeline/index_utils.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,6 @@ TimeseriesDescriptor get_merged_tsd(
138138
const TimeseriesDescriptor& existing_tsd,
139139
const std::shared_ptr<pipelines::InputTensorFrame>& new_frame);
140140

141+
[[nodiscard]] bool is_timeseries_index(const IndexDescriptorImpl& index_desc);
142+
141143
} //namespace arcticdb::pipelines::index

cpp/arcticdb/pipeline/query.hpp

+18-23
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,10 @@
1010
#include <arcticdb/util/bitset.hpp>
1111
#include <arcticdb/entity/index_range.hpp>
1212
#include <arcticdb/processing/expression_context.hpp>
13-
#include <arcticdb/entity/versioned_item.hpp>
14-
#include <arcticdb/pipeline/python_output_frame.hpp>
1513
#include <arcticdb/pipeline/write_frame.hpp>
1614
#include <arcticdb/pipeline/frame_slice.hpp>
17-
#include <arcticdb/util/constructors.hpp>
1815
#include <arcticdb/util/variant.hpp>
1916
#include <arcticdb/pipeline/index_segment_reader.hpp>
20-
#include <arcticdb/pipeline/input_tensor_frame.hpp>
21-
#include <arcticdb/pipeline/read_options.hpp>
2217
#include <arcticdb/stream/stream_utils.hpp>
2318
#include <arcticdb/processing/clause.hpp>
2419
#include <arcticdb/util/simple_string_hash.hpp>
@@ -28,9 +23,13 @@
2823
#include <vector>
2924
#include <string>
3025
#include <variant>
26+
#include <ranges>
27+
#include <span>
3128

3229
namespace arcticdb::pipelines {
3330

31+
namespace ranges = std::ranges;
32+
3433
using FilterRange = std::variant<std::monostate, IndexRange, RowRange>;
3534

3635
/*
@@ -405,41 +404,37 @@ inline FilterRange get_query_index_range(
405404
return RowRange{std::get<NumericIndex>(index_range.start_), std::get<NumericIndex>(index_range.end_)};
406405
}
407406

408-
inline std::vector<SliceAndKey> strictly_before(const FilterRange &range, const std::vector<SliceAndKey> &input) {
407+
inline std::vector<SliceAndKey> strictly_before(const FilterRange &range, std::span<const SliceAndKey> input) {
409408
std::vector<SliceAndKey> output;
410409
util::variant_match(range,
411410
[&](const RowRange &row_range) {
412-
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
413-
[&](const auto &sk) {
414-
return sk.slice_.row_range.second < row_range.first;
415-
});
411+
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
412+
return sk.slice_.row_range.second < row_range.first;
413+
});
416414
},
417415
[&](const IndexRange &index_range) {
418-
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
419-
[&](const auto &sk) {
420-
return sk.key().index_range().end_ < index_range.start_;
421-
});
416+
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
417+
return sk.key().index_range().end_ < index_range.start_;
418+
});
422419
},
423420
[&](const auto &) {
424421
util::raise_rte("Expected specified range ");
425422
});
426423
return output;
427424
}
428425

429-
inline std::vector<SliceAndKey> strictly_after(const FilterRange &range, const std::vector<SliceAndKey> &input) {
426+
inline std::vector<SliceAndKey> strictly_after(const FilterRange &range, std::span<const SliceAndKey> input) {
430427
std::vector<SliceAndKey> output;
431428
util::variant_match(range,
432429
[&input, &output](const RowRange &row_range) {
433-
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
434-
[&](const auto &sk) {
435-
return sk.slice_.row_range.first > row_range.second;
436-
});
430+
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
431+
return sk.slice_.row_range.first > row_range.second;
432+
});
437433
},
438434
[&input, &output](const IndexRange &index_range) {
439-
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
440-
[&](const auto &sk) {
441-
return sk.key().index_range().start_ > index_range.end_;
442-
});
435+
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
436+
return sk.key().index_range().start_ > index_range.end_;
437+
});
443438
},
444439
[](const auto &) {
445440
util::raise_rte("Expected specified range ");

cpp/arcticdb/pipeline/read_pipeline.hpp

+10-23
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,9 @@
88
#pragma once
99

1010
#include <variant>
11-
12-
#include <folly/futures/Future.h>
13-
#include <boost/noncopyable.hpp>
14-
1511
#include <arcticdb/entity/types.hpp>
16-
#include <arcticdb/stream/index.hpp>
17-
#include <arcticdb/entity/protobufs.hpp>
18-
#include <pybind11/pybind11.h>
19-
20-
#include <arcticdb/stream/stream_sink.hpp>
21-
#include <arcticdb/stream/stream_source.hpp>
22-
#include <arcticdb/entity/native_tensor.hpp>
2312
#include <arcticdb/entity/performance_tracing.hpp>
24-
#include <arcticdb/entity/atom_key.hpp>
2513
#include <arcticdb/util/bitset.hpp>
26-
#include <arcticdb/util/constructors.hpp>
27-
#include <folly/executors/FutureExecutor.h>
28-
#include <folly/executors/CPUThreadPoolExecutor.h>
2914
#include <arcticdb/pipeline/frame_slice.hpp>
3015
#include <arcticdb/pipeline/python_output_frame.hpp>
3116
#include <arcticdb/pipeline/query.hpp>
@@ -61,22 +46,24 @@ void foreach_active_bit(const util::BitSet &bs, C &&visitor) {
6146
}
6247
}
6348

64-
template<typename ContainerType>
65-
std::vector<SliceAndKey> filter_index(const ContainerType &container, std::optional<CombinedQuery<ContainerType>> &&query) {
49+
inline std::vector<SliceAndKey> filter_index(
50+
const index::IndexSegmentReader& index_segment_reader,
51+
std::optional<CombinedQuery<index::IndexSegmentReader>> &&query
52+
) {
6653
ARCTICDB_SAMPLE_DEFAULT(FilterIndex)
6754
std::vector<SliceAndKey> output{};
68-
if (container.size()> 0) {
55+
if (!index_segment_reader.empty()) {
6956
if(query) {
70-
auto row_bitset = (*query)(container);
57+
auto row_bitset = (*query)(index_segment_reader);
7158
ARCTICDB_DEBUG(log::version(), "Row bitset has {} bits set of {}", row_bitset->count(), row_bitset->size());
7259
output.reserve(row_bitset->count());
7360
foreach_active_bit(*row_bitset, [&](auto r) {
74-
output.emplace_back(get_row(container, r));
61+
output.emplace_back(get_row(index_segment_reader, r));
7562
});
7663
} else {
77-
output.reserve(container.size());
78-
for(auto i = 0u; i < container.size(); ++i) {
79-
output.emplace_back(get_row(container, i));
64+
output.reserve(index_segment_reader.size());
65+
for(auto i = 0u; i < index_segment_reader.size(); ++i) {
66+
output.emplace_back(get_row(index_segment_reader, i));
8067
}
8168
}
8269
}

cpp/arcticdb/pipeline/write_frame.cpp

+38-32
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,21 @@
1616
#include <arcticdb/stream/aggregator.hpp>
1717
#include <arcticdb/entity/protobufs.hpp>
1818
#include <arcticdb/util/variant.hpp>
19-
#include <arcticdb/python/python_types.hpp>
2019
#include <arcticdb/pipeline/frame_utils.hpp>
2120
#include <arcticdb/pipeline/write_frame.hpp>
2221
#include <arcticdb/stream/append_map.hpp>
2322
#include <arcticdb/async/task_scheduler.hpp>
2423
#include <arcticdb/util/format_date.hpp>
2524
#include <vector>
2625
#include <array>
26+
#include <ranges>
2727

2828

2929
namespace arcticdb::pipelines {
3030

3131
using namespace arcticdb::entity;
3232
using namespace arcticdb::stream;
33+
namespace ranges = std::ranges;
3334

3435
WriteToSegmentTask::WriteToSegmentTask(
3536
std::shared_ptr<InputTensorFrame> frame,
@@ -252,40 +253,46 @@ static RowRange partial_rewrite_row_range(
252253
}
253254
}
254255

255-
std::optional<SliceAndKey> rewrite_partial_segment(
256+
folly::Future<std::optional<SliceAndKey>> async_rewrite_partial_segment(
256257
const SliceAndKey& existing,
257258
const IndexRange& index_range,
258259
VersionId version_id,
259260
AffectedSegmentPart affected_part,
260261
const std::shared_ptr<Store>& store) {
261-
const auto& key = existing.key();
262-
auto kv = store->read(key).get();
263-
const SegmentInMemory& segment = kv.second;
264-
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
265-
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
266-
if (num_rows <= 0) {
267-
return std::nullopt;
268-
}
269-
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
270-
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
271-
// +1 as in the key we store one nanosecond greater than the last index value in the segment
272-
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
273-
FrameSlice new_slice{
274-
std::make_shared<StreamDescriptor>(output.descriptor()),
275-
existing.slice_.col_range,
276-
RowRange{0, num_rows},
277-
existing.slice_.hash_bucket(),
278-
existing.slice_.num_buckets()};
279-
280-
auto fut_key = store->write(
281-
key.type(),
262+
return store->read(existing.key()).thenValueInline([
263+
existing,
264+
index_range,
282265
version_id,
283-
key.id(),
284-
start_ts,
285-
end_ts,
286-
std::move(output)
287-
);
288-
return SliceAndKey{std::move(new_slice), std::get<AtomKey>(std::move(fut_key).get())};
266+
affected_part,
267+
store](std::pair<VariantKey, SegmentInMemory>&& key_segment) -> folly::Future<std::optional<SliceAndKey>> {
268+
const auto& key = existing.key();
269+
const SegmentInMemory& segment = key_segment.second;
270+
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
271+
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
272+
if (num_rows <= 0) {
273+
return std::nullopt;
274+
}
275+
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
276+
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
277+
// +1 as in the key we store one nanosecond greater than the last index value in the segment
278+
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
279+
FrameSlice new_slice{
280+
std::make_shared<StreamDescriptor>(output.descriptor()),
281+
existing.slice_.col_range,
282+
RowRange{0, num_rows},
283+
existing.slice_.hash_bucket(),
284+
existing.slice_.num_buckets()};
285+
return store->write(
286+
key.type(),
287+
version_id,
288+
key.id(),
289+
start_ts,
290+
end_ts,
291+
std::move(output)
292+
).thenValueInline([new_slice=std::move(new_slice)](VariantKey&& k) {
293+
return std::make_optional<SliceAndKey>(std::move(new_slice), std::get<AtomKey>(std::move(k)));
294+
});
295+
});
289296
}
290297

291298
std::vector<SliceAndKey> flatten_and_fix_rows(const std::array<std::vector<SliceAndKey>, 5>& groups, size_t& global_count) {
@@ -301,10 +308,9 @@ std::vector<SliceAndKey> flatten_and_fix_rows(const std::array<std::vector<Slice
301308
return std::max(a, sk.slice_.row_range.second);
302309
});
303310

304-
std::transform(std::begin(group), std::end(group), std::back_inserter(output), [&](SliceAndKey sk) {
311+
ranges::transform(group, std::back_inserter(output), [&](SliceAndKey sk) {
305312
auto range_start = global_count + (sk.slice_.row_range.first - group_start);
306-
auto new_range = RowRange{range_start, range_start + (sk.slice_.row_range.diff())};
307-
sk.slice_.row_range = new_range;
313+
sk.slice_.row_range = RowRange{range_start, range_start + sk.slice_.row_range.diff()};
308314
return sk;
309315
});
310316

cpp/arcticdb/pipeline/write_frame.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ enum class AffectedSegmentPart {
8888
END
8989
};
9090

91-
std::optional<SliceAndKey> rewrite_partial_segment(
91+
folly::Future<std::optional<SliceAndKey>> async_rewrite_partial_segment(
9292
const SliceAndKey& existing,
9393
const IndexRange& index_range,
9494
VersionId version_id,

cpp/arcticdb/processing/test/benchmark_clause.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ using namespace arcticdb;
2121

2222
SegmentInMemory get_segment_for_merge(const StreamId &id, size_t num_rows, size_t start, size_t step){
2323
auto segment = SegmentInMemory{
24-
get_test_descriptor<stream::TimeseriesIndex>(id, {
25-
scalar_field(DataType::UINT8, "column")
26-
}),
24+
get_test_descriptor<stream::TimeseriesIndex>(id, std::array{scalar_field(DataType::UINT8, "column")}),
2725
num_rows
2826
};
2927
auto& index_col = segment.column(0);

cpp/arcticdb/stream/stream_source.hpp

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#pragma once
99

1010
#include <arcticdb/column_store/memory_segment.hpp>
11-
#include <arcticdb/codec/segment.hpp>
1211
#include <arcticdb/storage/storage.hpp>
1312
#include <arcticdb/storage/storage_options.hpp>
1413
#include <arcticdb/async/batch_read_args.hpp>

0 commit comments

Comments
 (0)