Skip to content

Commit 9322077

Browse files
committed
Get object sizes based on S3's ListObjects output - implement PR review comments
8560764974
1 parent 7739abc commit 9322077

File tree

7 files changed

+128
-45
lines changed

7 files changed

+128
-45
lines changed

cpp/arcticdb/storage/s3/detail-inl.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ ObjectSizes do_calculate_sizes_for_type_impl(
525525
do {
526526
auto list_objects_result = s3_client.list_objects(path_info.key_prefix_, bucket_name, continuation_token);
527527
if (list_objects_result.is_success()) {
528-
auto& output = list_objects_result.get_output();
528+
const auto& output = list_objects_result.get_output();
529529

530530
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Received object list");
531531

cpp/arcticdb/storage/s3/s3_client_impl.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,7 @@ S3Result<ListObjectsOutput> S3ClientImpl::list_objects(
305305
s3_object_sizes.emplace_back(s3_object.GetSize());
306306
}
307307

308-
ListObjectsOutput output = {s3_object_names, s3_object_sizes, next_continuation_token};
309-
return {output};
308+
return {ListObjectsOutput{std::move(s3_object_names), std::move(s3_object_sizes), next_continuation_token}};
310309
}
311310

312311
}

cpp/arcticdb/storage/storage.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class Storage {
137137
visitor(std::move(k));
138138
return false; // keep applying the visitor no matter what
139139
};
140-
do_iterate_type_until_match(key_type, predicate_visitor, prefix);
140+
do_iterate_type_until_match(key_type, predicate_visitor, prefix);
141141
}
142142

143143
[[nodiscard]] virtual bool supports_object_size_calculation() const {
@@ -259,9 +259,9 @@ template<> struct formatter<ObjectSizes> {
259259
constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }
260260

261261
template<typename FormatContext>
262-
auto format(const ObjectSizes &srv, FormatContext &ctx) const {
262+
auto format(const ObjectSizes &sizes, FormatContext &ctx) const {
263263
return fmt::format_to(ctx.out(), "ObjectSizes key_type[{}] count[{}] compressed_size_bytes[{}]",
264-
srv.key_type_, srv.count_, srv.compressed_size_bytes_);
264+
sizes.key_type_, sizes.count_, sizes.compressed_size_bytes_);
265265
}
266266
};
267267
}

cpp/arcticdb/storage/storages.hpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,12 @@ class Storages {
188188
}
189189
}
190190

191-
ObjectSizes get_object_sizes(KeyType key_type, const std::string& prefix) {
191+
ObjectSizes get_object_sizes(KeyType key_type, const std::string& prefix, bool primary_only = true) {
192+
if (primary_only) {
193+
auto storage_sizes = primary().get_object_sizes(key_type, prefix);
194+
return {key_type, storage_sizes.count_, storage_sizes.compressed_size_bytes_};
195+
}
196+
192197
ObjectSizes res{key_type, 0, 0};
193198
for (const auto& storage : storages_) {
194199
auto storage_sizes = storage->get_object_sizes(key_type, prefix);

cpp/arcticdb/version/local_versioned_engine.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -1708,8 +1708,7 @@ std::vector<storage::ObjectSizes> LocalVersionedEngine::scan_object_sizes() {
17081708
sizes.push_back(store->get_object_sizes(key_type, ""));
17091709
});
17101710

1711-
folly::QueuedImmediateExecutor inline_executor;
1712-
return folly::collect(sizes_futs).via(&inline_executor).get();
1711+
return folly::collect(sizes_futs).via(&async::cpu_executor()).get();
17131712
}
17141713

17151714
std::unordered_map<StreamId, std::unordered_map<KeyType, KeySizesInfo>> LocalVersionedEngine::scan_object_sizes_by_stream() {

python/tests/integration/arcticdb/version_store/test_symbol_sizes.py

+78-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
from multiprocessing import Queue, Process
22

33
import pytest
4-
from arcticdb.util.test import sample_dataframe
4+
from arcticdb import LibraryOptions
5+
from arcticdb.encoding_version import EncodingVersion
6+
from arcticdb.util.test import sample_dataframe, config_context_multi
57
from arcticdb_ext.storage import KeyType
8+
import arcticdb_ext.cpp_async as adb_async
69

710

811
def test_symbol_sizes(basic_store):
@@ -105,6 +108,79 @@ def test_scan_object_sizes(arctic_client, lib_name):
105108
assert 500 < res[KeyType.VERSION_REF][1] < 1500
106109

107110

111+
@pytest.mark.parametrize("storage, encoding_version_, num_io_threads, num_cpu_threads", [
112+
("s3", EncodingVersion.V1, 1, 1),
113+
("s3", EncodingVersion.V1, 10, 1),
114+
("s3", EncodingVersion.V1, 1, 10),
115+
])
116+
def test_scan_object_sizes_threading(request, storage, encoding_version_, lib_name, num_io_threads, num_cpu_threads):
117+
"""Some stress testing for scan_object_sizes, particularly against deadlocks. Use a small segment size so that
118+
there is some work to be done in parallel."""
119+
storage_fixture = request.getfixturevalue(storage + "_storage")
120+
arctic_client = storage_fixture.create_arctic(encoding_version=encoding_version_)
121+
try:
122+
with config_context_multi({"VersionStore.NumIOThreads": num_io_threads, "VersionStore.NumCPUThreads": num_cpu_threads}):
123+
adb_async.reinit_task_scheduler()
124+
if num_io_threads:
125+
assert adb_async.io_thread_count() == num_io_threads
126+
if num_cpu_threads:
127+
assert adb_async.cpu_thread_count() == num_cpu_threads
128+
129+
lib = arctic_client.create_library(lib_name, library_options=LibraryOptions(rows_per_segment=5))
130+
basic_store = lib._nvs
131+
132+
df = sample_dataframe(100)
133+
basic_store.write("sym", df)
134+
basic_store.write("sym", df)
135+
136+
sizes = basic_store.version_store.scan_object_sizes()
137+
138+
res = dict()
139+
for s in sizes:
140+
res[s.key_type] = (s.count, s.compressed_size_bytes)
141+
142+
assert KeyType.VERSION in res
143+
assert KeyType.TABLE_INDEX in res
144+
assert KeyType.TABLE_DATA in res
145+
assert KeyType.VERSION_REF in res
146+
finally:
147+
adb_async.reinit_task_scheduler()
148+
149+
150+
@pytest.mark.parametrize("storage, encoding_version_, num_io_threads, num_cpu_threads", [
151+
("s3", EncodingVersion.V1, 1, 1),
152+
("s3", EncodingVersion.V1, 10, 1),
153+
("s3", EncodingVersion.V1, 1, 10),
154+
])
155+
def test_scan_object_sizes_by_stream_threading(request, storage, encoding_version_, lib_name, num_io_threads, num_cpu_threads):
156+
"""Some stress testing for scan_object_sizes, particularly against deadlocks. Use a small segment size so that
157+
there is some work to be done in parallel."""
158+
storage_fixture = request.getfixturevalue(storage + "_storage")
159+
arctic_client = storage_fixture.create_arctic(encoding_version=encoding_version_)
160+
try:
161+
with config_context_multi({"VersionStore.NumIOThreads": num_io_threads, "VersionStore.NumCPUThreads": num_cpu_threads}):
162+
adb_async.reinit_task_scheduler()
163+
if num_io_threads:
164+
assert adb_async.io_thread_count() == num_io_threads
165+
if num_cpu_threads:
166+
assert adb_async.cpu_thread_count() == num_cpu_threads
167+
168+
lib = arctic_client.create_library(lib_name, library_options=LibraryOptions(rows_per_segment=5))
169+
basic_store = lib._nvs
170+
171+
df = sample_dataframe(100)
172+
basic_store.write("sym", df)
173+
basic_store.write("sym", df)
174+
175+
sizes = basic_store.version_store.scan_object_sizes_by_stream()
176+
177+
assert sizes["sym"][KeyType.VERSION].compressed_size < 2000
178+
assert sizes["sym"][KeyType.TABLE_INDEX].compressed_size < 5000
179+
assert sizes["sym"][KeyType.TABLE_DATA].compressed_size < 50_000
180+
finally:
181+
adb_async.reinit_task_scheduler()
182+
183+
108184
@pytest.fixture
109185
def reader_store(basic_store):
110186
return basic_store
@@ -141,7 +217,7 @@ def test_symbol_sizes_concurrent(reader_store, writer_store):
141217
try:
142218
reader.start()
143219
writer.start()
144-
reader.join(1)
220+
reader.join(2)
145221
writer.join(0.001)
146222
finally:
147223
writer.terminate()

python/tests/unit/arcticdb/version_store/test_parallel.py

+38-34
Original file line numberDiff line numberDiff line change
@@ -113,44 +113,48 @@ def test_remove_incomplete(basic_store):
113113
@pytest.mark.parametrize("num_segments_live_during_compaction, num_io_threads, num_cpu_threads", [
114114
(1, 1, 1),
115115
(10, 1, 1),
116+
(1, 10, 1),
116117
(None, None, None)
117118
])
118119
def test_parallel_write(basic_store_tiny_segment, num_segments_live_during_compaction, num_io_threads, num_cpu_threads):
119-
with config_context_multi({"VersionStore.NumSegmentsLiveDuringCompaction": num_segments_live_during_compaction,
120-
"VersionStore.NumIOThreads": num_io_threads,
121-
"VersionStore.NumCPUThreads": num_cpu_threads}):
120+
try:
121+
with config_context_multi({"VersionStore.NumSegmentsLiveDuringCompaction": num_segments_live_during_compaction,
122+
"VersionStore.NumIOThreads": num_io_threads,
123+
"VersionStore.NumCPUThreads": num_cpu_threads}):
124+
adb_async.reinit_task_scheduler()
125+
if num_io_threads:
126+
assert adb_async.io_thread_count() == num_io_threads
127+
if num_cpu_threads:
128+
assert adb_async.cpu_thread_count() == num_cpu_threads
129+
130+
store = basic_store_tiny_segment
131+
sym = "parallel"
132+
store.remove_incomplete(sym)
133+
134+
num_rows = 1111
135+
dtidx = pd.date_range("1970-01-01", periods=num_rows)
136+
test = pd.DataFrame(
137+
{
138+
"uint8": random_integers(num_rows, np.uint8),
139+
"uint32": random_integers(num_rows, np.uint32),
140+
},
141+
index=dtidx,
142+
)
143+
chunk_size = 100
144+
list_df = [test[i : i + chunk_size] for i in range(0, test.shape[0], chunk_size)]
145+
random.shuffle(list_df)
146+
147+
for df in list_df:
148+
store.write(sym, df, parallel=True)
149+
150+
user_meta = {"thing": 7}
151+
store.compact_incomplete(sym, False, False, metadata=user_meta)
152+
vit = store.read(sym)
153+
assert_frame_equal(test, vit.data)
154+
assert vit.metadata["thing"] == 7
155+
assert len(get_append_keys(store, sym)) == 0
156+
finally:
122157
adb_async.reinit_task_scheduler()
123-
if num_io_threads:
124-
assert adb_async.io_thread_count() == num_io_threads
125-
if num_cpu_threads:
126-
assert adb_async.cpu_thread_count() == num_cpu_threads
127-
128-
store = basic_store_tiny_segment
129-
sym = "parallel"
130-
store.remove_incomplete(sym)
131-
132-
num_rows = 1111
133-
dtidx = pd.date_range("1970-01-01", periods=num_rows)
134-
test = pd.DataFrame(
135-
{
136-
"uint8": random_integers(num_rows, np.uint8),
137-
"uint32": random_integers(num_rows, np.uint32),
138-
},
139-
index=dtidx,
140-
)
141-
chunk_size = 100
142-
list_df = [test[i : i + chunk_size] for i in range(0, test.shape[0], chunk_size)]
143-
random.shuffle(list_df)
144-
145-
for df in list_df:
146-
store.write(sym, df, parallel=True)
147-
148-
user_meta = {"thing": 7}
149-
store.compact_incomplete(sym, False, False, metadata=user_meta)
150-
vit = store.read(sym)
151-
assert_frame_equal(test, vit.data)
152-
assert vit.metadata["thing"] == 7
153-
assert len(get_append_keys(store, sym)) == 0
154158

155159

156160
@pytest.mark.parametrize("index, expect_ordered", [

0 commit comments

Comments
 (0)