Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 14 additions & 27 deletions runners/s3-benchrunner-c/BenchmarkRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,39 +144,26 @@ BenchmarkRunner::BenchmarkRunner(const BenchmarkConfig &config) : config(config)
{
// If we're uploading, and not using files on disk,
// then generate an in-memory buffer of random data to upload.
// All uploads will use this same buffer, so make it big enough for the largest file.
// We use a small 8 MiB buffer (matching the Java runner) that the upload stream
// loops over repeatedly, rather than allocating a buffer sized to the full upload file.
// This keeps the working set small and cache-friendly, even for large uploads.
if (!config.filesOnDisk)
{
size_t maxUploadSize = 0;
bool hasUpload = false;
for (auto &&task : config.tasks)
if (task.action == "upload")
maxUploadSize = std::max(maxUploadSize, (size_t)task.size);

// Generating randomness is slower then copying memory. Therefore, only fill SOME
// of the buffer with randomness, and fill the rest with copies of that randomness.

// We don't want any parts to be identical.
// Use something that won't fall on a part boundary as we copy it.
const size_t randomBlockSize = std::min((size_t)31415926, maxUploadSize); // approx 30MiB, digits of pi
std::vector<uint8_t> randomBlock(randomBlockSize);
independent_bits_engine<default_random_engine, CHAR_BIT, unsigned char> randEngine;
generate(randomBlock.begin(), randomBlock.end(), randEngine);

// Resize the buffer to the maximum upload size
randomDataForUpload.resize(maxUploadSize);
{
hasUpload = true;
break;
}

// Fill the buffer by repeating the random block
size_t bytesWritten = 0;
while (bytesWritten < maxUploadSize)
if (hasUpload)
{
// Calculate how many bytes to copy in this iteration
size_t bytesToCopy = std::min(randomBlockSize, maxUploadSize - bytesWritten);

// Copy the bytes from the random block to the target buffer
std::copy(
randomBlock.begin(), randomBlock.begin() + bytesToCopy, randomDataForUpload.begin() + bytesWritten);

bytesWritten += bytesToCopy;
// Use 8 MiB to match the Java runner's buffer size (Util.generateRandomData()).
const size_t randomBlockSize = bytesFromMiB(8);
randomDataForUpload.resize(randomBlockSize);
independent_bits_engine<default_random_engine, CHAR_BIT, unsigned char> randEngine;
generate(randomDataForUpload.begin(), randomDataForUpload.end(), randEngine);
}
}
}
Expand Down
100 changes: 96 additions & 4 deletions runners/s3-benchrunner-c/CRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <aws/io/tls_channel_handler.h>
#include <aws/s3/s3_client.h>

#include <algorithm>
#include <cstring>
#include <future>
#include <iomanip>
#include <list>
Expand Down Expand Up @@ -284,6 +286,95 @@ void addHeader(aws_http_message *request, string_view name, string_view value)
aws_http_message_add_header(request, header);
}

// A custom aws_input_stream that loops a small buffer to produce totalSize bytes.
// This mirrors the Java runner's UploadFromRamStream: instead of allocating a buffer
// equal to the full upload size, we reuse a small cache-friendly buffer repeatedly.
struct LoopingUploadStream
{
aws_allocator *alloc;
const uint8_t *data;
size_t dataLen;
uint64_t totalSize;
uint64_t bytesWritten;
};

static int s_looping_stream_seek(aws_input_stream *stream, int64_t offset, enum aws_stream_seek_basis basis)
{
auto *s = reinterpret_cast<LoopingUploadStream *>(stream->impl);
if (basis == AWS_SSB_BEGIN)
s->bytesWritten = (uint64_t)offset;
else if (basis == AWS_SSB_END)
s->bytesWritten = (uint64_t)((int64_t)s->totalSize + offset);
return AWS_OP_SUCCESS;
}

static int s_looping_stream_read(aws_input_stream *stream, aws_byte_buf *dest)
{
auto *s = reinterpret_cast<LoopingUploadStream *>(stream->impl);
while (s->bytesWritten < s->totalSize && dest->len < dest->capacity)
{
uint64_t remaining = s->totalSize - s->bytesWritten;
size_t space = dest->capacity - dest->len;
size_t offset = (size_t)(s->bytesWritten % s->dataLen);
size_t chunk = (size_t)std::min({remaining, (uint64_t)space, (uint64_t)(s->dataLen - offset)});
memcpy(dest->buffer + dest->len, s->data + offset, chunk);
dest->len += chunk;
s->bytesWritten += chunk;
}
return AWS_OP_SUCCESS;
}

static int s_looping_stream_get_status(aws_input_stream *stream, aws_stream_status *status)
{
auto *s = reinterpret_cast<LoopingUploadStream *>(stream->impl);
status->is_end_of_stream = (s->bytesWritten >= s->totalSize);
status->is_valid = true;
return AWS_OP_SUCCESS;
}

static int s_looping_stream_get_length(aws_input_stream *stream, int64_t *out_length)
{
auto *s = reinterpret_cast<LoopingUploadStream *>(stream->impl);
*out_length = (int64_t)s->totalSize;
return AWS_OP_SUCCESS;
}

static aws_input_stream_vtable s_looping_stream_vtable = {
.seek = s_looping_stream_seek,
.read = s_looping_stream_read,
.get_status = s_looping_stream_get_status,
.get_length = s_looping_stream_get_length,
};

static aws_input_stream *aws_input_stream_new_looping(
aws_allocator *alloc,
const uint8_t *data,
size_t dataLen,
uint64_t totalSize)
{
auto *stream = reinterpret_cast<aws_input_stream *>(aws_mem_calloc(alloc, 1, sizeof(aws_input_stream)));
auto *impl = reinterpret_cast<LoopingUploadStream *>(aws_mem_calloc(alloc, 1, sizeof(LoopingUploadStream)));
impl->alloc = alloc; // store allocator so the destructor can use the same one
impl->data = data;
impl->dataLen = dataLen;
impl->totalSize = totalSize;
impl->bytesWritten = 0;
stream->impl = impl;
stream->vtable = &s_looping_stream_vtable;
aws_ref_count_init(
&stream->ref_count,
stream,
[](void *user_data)
{
auto *st = reinterpret_cast<aws_input_stream *>(user_data);
auto *impl = reinterpret_cast<LoopingUploadStream *>(st->impl);
aws_allocator *alloc = impl->alloc; // retrieve the allocator before freeing impl
aws_mem_release(alloc, impl);
aws_mem_release(alloc, st);
});
return stream;
}

Task::Task(CRunner &runner, size_t taskI, FILE *telemetryFile)
: runner(runner), taskI(taskI), config(runner.config.tasks[taskI]), donePromise(),
doneFuture(donePromise.get_future())
Expand Down Expand Up @@ -318,10 +409,11 @@ Task::Task(CRunner &runner, size_t taskI, FILE *telemetryFile)
options.send_filepath = toCursor(config.key);
else
{
// set up input-stream that uploads random data from a buffer
auto randomDataCursor =
aws_byte_cursor_from_array(runner.randomDataForUpload.data(), runner.randomDataForUpload.size());
auto inMemoryStreamForUpload = aws_input_stream_new_from_cursor(runner.alloc, &randomDataCursor);
// Set up a looping input-stream that repeatedly reads from a small buffer
// to produce config.size bytes total. This is more cache-friendly than
// allocating a buffer equal to the full upload size.
inMemoryStreamForUpload = aws_input_stream_new_looping(
Comment thread
sbSteveK marked this conversation as resolved.
runner.alloc, runner.randomDataForUpload.data(), runner.randomDataForUpload.size(), config.size);
aws_http_message_set_body_stream(request, inMemoryStreamForUpload);
aws_input_stream_release(inMemoryStreamForUpload);
}
Expand Down
75 changes: 63 additions & 12 deletions runners/s3-benchrunner-cpp/SdkClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,80 @@ class DownloadToRamNullBuf : public streambuf
}
};

// streambuf used in upload-from-ram tests
// it reads from a pre-existing vector of bytes
// streambuf used in upload-from-ram tests.
// Loops a small buffer to produce totalSize bytes total.
class UploadFromRamBuf : public streambuf
{
char *bufBegin;
char *bufEnd;
uint64_t totalSize;
uint64_t bytesRead;

public:
UploadFromRamBuf(vector<uint8_t> &src) : streambuf()
UploadFromRamBuf(vector<uint8_t> &src, uint64_t totalSize) : streambuf(), totalSize(totalSize), bytesRead(0)
{
char *begin = reinterpret_cast<char *>(src.data());
char *end = begin + src.size();
setg(begin, begin /*next*/, end);
bufBegin = reinterpret_cast<char *>(src.data());
bufEnd = bufBegin + src.size();
setg(bufBegin, bufBegin, bufEnd);
}

protected:
// Called when the get-area is exhausted. Loop back to the start of the buffer
// if we haven't yet produced totalSize bytes.
int_type underflow() override
{
if (bytesRead >= totalSize)
return traits_type::eof();

// Reset get-area to start of buffer
setg(bufBegin, bufBegin, bufEnd);
return traits_type::to_int_type(*bufBegin);
}

// Called for bulk reads. Loop the buffer and respect totalSize.
streamsize xsgetn(char *dest, streamsize count) override
{
streamsize totalRead = 0;
while (totalRead < count && bytesRead < totalSize)
{
// If get-area is exhausted, loop back
if (gptr() == egptr())
setg(bufBegin, bufBegin, bufEnd);

uint64_t remaining = totalSize - bytesRead;
streamsize available = egptr() - gptr();
streamsize toRead = (streamsize)std::min({(uint64_t)(count - totalRead), remaining, (uint64_t)available});
memcpy(dest + totalRead, gptr(), toRead);
gbump((int)toRead);
bytesRead += toRead;
totalRead += toRead;
}
return totalRead;
}

// Called for seeks (e.g. part retries). Reset bytesRead to match the new position.
streampos seekoff(streamoff off, ios_base::seekdir way, ios_base::openmode which) override
{
// Only handle input mode
if (which != ios_base::in)
return pos_type(off_type(-1)); // Seeking not supported for output mode
return pos_type(off_type(-1));

uint64_t newPos = 0;
if (way == ios_base::beg)
setg(eback(), eback() + off, egptr());
newPos = (uint64_t)off;
else if (way == ios_base::cur)
setg(eback(), gptr() + off, egptr());
newPos = bytesRead + (uint64_t)off;
else if (way == ios_base::end)
setg(eback(), egptr() + off, egptr());
newPos = totalSize + (uint64_t)off;

bytesRead = newPos;

// Position the get-area at the correct offset within the looping buffer
size_t bufSize = bufEnd - bufBegin;
size_t offsetInBuf = (size_t)(newPos % bufSize);
setg(bufBegin, bufBegin + offsetInBuf, bufEnd);

return gptr() - eback(); // Return the new position
return streampos(newPos);
}

streampos seekpos(streampos sp, ios_base::openmode which) override
Expand Down Expand Up @@ -181,9 +228,13 @@ class SdkClientRunner : public BenchmarkRunner
}
else
{
this->uploadFromRamBuf = make_unique<UploadFromRamBuf>(runner.randomDataForUpload);
// Loop the small random buffer to produce exactly taskConfig.size bytes.
// SetContentLength tells the SDK the true upload size so it can make
// correct multipart decisions, independent of the buffer size.
this->uploadFromRamBuf = make_unique<UploadFromRamBuf>(runner.randomDataForUpload, taskConfig.size);
auto streamForUpload = make_shared<Aws::IOStream>(this->uploadFromRamBuf.get());
request.SetBody(streamForUpload);
request.SetContentLength((long long)taskConfig.size);
}

auto onPutObjectFinished = [this](
Expand Down
Loading