Skip to content

Commit 9ad7477

Browse files
committed
Async writing works
1 parent 76883c4 commit 9ad7477

File tree

2 files changed

+41
-13
lines changed

2 files changed

+41
-13
lines changed

include/openPMD/toolkit/Aws.hpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace openPMD::internal
1010
{
11-
struct AwsAsyncHandler
11+
struct AwsAsyncCounter
1212
{
1313
std::mutex mutex;
1414
std::condition_variable event;
@@ -21,7 +21,18 @@ struct AwsAsyncHandler
2121
void add_task();
2222
void add_and_notify_result();
2323

24-
~AwsAsyncHandler();
24+
~AwsAsyncCounter();
25+
};
26+
27+
struct AwsAsyncHandler
28+
{
29+
// We can defer std::unique_ptr operations longer than std::shared_ptr
30+
// operations, since no one else has the memory, so use two counters. TODO:
31+
// Add some form of restriction on how long the std::unique_ptr queue may
32+
// become. Currently it can theoretically be spammed ad libitum. Either
33+
// restrict the queue to a configurable length, or add a syncEverything()
34+
// call.
35+
AwsAsyncCounter shared_ptr_operations, unique_ptr_operations;
2536
};
2637

2738
struct ExternalBlockStorageAws : ExternalBlockStorageBackend

src/toolkit/Aws.cpp

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
#include "openPMD/toolkit/Aws.hpp"
2+
#include "openPMD/auxiliary/Memory.hpp"
3+
#include "openPMD/auxiliary/Memory_internal.hpp"
4+
#include "openPMD/auxiliary/Variant.hpp"
25

36
#include <aws/s3/S3Client.h>
47
#include <aws/s3/model/CreateBucketRequest.h>
@@ -33,7 +36,7 @@ struct imemstream : std::iostream
3336

3437
namespace openPMD::internal
3538
{
36-
void AwsAsyncHandler::wait()
39+
void AwsAsyncCounter::wait()
3740
{
3841
std::cerr << "Waiting for remaining tasks. Have " << completion_counter
3942
<< " of " << request_counter << std::endl;
@@ -44,20 +47,20 @@ void AwsAsyncHandler::wait()
4447
std::cerr << "Finished waiting for remaining tasks" << std::endl;
4548
}
4649

47-
void AwsAsyncHandler::add_task()
50+
void AwsAsyncCounter::add_task()
4851
{
4952
this->request_counter++;
5053
}
5154

52-
void AwsAsyncHandler::add_and_notify_result()
55+
void AwsAsyncCounter::add_and_notify_result()
5356
{
5457
std::unique_lock lk(this->mutex);
5558
this->completion_counter++;
5659
lk.unlock();
5760
this->event.notify_all();
5861
}
5962

60-
AwsAsyncHandler::~AwsAsyncHandler()
63+
AwsAsyncCounter::~AwsAsyncCounter()
6164
{
6265
this->wait();
6366
}
@@ -124,14 +127,30 @@ auto ExternalBlockStorageAws::put(
124127
}
125128
else
126129
{
127-
auto &async_handler = *m_async;
130+
auto &async_counter = *std::visit(
131+
auxiliary::overloaded{
132+
[this](auxiliary::WriteBuffer::CopyableUniquePtr const &) {
133+
return &this->m_async->unique_ptr_operations;
134+
},
135+
[this](auxiliary::WriteBuffer::SharedPtr const &) {
136+
return &this->m_async->shared_ptr_operations;
137+
}},
138+
data.as_variant<auxiliary::WriteBufferTypes>());
128139
auto responseReceivedHandler =
129-
[&async_handler](
140+
[&async_counter,
141+
/*
142+
* Need to keep buffers alive until they have been asynchronously
143+
* read. Use the closure captures for this. Wrap the WriteBuffer
144+
* inside a shared_ptr to make the std::function copyable.
145+
*/
146+
keepalive =
147+
std::make_shared<auxiliary::WriteBuffer>(std::move(data))](
130148
const Aws::S3::S3Client *,
131149
const Aws::S3::Model::PutObjectRequest &,
132150
const Aws::S3::Model::PutObjectOutcome &put_outcome,
133151
const std::shared_ptr<const Aws::Client::AsyncCallerContext>
134152
&) {
153+
(void)keepalive;
135154
if (put_outcome.IsSuccess())
136155
{
137156
std::cout
@@ -144,12 +163,10 @@ auto ExternalBlockStorageAws::put(
144163
<< put_outcome.GetError().GetMessage()
145164
<< std::endl;
146165
}
147-
async_handler.add_and_notify_result();
166+
async_counter.add_and_notify_result();
148167
};
149-
async_handler.add_task();
168+
async_counter.add_task();
150169
m_client.PutObjectAsync(put_request, responseReceivedHandler);
151-
// todo replace this
152-
async_handler.wait();
153170
}
154171
return sanitized;
155172
}
@@ -192,7 +209,7 @@ void ExternalBlockStorageAws::sync()
192209
{
193210
return;
194211
}
195-
this->m_async->wait();
212+
this->m_async->shared_ptr_operations.wait();
196213
}
197214

198215
[[nodiscard]] auto ExternalBlockStorageAws::externalStorageLocation() const

0 commit comments

Comments
 (0)