Skip to content

Commit 50e766e

Browse files
authored
8066268338 Key segment pair refactor - do not move the segment out of the KeySegmentPair as it is a shared resource (#2166)
This allows us to re-enable some tests in `arcticdb-enterprise` https://github.com/man-group/arcticdb-enterprise/pull/new/aseaton/keysegmentpair-refactor-enterprise . The changes are: - Remove the mutable `Segment& segment()` in the `KeySegmentPair` - this was often used to move out of the `Segment` owned by the shared pointer in `KeySegmentPair` - Pass `KeySegmentPair` by lvalue rather than rvalue reference where possible. The `Task` layer still needs rvalue references as that is what Folly expects (rightly, the Folly executors need to have ownership). - Refactor `lookup_match_in_dedup_match` to return a variant - Unrelated tidy up to `DecodeMetadataTask` - `clone()` segments when adding them to the in in-memory and mock storages In particular this lets `CopyCompressedInterstore` safely copy the same `KeySegmentPair` to several targets, whereas before the `Segment` could be moved from during the copy to each target.
1 parent 054c6f7 commit 50e766e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+207
-220
lines changed

cpp/arcticdb/async/async_store.cpp

+4-6
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,24 @@
77

88
#include <arcticdb/async/async_store.hpp>
99
#include <arcticdb/entity/variant_key.hpp>
10-
#include <arcticdb/codec/segment.hpp>
1110
#include <arcticdb/storage/key_segment_pair.hpp>
1211
#include <arcticdb/version/de_dup_map.hpp>
1312

1413
namespace arcticdb::async {
15-
std::pair<entity::VariantKey, std::optional<Segment>> lookup_match_in_dedup_map(
14+
DeDupLookupResult lookup_match_in_dedup_map(
1615
const std::shared_ptr<DeDupMap> &de_dup_map,
17-
storage::KeySegmentPair&& key_seg) {
16+
storage::KeySegmentPair& key_seg) {
1817
std::optional<AtomKey> de_dup_key;
1918
if (!de_dup_map || !(de_dup_key = de_dup_map->get_key_if_present(key_seg.atom_key()))) {
2019
ARCTICDB_DEBUG(log::version(),
2120
"No existing key with same contents: writing new object {}",
2221
key_seg.atom_key());
23-
return std::make_pair(std::move(key_seg.atom_key()), std::make_optional(std::move(key_seg.segment())));
24-
22+
return key_seg;
2523
} else {
2624
ARCTICDB_DEBUG(log::version(),
2725
"Found existing key with same contents: using existing object {}",
2826
*de_dup_key);
29-
return std::make_pair(*de_dup_key, std::nullopt);
27+
return *de_dup_key;
3028
}
3129
}
3230
}

cpp/arcticdb/async/async_store.hpp

+19-13
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ namespace arcticdb::toolbox::apy{
2424

2525
namespace arcticdb::async {
2626

27-
std::pair<VariantKey, std::optional<Segment>> lookup_match_in_dedup_map(
27+
using ExistingObject = entity::VariantKey;
28+
using NewObject = storage::KeySegmentPair;
29+
using DeDupLookupResult = std::variant<ExistingObject, NewObject>;
30+
31+
DeDupLookupResult lookup_match_in_dedup_map(
2832
const std::shared_ptr<DeDupMap> &de_dup_map,
29-
storage::KeySegmentPair&& key_seg);
33+
storage::KeySegmentPair& key_seg);
3034

3135
template <typename Callable>
3236
auto read_and_continue(const VariantKey& key, std::shared_ptr<storage::Library> library, const storage::ReadKeyOpts& opts, Callable&& c) {
@@ -173,7 +177,7 @@ folly::Future<folly::Unit> write_compressed(storage::KeySegmentPair ks) override
173177
}
174178

175179
void write_compressed_sync(storage::KeySegmentPair ks) override {
176-
library_->write(std::move(ks));
180+
library_->write(ks);
177181
}
178182

179183
folly::Future<entity::VariantKey> update(const entity::VariantKey &key,
@@ -375,7 +379,6 @@ std::vector<folly::Future<bool>> batch_key_exists(
375379
folly::Future<SliceAndKey> async_write(
376380
folly::Future<std::tuple<PartialKey, SegmentInMemory, pipelines::FrameSlice>> &&input_fut,
377381
const std::shared_ptr<DeDupMap> &de_dup_map) override {
378-
using KeyOptSegment = std::pair<VariantKey, std::optional<Segment>>;
379382
return std::move(input_fut).thenValue([this] (auto&& input) {
380383
auto [key, seg, slice] = std::forward<decltype(input)>(input);
381384
auto key_seg = EncodeAtomTask{
@@ -386,17 +389,20 @@ folly::Future<SliceAndKey> async_write(
386389
encoding_version_}();
387390
return std::pair<storage::KeySegmentPair, FrameSlice>(std::move(key_seg), std::move(slice));
388391
})
389-
.thenValue([de_dup_map](auto &&ks) -> std::pair<KeyOptSegment, pipelines::FrameSlice> {
390-
auto [key_seg, slice] = std::forward<decltype(ks)>(ks);
391-
return std::make_pair(lookup_match_in_dedup_map(de_dup_map, std::move(key_seg)), std::move(slice));
392+
.thenValue([de_dup_map](auto &&ks) -> std::pair<DeDupLookupResult, pipelines::FrameSlice> {
393+
auto& [key_seg, slice] = ks;
394+
return std::make_pair<>(lookup_match_in_dedup_map(de_dup_map, key_seg), std::move(slice));
392395
})
393396
.via(&async::io_executor())
394-
.thenValue([lib=library_](auto &&item) {
395-
auto [key_opt_segment, slice] = std::forward<decltype(item)>(item);
396-
if (key_opt_segment.second)
397-
lib->write({VariantKey{key_opt_segment.first}, std::move(*key_opt_segment.second)});
398-
399-
return SliceAndKey{slice, to_atom(key_opt_segment.first)};
397+
.thenValue([lib=library_](auto&& item) {
398+
auto& [dedup_lookup, slice] = item;
399+
return util::variant_match(dedup_lookup,
400+
[&](NewObject& obj) {
401+
lib->write(obj);
402+
return SliceAndKey{slice, obj.atom_key()};
403+
}, [&](ExistingObject& obj) {
404+
return SliceAndKey{slice, to_atom(std::move(obj))};
405+
});
400406
});
401407
}
402408

cpp/arcticdb/async/tasks.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace arcticdb::async {
3737

3838
pipelines::SegmentAndSlice DecodeSliceTask::decode_into_slice(storage::KeySegmentPair&& key_segment_pair) {
3939
auto key = key_segment_pair.atom_key();
40-
auto& seg = key_segment_pair.segment();
40+
auto& seg = *key_segment_pair.segment_ptr();
4141
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
4242
seg.size(),
4343
key);

cpp/arcticdb/async/tasks.hpp

+16-25
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ struct WriteSegmentTask : BaseTask {
183183
VariantKey operator()(storage::KeySegmentPair &&key_seg) const {
184184
ARCTICDB_SAMPLE(WriteSegmentTask, 0)
185185
auto k = key_seg.variant_key();
186-
lib_->write(std::move(key_seg));
186+
lib_->write(key_seg);
187187
return k;
188188
}
189189
};
@@ -200,7 +200,7 @@ struct WriteIfNoneTask : BaseTask {
200200
VariantKey operator()(storage::KeySegmentPair &&key_seg) const {
201201
ARCTICDB_SAMPLE(WriteSegmentTask, 0)
202202
auto k = key_seg.variant_key();
203-
lib_->write_if_none(std::move(key_seg));
203+
lib_->write_if_none(key_seg);
204204
return k;
205205
}
206206
};
@@ -219,7 +219,7 @@ struct UpdateSegmentTask : BaseTask {
219219
VariantKey operator()(storage::KeySegmentPair &&key_seg) const {
220220
ARCTICDB_SAMPLE(UpdateSegmentTask, 0)
221221
auto k = key_seg.variant_key();
222-
lib_->update(std::move(key_seg), opts_);
222+
lib_->update(key_seg, opts_);
223223
return k;
224224
}
225225
};
@@ -305,9 +305,9 @@ struct CopyCompressedTask : BaseTask {
305305
VariantKey copy() {
306306
return std::visit([this](const auto &source_key) {
307307
auto key_seg = lib_->read_sync(source_key);
308-
auto target_key_seg = stream::make_target_key<ClockType>(key_type_, stream_id_, version_id_, source_key, std::move(key_seg.segment()));
308+
auto target_key_seg = stream::make_target_key<ClockType>(key_type_, stream_id_, version_id_, source_key, std::move(*key_seg.segment_ptr()));
309309
auto return_key = target_key_seg.variant_key();
310-
lib_->write(std::move(target_key_seg));
310+
lib_->write(target_key_seg);
311311
return return_key;
312312
}, source_key_);
313313
}
@@ -440,7 +440,7 @@ struct DecodeSegmentTask : BaseTask {
440440
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
441441
variant_key_view(key_seg.variant_key()));
442442

443-
return {key_seg.variant_key(), decode_segment(std::move(key_seg.segment()))};
443+
return {key_seg.variant_key(), decode_segment(*key_seg.segment_ptr())};
444444
}
445445
};
446446

@@ -523,17 +523,10 @@ struct DecodeMetadataTask : BaseTask {
523523

524524
DecodeMetadataTask() = default;
525525

526-
std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>> operator()(storage::KeySegmentPair &&ks) const {
526+
std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>> operator()(storage::KeySegmentPair &&key_seg) const {
527527
ARCTICDB_SAMPLE(ReadMetadataTask, 0)
528-
auto key_seg = std::move(ks);
529528
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));
530-
531-
auto meta = decode_metadata_from_segment(key_seg.segment());
532-
std::pair<VariantKey, std::optional<google::protobuf::Any>> output;
533-
output.first = key_seg.variant_key();
534-
output.second = std::move(meta);
535-
536-
return output;
529+
return std::make_pair<>(key_seg.variant_key(), decode_metadata_from_segment(key_seg.segment()));
537530
}
538531
};
539532

@@ -542,16 +535,15 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {
542535

543536
DecodeTimeseriesDescriptorTask() = default;
544537

545-
std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&ks) const {
538+
std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&key_seg) const {
546539
ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorTask, 0)
547-
auto key_seg = std::move(ks);
548540
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));
549541

550-
auto maybe_desc = decode_timeseries_descriptor(key_seg.segment());
542+
auto maybe_desc = decode_timeseries_descriptor(*key_seg.segment_ptr());
551543

552544
util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
553545
return std::make_pair(
554-
std::move(key_seg.variant_key()),
546+
key_seg.variant_key(),
555547
std::move(*maybe_desc));
556548

557549
}
@@ -562,15 +554,14 @@ struct DecodeMetadataAndDescriptorTask : BaseTask {
562554

563555
DecodeMetadataAndDescriptorTask() = default;
564556

565-
std::tuple<VariantKey, std::optional<google::protobuf::Any>, StreamDescriptor> operator()(storage::KeySegmentPair &&ks) const {
557+
std::tuple<VariantKey, std::optional<google::protobuf::Any>, StreamDescriptor> operator()(storage::KeySegmentPair &&key_seg) const {
566558
ARCTICDB_SAMPLE(ReadMetadataAndDescriptorTask, 0)
567559
ARCTICDB_DEBUG_THROW(5)
568-
auto key_seg = std::move(ks);
569560
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));
570561

571-
auto [any, descriptor] = decode_metadata_and_descriptor_fields(key_seg.segment());
562+
auto [any, descriptor] = decode_metadata_and_descriptor_fields(*key_seg.segment_ptr());
572563
return std::make_tuple(
573-
std::move(key_seg.variant_key()),
564+
key_seg.variant_key(),
574565
std::move(any),
575566
std::move(descriptor)
576567
);
@@ -603,7 +594,7 @@ struct WriteCompressedTask : BaseTask {
603594
ARCTICDB_MOVE_ONLY_DEFAULT(WriteCompressedTask)
604595

605596
folly::Future<folly::Unit> write() {
606-
lib_->write(std::move(kv_));
597+
lib_->write(kv_);
607598
return folly::makeFuture();
608599
}
609600

@@ -628,7 +619,7 @@ struct WriteCompressedBatchTask : BaseTask {
628619

629620
folly::Future<folly::Unit> write() {
630621
for(auto&& kv : kvs_)
631-
lib_->write(std::move(kv));
622+
lib_->write(kv);
632623

633624
return folly::makeFuture();
634625
}

cpp/arcticdb/codec/codec.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,7 @@ void decode_into_memory_segment(
560560
decode_v1(segment, hdr, res, desc);
561561
}
562562

563-
SegmentInMemory decode_segment(Segment&& s) {
564-
auto segment = std::move(s);
563+
SegmentInMemory decode_segment(Segment& segment) {
565564
auto &hdr = segment.header();
566565
ARCTICDB_TRACE(log::codec(), "Decoding descriptor: {}", segment.descriptor());
567566
auto descriptor = segment.descriptor();

cpp/arcticdb/codec/codec.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ EncodedFieldCollection decode_encoded_fields(
5757
const uint8_t* data,
5858
const uint8_t* begin ARCTICDB_UNUSED);
5959

60-
SegmentInMemory decode_segment(Segment&& segment);
60+
SegmentInMemory decode_segment(Segment& segment);
6161

6262
void decode_into_memory_segment(
6363
const Segment& segment,

cpp/arcticdb/codec/python_bindings.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ Segment encode_segment(SegmentInMemory segment_in_memory, const py::object &opts
103103
}
104104

105105
SegmentInMemory decode_python_segment(Segment& segment) {
106-
return decode_segment(std::move(segment));
106+
return decode_segment(segment);
107107
}
108108

109109
class BufferPairDataSink {

cpp/arcticdb/codec/test/test_codec.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ TYPED_TEST(SegmentStringEncodingTest, EncodeSingleString) {
319319
constexpr EncodingVersion encoding_version = TypeParam::value;
320320
Segment seg = encode_dispatch(s.clone(), opt, encoding_version);
321321

322-
SegmentInMemory res = decode_segment(std::move(seg));
322+
SegmentInMemory res = decode_segment(seg);
323323
ASSERT_EQ(copy.string_at(0, 1), res.string_at(0, 1));
324324
ASSERT_EQ(std::string("happy"), res.string_at(0, 1));
325325
}
@@ -346,7 +346,7 @@ TYPED_TEST(SegmentStringEncodingTest, EncodeStringsBasic) {
346346
constexpr EncodingVersion encoding_version = TypeParam::value;
347347
Segment seg = encode_dispatch(SegmentInMemory{s}, opt, encoding_version);
348348

349-
SegmentInMemory res = decode_segment(std::move(seg));
349+
SegmentInMemory res = decode_segment(seg);
350350
ASSERT_EQ(copy.string_at(0, 1), res.string_at(0, 1));
351351
ASSERT_EQ(std::string("happy"), res.string_at(0, 1));
352352
ASSERT_EQ(copy.string_at(1, 3), res.string_at(1, 3));

cpp/arcticdb/pipeline/read_frame.cpp

+4-6
Original file line numberDiff line numberDiff line change
@@ -488,12 +488,11 @@ void check_data_left_for_subsequent_fields(
488488
void decode_into_frame_static(
489489
SegmentInMemory &frame,
490490
PipelineContextRow &context,
491-
Segment &&s,
491+
const Segment& seg,
492492
const DecodePathData& shared_data,
493493
std::any& handler_data,
494494
const ReadQuery& read_query,
495495
const ReadOptions& read_options) {
496-
auto seg = std::move(s);
497496
ARCTICDB_SAMPLE_DEFAULT(DecodeIntoFrame)
498497
const uint8_t *data = seg.buffer().data();
499498
const uint8_t *begin = data;
@@ -626,14 +625,13 @@ void handle_type_promotion(
626625
void decode_into_frame_dynamic(
627626
SegmentInMemory& frame,
628627
PipelineContextRow& context,
629-
Segment&& s,
628+
const Segment& seg,
630629
const DecodePathData& shared_data,
631630
std::any& handler_data,
632631
const ReadQuery& read_query,
633632
const ReadOptions& read_options
634633
) {
635634
ARCTICDB_SAMPLE_DEFAULT(DecodeIntoFrame)
636-
auto seg = std::move(s);
637635
const uint8_t *data = seg.buffer().data();
638636
const uint8_t *begin = data;
639637
const uint8_t *end = begin + seg.buffer().bytes();
@@ -887,9 +885,9 @@ folly::Future<SegmentInMemory> fetch_data(
887885
[row=row, frame=frame, dynamic_schema=dynamic_schema, shared_data, &handler_data, read_query, read_options](auto &&ks) mutable {
888886
auto key_seg = std::forward<storage::KeySegmentPair>(ks);
889887
if(dynamic_schema) {
890-
decode_into_frame_dynamic(frame, row, std::move(key_seg.segment()), shared_data, handler_data, read_query, read_options);
888+
decode_into_frame_dynamic(frame, row, key_seg.segment(), shared_data, handler_data, read_query, read_options);
891889
} else {
892-
decode_into_frame_static(frame, row, std::move(key_seg.segment()), shared_data, handler_data, read_query, read_options);
890+
decode_into_frame_static(frame, row, key_seg.segment(), shared_data, handler_data, read_query, read_options);
893891
}
894892

895893
return key_seg.variant_key();

cpp/arcticdb/pipeline/read_frame.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,16 @@ folly::Future<SegmentInMemory> fetch_data(
8585
void decode_into_frame_static(
8686
SegmentInMemory &frame,
8787
PipelineContextRow &context,
88-
Segment &&seg,
88+
const Segment& seg,
8989
const DecodePathData& shared_data,
9090
std::any& handler_data,
9191
const ReadQuery& read_query,
9292
const ReadOptions& read_options);
9393

9494
void decode_into_frame_dynamic(
95-
SegmentInMemory &frame,
95+
const SegmentInMemory &frame,
9696
PipelineContextRow &context,
97-
Segment &&seg,
97+
const Segment& seg,
9898
const DecodePathData& shared_data,
9999
std::any& handler_data,
100100
const ReadQuery& read_query,

cpp/arcticdb/storage/azure/azure_client_impl.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ Azure::Storage::Blobs::BlobClientOptions RealAzureClient::get_client_options(con
4747

4848
void RealAzureClient::write_blob(
4949
const std::string& blob_name,
50-
Segment&& segment,
50+
Segment& segment,
5151
const Azure::Storage::Blobs::UploadBlockBlobFromOptions& upload_option,
5252
unsigned int request_timeout) {
5353

cpp/arcticdb/storage/azure/azure_client_impl.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RealAzureClient : public AzureClientWrapper {
2626

2727
void write_blob(
2828
const std::string& blob_name,
29-
Segment&& segment,
29+
Segment& segment,
3030
const Azure::Storage::Blobs::UploadBlockBlobFromOptions& upload_option,
3131
unsigned int request_timeout) override;
3232

cpp/arcticdb/storage/azure/azure_client_interface.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class AzureClientWrapper {
5050
using Config = arcticdb::proto::azure_storage::Config;
5151
virtual void write_blob(
5252
const std::string& blob_name,
53-
Segment&& segment,
53+
Segment& segment,
5454
const Azure::Storage::Blobs::UploadBlockBlobFromOptions& upload_option,
5555
unsigned int request_timeout) = 0;
5656

0 commit comments

Comments
 (0)