Skip to content

Commit f19e8e2

Browse files
authored
Only "data" frames affect websocket's read window now (#407)
Fix it so that a websocket with manual_window_management turned off will disable backpressure on its whole channel (previously, we kept backpressure on and just automatically maintained a constant window size). Change it so that the websocket's read window is only affected by "data" frames (TEXT, BINARY, CONTINUATION) and not "control" frames (PING, PONG, CLOSE). The assumption here being that only "data" frames are generating the kind of traffic that the user is forwarding along and needs tight control over. This is similar to how HTTP/2 only counts "data" frames as affecting the window. Add copious documentation, because this stuff is confusing
1 parent 642d877 commit f19e8e2

File tree

7 files changed

+123
-45
lines changed

7 files changed

+123
-45
lines changed

include/aws/http/private/websocket_impl.h

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ struct aws_websocket_client_bootstrap_system_vtable {
7878
int (*aws_http_stream_activate)(struct aws_http_stream *stream);
7979
void (*aws_http_stream_release)(struct aws_http_stream *stream);
8080
struct aws_http_connection *(*aws_http_stream_get_connection)(const struct aws_http_stream *stream);
81+
void (*aws_http_stream_update_window)(struct aws_http_stream *stream, size_t increment_size);
8182
int (*aws_http_stream_get_incoming_response_status)(const struct aws_http_stream *stream, int *out_status);
8283
struct aws_websocket *(*aws_websocket_handler_new)(const struct aws_websocket_handler_options *options);
8384
};

include/aws/http/websocket.h

+38-10
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,17 @@ typedef bool(aws_websocket_on_incoming_frame_begin_fn)(
9797
* Payload data will not be valid after this call, so copy if necessary.
9898
* The payload data is always unmasked at this point.
9999
*
100+
* NOTE: If you created the websocket with `manual_window_management` set true, you must maintain the read window.
101+
* Whenever the read window reaches 0, you will stop receiving anything.
102+
* The websocket's `initial_window_size` determines the starting size of the read window.
103+
* The read window shrinks as you receive the payload from "data" frames (TEXT, BINARY, and CONTINUATION).
104+
* Use aws_websocket_increment_read_window() to increment the window again and keep frames flowing.
105+
* Maintain a larger window to keep up high throughput.
106+
* You only need to worry about the payload from "data" frames.
107+
* The websocket automatically increments the window to account for any
108+
* other incoming bytes, including other parts of a frame (opcode, payload-length, etc)
109+
* and the payload of other frame types (PING, PONG, CLOSE).
110+
*
100111
* Return true to proceed normally. If false is returned, the websocket will read no further data,
101112
* the frame will complete with an error-code, and the connection will close.
102113
*/
@@ -186,8 +197,8 @@ struct aws_websocket_client_connection_options {
186197
struct aws_http_message *handshake_request;
187198

188199
/**
189-
* Initial window size for websocket.
190-
* Required.
200+
* Initial size of the websocket's read window.
201+
* Ignored unless `manual_window_management` is true.
191202
* Set to 0 to prevent any incoming websocket frames until aws_websocket_increment_read_window() is called.
192203
*/
193204
size_t initial_window_size;
@@ -241,11 +252,17 @@ struct aws_websocket_client_connection_options {
241252
/**
242253
* Set to true to manually manage the read window size.
243254
*
244-
* If this is false, the connection will maintain a constant window size.
255+
* If this is false, no backpressure is applied and frames will arrive as fast as possible.
245256
*
246-
* If this is true, the caller must manually increment the window size using aws_websocket_increment_read_window().
247-
* If the window is not incremented, it will shrink by the amount of payload data received. If the window size
248-
* reaches 0, no further data will be received.
257+
* If this is true, then whenever the read window reaches 0 you will stop receiving anything.
258+
* The websocket's `initial_window_size` determines the starting size of the read window.
259+
* The read window shrinks as you receive the payload from "data" frames (TEXT, BINARY, and CONTINUATION).
260+
* Use aws_websocket_increment_read_window() to increment the window again and keep frames flowing.
261+
* Maintain a larger window to keep up high throughput.
262+
* You only need to worry about the payload from "data" frames.
263+
* The websocket automatically increments the window to account for any
264+
* other incoming bytes, including other parts of a frame (opcode, payload-length, etc)
265+
* and the payload of other frame types (PING, PONG, CLOSE).
249266
*/
250267
bool manual_window_management;
251268

@@ -390,10 +407,21 @@ AWS_HTTP_API
390407
int aws_websocket_send_frame(struct aws_websocket *websocket, const struct aws_websocket_send_frame_options *options);
391408

392409
/**
393-
* Manually increment the read window.
394-
* The read window shrinks as payload data is received, and reading stops when its size reaches 0.
395-
* Note that the read window can also be controlled from the aws_websocket_on_incoming_frame_payload_fn(),
396-
* callback, by manipulating the `out_increment_window` argument.
410+
* Manually increment the read window to keep frames flowing.
411+
*
412+
* If the websocket was created with `manual_window_management` set true,
413+
* then whenever the read window reaches 0 you will stop receiving data.
414+
* The websocket's `initial_window_size` determines the starting size of the read window.
415+
* The read window shrinks as you receive the payload from "data" frames (TEXT, BINARY, and CONTINUATION).
416+
* Use aws_websocket_increment_read_window() to increment the window again and keep frames flowing.
417+
* Maintain a larger window to keep up high throughput.
418+
* You only need to worry about the payload from "data" frames.
419+
* The websocket automatically increments the window to account for any
420+
* other incoming bytes, including other parts of a frame (opcode, payload-length, etc)
421+
* and the payload of other frame types (PING, PONG, CLOSE).
422+
*
423+
* If the websocket was created with `manual_window_management` set false, this function does nothing.
424+
*
397425
* This function may be called from any thread.
398426
*/
399427
AWS_HTTP_API

source/websocket.c

+17-5
Original file line numberDiff line numberDiff line change
@@ -1238,6 +1238,9 @@ static int s_handler_process_read_message(
12381238
struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&message->message_data);
12391239
int err;
12401240

1241+
/* At the end of this function we'll bump the window back up by this amount.
1242+
* We start off assuming we'll re-open the window by the whole amount,
1243+
* but this number will go down if we process any payload data that ought to shrink the window */
12411244
websocket->thread_data.incoming_message_window_update = message->message_data.len;
12421245

12431246
AWS_LOGF_TRACE(
@@ -1382,15 +1385,16 @@ static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws
13821385
return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
13831386
}
13841387

1385-
/* If user reduced window_update_size, reduce how much the websocket will update its window */
1386-
if (websocket->manual_window_update) {
1387-
size_t reduce = data.len;
1388+
/* If this is a "data" frame's payload, let the window shrink */
1389+
if (aws_websocket_is_data_frame(websocket->thread_data.current_incoming_frame->opcode) &&
1390+
websocket->manual_window_update) {
1391+
13881392
websocket->thread_data.incoming_message_window_update -= data.len;
13891393
AWS_LOGF_DEBUG(
13901394
AWS_LS_HTTP_WEBSOCKET,
1391-
"id=%p: Incoming payload callback changed window update size, window will shrink by %zu.",
1395+
"id=%p: The read window is shrinking by %zu due to incoming payload from 'data' frame.",
13921396
(void *)websocket,
1393-
reduce);
1397+
data.len);
13941398
}
13951399

13961400
return AWS_OP_SUCCESS;
@@ -1607,6 +1611,14 @@ void aws_websocket_increment_read_window(struct aws_websocket *websocket, size_t
16071611
return;
16081612
}
16091613

1614+
if (!websocket->manual_window_update) {
1615+
AWS_LOGF_DEBUG(
1616+
AWS_LS_HTTP_WEBSOCKET,
1617+
"id=%p: Ignoring window increment. Manual window management (aka read backpressure) is not enabled.",
1618+
(void *)websocket);
1619+
return;
1620+
}
1621+
16101622
/* Schedule a task to do the increment.
16111623
* If task is already scheduled, just increase size to be incremented */
16121624
bool is_midchannel_handler = false;

source/websocket_bootstrap.c

+38-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ static const struct aws_websocket_client_bootstrap_system_vtable s_default_syste
2929
.aws_http_stream_activate = aws_http_stream_activate,
3030
.aws_http_stream_release = aws_http_stream_release,
3131
.aws_http_stream_get_connection = aws_http_stream_get_connection,
32+
.aws_http_stream_update_window = aws_http_stream_update_window,
3233
.aws_http_stream_get_incoming_response_status = aws_http_stream_get_incoming_response_status,
3334
.aws_websocket_handler_new = aws_websocket_handler_new,
3435
};
@@ -94,6 +95,10 @@ static int s_ws_bootstrap_on_handshake_response_header_block_done(
9495
struct aws_http_stream *stream,
9596
enum aws_http_header_block header_block,
9697
void *user_data);
98+
static int s_ws_bootstrap_on_handshake_response_body(
99+
struct aws_http_stream *stream,
100+
const struct aws_byte_cursor *data,
101+
void *user_data);
97102
static void s_ws_bootstrap_on_stream_complete(struct aws_http_stream *stream, int error_code, void *user_data);
98103

99104
int aws_websocket_client_connect(const struct aws_websocket_client_connection_options *options) {
@@ -188,10 +193,19 @@ int aws_websocket_client_connect(const struct aws_websocket_client_connection_op
188193
http_options.socket_options = options->socket_options;
189194
http_options.tls_options = options->tls_options;
190195
http_options.proxy_options = options->proxy_options;
191-
http_options.initial_window_size = 1024; /* Adequate space for response data to trickle in */
192196

193-
/* TODO: websockets has issues if back-pressure is disabled on the whole channel. This should be fixed. */
194-
http_options.manual_window_management = true;
197+
if (options->manual_window_management) {
198+
http_options.manual_window_management = true;
199+
200+
/* Give HTTP handler enough window to comfortably receive the handshake response.
201+
*
202+
* If the upgrade is unsuccessful, the HTTP window will shrink as the response body is received.
203+
* In this case, we'll keep incrementing the window back to its original size so data keeps arriving.
204+
*
205+
* If the upgrade is successful, then the websocket handler is installed, and
206+
* the HTTP handler will take over its own window management. */
207+
http_options.initial_window_size = 1024;
208+
}
195209

196210
http_options.user_data = ws_bootstrap;
197211
http_options.on_setup = s_ws_bootstrap_on_http_setup;
@@ -309,6 +323,7 @@ static void s_ws_bootstrap_on_http_setup(struct aws_http_connection *http_connec
309323
.user_data = ws_bootstrap,
310324
.on_response_headers = s_ws_bootstrap_on_handshake_response_headers,
311325
.on_response_header_block_done = s_ws_bootstrap_on_handshake_response_header_block_done,
326+
.on_response_body = s_ws_bootstrap_on_handshake_response_body,
312327
.on_complete = s_ws_bootstrap_on_stream_complete,
313328
};
314329

@@ -544,6 +559,26 @@ static int s_ws_bootstrap_on_handshake_response_header_block_done(
544559
return AWS_OP_ERR;
545560
}
546561

562+
/**
563+
* Invoked as we receive the body of a failed response.
564+
* This is never invoked if the handshake succeeds.
565+
*/
566+
static int s_ws_bootstrap_on_handshake_response_body(
567+
struct aws_http_stream *stream,
568+
const struct aws_byte_cursor *data,
569+
void *user_data) {
570+
571+
struct aws_websocket_client_bootstrap *ws_bootstrap = user_data;
572+
573+
/* If we're managing the read window...
574+
* bump the HTTP window back to its starting size, so that we keep receiving the whole response. */
575+
if (ws_bootstrap->manual_window_update) {
576+
s_system_vtable->aws_http_stream_update_window(stream, data->len);
577+
}
578+
579+
return AWS_OP_SUCCESS;
580+
}
581+
547582
static void s_ws_bootstrap_on_stream_complete(struct aws_http_stream *stream, int error_code, void *user_data) {
548583
/* Not checking error_code because a stream error ends the HTTP connection.
549584
* We'll deal with finishing setup and/or shutdown from the http-shutdown callback */

tests/CMakeLists.txt

-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ add_test_case(websocket_handler_read_frames_complete_on_shutdown)
206206
add_test_case(websocket_handler_read_halts_if_begin_fn_returns_false)
207207
add_test_case(websocket_handler_read_halts_if_payload_fn_returns_false)
208208
add_test_case(websocket_handler_read_halts_if_complete_fn_returns_false)
209-
add_test_case(websocket_handler_window_reopens_by_default)
210209
add_test_case(websocket_handler_window_manual_increment)
211210
add_test_case(websocket_handler_window_manual_increment_off_thread)
212211
add_test_case(websocket_midchannel_sanity_check)

tests/test_websocket_bootstrap.c

+12
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ static struct aws_http_stream *s_mock_http_connection_make_request(
3030
static int s_mock_http_stream_activate(struct aws_http_stream *stream);
3131
static void s_mock_http_stream_release(struct aws_http_stream *stream);
3232
static struct aws_http_connection *s_mock_http_stream_get_connection(const struct aws_http_stream *stream);
33+
static void s_mock_http_stream_update_window(struct aws_http_stream *stream, size_t increment_size);
3334
static int s_mock_http_stream_get_incoming_response_status(const struct aws_http_stream *stream, int *out_status);
3435
static struct aws_websocket *s_mock_websocket_handler_new(const struct aws_websocket_handler_options *options);
3536

@@ -42,6 +43,7 @@ static const struct aws_websocket_client_bootstrap_system_vtable s_mock_system_v
4243
.aws_http_stream_activate = s_mock_http_stream_activate,
4344
.aws_http_stream_release = s_mock_http_stream_release,
4445
.aws_http_stream_get_connection = s_mock_http_stream_get_connection,
46+
.aws_http_stream_update_window = s_mock_http_stream_update_window,
4547
.aws_http_stream_get_incoming_response_status = s_mock_http_stream_get_incoming_response_status,
4648
.aws_websocket_handler_new = s_mock_websocket_handler_new,
4749
};
@@ -112,6 +114,9 @@ static struct tester {
112114

113115
bool websocket_shutdown_invoked;
114116
int websocket_shutdown_error_code;
117+
118+
/* Track the sum of all calls to aws_http_stream_update_window() */
119+
size_t window_increment_total;
115120
} s_tester;
116121

117122
static int s_tester_init(struct aws_allocator *alloc) {
@@ -308,6 +313,13 @@ static struct aws_http_connection *s_mock_http_stream_get_connection(const struc
308313
return s_mock_http_connection;
309314
}
310315

316+
static void s_mock_http_stream_update_window(struct aws_http_stream *stream, size_t increment_size) {
317+
AWS_FATAL_ASSERT(stream == s_mock_stream);
318+
AWS_FATAL_ASSERT(!s_tester.http_connection_release_called);
319+
AWS_FATAL_ASSERT(!s_tester.http_stream_release_called);
320+
s_tester.window_increment_total += increment_size;
321+
}
322+
311323
static int s_mock_http_stream_get_incoming_response_status(const struct aws_http_stream *stream, int *out_status) {
312324
AWS_FATAL_ASSERT(stream == s_mock_stream);
313325
AWS_FATAL_ASSERT(!s_tester.http_connection_release_called);

tests/test_websocket_handler.c

+17-26
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ static bool s_on_incoming_frame_complete(
271271
static void s_set_readpush_frames(struct tester *tester, struct readpush_frame *frames, size_t num_frames) {
272272
tester->readpush_frames = frames;
273273
tester->num_readpush_frames = num_frames;
274+
tester->readpush_frame_index = 0;
274275
for (size_t i = 0; i < num_frames; ++i) {
275276
struct readpush_frame *frame = &frames[i];
276277
frame->cursor = frame->payload;
@@ -1719,37 +1720,12 @@ TEST_CASE(websocket_handler_read_halts_if_complete_fn_returns_false) {
17191720
return AWS_OP_SUCCESS;
17201721
}
17211722

1722-
TEST_CASE(websocket_handler_window_reopens_by_default) {
1723-
(void)ctx;
1724-
struct tester tester;
1725-
ASSERT_SUCCESS(s_tester_init(&tester, allocator));
1726-
1727-
struct readpush_frame pushing = {
1728-
.payload = aws_byte_cursor_from_c_str("Tore open the shutters and threw up the sash."),
1729-
.def =
1730-
{
1731-
.opcode = AWS_WEBSOCKET_OPCODE_TEXT,
1732-
.fin = true,
1733-
},
1734-
};
1735-
1736-
s_set_readpush_frames(&tester, &pushing, 1);
1737-
ASSERT_SUCCESS(s_do_readpush_all(&tester));
1738-
1739-
testing_channel_drain_queued_tasks(&tester.testing_channel);
1740-
1741-
uint64_t total_frame_size = aws_websocket_frame_encoded_size(&pushing.def);
1742-
ASSERT_UINT_EQUALS(total_frame_size, testing_channel_last_window_update(&tester.testing_channel));
1743-
1744-
ASSERT_SUCCESS(s_tester_clean_up(&tester));
1745-
return AWS_OP_SUCCESS;
1746-
}
1747-
17481723
static int s_window_manual_increment_common(struct aws_allocator *allocator, bool on_thread) {
17491724
struct tester tester;
17501725
s_tester_options.manual_window_update = true;
17511726
ASSERT_SUCCESS(s_tester_init(&tester, allocator));
17521727

1728+
/* Push "data" frame to websocket */
17531729
struct readpush_frame pushing = {
17541730
.payload = aws_byte_cursor_from_c_str("Shrink, then open"),
17551731
.def =
@@ -1777,6 +1753,21 @@ static int s_window_manual_increment_common(struct aws_allocator *allocator, boo
17771753
testing_channel_drain_queued_tasks(&tester.testing_channel);
17781754
ASSERT_UINT_EQUALS(pushing.def.payload_length, testing_channel_last_window_update(&tester.testing_channel));
17791755

1756+
/* Now push a control frame, and ensure the window automatically re-opens by the whole amount */
1757+
struct readpush_frame pushing_control_frame = {
1758+
.payload = aws_byte_cursor_from_c_str("free data"),
1759+
.def = {.opcode = AWS_WEBSOCKET_OPCODE_PONG, .fin = true},
1760+
};
1761+
1762+
s_set_readpush_frames(&tester, &pushing_control_frame, 1);
1763+
ASSERT_SUCCESS(s_do_readpush_all(&tester));
1764+
testing_channel_drain_queued_tasks(&tester.testing_channel);
1765+
1766+
ASSERT_UINT_EQUALS(
1767+
aws_websocket_frame_encoded_size(&pushing_control_frame.def),
1768+
testing_channel_last_window_update(&tester.testing_channel));
1769+
1770+
/* Done */
17801771
ASSERT_SUCCESS(s_tester_clean_up(&tester));
17811772
return AWS_OP_SUCCESS;
17821773
}

0 commit comments

Comments
 (0)