Skip to content

Commit a2e840d

Browse files
author
Maxim Morozov
committed
Merge remote-tracking branch 'upstream/master'
2 parents 981072d + b3b0273 commit a2e840d

File tree

14 files changed

+198
-66
lines changed

14 files changed

+198
-66
lines changed

cpp/arcticdb/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,7 @@ if(${TEST})
969969
util/test/test_cursor.cpp
970970
util/test/test_decimal.cpp
971971
util/test/test_exponential_backoff.cpp
972+
util/test/test_folly.cpp
972973
util/test/test_format_date.cpp
973974
util/test/test_hash.cpp
974975
util/test/test_id_transformation.cpp

cpp/arcticdb/async/async_store.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -340,14 +340,14 @@ std::vector<RemoveKeyResultType> remove_keys_sync(std::vector<entity::VariantKey
340340
RemoveBatchTask{std::move(keys), library_, opts}();
341341
}
342342

343-
folly::Future<std::vector<VariantKey>> batch_read_compressed(
343+
std::vector<folly::Future<VariantKey>> batch_read_compressed(
344344
std::vector<std::pair<entity::VariantKey, ReadContinuation>> &&keys_and_continuations,
345345
const BatchReadArgs &args) override {
346346
util::check(!keys_and_continuations.empty(), "Unexpected empty keys/continuation vector in batch_read_compressed");
347-
return folly::collect(folly::window(std::move(keys_and_continuations), [this] (auto&& key_and_continuation) {
347+
return folly::window(std::move(keys_and_continuations), [this] (auto&& key_and_continuation) {
348348
auto [key, continuation] = std::forward<decltype(key_and_continuation)>(key_and_continuation);
349349
return read_and_continue(key, library_, storage::ReadKeyOpts{}, std::move(continuation));
350-
}, args.batch_size_)).via(&async::io_executor());
350+
}, args.batch_size_);
351351
}
352352

353353
std::vector<folly::Future<pipelines::SegmentAndSlice>> batch_read_uncompressed(

cpp/arcticdb/async/batch_read_args.hpp

+3-19
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,11 @@
1111

1212
namespace arcticdb {
1313
struct BatchReadArgs {
14-
// The below enum controls where (IO or CPU thread pool) decoding and data processing tasks are executed.
15-
// If IO is used, all work will be done in the IO thread pool which may prevent context switches - this is therefore
16-
// suitable for work that is CPU light.
17-
enum Scheduler {
18-
IO,
19-
CPU
20-
};
21-
22-
BatchReadArgs() :
23-
batch_size_(ConfigsMap::instance()->get_int("BatchRead.BatchSize", 200)),
24-
scheduler_(Scheduler::CPU) {}
14+
BatchReadArgs() = default;
2515

2616
explicit BatchReadArgs(size_t batch_size) :
27-
batch_size_(batch_size),
28-
scheduler_(Scheduler::CPU) {}
29-
30-
explicit BatchReadArgs(Scheduler scheduler) :
31-
batch_size_(ConfigsMap::instance()->get_int("BatchRead.BatchSize", 200)),
32-
scheduler_(scheduler) { }
17+
batch_size_(batch_size) {}
3318

34-
size_t batch_size_;
35-
Scheduler scheduler_;
19+
size_t batch_size_ = ConfigsMap::instance()->get_int("BatchRead.BatchSize", 200);
3620
};
3721
}

cpp/arcticdb/pipeline/read_frame.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,8 @@ folly::Future<SegmentInMemory> fetch_data(
900900
}
901901
}
902902
ARCTICDB_SUBSAMPLE_DEFAULT(DoBatchReadCompressed)
903-
return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{})
903+
return folly::collect(ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{}))
904+
.via(&async::io_executor())
904905
.thenValue([frame](auto&&){ return frame; });
905906
}
906907

cpp/arcticdb/pipeline/write_frame.cpp

+8-2
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,14 @@ std::tuple<stream::StreamSink::PartialKey, SegmentInMemory, FrameSlice> WriteToS
9595
auto& tensor = frame_->field_tensors[slice_.absolute_field_col(col)];
9696
auto opt_error = aggregator_set_data(
9797
fd.type(),
98-
tensor, agg, abs_col, rows_to_write, offset_in_frame, slice_num_for_column_,
99-
regular_slice_size, sparsify_floats_);
98+
tensor,
99+
agg,
100+
abs_col,
101+
rows_to_write,
102+
offset_in_frame,
103+
slice_num_for_column_,
104+
regular_slice_size,
105+
sparsify_floats_);
100106
if (opt_error.has_value()) {
101107
opt_error->raise(fd.name(), offset_in_frame);
102108
}

cpp/arcticdb/python/python_to_tensor_frame.cpp

+11-18
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,10 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) {
145145
// wide type always is 64bits
146146
val_bytes = 8;
147147

148-
// If Numpy has type 'O' then get_value_type above will return type 'BYTES'
149-
// If there is no value, and we can't deduce a type then leave it that way,
150-
// otherwise try to work out whether it was a bytes (string) type or unicode
151148
if (!is_fixed_string_type(val_type) && element_count > 0) {
152149
auto none = py::none{};
153150
auto obj = reinterpret_cast<PyObject **>(arr->data);
154-
bool empty = false;
155-
bool all_nans = false;
151+
bool empty_string_placeholder = false;
156152
PyObject *sample = *obj;
157153
PyObject** current_object = obj;
158154
// Arctic allows both None and NaN to represent a string with no value. We have 3 options:
@@ -163,31 +159,28 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) {
163159
// * In case there is at least one actual string we can sample it and decide the type of the column segment
164160
// based on it
165161
// Note: ValueType::ASCII_DYNAMIC was used when Python 2 was supported. It is no longer supported, and
166-
// we're not expected to enter that branch.
162+
// we're not expected to enter that branch.
167163
if (sample == none.ptr() || is_py_nan(sample)) {
168-
empty = true;
169-
all_nans = true;
164+
empty_string_placeholder = true;
170165
util::check(c_style, "Non contiguous columns with first element as None not supported yet.");
171166
const auto* end = obj + size;
172167
while(current_object < end) {
173-
174-
if(*current_object == none.ptr()) {
175-
all_nans = false;
176-
} else if(is_py_nan(*current_object)) {
177-
empty = false;
178-
} else {
179-
all_nans = false;
180-
empty = false;
168+
if(!(is_py_nan(*current_object) || *current_object == none.ptr())) {
169+
empty_string_placeholder = false;
181170
break;
182171
}
183172
++current_object;
184173
}
185174
if(current_object != end)
186175
sample = *current_object;
187176
}
188-
if (empty && kind == 'O') {
177+
// Column full of NaN values is interpreted differently based on the kind. If kind is object "O" the column
178+
// is assigned a string type if kind is float "f" the column is assigned a float type. This is done in
179+
// order to preserve a legacy behavior of ArcticDB allowing to use both NaN and None as a placeholder for
180+
// missing string values.
181+
if (empty_string_placeholder && kind == 'O') {
189182
val_type = empty_types ? ValueType::EMPTY : ValueType::UTF_DYNAMIC;
190-
} else if(all_nans || is_unicode(sample)){
183+
} else if(is_unicode(sample)) {
191184
val_type = ValueType::UTF_DYNAMIC;
192185
} else if (PYBIND11_BYTES_CHECK(sample)) {
193186
val_type = ValueType::ASCII_DYNAMIC;

cpp/arcticdb/storage/test/in_memory_store.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class InMemoryStore : public Store {
5353
throw std::runtime_error("Not implemented for tests");
5454
}
5555

56-
folly::Future<std::vector<VariantKey>> batch_read_compressed(
56+
std::vector<folly::Future<VariantKey>> batch_read_compressed(
5757
std::vector<std::pair<entity::VariantKey, ReadContinuation>>&&,
5858
const BatchReadArgs&) override {
5959
throw std::runtime_error("Not implemented for tests");

cpp/arcticdb/stream/stream_source.hpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,13 @@ struct StreamSource {
5555

5656
using ReadContinuation = folly::Function<VariantKey(storage::KeySegmentPair &&)>;
5757

58-
virtual folly::Future<std::vector<VariantKey>> batch_read_compressed(
58+
virtual std::vector<folly::Future<VariantKey>> batch_read_compressed(
5959
std::vector<std::pair<entity::VariantKey, ReadContinuation>> &&ks,
60-
const BatchReadArgs &args) = 0;
60+
const BatchReadArgs& args) = 0;
6161

6262
[[nodiscard]] virtual std::vector<folly::Future<bool>> batch_key_exists(
6363
const std::vector<entity::VariantKey> &keys) = 0;
6464

65-
using DecodeContinuation = folly::Function<folly::Unit(SegmentInMemory &&)>;
66-
6765
virtual std::vector<folly::Future<pipelines::SegmentAndSlice>> batch_read_uncompressed(
6866
std::vector<pipelines::RangesAndKey>&& ranges_and_keys,
6967
std::shared_ptr<std::unordered_set<std::string>> columns_to_decode) = 0;

cpp/arcticdb/toolbox/storage_mover.hpp

+15-6
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,18 @@ struct BatchCopier {
122122
++skipped_;
123123
}
124124
}
125-
auto collected_kvs = source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}).get();
126-
if (!collected_kvs.empty()) {
125+
126+
size_t n_keys = keys_to_copy.size();
127+
auto collected_kvs = folly::collect(source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}))
128+
.via(&async::io_executor())
129+
.get();
130+
if (n_keys > 0) {
127131
const size_t bytes_being_copied = std::accumulate(segments_ptr->begin(), segments_ptr->end(), size_t{0}, [] (size_t a, const storage::KeySegmentPair& ks) {
128132
return a + ks.segment().size();
129133
});
130134
target_store_->batch_write_compressed(*segments_ptr.release()).get();
131135
bytes_moved_.fetch_add(bytes_being_copied, std::memory_order_relaxed);
132-
objects_moved_.fetch_add(collected_kvs.size(), std::memory_order_relaxed);
136+
objects_moved_.fetch_add(n_keys, std::memory_order_relaxed);
133137
}
134138
++count_;
135139
if (count_.compare_exchange_strong(logging_frequency, 0)) {
@@ -177,8 +181,11 @@ struct BatchCopier {
177181
}
178182
);
179183
keys_.clear();
180-
auto collected_kvs = source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}).get();
181-
if (!collected_kvs.empty()) {
184+
size_t n_keys = keys_to_copy.size();
185+
auto collected_kvs = folly::collect(source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}))
186+
.via(&async::io_executor())
187+
.get();
188+
if (n_keys > 0) {
182189
bytes_moved_ += std::accumulate(segments.begin(), segments.end(), size_t{0}, [] (size_t a, const storage::KeySegmentPair& ks) {
183190
return a + ks.segment().size();
184191
});
@@ -523,7 +530,9 @@ class ARCTICDB_VISIBILITY_HIDDEN StorageMover {
523530
}
524531

525532
total_copied += copied;
526-
auto keys = source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}).get();
533+
[[maybe_unused]] auto keys = folly::collect(source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}))
534+
.via(&async::io_executor())
535+
.get();
527536
std::erase_if(segments, [](const auto& segment) { return variant_key_type(segment.variant_key()) == KeyType::UNDEFINED; });
528537
util::check(keys.size() == segments.size(), "Keys and segments size mismatch, maybe due to parallel deletes");
529538
write_futs.push_back(target_store_->batch_write_compressed(std::move(segments)));

cpp/arcticdb/util/test/test_folly.cpp

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/* Copyright 2025 Man Group Operations Limited
2+
*
3+
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
4+
*
5+
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
6+
*/
7+
8+
#include <folly/futures/Future.h>
9+
#include <gtest/gtest.h>
10+
11+
/**
12+
* We rely on folly::window continuing to process the collection even after one of the futures throws.
13+
*
14+
* Since this is not clearly tested in Folly itself, we have this regression test in case they break it in future.
15+
*
16+
* At time of writing, the symbol size calculation APIs `test_symbol_sizes.py` rely on this.
17+
*/
18+
TEST(Window, ContinuesOnException) {
19+
using namespace folly;
20+
std::vector<int> ints(1000);
21+
for (int i = 0; i < 1000; i++) {
22+
ints.push_back(i);
23+
}
24+
25+
std::vector<Promise<int>> ps(1000);
26+
27+
auto res = reduce(
28+
window(
29+
ints,
30+
[&ps](int i) {
31+
if (i % 4 == 0) {
32+
throw std::runtime_error("exception should not kill process");
33+
}
34+
return ps[i].getFuture();
35+
},
36+
2),
37+
0,
38+
[](int sum, const Try<int>& b) {
39+
sum += b.hasException<std::exception>() ? 0 : 1;
40+
return sum;
41+
});
42+
43+
for (auto& p : ps) {
44+
p.setValue(0);
45+
}
46+
47+
// 100 / 4 = 250 of the futures threw, 750 completed successfully
48+
EXPECT_EQ(750, std::move(res).get());
49+
}

cpp/arcticdb/version/local_versioned_engine.cpp

+27-12
Original file line numberDiff line numberDiff line change
@@ -1702,15 +1702,35 @@ timestamp LocalVersionedEngine::latest_timestamp(const std::string& symbol) {
17021702
}
17031703

17041704
struct AtomicKeySizesInfo {
1705-
size_t count;
1705+
size_t count{0};
17061706
std::atomic<size_t> compressed_size;
17071707
std::atomic<size_t> uncompressed_size;
17081708
};
17091709

1710+
using KeySizeCalculators = std::vector<std::pair<VariantKey, stream::StreamSource::ReadContinuation>>;
1711+
1712+
static void read_ignoring_key_not_found(Store& store, KeySizeCalculators&& calculators) {
1713+
if (calculators.empty()) {
1714+
return;
1715+
}
1716+
1717+
auto read_futures = store.batch_read_compressed(std::move(calculators), BatchReadArgs{});
1718+
std::vector<folly::Future<folly::Unit>> res;
1719+
res.reserve(read_futures.size());
1720+
for (auto&& fut : std::move(read_futures)) {
1721+
// Ignore some exceptions, someone might be deleting while we scan
1722+
res.push_back(std::move(fut)
1723+
.thenValue([](auto&&) {return folly::Unit{};})
1724+
.thenError(folly::tag_t<storage::KeyNotFoundException>{}, [](auto&&) { return folly::Unit{}; }));
1725+
}
1726+
1727+
folly::collect(res).get();
1728+
}
1729+
17101730
std::unordered_map<KeyType, KeySizesInfo> LocalVersionedEngine::scan_object_sizes() {
17111731
std::unordered_map<KeyType, AtomicKeySizesInfo> sizes;
17121732
foreach_key_type([&store=store(), &sizes=sizes](KeyType key_type) {
1713-
std::vector<std::pair<VariantKey, stream::StreamSource::ReadContinuation>> key_size_calculators;
1733+
KeySizeCalculators key_size_calculators;
17141734
store->iterate_type(key_type, [&key_size_calculators, &sizes, &key_type](const VariantKey&& k) {
17151735
auto& sizes_info = sizes[key_type];
17161736
++sizes_info.count;
@@ -1723,10 +1743,7 @@ std::unordered_map<KeyType, KeySizesInfo> LocalVersionedEngine::scan_object_size
17231743
});
17241744
});
17251745

1726-
if(!key_size_calculators.empty()) {
1727-
store->batch_read_compressed(std::move(key_size_calculators), BatchReadArgs{}).get();
1728-
}
1729-
1746+
read_ignoring_key_not_found(*store, std::move(key_size_calculators));
17301747
});
17311748

17321749
std::unordered_map<KeyType, KeySizesInfo> result;
@@ -1742,10 +1759,10 @@ std::unordered_map<StreamId, std::unordered_map<KeyType, KeySizesInfo>> LocalVer
17421759
auto streams = symbol_list().get_symbols(store());
17431760

17441761
foreach_key_type([&store=store(), &sizes, &mutex](KeyType key_type) {
1745-
std::vector<std::pair<VariantKey, stream::StreamSource::ReadContinuation>> keys;
1762+
KeySizeCalculators key_size_calculators;
17461763

1747-
store->iterate_type(key_type, [&keys, &mutex, &sizes, key_type](const VariantKey&& k){
1748-
keys.emplace_back(std::forward<const VariantKey>(k), [key_type, &sizes, &mutex] (auto&& ks) {
1764+
store->iterate_type(key_type, [&key_size_calculators, &mutex, &sizes, key_type](const VariantKey&& k){
1765+
key_size_calculators.emplace_back(std::forward<const VariantKey>(k), [key_type, &sizes, &mutex] (auto&& ks) {
17491766
auto key_seg = std::forward<decltype(ks)>(ks);
17501767
auto variant_key = key_seg.variant_key();
17511768
auto stream_id = variant_key_id(variant_key);
@@ -1766,9 +1783,7 @@ std::unordered_map<StreamId, std::unordered_map<KeyType, KeySizesInfo>> LocalVer
17661783

17671784
});
17681785

1769-
if (!keys.empty()) {
1770-
store->batch_read_compressed(std::move(keys), BatchReadArgs{}).get();
1771-
}
1786+
read_ignoring_key_not_found(*store, std::move(key_size_calculators));
17721787
});
17731788

17741789
return sizes;

python/tests/hypothesis/arcticdb/test_sort_merge.py

+4
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ def get_append_keys(lib, sym):
127127
@given(df_list=generate_dataframes(COLUMN_DESCRIPTIONS))
128128
def test_sort_merge_static_schema_write(lmdb_library, df_list):
129129
lib = lmdb_library
130+
lib._nvs.version_store.clear()
130131
sym = "test_sort_merge_static_schema_write"
131132
for df in df_list:
132133
lib.write(sym, df, staged=True, validate_index=False)
@@ -150,6 +151,7 @@ def test_sort_merge_static_schema_write(lmdb_library, df_list):
150151
@given(df_list=generate_dataframes(COLUMN_DESCRIPTIONS), initial_df=generate_single_dataframe(COLUMN_DESCRIPTIONS, min_size=1, allow_nat_in_index=False))
151152
def test_sort_merge_static_schema_append(lmdb_library, df_list, initial_df):
152153
lib = lmdb_library
154+
lib._nvs.version_store.clear()
153155
sym = "test_sort_merge_static_schema_append"
154156
initial_df.sort_index(inplace=True)
155157
lib.write(sym, initial_df)
@@ -179,6 +181,7 @@ def test_sort_merge_static_schema_append(lmdb_library, df_list, initial_df):
179181
@given(df_list=generate_dataframes(COLUMN_DESCRIPTIONS))
180182
def test_sort_merge_dynamic_schema_write(lmdb_library_dynamic_schema, df_list):
181183
lib = lmdb_library_dynamic_schema
184+
lib._nvs.version_store.clear()
182185
sym = "test_sort_merge_dynamic_schema_write"
183186
for df in df_list:
184187
lib.write(sym, df, staged=True, validate_index=False)
@@ -203,6 +206,7 @@ def test_sort_merge_dynamic_schema_write(lmdb_library_dynamic_schema, df_list):
203206
@given(df_list=generate_dataframes(COLUMN_DESCRIPTIONS), initial_df=generate_single_dataframe(COLUMN_DESCRIPTIONS, min_size=1, allow_nat_in_index=False))
204207
def test_sort_merge_dynamic_schema_append(lmdb_library_dynamic_schema, df_list, initial_df):
205208
lib = lmdb_library_dynamic_schema
209+
lib._nvs.version_store.clear()
206210
sym = "test_sort_merge_dynamic_schema_append"
207211
initial_df.sort_index(inplace=True)
208212
lib.write(sym, initial_df)

0 commit comments

Comments
 (0)