Skip to content

Commit 7f6f44c

Browse files
authored
Return correct timestamp in the VersionItem returned by write when recursive normalizers are used (#2209)
#### Reference Issues/PRs The issue was reported by users using recursive normalizers. The write method ```python v2lib._nvs.write(..., recursive_normalizers=True) ``` return a VersionedItem whose timestamp is 0. #### What does this implement or fix? This PR is split into two commits. 1. First commit moves all recursive normalizers unit tests from `python/tests/integration/arcticdb/version_store/test_basic_version_store.py` to `python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py` 2. The second commit implements the actual fix: * Python code change: set the timestamp from the VersionedItem returned by the C++ layer. The C++ layer uses the MULTI_KEY to create a VersionItem * This also changes the read side when reucrsive normalizers are used. Prior this change when a read with recursive normalizers was done it used one of the TABLE_INDEX keys to create versioned item. There wasn't any particular reasoning for whose TABLE_INDEX key to select. Since both index keys are created at different times this desn't reflect well the write timestamp. Now it uses the MULTI_KEY to create the VersionedItem. This way both read and write will return the same VersionedItem (except the data as write does not return data) and will also have a single deterministic timestamp for all symbols. * Add minor improvements such as reserving space for vectors with known sizes. * Remove `test_simple_recursive_normalizer` as it is repetitive #### Any other comments? Interesting remark related to creating versioned items. When doing regular read and write we create a versioned item based on the INDEX_DATA key and not based on the VERSION key. I find this a bit misleading mainly due to the name `VersionedItem`. #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 2b7d252 commit 7f6f44c

File tree

10 files changed

+310
-230
lines changed

10 files changed

+310
-230
lines changed

cpp/arcticdb/storage/s3/s3_client_wrapper.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,7 @@ folly::Future<S3Result<std::monostate>> S3ClientTestWrapper::delete_object(
136136
return actual_client_->delete_object(s3_object_name, bucket_name);
137137
}
138138

139-
// Using a fixed page size since it's only being used for simple tests.
140-
// If we ever need to configure it we should move it to the s3 proto config instead.
139+
141140
S3Result<ListObjectsOutput> S3ClientTestWrapper::list_objects(
142141
const std::string& name_prefix,
143142
const std::string& bucket_name,

cpp/arcticdb/version/version_core.cpp

+9-5
Original file line numberDiff line numberDiff line change
@@ -500,21 +500,25 @@ VersionedItem update_impl(
500500
folly::Future<ReadVersionOutput> read_multi_key(
501501
const std::shared_ptr<Store>& store,
502502
const SegmentInMemory& index_key_seg,
503-
std::any& handler_data) {
503+
std::any& handler_data,
504+
AtomKey&& key
505+
) {
504506
std::vector<AtomKey> keys;
507+
keys.reserve(index_key_seg.row_count());
505508
for (size_t idx = 0; idx < index_key_seg.row_count(); idx++) {
506-
keys.emplace_back(stream::read_key_row(index_key_seg, static_cast<ssize_t>(idx)));
509+
keys.emplace_back(read_key_row(index_key_seg, static_cast<ssize_t>(idx)));
507510
}
508511

509512
AtomKey dup{keys[0]};
510513
VersionedItem versioned_item{std::move(dup)};
511514
TimeseriesDescriptor multi_key_desc{index_key_seg.index_descriptor()};
512515

513516
return read_frame_for_version(store, versioned_item, std::make_shared<ReadQuery>(), ReadOptions{}, handler_data)
514-
.thenValue([multi_key_desc=std::move(multi_key_desc), keys=std::move(keys)](ReadVersionOutput&& read_version_output) mutable {
517+
.thenValue([multi_key_desc=std::move(multi_key_desc), keys=std::move(keys), key=std::move(key)](ReadVersionOutput&& read_version_output) mutable {
515518
multi_key_desc.mutable_proto().mutable_normalization()->CopyFrom(read_version_output.frame_and_descriptor_.desc_.proto().normalization());
516519
read_version_output.frame_and_descriptor_.desc_ = std::move(multi_key_desc);
517520
read_version_output.frame_and_descriptor_.keys_ = std::move(keys);
521+
read_version_output.versioned_item_ = VersionedItem(std::move(key));
518522
return std::move(read_version_output);
519523
});
520524
}
@@ -2046,7 +2050,7 @@ folly::Future<ReadVersionOutput> read_frame_for_version(
20462050

20472051
if(pipeline_context->multi_key_) {
20482052
check_multi_key_is_not_index_only(*pipeline_context, *read_query);
2049-
return read_multi_key(store, *pipeline_context->multi_key_, handler_data);
2053+
return read_multi_key(store, *pipeline_context->multi_key_, handler_data, std::move(res_versioned_item.key_));
20502054
}
20512055

20522056
if(opt_false(read_options.incompletes())) {
@@ -2067,7 +2071,7 @@ folly::Future<ReadVersionOutput> read_frame_for_version(
20672071
ARCTICDB_DEBUG(log::version(), "Fetching data to frame");
20682072

20692073
DecodePathData shared_data;
2070-
return version_store::do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data)
2074+
return do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data)
20712075
.thenValue([res_versioned_item, pipeline_context, read_options, &handler_data, read_query, shared_data](auto&& frame) mutable {
20722076
ARCTICDB_DEBUG(log::version(), "Reduce and fix columns");
20732077
return reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data)

cpp/arcticdb/version/version_store_api.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -571,11 +571,16 @@ VersionedItem PythonVersionStore::write_versioned_composite_data(
571571
ARCTICDB_DEBUG(log::version(), "write_versioned_composite_data for stream_id: {} , version_id = {}", stream_id, version_id);
572572
// TODO: Assuming each sub key is always going to have the same version attached to it.
573573
std::vector<VersionId> version_ids;
574+
version_ids.reserve(sub_keys.size());
575+
574576
std::vector<py::object> user_metas;
577+
user_metas.reserve(sub_keys.size());
578+
579+
std::vector<std::shared_ptr<DeDupMap>> de_dup_maps;
580+
de_dup_maps.reserve(sub_keys.size());
575581

576582
auto write_options = get_write_options();
577583
auto de_dup_map = get_de_dup_map(stream_id, maybe_prev, write_options);
578-
std::vector<std::shared_ptr<DeDupMap>> de_dup_maps;
579584
for (auto i = 0u; i < sub_keys.size(); ++i) {
580585
version_ids.emplace_back(version_id);
581586
user_metas.emplace_back(py::none());

cpp/arcticdb/version/version_utils.cpp

+41
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,47 @@ using namespace arcticdb::entity;
2020
using namespace arcticdb::stream;
2121

2222

23+
VariantKey write_multi_index_entry(
24+
std::shared_ptr<StreamSink> store,
25+
std::vector<AtomKey> &keys,
26+
const StreamId &stream_id,
27+
const py::object &metastruct,
28+
const py::object &user_meta,
29+
VersionId version_id
30+
) {
31+
ARCTICDB_SAMPLE(WriteJournalEntry, 0)
32+
ARCTICDB_DEBUG(log::version(), "Version map writing multi key");
33+
folly::Future<VariantKey> multi_key_fut = folly::Future<VariantKey>::makeEmpty();
34+
35+
IndexAggregator<RowCountIndex> multi_index_agg(stream_id, [&multi_key_fut, &store, version_id, stream_id](auto &&segment) {
36+
multi_key_fut = store->write(KeyType::MULTI_KEY,
37+
version_id, // version_id
38+
stream_id,
39+
NumericIndex{0}, // start_index
40+
NumericIndex{0}, // end_index
41+
std::forward<decltype(segment)>(segment)).wait();
42+
});
43+
44+
for (auto &key : keys) {
45+
multi_index_agg.add_key(to_atom(key));
46+
}
47+
TimeseriesDescriptor timeseries_descriptor;
48+
49+
if (!metastruct.is_none()) {
50+
arcticdb::proto::descriptors::UserDefinedMetadata multi_key_proto;
51+
python_util::pb_from_python(metastruct, multi_key_proto);
52+
timeseries_descriptor.set_multi_key_metadata(std::move(multi_key_proto));
53+
}
54+
if (!user_meta.is_none()) {
55+
arcticdb::proto::descriptors::UserDefinedMetadata user_meta_proto;
56+
python_util::pb_from_python(user_meta, user_meta_proto);
57+
timeseries_descriptor.set_user_metadata(std::move(user_meta_proto));
58+
}
59+
multi_index_agg.set_timeseries_descriptor(timeseries_descriptor);
60+
multi_index_agg.commit();
61+
return multi_key_fut.wait().value();
62+
}
63+
2364
std::unordered_map<StreamId, size_t> get_num_version_entries(const std::shared_ptr<Store>& store, size_t batch_size) {
2465
std::unordered_map<StreamId, size_t> output;
2566
size_t max_blocks = ConfigsMap::instance()->get_int("VersionMap.MaxVersionBlocks", 5);

cpp/arcticdb/version/version_utils.hpp

+2-34
Original file line numberDiff line numberDiff line change
@@ -22,46 +22,14 @@
2222

2323
namespace arcticdb {
2424

25-
inline entity::VariantKey write_multi_index_entry(
25+
VariantKey write_multi_index_entry(
2626
std::shared_ptr<StreamSink> store,
2727
std::vector<AtomKey> &keys,
2828
const StreamId &stream_id,
2929
const py::object &metastruct,
3030
const py::object &user_meta,
3131
VersionId version_id
32-
) {
33-
ARCTICDB_SAMPLE(WriteJournalEntry, 0)
34-
ARCTICDB_DEBUG(log::version(), "Version map writing multi key");
35-
folly::Future<VariantKey> multi_key_fut = folly::Future<VariantKey>::makeEmpty();
36-
37-
IndexAggregator<RowCountIndex> multi_index_agg(stream_id, [&multi_key_fut, &store, version_id, stream_id](auto &&segment) {
38-
multi_key_fut = store->write(KeyType::MULTI_KEY,
39-
version_id, // version_id
40-
stream_id,
41-
NumericIndex{0}, // start_index
42-
NumericIndex{0}, // end_index
43-
std::forward<decltype(segment)>(segment)).wait();
44-
});
45-
46-
for (auto &key : keys) {
47-
multi_index_agg.add_key(to_atom(key));
48-
}
49-
TimeseriesDescriptor timeseries_descriptor;
50-
51-
if (!metastruct.is_none()) {
52-
arcticdb::proto::descriptors::UserDefinedMetadata multi_key_proto;
53-
python_util::pb_from_python(metastruct, multi_key_proto);
54-
timeseries_descriptor.set_multi_key_metadata(std::move(multi_key_proto));
55-
}
56-
if (!user_meta.is_none()) {
57-
arcticdb::proto::descriptors::UserDefinedMetadata user_meta_proto;
58-
python_util::pb_from_python(user_meta, user_meta_proto);
59-
timeseries_descriptor.set_user_metadata(std::move(user_meta_proto));
60-
}
61-
multi_index_agg.set_timeseries_descriptor(timeseries_descriptor);
62-
multi_index_agg.commit();
63-
return multi_key_fut.wait().value();
64-
}
32+
);
6533

6634
inline std::optional<AtomKey> read_segment_with_keys(
6735
const SegmentInMemory &seg,

python/arcticdb/util/test.py

+16
Original file line numberDiff line numberDiff line change
@@ -901,3 +901,19 @@ def generic_resample_test(
901901
assert_dfs_approximate(expected, received)
902902
else:
903903
assert_frame_equal(expected, received, check_dtype=False)
904+
905+
def equals(x, y):
906+
if isinstance(x, tuple) or isinstance(x, list):
907+
assert len(x) == len(y)
908+
for vx, vy in zip(x, y):
909+
equals(vx, vy)
910+
elif isinstance(x, dict):
911+
assert isinstance(y, dict)
912+
assert set(x.keys()) == set(y.keys())
913+
for k in x.keys():
914+
equals(x[k], y[k])
915+
elif isinstance(x, np.ndarray):
916+
assert isinstance(y, np.ndarray)
917+
assert np.allclose(x, y)
918+
else:
919+
assert x == y

python/arcticdb/version_store/_store.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ def _try_flatten_and_write_composite_object(
433433
metadata=metadata,
434434
data=None,
435435
host=self.env,
436+
timestamp=vit_composite.timestamp
436437
)
437438

438439
def _write_options(self):
@@ -2153,7 +2154,6 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
21532154
data = self._normalizer.denormalize(frame_data, read_result.norm)
21542155
if read_result.norm.HasField("custom"):
21552156
data = self._custom_normalizer.denormalize(data, read_result.norm.custom)
2156-
21572157
return VersionedItem(
21582158
symbol=read_result.version.symbol,
21592159
library=self._library.library_path,

python/tests/compat/arcticdb/test_compatibility.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -217,4 +217,4 @@ def test_compat_read_incomplete(old_venv_and_arctic_uri, lib_name):
217217
assert_frame_equal(read_df, df[["float_col", "str_col"]])
218218

219219
read_df = curr.lib._nvs.read(sym, date_range=(pd.Timestamp(2024, 1, 5), pd.Timestamp(2024, 1, 9)), incomplete=True, columns=["float_col", "str_col"]).data
220-
assert_frame_equal(read_df, df[["float_col", "str_col"]].iloc[4:9])
220+
assert_frame_equal(read_df, df[["float_col", "str_col"]].iloc[4:9])

0 commit comments

Comments
 (0)