Skip to content

Commit d261361

Browse files
authored
Implement batch_update (#2100)
#### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> #### What does this implement or fix? * Add a method updating several symbols in parallel using the previously added `update_async` method * Expose the update batch functionality to the python layer. * Add unit tests * Add documentation #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent aa258b5 commit d261361

File tree

11 files changed

+620
-237
lines changed

11 files changed

+620
-237
lines changed

cpp/arcticdb/stream/test/stream_test_common.hpp

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
#include <folly/gen/Base.h>
2323
#include <folly/futures/Future.h>
24-
2524
#include <filesystem>
2625
#include <span>
2726
#include <string>

cpp/arcticdb/version/local_versioned_engine.cpp

+140-124
Large diffs are not rendered by default.

cpp/arcticdb/version/local_versioned_engine.hpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <arcticdb/entity/descriptor_item.hpp>
2323
#include <arcticdb/entity/data_error.hpp>
2424

25-
#include <sstream>
2625
namespace arcticdb::version_store {
2726

2827
/**
@@ -277,6 +276,13 @@ class LocalVersionedEngine : public VersionedEngine {
277276
bool upsert,
278277
bool throw_on_error);
279278

279+
std::vector<std::variant<VersionedItem, DataError>> batch_update_internal(
280+
const std::vector<StreamId>& stream_ids,
281+
std::vector<std::shared_ptr<InputTensorFrame>>&& frames,
282+
const std::vector<UpdateQuery>& update_queries,
283+
bool prune_previous_versions,
284+
bool upsert);
285+
280286
std::vector<ReadVersionOutput> batch_read_keys(const std::vector<AtomKey> &keys);
281287

282288
std::vector<std::variant<ReadVersionOutput, DataError>> batch_read_internal(

cpp/arcticdb/version/python_bindings.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <arcticdb/python/adapt_read_dataframe.hpp>
2323
#include <arcticdb/version/schema_checks.hpp>
2424
#include <arcticdb/util/pybind_mutex.hpp>
25-
#include <boost/date_time/posix_time/posix_time.hpp>
2625

2726

2827
namespace arcticdb::version_store {
@@ -300,7 +299,8 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
300299
.def_property_readonly("error_code", &DataError::error_code)
301300
.def_property_readonly("error_category", &DataError::error_category)
302301
.def_property_readonly("exception_string", &DataError::exception_string)
303-
.def("__str__", &DataError::to_string);
302+
.def("__str__", &DataError::to_string)
303+
.def("__repr__", &DataError::to_string);
304304

305305
// TODO: add repr.
306306
py::class_<VersionedItem>(version, "VersionedItem")
@@ -784,6 +784,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
784784
.def("batch_append",
785785
&PythonVersionStore::batch_append,
786786
py::call_guard<SingleThreadMutexHolder>(), "Batch append to a list of symbols")
787+
.def("batch_update",
788+
&PythonVersionStore::batch_update,
789+
py::call_guard<SingleThreadMutexHolder>(), "Batch update a list of symbols")
787790
.def("batch_restore_version",
788791
[&](PythonVersionStore& v, const std::vector<StreamId>& ids, const std::vector<VersionQuery>& version_queries){
789792
auto results = v.batch_restore_version(ids, version_queries);

cpp/arcticdb/version/version_core.cpp

+32-25
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ std::vector<SliceAndKey> filter_existing_slices(std::vector<std::optional<SliceA
190190
using IntersectingSegments = std::tuple<std::vector<SliceAndKey>, std::vector<SliceAndKey>>;
191191

192192
[[nodiscard]] folly::Future<IntersectingSegments> async_intersecting_segments(
193-
const std::vector<SliceAndKey>& affected_keys,
193+
std::shared_ptr<std::vector<SliceAndKey>> affected_keys,
194194
const IndexRange& front_range,
195195
const IndexRange& back_range,
196196
VersionId version_id,
@@ -205,7 +205,7 @@ using IntersectingSegments = std::tuple<std::vector<SliceAndKey>, std::vector<Sl
205205
std::vector<folly::Future<std::optional<SliceAndKey>>> maybe_intersect_before_fut;
206206
std::vector<folly::Future<std::optional<SliceAndKey>>> maybe_intersect_after_fut;
207207

208-
for (const auto& affected_slice_and_key : affected_keys) {
208+
for (const auto& affected_slice_and_key : *affected_keys) {
209209
const auto& affected_range = affected_slice_and_key.key().index_range();
210210
if (intersects(affected_range, front_range) && !overlaps(affected_range, front_range) &&
211211
is_before(affected_range, front_range)) {
@@ -267,7 +267,7 @@ VersionedItem delete_range_impl(
267267
std::end(affected_keys),
268268
std::back_inserter(unaffected_keys));
269269

270-
auto [intersect_before, intersect_after] = async_intersecting_segments(affected_keys, index_range, index_range, update_info.next_version_id_, store).get();
270+
auto [intersect_before, intersect_after] = async_intersecting_segments(std::make_shared<std::vector<SliceAndKey>>(affected_keys), index_range, index_range, update_info.next_version_id_, store).get();
271271

272272
auto orig_filter_range = std::holds_alternative<std::monostate>(query.row_filter) ? get_query_index_range(index, index_range) : query.row_filter;
273273

@@ -314,7 +314,12 @@ struct UpdateRanges {
314314
IndexRange original_index_range;
315315
};
316316

317-
static UpdateRanges compute_update_ranges(const FilterRange& row_filter, const InputTensorFrame& update_frame, std::span<SliceAndKey> update_slice_and_keys) {
317+
318+
static UpdateRanges compute_update_ranges(
319+
const FilterRange& row_filter,
320+
const InputTensorFrame& update_frame,
321+
std::span<SliceAndKey> update_slice_and_keys
322+
) {
318323
return util::variant_match(row_filter,
319324
[&](std::monostate) -> UpdateRanges {
320325
util::check(std::holds_alternative<TimeseriesIndex>(update_frame.index), "Update with row count index is not permitted");
@@ -356,16 +361,18 @@ static void check_can_update(
356361
}
357362

358363
static std::shared_ptr<std::vector<SliceAndKey>> get_keys_affected_by_update(
359-
const index::IndexSegmentReader& index_segment_reader,
360-
const InputTensorFrame& frame,
361-
const UpdateQuery& query,
362-
bool dynamic_schema) {
364+
const index::IndexSegmentReader& index_segment_reader,
365+
const InputTensorFrame& frame,
366+
const UpdateQuery& query,
367+
bool dynamic_schema
368+
) {
363369
std::vector<FilterQuery<index::IndexSegmentReader>> queries = build_update_query_filters<index::IndexSegmentReader>(
364-
query.row_filter,
365-
frame.index,
366-
frame.index_range,
367-
dynamic_schema,
368-
index_segment_reader.bucketize_dynamic());
370+
query.row_filter,
371+
frame.index,
372+
frame.index_range,
373+
dynamic_schema,
374+
index_segment_reader.bucketize_dynamic()
375+
);
369376
return std::make_shared<std::vector<SliceAndKey>>(filter_index(index_segment_reader, combine_filter_functions(queries)));
370377
}
371378

@@ -383,11 +390,12 @@ static std::vector<SliceAndKey> get_keys_not_affected_by_update(
383390
}
384391

385392
static std::pair<std::vector<SliceAndKey>, size_t> get_slice_and_keys_for_update(
386-
const UpdateRanges& update_ranges,
387-
std::span<const SliceAndKey> unaffected_keys,
388-
std::span<const SliceAndKey> affected_keys,
389-
IntersectingSegments&& segments_intersecting_with_update_range,
390-
std::vector<SliceAndKey>&& new_slice_and_keys) {
393+
const UpdateRanges& update_ranges,
394+
std::span<const SliceAndKey> unaffected_keys,
395+
std::span<const SliceAndKey> affected_keys,
396+
const IntersectingSegments& segments_intersecting_with_update_range,
397+
std::vector<SliceAndKey>&& new_slice_and_keys
398+
) {
391399
const size_t new_keys_size = new_slice_and_keys.size();
392400
size_t row_count = 0;
393401
const std::array<std::vector<SliceAndKey>, 5> groups{
@@ -409,15 +417,15 @@ folly::Future<AtomKey> async_update_impl(
409417
const UpdateInfo& update_info,
410418
const UpdateQuery& query,
411419
const std::shared_ptr<InputTensorFrame>& frame,
412-
const WriteOptions& options,
420+
WriteOptions&& options,
413421
bool dynamic_schema,
414422
bool empty_types) {
415423
return index::async_get_index_reader(*(update_info.previous_index_key_), store).thenValue([
416424
store,
417425
update_info,
418426
query,
419427
frame,
420-
options=options,
428+
options=std::move(options),
421429
dynamic_schema,
422430
empty_types
423431
](index::IndexSegmentReader&& index_segment_reader) {
@@ -441,16 +449,15 @@ folly::Future<AtomKey> async_update_impl(
441449
"The sum of affected keys and unaffected keys must be equal to the total number of keys {} + {} != {}",
442450
affected_keys->size(), unaffected_keys.size(), index_segment_reader.size());
443451
const UpdateRanges update_ranges = compute_update_ranges(query.row_filter, *frame, new_slice_and_keys);
444-
445452
return async_intersecting_segments(
446-
*affected_keys,
453+
affected_keys,
447454
update_ranges.front,
448455
update_ranges.back,
449456
update_info.next_version_id_,
450457
store).thenValue([new_slice_and_keys=std::move(new_slice_and_keys),
451458
update_ranges=update_ranges,
452459
unaffected_keys=std::move(unaffected_keys),
453-
affected_keys=affected_keys,
460+
affected_keys=std::move(affected_keys),
454461
index_segment_reader=std::move(index_segment_reader),
455462
frame,
456463
dynamic_schema,
@@ -465,7 +472,7 @@ folly::Future<AtomKey> async_update_impl(
465472
auto tsd = index::get_merged_tsd(row_count, dynamic_schema, index_segment_reader.tsd(), frame);
466473
return index::write_index(
467474
index_type_from_descriptor(tsd.as_stream_descriptor()),
468-
tsd,
475+
std::move(tsd),
469476
std::move(flattened_slice_and_keys),
470477
IndexPartialKey{frame->desc.id(), update_info.next_version_id_},
471478
store
@@ -483,7 +490,7 @@ VersionedItem update_impl(
483490
WriteOptions&& options,
484491
bool dynamic_schema,
485492
bool empty_types) {
486-
auto version_key = async_update_impl(store, update_info, query, frame, options, dynamic_schema, empty_types).get();
493+
auto version_key = async_update_impl(store, update_info, query, frame, std::move(options), dynamic_schema, empty_types).get();
487494
auto versioned_item = VersionedItem(to_atom(std::move(version_key)));
488495
ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", frame->desc.id(), update_info.next_version_id_);
489496
return versioned_item;

cpp/arcticdb/version/version_core.hpp

+9-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include <arcticdb/pipeline/write_options.hpp>
1414
#include <arcticdb/async/task_scheduler.hpp>
1515
#include <arcticdb/stream/incompletes.hpp>
16-
#include <arcticdb/pipeline/read_pipeline.hpp>
1716
#include <arcticdb/pipeline/pipeline_context.hpp>
1817
#include <arcticdb/pipeline/read_options.hpp>
1918
#include <arcticdb/entity/atom_key.hpp>
@@ -98,6 +97,15 @@ VersionedItem update_impl(
9897
bool dynamic_schema,
9998
bool empty_types);
10099

100+
folly::Future<AtomKey> async_update_impl(
101+
const std::shared_ptr<Store>& store,
102+
const UpdateInfo& update_info,
103+
const UpdateQuery& query,
104+
const std::shared_ptr<InputTensorFrame>& frame,
105+
WriteOptions&& options,
106+
bool dynamic_schema,
107+
bool empty_types);
108+
101109
VersionedItem delete_range_impl(
102110
const std::shared_ptr<Store>& store,
103111
const StreamId& stream_id,

cpp/arcticdb/version/version_store_api.cpp

+13
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,19 @@ std::vector<std::variant<ReadResult, DataError>> PythonVersionStore::batch_read(
787787
return res;
788788
}
789789

790+
std::vector<std::variant<VersionedItem, DataError>> PythonVersionStore::batch_update(
791+
const std::vector<StreamId>& stream_ids,
792+
const std::vector<py::tuple>& items,
793+
const std::vector<py::object>& norms,
794+
const std::vector<py::object>& user_metas,
795+
const std::vector<UpdateQuery>& update_qeries,
796+
bool prune_previous_versions,
797+
bool upsert
798+
) {
799+
auto frames = create_input_tensor_frames(stream_ids, items, norms, user_metas, cfg().write_options().empty_types());
800+
return batch_update_internal(stream_ids, std::move(frames), update_qeries, prune_previous_versions, upsert);
801+
}
802+
790803
void PythonVersionStore::delete_snapshot(const SnapshotId& snap_name) {
791804
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: delete_snapshot");
792805
auto opt_snapshot = get_snapshot(store(), snap_name);

cpp/arcticdb/version/version_store_api.hpp

+9-10
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,21 @@
1010
#include <arcticdb/entity/data_error.hpp>
1111
#include <arcticdb/entity/types.hpp>
1212
#include <arcticdb/stream/index.hpp>
13-
#include <arcticdb/util/timeouts.hpp>
14-
#include <arcticdb/util/variant.hpp>
1513
#include <pybind11/pybind11.h>
1614
#include <arcticdb/python/python_utils.hpp>
1715
#include <arcticdb/async/async_store.hpp>
1816
#include <arcticdb/version/version_map.hpp>
1917
#include <arcticdb/version/snapshot.hpp>
2018
#include <arcticdb/version/symbol_list.hpp>
21-
#include <arcticdb/entity/protobufs.hpp>
2219
#include <arcticdb/pipeline/column_stats.hpp>
23-
#include <arcticdb/pipeline/write_options.hpp>
2420
#include <arcticdb/entity/versioned_item.hpp>
2521
#include <arcticdb/pipeline/query.hpp>
26-
#include <arcticdb/pipeline/slicing.hpp>
2722
#include <arcticdb/pipeline/read_pipeline.hpp>
28-
#include <arcticdb/pipeline/query.hpp>
2923
#include <arcticdb/pipeline/read_options.hpp>
3024
#include <arcticdb/stream/incompletes.hpp>
3125
#include <arcticdb/version/version_core.hpp>
3226
#include <arcticdb/version/local_versioned_engine.hpp>
3327
#include <arcticdb/entity/read_result.hpp>
34-
#include <arcticdb/version/version_log.hpp>
35-
36-
#include <type_traits>
37-
#include <iostream>
3828

3929
namespace arcticdb::version_store {
4030

@@ -299,6 +289,15 @@ class PythonVersionStore : public LocalVersionedEngine {
299289
const ReadOptions& read_options,
300290
std::any& handler_data);
301291

292+
std::vector<std::variant<VersionedItem, DataError>> batch_update(
293+
const std::vector<StreamId>& stream_ids,
294+
const std::vector<py::tuple>& items,
295+
const std::vector<py::object>& norms,
296+
const std::vector<py::object>& user_metas,
297+
const std::vector<UpdateQuery>& update_qeries,
298+
bool prune_previous_versions,
299+
bool upsert);
300+
302301
std::vector<std::variant<std::pair<VersionedItem, py::object>, DataError>> batch_read_metadata(
303302
const std::vector<StreamId>& stream_ids,
304303
const std::vector<VersionQuery>& version_queries,

0 commit comments

Comments
 (0)