Skip to content

Commit f6b8933

Browse files
authored
Enhance MockS3Client to support real client delegation and configurable failures (#2158)
#### Reference Issues/PRs ref monday ticket: 7971351691 #### What does this implement or fix? This change introduces several improvements to the MockS3Client: - Add support for wrapping and delegating to a real S3 client - Implement a new `check_failure` method to enable configurable failure simulation for specific buckets through env variables - Update S3 storage initialization to use the new MockS3Client with real client delegation - Modify methods to first check for simulated failures before performing operations The changes allow for more flexible testing scenarios and improved mock storage behavior. #### Any other comments? The intended use is something like: ``` python # Enable failure with config_context("S3Mock.EnableFailures", 1): with config_context_string("S3Mock.FailureBucket", target_bucket_names[0]): time.sleep(5) with config_context_string("S3Mock.FailureBucket", target_bucket_names[1]): time.sleep(5) # In these 5 seconds, all of the targets should have failed time.sleep(5) with config_context_string("S3Mock.FailureBucket", target_bucket_names[0]): time.sleep(5) # continue as usual ``` #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 1478b04 commit f6b8933

File tree

14 files changed

+646
-248
lines changed

14 files changed

+646
-248
lines changed

cpp/arcticdb/CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ set(arcticdb_srcs
495495
storage/mock/s3_mock_client.cpp
496496
storage/s3/s3_storage.cpp
497497
storage/s3/s3_storage_tool.cpp
498+
storage/s3/s3_client_wrapper.cpp
499+
storage/s3/s3_client_wrapper.hpp
498500
storage/storage_factory.cpp
499501
storage/storage_utils.cpp
500502
stream/aggregator.cpp

cpp/arcticdb/storage/python_bindings.cpp

+19-9
Original file line numberDiff line numberDiff line change
@@ -105,23 +105,30 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
105105

106106
enum class S3SettingsPickleOrder : uint32_t {
107107
AWS_AUTH = 0,
108-
AWS_PROFILE = 1
108+
AWS_PROFILE = 1,
109+
USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING = 2
109110
};
110111

111112
py::class_<s3::S3Settings>(storage, "S3Settings")
112-
.def(py::init<s3::AWSAuthMethod, const std::string&>())
113+
.def(py::init<s3::AWSAuthMethod, const std::string&, bool>())
113114
.def(py::pickle(
114115
[](const s3::S3Settings &settings) {
115-
return py::make_tuple(settings.aws_auth(), settings.aws_profile());
116+
return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing());
116117
},
117118
[](py::tuple t) {
118-
util::check(t.size() == 2, "Invalid S3Settings pickle objects");
119-
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(), t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>());
119+
util::check(t.size() == 3, "Invalid S3Settings pickle objects");
120+
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
121+
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
122+
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
123+
);
120124
return settings;
121125
}
122126
))
123127
.def_property_readonly("aws_profile", [](const s3::S3Settings &settings) { return settings.aws_profile(); })
124-
.def_property_readonly("aws_auth", [](const s3::S3Settings &settings) { return settings.aws_auth(); });
128+
.def_property_readonly("aws_auth", [](const s3::S3Settings &settings) { return settings.aws_auth(); })
129+
.def_property_readonly("use_internal_client_wrapper_for_testing", [](const s3::S3Settings &settings) {
130+
return settings.use_internal_client_wrapper_for_testing();
131+
});
125132

126133
py::class_<NativeVariantStorage>(storage, "NativeVariantStorage")
127134
.def(py::init<>())
@@ -130,7 +137,7 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
130137
[](const NativeVariantStorage &settings) {
131138
return util::variant_match(settings.variant(),
132139
[] (const s3::S3Settings& settings) {
133-
return py::make_tuple(settings.aws_auth(), settings.aws_profile());
140+
return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing());
134141
},
135142
[](const auto &) {
136143
util::raise_rte("Invalid native storage setting type");
@@ -139,8 +146,11 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
139146
);
140147
},
141148
[](py::tuple t) {
142-
util::check(t.size() == 2, "Invalid S3Settings pickle objects");
143-
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(), t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>());
149+
util::check(t.size() == 3, "Invalid S3Settings pickle objects");
150+
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
151+
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
152+
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
153+
);
144154
return NativeVariantStorage(std::move(settings));
145155
}
146156
))

cpp/arcticdb/storage/s3/nfs_backed_storage.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#include <arcticdb/storage/s3/s3_client_impl.hpp>
1212
#include <arcticdb/storage/s3/s3_client_interface.hpp>
1313
#include <arcticdb/util/simple_string_hash.hpp>
14-
14+
#include <arcticdb/storage/s3/s3_client_wrapper.hpp>
1515
namespace arcticdb::storage::nfs_backed {
1616

1717
std::string add_suffix_char(const std::string& str) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/* Copyright 2023 Man Group Operations Limited
2+
*
3+
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
4+
*
5+
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
6+
*/
7+
8+
#include <arcticdb/storage/s3/s3_client_interface.hpp>
9+
#include <arcticdb/log/log.hpp>
10+
#include <arcticdb/util/buffer_pool.hpp>
11+
#include <arcticdb/storage/storage_utils.hpp>
12+
#include <arcticdb/storage/s3/s3_client_wrapper.hpp>
13+
14+
#include <aws/s3/S3Errors.h>
15+
16+
namespace arcticdb::storage{
17+
18+
using namespace object_store_utils;
19+
20+
namespace s3 {
21+
22+
std::optional<Aws::S3::S3Error> S3ClientTestWrapper::has_failure_trigger(const std::string& bucket_name) const {
23+
bool static_failures_enabled = ConfigsMap::instance()->get_int("S3ClientTestWrapper.EnableFailures", 0) == 1;
24+
// Check if mock failures are enabled
25+
if (!static_failures_enabled) {
26+
return std::nullopt;
27+
}
28+
29+
// Get target buckets (if not set or "all", affects all buckets)
30+
auto failure_buckets_str = ConfigsMap::instance()->get_string("S3ClientTestWrapper.FailureBucket", "all");
31+
32+
if (failure_buckets_str != "all") {
33+
// Split the comma-separated bucket names and check if current bucket is in the list
34+
std::istringstream bucket_stream(failure_buckets_str);
35+
std::string target_bucket;
36+
bool bucket_found = false;
37+
38+
while (std::getline(bucket_stream, target_bucket, ',')) {
39+
// Trim whitespace
40+
target_bucket.erase(0, target_bucket.find_first_not_of(" \t"));
41+
target_bucket.erase(target_bucket.find_last_not_of(" \t") + 1);
42+
43+
if (target_bucket == bucket_name) {
44+
bucket_found = true;
45+
break;
46+
}
47+
}
48+
49+
if (!bucket_found) {
50+
return std::nullopt;
51+
}
52+
}
53+
54+
// Get error configuration
55+
auto error_code = ConfigsMap::instance()->get_int("S3ClientTestWrapper.ErrorCode", static_cast<int>(Aws::S3::S3Errors::NETWORK_CONNECTION));
56+
auto retryable = ConfigsMap::instance()->get_int("S3ClientTestWrapper.ErrorRetryable", 0) == 1;
57+
58+
auto failure_error_ = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(
59+
static_cast<Aws::S3::S3Errors>(error_code),
60+
"SimulatedFailure",
61+
"Simulated failure from environment variables",
62+
retryable
63+
));
64+
65+
66+
return failure_error_;
67+
}
68+
69+
S3Result<std::monostate> S3ClientTestWrapper::head_object(
70+
const std::string& s3_object_name,
71+
const std::string &bucket_name) const {
72+
auto maybe_error = has_failure_trigger(bucket_name);
73+
if (maybe_error.has_value()) {
74+
return {*maybe_error};
75+
}
76+
77+
78+
return actual_client_->head_object(s3_object_name, bucket_name);
79+
}
80+
81+
S3Result<Segment> S3ClientTestWrapper::get_object(
82+
const std::string &s3_object_name,
83+
const std::string &bucket_name) const {
84+
auto maybe_error = has_failure_trigger(bucket_name);
85+
if (maybe_error.has_value()) {
86+
return {*maybe_error};
87+
}
88+
89+
return actual_client_->get_object(s3_object_name, bucket_name);
90+
}
91+
92+
folly::Future<S3Result<Segment>> S3ClientTestWrapper::get_object_async(
93+
const std::string &s3_object_name,
94+
const std::string &bucket_name) const {
95+
auto maybe_error = has_failure_trigger(bucket_name);
96+
if (maybe_error.has_value()) {
97+
return folly::makeFuture<S3Result<Segment>>({*maybe_error});
98+
}
99+
100+
return actual_client_->get_object_async(s3_object_name, bucket_name);
101+
}
102+
103+
S3Result<std::monostate> S3ClientTestWrapper::put_object(
104+
const std::string &s3_object_name,
105+
Segment &segment,
106+
const std::string &bucket_name,
107+
PutHeader header) {
108+
auto maybe_error = has_failure_trigger(bucket_name);
109+
if (maybe_error.has_value()) {
110+
return {*maybe_error};
111+
}
112+
113+
return actual_client_->put_object(s3_object_name, segment, bucket_name, header);
114+
}
115+
116+
S3Result<DeleteOutput> S3ClientTestWrapper::delete_objects(
117+
const std::vector<std::string>& s3_object_names,
118+
const std::string& bucket_name) {
119+
auto maybe_error = has_failure_trigger(bucket_name);
120+
if (maybe_error.has_value()) {
121+
return {*maybe_error};
122+
}
123+
124+
125+
return actual_client_->delete_objects(s3_object_names, bucket_name);
126+
}
127+
128+
// Using a fixed page size since it's only being used for simple tests.
129+
// If we ever need to configure it we should move it to the s3 proto config instead.
130+
constexpr auto page_size = 10;
131+
S3Result<ListObjectsOutput> S3ClientTestWrapper::list_objects(
132+
const std::string& name_prefix,
133+
const std::string& bucket_name,
134+
const std::optional<std::string>& continuation_token) const {
135+
auto maybe_error = has_failure_trigger(bucket_name);
136+
if (maybe_error.has_value()) {
137+
return {*maybe_error};
138+
}
139+
140+
return actual_client_->list_objects(name_prefix, bucket_name, continuation_token);
141+
}
142+
143+
}
144+
145+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/* Copyright 2023 Man Group Operations Limited
2+
*
3+
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
4+
*
5+
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
6+
*/
7+
8+
#pragma once
9+
10+
#include <aws/s3/S3Client.h>
11+
12+
#include <arcticdb/storage/s3/s3_client_interface.hpp>
13+
#include <arcticdb/storage/s3/s3_client_impl.hpp>
14+
15+
#include <arcticdb/util/preconditions.hpp>
16+
#include <arcticdb/util/pb_util.hpp>
17+
#include <arcticdb/log/log.hpp>
18+
#include <arcticdb/util/buffer_pool.hpp>
19+
20+
#include <arcticdb/storage/object_store_utils.hpp>
21+
#include <arcticdb/storage/storage_utils.hpp>
22+
#include <arcticdb/entity/serialized_key.hpp>
23+
#include <arcticdb/util/configs_map.hpp>
24+
#include <arcticdb/util/composite.hpp>
25+
26+
namespace arcticdb::storage::s3 {
27+
28+
// A wrapper around the actual S3 client which can simulate failures based on the configuration.
29+
// The S3ClientTestWrapper delegates to the real client by default, but can intercept operations
30+
// to simulate failures or track operations for testing purposes.
31+
class S3ClientTestWrapper : public S3ClientInterface {
32+
public:
33+
explicit S3ClientTestWrapper(std::unique_ptr<S3ClientInterface> actual_client) :
34+
actual_client_(std::move(actual_client)) {
35+
}
36+
37+
~S3ClientTestWrapper() override = default;
38+
39+
[[nodiscard]] S3Result<std::monostate> head_object(
40+
const std::string& s3_object_name,
41+
const std::string& bucket_name) const override;
42+
43+
[[nodiscard]] S3Result<Segment> get_object(
44+
const std::string& s3_object_name,
45+
const std::string& bucket_name) const override;
46+
47+
[[nodiscard]] folly::Future<S3Result<Segment>> get_object_async(
48+
const std::string& s3_object_name,
49+
const std::string& bucket_name) const override;
50+
51+
S3Result<std::monostate> put_object(
52+
const std::string& s3_object_name,
53+
Segment& segment,
54+
const std::string& bucket_name,
55+
PutHeader header = PutHeader::NONE) override;
56+
57+
S3Result<DeleteOutput> delete_objects(
58+
const std::vector<std::string>& s3_object_names,
59+
const std::string& bucket_name) override;
60+
61+
S3Result<ListObjectsOutput> list_objects(
62+
const std::string& prefix,
63+
const std::string& bucket_name,
64+
const std::optional<std::string>& continuation_token) const override;
65+
66+
private:
67+
// Returns error if failures are enabled for the given bucket
68+
std::optional<Aws::S3::S3Error> has_failure_trigger(const std::string& bucket_name) const;
69+
70+
std::unique_ptr<S3ClientInterface> actual_client_;
71+
};
72+
73+
}

cpp/arcticdb/storage/s3/s3_settings.hpp

+11-3
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,19 @@ class S3Settings {
3838
bool use_raw_prefix_;
3939
AWSAuthMethod aws_auth_;
4040
std::string aws_profile_;
41+
bool use_internal_client_wrapper_for_testing_;
4142

4243
public:
43-
explicit S3Settings(AWSAuthMethod aws_auth, const std::string& aws_profile) :
44+
explicit S3Settings(AWSAuthMethod aws_auth,
45+
const std::string& aws_profile,
46+
bool use_internal_client_wrapper_for_testing) :
4447
aws_auth_(aws_auth),
45-
aws_profile_(aws_profile) {
48+
aws_profile_(aws_profile),
49+
use_internal_client_wrapper_for_testing_(use_internal_client_wrapper_for_testing) {
4650
}
4751

4852
explicit S3Settings(const arcticc::pb2::s3_storage_pb2::Config& config) :
49-
S3Settings(AWSAuthMethod::DISABLED, "")
53+
S3Settings(AWSAuthMethod::DISABLED, "", false)
5054
{
5155
update(config);
5256
}
@@ -135,6 +139,10 @@ class S3Settings {
135139
return aws_auth_;
136140
}
137141

142+
bool use_internal_client_wrapper_for_testing() const {
143+
return use_internal_client_wrapper_for_testing_;
144+
}
145+
138146
std::string aws_profile() const {
139147
return aws_profile_;
140148
}

cpp/arcticdb/storage/s3/s3_storage.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <arcticdb/log/log.hpp>
1616
#include <arcticdb/storage/s3/s3_api.hpp>
17+
#include <arcticdb/storage/s3/s3_client_wrapper.hpp>
1718
#include <arcticdb/util/buffer_pool.hpp>
1819
#include <arcticdb/storage/object_store_utils.hpp>
1920
#include <arcticdb/storage/storage_options.hpp>
@@ -135,6 +136,11 @@ void S3Storage::create_s3_client(const S3Settings &conf, const Aws::Auth::AWSCre
135136
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using provided auth credentials");
136137
s3_client_ = std::make_unique<S3ClientImpl>(creds, get_s3_config(conf), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, conf.use_virtual_addressing());
137138
}
139+
140+
if (conf.use_internal_client_wrapper_for_testing()){
141+
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using internal client wrapper for testing");
142+
s3_client_ = std::make_unique<S3ClientTestWrapper>(std::move(s3_client_));
143+
}
138144
}
139145

140146
S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const S3Settings &conf) :

0 commit comments

Comments
 (0)