Skip to content

Commit 7bd64b2

Browse files
committed
websocket: Fix websocket data being modelled as a stream internally
Currently WebSocket implementation has the following input data path: input_stream -> websocket parser -> queue -> data_sink -> input_stream wrapper On the output side, the data path is as follows: output_stream wrapper -> data_source -> queue -> websocket serializer -> output_stream The input_stream and output_stream wrappers are what is exposed to the user. This is problematic, because WebSocket is a message-based protocol and streams do not have the concept of messages, they model data as a stream of bytes. More specifically, the current code results in problems on high load, when the stream wrappers start to coalesce and split messages in the write path due to more than one message being available at a time. The solution is to expose data_sink and data_source that are backed by message queues directly to the user.
1 parent 384661a commit 7bd64b2

File tree

4 files changed

+17
-25
lines changed

4 files changed

+17
-25
lines changed

demos/websocket_server_demo.cc

+3-4
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,14 @@ int main(int argc, char** argv) {
4444

4545
return async([port] {
4646
websocket::server ws;
47-
ws.register_handler("echo", [] (input_stream<char>& in,
48-
output_stream<char>& out) {
47+
ws.register_handler("echo", [](data_source& in, data_sink& out) {
4948
return repeat([&in, &out]() {
50-
return in.read().then([&out](temporary_buffer<char> f) {
49+
return in.get().then([&out](temporary_buffer<char> f) {
5150
std::cerr << "f.size(): " << f.size() << "\n";
5251
if (f.empty()) {
5352
return make_ready_future<stop_iteration>(stop_iteration::yes);
5453
} else {
55-
return out.write(std::move(f)).then([&out]() {
54+
return out.put(std::move(f)).then([&out]() {
5655
return out.flush().then([] {
5756
return make_ready_future<stop_iteration>(stop_iteration::no);
5857
});

doc/websocket.md

+3-6
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,16 @@ Here's an example of how to register a simple echo protocol:
2020
```cpp
2121
using namespace seastar;
2222
static experimental::websocket::server ws;
23-
ws.register_handler("echo", [] (input_stream<char>& in, output_stream<char>& out) -> future<> {
23+
ws.register_handler("echo", [] (data_source& in, data_sink& out) -> future<> {
2424
while (true) {
25-
auto buf = co_await in.read();
26-
if (buf.empty()) {
27-
co_return;
28-
}
25+
auto buf = co_await in.get();
2926
co_await out.write(std::move(buf));
3027
co_await out.flush();
3128
}
3229
});
3330
```
3431

35-
Note: the developers should assume that the input stream provides decoded and unmasked data - so the stream should be treated as if it was backed by a TCP socket. Similarly, responses should be sent to the output stream as is, and the WebSocket server implementation will handle its proper serialization, masking and so on.
32+
Note: the developers should assume that the data source provides decoded and unmasked data. Similarly, responses should be sent to the output stream as is, and the WebSocket server implementation will handle its proper serialization, masking and so on.
3633

3734
## Error handling
3835

include/seastar/websocket/common.hh

+5-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace seastar::experimental::websocket {
3232

3333
extern sstring magic_key_suffix;
3434

35-
using handler_t = std::function<future<>(input_stream<char>&, output_stream<char>&)>;
35+
using handler_t = std::function<future<>(data_source&, data_sink&)>;
3636

3737
class server;
3838

@@ -126,9 +126,9 @@ protected:
126126

127127
websocket_parser _websocket_parser;
128128
queue <temporary_buffer<char>> _input_buffer;
129-
input_stream<char> _input;
129+
data_source _input;
130130
queue <temporary_buffer<char>> _output_buffer;
131-
output_stream<char> _output;
131+
data_sink _output;
132132

133133
sstring _subprotocol;
134134
handler_t _handler;
@@ -143,10 +143,8 @@ public:
143143
, _input_buffer{PIPE_SIZE}
144144
, _output_buffer{PIPE_SIZE}
145145
{
146-
_input = input_stream<char>{data_source{
147-
std::make_unique<connection_source_impl>(&_input_buffer)}};
148-
_output = output_stream<char>{data_sink{
149-
std::make_unique<connection_sink_impl>(&_output_buffer)}};
146+
_input = data_source{std::make_unique<connection_source_impl>(&_input_buffer)};
147+
_output = data_sink{std::make_unique<connection_sink_impl>(&_output_buffer)};
150148
}
151149

152150
/*!

tests/unit/websocket_test.cc

+6-8
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,14 @@ future<> test_websocket_handshake_common(std::string subprotocol) {
4545
auto output = sock.output();
4646

4747
websocket::server dummy;
48-
dummy.register_handler(subprotocol, [] (input_stream<char>& in,
49-
output_stream<char>& out) {
48+
dummy.register_handler(subprotocol, [] (data_source& in, data_sink& out) {
5049
return repeat([&in, &out]() {
51-
return in.read().then([&out](temporary_buffer<char> f) {
50+
return in.get().then([&out](temporary_buffer<char> f) {
5251
std::cerr << "f.size(): " << f.size() << "\n";
5352
if (f.empty()) {
5453
return make_ready_future<stop_iteration>(stop_iteration::yes);
5554
} else {
56-
return out.write(std::move(f)).then([&out]() {
55+
return out.put(std::move(f)).then([&out]() {
5756
return out.flush().then([] {
5857
return make_ready_future<stop_iteration>(stop_iteration::no);
5958
});
@@ -115,15 +114,14 @@ future<> test_websocket_handler_registration_common(std::string subprotocol) {
115114

116115
// Setup server
117116
websocket::server ws;
118-
ws.register_handler(subprotocol, [] (input_stream<char>& in,
119-
output_stream<char>& out) {
117+
ws.register_handler(subprotocol, [] (data_source& in, data_sink& out) {
120118
return repeat([&in, &out]() {
121-
return in.read().then([&out](temporary_buffer<char> f) {
119+
return in.get().then([&out](temporary_buffer<char> f) {
122120
std::cerr << "f.size(): " << f.size() << "\n";
123121
if (f.empty()) {
124122
return make_ready_future<stop_iteration>(stop_iteration::yes);
125123
} else {
126-
return out.write(std::move(f)).then([&out]() {
124+
return out.put(std::move(f)).then([&out]() {
127125
return out.flush().then([] {
128126
return make_ready_future<stop_iteration>(stop_iteration::no);
129127
});

0 commit comments

Comments
 (0)