Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/aws/crt/http/HttpRequestResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ namespace Aws
*/
bool SetBody(const std::shared_ptr<Aws::Crt::Io::InputStream> &body) noexcept;

/**
* Sets an async input stream as the message body
* @param body the async input stream representing the message body
* @return future<bool> indicating success/failure
*/
std::future<bool> SetBody(const std::shared_ptr<Aws::Crt::Io::AsyncInputStream> &body) noexcept;

/**
* Gets the number of headers contained in this request
* @return the number of headers contained in this request
Expand Down Expand Up @@ -101,6 +108,7 @@ namespace Aws
Allocator *m_allocator;
struct aws_http_message *m_message;
std::shared_ptr<Aws::Crt::Io::InputStream> m_bodyStream;
std::shared_ptr<Aws::Crt::Io::AsyncInputStream> m_asyncBodyStream;
};

/**
Expand Down
45 changes: 45 additions & 0 deletions include/aws/crt/io/Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
#include <aws/crt/Exports.h>
#include <aws/crt/RefCounted.h>
#include <aws/crt/Types.h>
#include <aws/io/async_stream.h>
#include <aws/io/stream.h>

#include <future>

namespace Aws
{
namespace Crt
Expand Down Expand Up @@ -192,6 +195,48 @@ namespace Aws
private:
std::shared_ptr<Aws::Crt::Io::IStream> m_stream;
};

/**
* Interface for asynchronous input streams.
* Used for async HTTP request bodies.
*/
class AWS_CRT_CPP_API AsyncInputStream : public std::enable_shared_from_this<AsyncInputStream>,
public RefCounted<AsyncInputStream>
{
public:
virtual ~AsyncInputStream();

AsyncInputStream(const AsyncInputStream &) = delete;
AsyncInputStream &operator=(const AsyncInputStream &) = delete;
AsyncInputStream(AsyncInputStream &&) = delete;
AsyncInputStream &operator=(AsyncInputStream &&) = delete;

explicit operator bool() const noexcept { return IsValid(); }

virtual bool IsValid() const noexcept = 0;

/// @private
aws_async_input_stream *GetUnderlyingStream() noexcept { return &m_underlying_stream; }

protected:
Allocator *m_allocator;
aws_async_input_stream m_underlying_stream;

AsyncInputStream(Aws::Crt::Allocator *allocator = ApiAllocator());

/**
* Asynchronously read into buffer.
* @return future<bool> - true on success (including EOF/no data available), false on error
*/
virtual std::future<bool> ReadImpl(ByteBuf &buffer) noexcept = 0;

private:
static void s_Destroy(aws_async_input_stream *stream);
static aws_future_bool *s_Read(aws_async_input_stream *stream, aws_byte_buf *dest);

static aws_async_input_stream_vtable s_vtable;
};

} // namespace Io
} // namespace Crt
} // namespace Aws
12 changes: 12 additions & 0 deletions source/http/HttpRequestResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,24 @@ namespace Aws
bool HttpMessage::SetBody(const std::shared_ptr<Aws::Crt::Io::InputStream> &body) noexcept
{
m_bodyStream = body;
m_asyncBodyStream = nullptr;
aws_http_message_set_body_stream(
m_message, m_bodyStream && *m_bodyStream ? m_bodyStream->GetUnderlyingStream() : nullptr);

return true;
}

std::future<bool> HttpMessage::SetBody(const std::shared_ptr<Aws::Crt::Io::AsyncInputStream> &body) noexcept
{
m_asyncBodyStream = body;
m_bodyStream = nullptr;
aws_http_message_set_async_body_stream(m_message, nullptr);

std::promise<bool> promise;
promise.set_value(body == nullptr || body->IsValid());
return promise.get_future();
}

size_t HttpMessage::GetHeaderCount() const noexcept
{
return aws_http_message_get_header_count(m_message);
Expand Down
43 changes: 43 additions & 0 deletions source/io/Stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include <aws/crt/StlAllocator.h>
#include <aws/crt/io/Stream.h>
#include <iostream>
#include <thread>

#include <aws/io/future.h>
#include <aws/io/stream.h>

namespace Aws
Expand Down Expand Up @@ -232,6 +234,47 @@ namespace Aws
{
return m_stream->peek();
}

/*** AsyncInputStream implementation ***/

AsyncInputStream::~AsyncInputStream() {}

void AsyncInputStream::s_Destroy(aws_async_input_stream *stream)
{
auto impl = static_cast<AsyncInputStream *>(stream->impl);
impl->ReleaseRef();
}

aws_future_bool *AsyncInputStream::s_Read(aws_async_input_stream *stream, aws_byte_buf *dest)
{
auto impl = static_cast<AsyncInputStream *>(stream->impl);
auto future = aws_future_bool_new(impl->m_allocator);

std::shared_future<bool> cppFuture = impl->ReadImpl(*dest).share();

aws_future_bool_acquire(future);
std::thread(
[future, cppFuture]()
{
bool result = cppFuture.get();
aws_future_bool_set_result(future, result);
aws_future_bool_release(future);
})
.detach();

return future;
}

aws_async_input_stream_vtable AsyncInputStream::s_vtable = {
AsyncInputStream::s_Destroy,
AsyncInputStream::s_Read,
};

AsyncInputStream::AsyncInputStream(Aws::Crt::Allocator *allocator) : m_allocator(allocator)
{
aws_async_input_stream_init_base(&m_underlying_stream, allocator, &s_vtable, this);
}

} // namespace Io
} // namespace Crt
} // namespace Aws
Loading