Skip to content

Commit f3e33bb

Browse files
authored
c/c++ runner looping buffer (#112)
1 parent 1ab3fe0 commit f3e33bb

3 files changed

Lines changed: 173 additions & 43 deletions

File tree

runners/s3-benchrunner-c/BenchmarkRunner.cpp

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -144,39 +144,26 @@ BenchmarkRunner::BenchmarkRunner(const BenchmarkConfig &config) : config(config)
144144
{
145145
// If we're uploading, and not using files on disk,
146146
// then generate an in-memory buffer of random data to upload.
147-
// All uploads will use this same buffer, so make it big enough for the largest file.
147+
// We use a small 8 MiB buffer (matching the Java runner) that the upload stream
148+
// loops over repeatedly, rather than allocating a buffer sized to the full upload file.
149+
// This keeps the working set small and cache-friendly, even for large uploads.
148150
if (!config.filesOnDisk)
149151
{
150-
size_t maxUploadSize = 0;
152+
bool hasUpload = false;
151153
for (auto &&task : config.tasks)
152154
if (task.action == "upload")
153-
maxUploadSize = std::max(maxUploadSize, (size_t)task.size);
154-
155-
// Generating randomness is slower then copying memory. Therefore, only fill SOME
156-
// of the buffer with randomness, and fill the rest with copies of that randomness.
157-
158-
// We don't want any parts to be identical.
159-
// Use something that won't fall on a part boundary as we copy it.
160-
const size_t randomBlockSize = std::min((size_t)31415926, maxUploadSize); // approx 30MiB, digits of pi
161-
std::vector<uint8_t> randomBlock(randomBlockSize);
162-
independent_bits_engine<default_random_engine, CHAR_BIT, unsigned char> randEngine;
163-
generate(randomBlock.begin(), randomBlock.end(), randEngine);
164-
165-
// Resize the buffer to the maximum upload size
166-
randomDataForUpload.resize(maxUploadSize);
155+
{
156+
hasUpload = true;
157+
break;
158+
}
167159

168-
// Fill the buffer by repeating the random block
169-
size_t bytesWritten = 0;
170-
while (bytesWritten < maxUploadSize)
160+
if (hasUpload)
171161
{
172-
// Calculate how many bytes to copy in this iteration
173-
size_t bytesToCopy = std::min(randomBlockSize, maxUploadSize - bytesWritten);
174-
175-
// Copy the bytes from the random block to the target buffer
176-
std::copy(
177-
randomBlock.begin(), randomBlock.begin() + bytesToCopy, randomDataForUpload.begin() + bytesWritten);
178-
179-
bytesWritten += bytesToCopy;
162+
// Use 8 MiB to match the Java runner's buffer size (Util.generateRandomData()).
163+
const size_t randomBlockSize = bytesFromMiB(8);
164+
randomDataForUpload.resize(randomBlockSize);
165+
independent_bits_engine<default_random_engine, CHAR_BIT, unsigned char> randEngine;
166+
generate(randomDataForUpload.begin(), randomDataForUpload.end(), randEngine);
180167
}
181168
}
182169
}

runners/s3-benchrunner-c/CRunner.cpp

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include <aws/io/tls_channel_handler.h>
1313
#include <aws/s3/s3_client.h>
1414

15+
#include <algorithm>
16+
#include <cstring>
1517
#include <future>
1618
#include <iomanip>
1719
#include <list>
@@ -284,6 +286,95 @@ void addHeader(aws_http_message *request, string_view name, string_view value)
284286
aws_http_message_add_header(request, header);
285287
}
286288

289+
// A custom aws_input_stream that loops a small buffer to produce totalSize bytes.
290+
// This mirrors the Java runner's UploadFromRamStream: instead of allocating a buffer
291+
// equal to the full upload size, we reuse a small cache-friendly buffer repeatedly.
292+
struct LoopingUploadStream
293+
{
294+
aws_allocator *alloc;
295+
const uint8_t *data;
296+
size_t dataLen;
297+
uint64_t totalSize;
298+
uint64_t bytesWritten;
299+
};
300+
301+
static int s_looping_stream_seek(aws_input_stream *stream, int64_t offset, enum aws_stream_seek_basis basis)
302+
{
303+
auto *s = reinterpret_cast<LoopingUploadStream *>(stream->impl);
304+
if (basis == AWS_SSB_BEGIN)
305+
s->bytesWritten = (uint64_t)offset;
306+
else if (basis == AWS_SSB_END)
307+
s->bytesWritten = (uint64_t)((int64_t)s->totalSize + offset);
308+
return AWS_OP_SUCCESS;
309+
}
310+
311+
static int s_looping_stream_read(aws_input_stream *stream, aws_byte_buf *dest)
312+
{
313+
auto *s = reinterpret_cast<LoopingUploadStream *>(stream->impl);
314+
while (s->bytesWritten < s->totalSize && dest->len < dest->capacity)
315+
{
316+
uint64_t remaining = s->totalSize - s->bytesWritten;
317+
size_t space = dest->capacity - dest->len;
318+
size_t offset = (size_t)(s->bytesWritten % s->dataLen);
319+
size_t chunk = (size_t)std::min({remaining, (uint64_t)space, (uint64_t)(s->dataLen - offset)});
320+
memcpy(dest->buffer + dest->len, s->data + offset, chunk);
321+
dest->len += chunk;
322+
s->bytesWritten += chunk;
323+
}
324+
return AWS_OP_SUCCESS;
325+
}
326+
327+
static int s_looping_stream_get_status(aws_input_stream *stream, aws_stream_status *status)
328+
{
329+
auto *s = reinterpret_cast<LoopingUploadStream *>(stream->impl);
330+
status->is_end_of_stream = (s->bytesWritten >= s->totalSize);
331+
status->is_valid = true;
332+
return AWS_OP_SUCCESS;
333+
}
334+
335+
static int s_looping_stream_get_length(aws_input_stream *stream, int64_t *out_length)
336+
{
337+
auto *s = reinterpret_cast<LoopingUploadStream *>(stream->impl);
338+
*out_length = (int64_t)s->totalSize;
339+
return AWS_OP_SUCCESS;
340+
}
341+
342+
static aws_input_stream_vtable s_looping_stream_vtable = {
343+
.seek = s_looping_stream_seek,
344+
.read = s_looping_stream_read,
345+
.get_status = s_looping_stream_get_status,
346+
.get_length = s_looping_stream_get_length,
347+
};
348+
349+
static aws_input_stream *aws_input_stream_new_looping(
350+
aws_allocator *alloc,
351+
const uint8_t *data,
352+
size_t dataLen,
353+
uint64_t totalSize)
354+
{
355+
auto *stream = reinterpret_cast<aws_input_stream *>(aws_mem_calloc(alloc, 1, sizeof(aws_input_stream)));
356+
auto *impl = reinterpret_cast<LoopingUploadStream *>(aws_mem_calloc(alloc, 1, sizeof(LoopingUploadStream)));
357+
impl->alloc = alloc; // store allocator so the destructor can use the same one
358+
impl->data = data;
359+
impl->dataLen = dataLen;
360+
impl->totalSize = totalSize;
361+
impl->bytesWritten = 0;
362+
stream->impl = impl;
363+
stream->vtable = &s_looping_stream_vtable;
364+
aws_ref_count_init(
365+
&stream->ref_count,
366+
stream,
367+
[](void *user_data)
368+
{
369+
auto *st = reinterpret_cast<aws_input_stream *>(user_data);
370+
auto *impl = reinterpret_cast<LoopingUploadStream *>(st->impl);
371+
aws_allocator *alloc = impl->alloc; // retrieve the allocator before freeing impl
372+
aws_mem_release(alloc, impl);
373+
aws_mem_release(alloc, st);
374+
});
375+
return stream;
376+
}
377+
287378
Task::Task(CRunner &runner, size_t taskI, FILE *telemetryFile)
288379
: runner(runner), taskI(taskI), config(runner.config.tasks[taskI]), donePromise(),
289380
doneFuture(donePromise.get_future())
@@ -318,10 +409,11 @@ Task::Task(CRunner &runner, size_t taskI, FILE *telemetryFile)
318409
options.send_filepath = toCursor(config.key);
319410
else
320411
{
321-
// set up input-stream that uploads random data from a buffer
322-
auto randomDataCursor =
323-
aws_byte_cursor_from_array(runner.randomDataForUpload.data(), runner.randomDataForUpload.size());
324-
auto inMemoryStreamForUpload = aws_input_stream_new_from_cursor(runner.alloc, &randomDataCursor);
412+
// Set up a looping input-stream that repeatedly reads from a small buffer
413+
// to produce config.size bytes total. This is more cache-friendly than
414+
// allocating a buffer equal to the full upload size.
415+
inMemoryStreamForUpload = aws_input_stream_new_looping(
416+
runner.alloc, runner.randomDataForUpload.data(), runner.randomDataForUpload.size(), config.size);
325417
aws_http_message_set_body_stream(request, inMemoryStreamForUpload);
326418
aws_input_stream_release(inMemoryStreamForUpload);
327419
}

runners/s3-benchrunner-cpp/SdkClient.cpp

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,33 +33,80 @@ class DownloadToRamNullBuf : public streambuf
3333
}
3434
};
3535

36-
// streambuf used in upload-from-ram tests
37-
// it reads from a pre-existing vector of bytes
36+
// streambuf used in upload-from-ram tests.
37+
// Loops a small buffer to produce totalSize bytes total.
3838
class UploadFromRamBuf : public streambuf
3939
{
40+
char *bufBegin;
41+
char *bufEnd;
42+
uint64_t totalSize;
43+
uint64_t bytesRead;
44+
4045
public:
41-
UploadFromRamBuf(vector<uint8_t> &src) : streambuf()
46+
UploadFromRamBuf(vector<uint8_t> &src, uint64_t totalSize) : streambuf(), totalSize(totalSize), bytesRead(0)
4247
{
43-
char *begin = reinterpret_cast<char *>(src.data());
44-
char *end = begin + src.size();
45-
setg(begin, begin /*next*/, end);
48+
bufBegin = reinterpret_cast<char *>(src.data());
49+
bufEnd = bufBegin + src.size();
50+
setg(bufBegin, bufBegin, bufEnd);
4651
}
4752

4853
protected:
54+
// Called when the get-area is exhausted. Loop back to the start of the buffer
55+
// if we haven't yet produced totalSize bytes.
56+
int_type underflow() override
57+
{
58+
if (bytesRead >= totalSize)
59+
return traits_type::eof();
60+
61+
// Reset get-area to start of buffer
62+
setg(bufBegin, bufBegin, bufEnd);
63+
return traits_type::to_int_type(*bufBegin);
64+
}
65+
66+
// Called for bulk reads. Loop the buffer and respect totalSize.
67+
streamsize xsgetn(char *dest, streamsize count) override
68+
{
69+
streamsize totalRead = 0;
70+
while (totalRead < count && bytesRead < totalSize)
71+
{
72+
// If get-area is exhausted, loop back
73+
if (gptr() == egptr())
74+
setg(bufBegin, bufBegin, bufEnd);
75+
76+
uint64_t remaining = totalSize - bytesRead;
77+
streamsize available = egptr() - gptr();
78+
streamsize toRead = (streamsize)std::min({(uint64_t)(count - totalRead), remaining, (uint64_t)available});
79+
memcpy(dest + totalRead, gptr(), toRead);
80+
gbump((int)toRead);
81+
bytesRead += toRead;
82+
totalRead += toRead;
83+
}
84+
return totalRead;
85+
}
86+
87+
// Called for seeks (e.g. part retries). Reset bytesRead to match the new position.
4988
streampos seekoff(streamoff off, ios_base::seekdir way, ios_base::openmode which) override
5089
{
5190
// Only handle input mode
5291
if (which != ios_base::in)
53-
return pos_type(off_type(-1)); // Seeking not supported for output mode
92+
return pos_type(off_type(-1));
5493

94+
uint64_t newPos = 0;
5595
if (way == ios_base::beg)
56-
setg(eback(), eback() + off, egptr());
96+
newPos = (uint64_t)off;
5797
else if (way == ios_base::cur)
58-
setg(eback(), gptr() + off, egptr());
98+
newPos = bytesRead + (uint64_t)off;
5999
else if (way == ios_base::end)
60-
setg(eback(), egptr() + off, egptr());
100+
newPos = totalSize + (uint64_t)off;
101+
102+
bytesRead = newPos;
103+
104+
// Position the get-area at the correct offset within the looping buffer
105+
size_t bufSize = bufEnd - bufBegin;
106+
size_t offsetInBuf = (size_t)(newPos % bufSize);
107+
setg(bufBegin, bufBegin + offsetInBuf, bufEnd);
61108

62-
return gptr() - eback(); // Return the new position
109+
return streampos(newPos);
63110
}
64111

65112
streampos seekpos(streampos sp, ios_base::openmode which) override
@@ -181,9 +228,13 @@ class SdkClientRunner : public BenchmarkRunner
181228
}
182229
else
183230
{
184-
this->uploadFromRamBuf = make_unique<UploadFromRamBuf>(runner.randomDataForUpload);
231+
// Loop the small random buffer to produce exactly taskConfig.size bytes.
232+
// SetContentLength tells the SDK the true upload size so it can make
233+
// correct multipart decisions, independent of the buffer size.
234+
this->uploadFromRamBuf = make_unique<UploadFromRamBuf>(runner.randomDataForUpload, taskConfig.size);
185235
auto streamForUpload = make_shared<Aws::IOStream>(this->uploadFromRamBuf.get());
186236
request.SetBody(streamForUpload);
237+
request.SetContentLength((long long)taskConfig.size);
187238
}
188239

189240
auto onPutObjectFinished = [this](

0 commit comments

Comments
 (0)