Skip to content

Conversation

@MrD4rkne
Copy link

@MrD4rkne MrD4rkne commented Dec 20, 2025

In this PR we introduce new rpc_streaming job - with STREAM_UNIDIRECTIONAL and STREAM_BIDIRECTIONAL verbs.

Overview

The streaming job uses Seastar’s RPC streaming API by continuously sending payloads from workers to the server, optionally receiving responses on a return stream. It is intended for measuring end-to-end throughput.

Similar to the rpc job's rpc_verb::write behavior but replaces per-payload RPC calls with the streaming API and doesn't wait for response from the server between writes.

Metrics

  • messages - total number of payload_t that was put by workers.
  • throughput - estimated number of bytes send per second - _total_messages * _payload_size_bytes / _total_duration.count(), measured in kB/s.
  • messages per second - messages / _total_duration.count()

Benchmark

Below you can find a simple benchmark that was run on potwor2 for 30s machine with:

  • client: cpuset=0-7
  • server: cpuset=8-15
        parallelism: 10
        shares: 100
        payload: 64kB

All configs:
suite.yaml

rpc_streaming::bidirectional

image image

rpc_streaming::unidirectional

image image

rpc::write

image

rpc::write doesn't produce throughput metric

Client

In client mode app:

  1. Connects to the server.
  2. Spawns parallelism workers, each starting with own delay if sleep_time was specified.
  3. Each worker
    • Creates rpc::sink<payload_t>, send it to the server and receives a corresponding source<uint64_t>
    • Performs the streaming phase:
      • Streams payload bytes in a loop through the sink until duration elapses.
      • For bidirectional streaming, simultaneously reads responses from the received source.
    • Waits for the server to close it's sink so the client can shut down the connection safely.

Server

  • For each STREAM_UNIDIRECTIONAL or STREAM_BIDIRECTIONAL call:

    1. Creates a rpc::sink<uint64_t>
    2. Schedules background processing for the rpc::source<payload_t> received from the client.
    3. Returns the rpc::source<uint64_t> associated with the created rpc::sink<uint64_t>, used for streaming back the number of bytes received.
  • For the bidirectional verb, the server sends back, on its stream, the number of bytes received for each chunk of data.

  • After client closes their rpc::sink<payload_t>, the server writes the final total number of bytes into corresponding rpc::sink<uint64_t> and then closes it.

@MrD4rkne MrD4rkne marked this pull request as ready for review December 20, 2025 20:04
@Deexie Deexie requested review from Deexie, avikivity and xemul December 20, 2025 20:10
@Deexie Deexie assigned MrD4rkne and unassigned MrD4rkne Dec 22, 2025
@witek-formanski witek-formanski force-pushed the test/extend-rpc-tester-with-streaming branch from a938035 to f557f81 Compare January 8, 2026 13:43
@witek-formanski
Copy link

witek-formanski commented Jan 8, 2026

Changelog

Requested changes

  • coroutines instead of continuations (for readability)
  • throughput in bytes instead of kilobytes
  • bidirectional server responds with data, not number of bytes received so far

Additional changes

  • payload in constructor
  • refactored run() method (extracted run_worker_with_delay())
  • slightly modified comments and descriptions
  • unidirectional server doesn't send anything to client (sending EOS when ending stream is sufficient to avoid bug noticed before)

Benchmark after changes

rpc_streaming::bidirectional
image
image

rpc_streaming::unidirectional
image
image

rpc::write
image

@MrD4rkne MrD4rkne requested review from avikivity and xemul January 8, 2026 13:56
, _ccfg(ccfg)
, _rpc(rpc)
, _stop(std::chrono::steady_clock::now() + _cfg.duration)
, _payload_size_bytes(_cfg.payload),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: comma in the next line

, _payload_size_bytes(_cfg.payload),
_payload(_cfg.payload / sizeof(payload_t::value_type), 0) {
if (_cfg.verb == "bidirectional") {
_call = [this] (unsigned worker_id, const payload_t& payload) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We usually do the following indentation:

name()
        : _a()
        , _b()
{
    do_sth();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Twice before params initialization, once before instructions)


private:
future<> stream_data(rpc::sink<payload_t> sink, const payload_t& payload) {
return do_with(std::move(sink), [this, &payload] (rpc::sink<payload_t>& sink) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a coroutine here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to flush and close the sink even on error. Can it be done easier?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use try-catch clause

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about:

    template<typename T>
    static future<> flush_and_close(rpc::sink<T>& sink){
        try {
            co_await sink.flush();
        } catch (...) {
        }
        try {
            co_await sink.close();
        } catch (...) {
        }
    }

    future<> stream_data(rpc::sink<payload_t> sink, const payload_t& payload) {
        auto d = defer([&sink] () noexcept {
            seastar::async([s = std::move(sink)] () mutable {
                flush_and_close(s).get();
            }).handle_exception([](auto) {});
        });
        
        while (std::chrono::steady_clock::now() <= _stop) {
            ++_total_messages;
            co_await sink(payload);
            if (_cfg.sleep_time) {
                co_await seastar::sleep(std::chrono::duration_cast<std::chrono::nanoseconds>(*_cfg.sleep_time));
            }
        }
    }

is it viable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, we're ignoring future here (::async)...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.close() shouldn't cause exceptions if you flush() before, so the second try is unnecessary.

Creating a thread should be reserved for top-level operations so their numbers can be kept in check (threads are heavyweight).

You can keep it simple - a try/catch around the outer loop + flush, capturing the exception, then a close, then returning the exception if any.

See utils/closeable.hh, with_closeable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we will check it out

static future<> process_bi_source(rpc::source<payload_t> source, rpc::sink<payload_t> sink) {
uint64_t total_messages = 0, total_payload = 0;

co_await repeat([&source, &sink, &total_messages, &total_payload] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to while loop and co_awaits

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about #3173 (comment) ?
We tried to keep continuations in the loop for streaming/consuming the data, and co-routines for everything else

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be a continuation chain but then you don't need to co_await repeat as it doesn't give much

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xemul @avikivity I'm not sure about this one. Which approach do you suggest here (as it's in the main path):

  1. The current one: couroutine in the outer scope + repeat
  2. Only continuation chain in the whole process_bi_source
  3. Only coroutines?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an rpc tester, there's no reason not to coroutines everywhere, it's the cleanest option and the small performance hit won't matter.

static future<> process_uni_source(rpc::source<payload_t> source, rpc::sink<uint64_t> sink) {
uint64_t total_messages = 0, total_payload = 0;

co_await repeat([&source, &sink, &total_messages, &total_payload] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, if you are using coroutines, use their whole potential

@avikivity
Copy link
Member

Are those results repeatable? The dependency on shard count looks random.

@MrD4rkne
Copy link
Author

Are those results repeatable? The dependency on shard count looks random.

they tend to have some deviations, but we suspect it's because of server's load (not only we're using it). We can run benchmarks and plot it with standard deviation. Would you want to see such charts?

@avikivity
Copy link
Member

Are those results repeatable? The dependency on shard count looks random.

they tend to have some deviations, but we suspect it's because of server's load (not only we're using it). We can run benchmarks and plot it with standard deviation. Would you want to see such charts?

Prefer to see known stable results.

@witek-formanski
Copy link

The dependency on shard count looks random.

What do you mean by "dependency on shard count"? Do you mean that shards with different ids (in the graphs above from 0 to 7) differ among each other for the same run of rpc_tester? Or something else?

@witek-formanski
Copy link

Are those results repeatable? The dependency on shard count looks random.

they tend to have some deviations, but we suspect it's because of server's load (not only we're using it). We can run benchmarks and plot it with standard deviation. Would you want to see such charts?

Prefer to see known stable results.

We run the tests on a server that is shared among other employees, so the most we can do for stability of the results is to run the tests a couple of times and calculate standard deviation (to minimize chance of results being impacted by other users of the server).

This commit extends the rpc_tester with rpc_streaming job that uses rpc::sink<> and rpc::source<> to stream data between the client and the server.

We introduce verb STREAM_BIDIRECTIONAL:

- Streaming is bidirectional - the client streams the payload_t to the server, the server streams to the client received data back on each read portion of data.

- The client sends the configured number of bytes to the server throught rpc::sink, while simultaneously reading from the rpc::source received from the server.

The server, for each stream, starts a background job to read data from it.
We introduce an unidirectional streaming to the rpc_streaming job.

Client streams the configured number of bytes, while server sums the total data length and keeps the stream to client open, not sending anything. After client closes the stream to server, server closes the one to the client. Client waits for EOF on the stream from the server, then finishes the job.
@witek-formanski witek-formanski force-pushed the test/extend-rpc-tester-with-streaming branch from f557f81 to cca65a3 Compare January 19, 2026 22:53
@witek-formanski
Copy link

witek-formanski commented Jan 19, 2026

Changelog

Changes in code

  • coroutines everywhere instead of continuations
  • style

Different graphs

We changed the way we generate graphs - now we ran each test three times and calculated the standard deviation (plotted in black).

Benchmark after changes

rpc_streaming::bidirectional
image
image

rpc_streaming::unidirectional
image
image

rpc::write
image

}

try {
co_await sink.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those try-catches don't look best. Try this instead:

auto flush_future = co_await coroutine::as_future(sink.flush());
if (flush_future.failed() && !error) [[unlikely]] {
    error = f.get_exception();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: flush_future.failed() is first on purpose. It would complain if future failed and it wasn't checked

}

try {
co_await sink.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

} catch (...) {
}
try {
co_await sink.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants