Skip to content

Commit e8078b6

Browse files
authored
WIP websocket handler: reads incoming messages (#44)
1 parent 5e65065 commit e8078b6

File tree

4 files changed

+590
-18
lines changed

4 files changed

+590
-18
lines changed

include/aws/http/websocket.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
struct aws_channel_handler;
2121
struct aws_http_header;
2222

23+
/* TODO: Document lifetime stuff */
24+
/* TODO: Document CLOSE frame behavior (when auto-sent during close, when auto-closed) */
25+
/* TODO: Document auto-pong behavior */
26+
2327
/**
2428
* A websocket connection.
2529
*/

source/websocket.c

Lines changed: 199 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@
3838
/* TODO: Delayed payload works by sending 0-size io_msgs down pipe and trying again when they're compele.
3939
* Do something more efficient? */
4040

41+
/* TODO: don't fire send completion until data written to socket */
42+
43+
/* TODO: stop using the HTTP_PARSE error, give websocket its own error */
44+
4145
enum {
4246
MESSAGE_SIZE_HINT = 16 * 1024,
4347
};
@@ -69,6 +73,11 @@ struct aws_websocket {
6973
struct outgoing_frame *current_outgoing_frame;
7074

7175
struct aws_websocket_decoder decoder;
76+
struct aws_websocket_incoming_frame *current_incoming_frame;
77+
struct aws_websocket_incoming_frame incoming_frame_storage;
78+
79+
/* Amount to increment window after a channel message has been processed. */
80+
size_t incoming_message_window_update;
7281

7382
/* True when no more frames will be read, due to:
7483
* - a CLOSE frame was received
@@ -136,6 +145,7 @@ static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *use
136145
static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data);
137146

138147
static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code);
148+
static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code);
139149
static void s_finish_shutdown(struct aws_websocket *websocket);
140150
static void s_io_message_write_completed(
141151
struct aws_channel *channel,
@@ -144,6 +154,7 @@ static void s_io_message_write_completed(
144154
void *user_data);
145155
static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
146156
static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int error_code);
157+
static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int error_code);
147158
static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code);
148159
static void s_try_write_outgoing_frames(struct aws_websocket *websocket);
149160

@@ -239,6 +250,9 @@ struct aws_channel_handler *aws_websocket_handler_new(const struct aws_websocket
239250

240251
static void s_handler_destroy(struct aws_channel_handler *handler) {
241252
struct aws_websocket *websocket = handler->impl;
253+
assert(!websocket->thread_data.current_outgoing_frame);
254+
assert(!websocket->thread_data.current_incoming_frame);
255+
242256
AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Destroying websocket.", (void *)websocket);
243257

244258
aws_mutex_clean_up(&websocket->synced_data.lock);
@@ -310,13 +324,13 @@ int aws_websocket_send_frame(struct aws_websocket *websocket, const struct aws_w
310324

311325
/* Check for bad input. Log about non-obvious errors. */
312326
if (options->high_priority && aws_websocket_is_data_frame(options->opcode)) {
313-
AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "%p: Data frames cannot be sent as high-priority.", (void *)websocket);
327+
AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Data frames cannot be sent as high-priority.", (void *)websocket);
314328
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
315329
}
316330
if (options->payload_length > 0 && !options->stream_outgoing_payload) {
317331
AWS_LOGF_ERROR(
318332
AWS_LS_HTTP_WEBSOCKET,
319-
"%p: Invalid frame options, payload streaming function required when payload length is non-zero.",
333+
"id=%p: Invalid frame options, payload streaming function required when payload length is non-zero.",
320334
(void *)websocket);
321335
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
322336
}
@@ -753,6 +767,27 @@ static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int err
753767
}
754768
}
755769

770+
static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int error_code) {
771+
assert(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
772+
773+
AWS_LOGF_ERROR(
774+
AWS_LS_HTTP_WEBSOCKET,
775+
"id=%p: Closing websocket due to failure during read, error %d (%s).",
776+
(void *)websocket,
777+
error_code,
778+
aws_error_name(error_code));
779+
780+
websocket->thread_data.is_reading_stopped = true;
781+
782+
/* If there's a current incoming frame, complete it with the specific error code. */
783+
if (websocket->thread_data.current_incoming_frame) {
784+
s_complete_incoming_frame(websocket, error_code);
785+
}
786+
787+
/* Tell channel to shutdown (it's ok to call this redundantly) */
788+
aws_channel_shutdown(websocket->channel_slot->channel, error_code);
789+
}
790+
756791
static int s_handler_shutdown(
757792
struct aws_channel_handler *handler,
758793
struct aws_channel_slot *slot,
@@ -762,6 +797,7 @@ static int s_handler_shutdown(
762797

763798
assert(aws_channel_thread_is_callers_thread(slot->channel));
764799
struct aws_websocket *websocket = handler->impl;
800+
int err;
765801

766802
AWS_LOGF_DEBUG(
767803
AWS_LS_HTTP_WEBSOCKET,
@@ -772,6 +808,8 @@ static int s_handler_shutdown(
772808
free_scarce_resources_immediately);
773809

774810
if (dir == AWS_CHANNEL_DIR_READ) {
811+
/* Shutdown in the read direction is immediate and simple. */
812+
websocket->thread_data.is_reading_stopped = true;
775813
aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately);
776814

777815
} else {
@@ -795,7 +833,7 @@ static int s_handler_shutdown(
795833
.opcode = AWS_WEBSOCKET_OPCODE_CLOSE,
796834
.fin = true,
797835
};
798-
int err = aws_websocket_send_frame(websocket, &close_frame);
836+
err = aws_websocket_send_frame(websocket, &close_frame);
799837
if (err) {
800838
AWS_LOGF_WARN(
801839
AWS_LS_HTTP_WEBSOCKET,
@@ -827,7 +865,11 @@ static void s_finish_shutdown(struct aws_websocket *websocket) {
827865

828866
websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written = false;
829867

830-
/* Cancel all incomplete outgoing frames */
868+
/* Cancel all incomplete frames */
869+
if (websocket->thread_data.current_incoming_frame) {
870+
s_complete_incoming_frame(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
871+
}
872+
831873
if (websocket->thread_data.current_outgoing_frame) {
832874
s_destroy_outgoing_frame(
833875
websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_HTTP_CONNECTION_CLOSED);
@@ -856,8 +898,6 @@ static void s_finish_shutdown(struct aws_websocket *websocket) {
856898
s_destroy_outgoing_frame(websocket, frame, AWS_ERROR_HTTP_CONNECTION_CLOSED);
857899
}
858900

859-
/* TODO: cancel incomplete incoming_frame */
860-
861901
if (websocket->on_connection_shutdown) {
862902
AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Invoking user's shutdown callback.", (void *)websocket);
863903
websocket->on_connection_shutdown(
@@ -876,22 +916,165 @@ static int s_handler_process_read_message(
876916
struct aws_channel_slot *slot,
877917
struct aws_io_message *message) {
878918

879-
(void)handler;
880-
(void)slot;
881-
(void)message;
882-
return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
919+
assert(message);
920+
assert(aws_channel_thread_is_callers_thread(slot->channel));
921+
struct aws_websocket *websocket = handler->impl;
922+
struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&message->message_data);
923+
int err;
924+
925+
websocket->thread_data.incoming_message_window_update = message->message_data.len;
926+
927+
AWS_LOGF_TRACE(
928+
AWS_LS_HTTP_WEBSOCKET,
929+
"id=%p: Begin processing message of size %zu.",
930+
(void *)websocket,
931+
message->message_data.len);
932+
933+
while (cursor.len) {
934+
if (websocket->thread_data.is_reading_stopped) {
935+
goto clean_up;
936+
}
937+
938+
bool frame_complete;
939+
err = aws_websocket_decoder_process(&websocket->thread_data.decoder, &cursor, &frame_complete);
940+
if (err) {
941+
AWS_LOGF_ERROR(
942+
AWS_LS_HTTP_WEBSOCKET,
943+
"id=%p: Message processing failed, error %d (%s). Closing connection.",
944+
(void *)websocket,
945+
aws_last_error(),
946+
aws_error_name(aws_last_error()));
947+
948+
goto error;
949+
}
950+
951+
if (frame_complete) {
952+
s_complete_incoming_frame(websocket, AWS_ERROR_SUCCESS);
953+
}
954+
}
955+
956+
if (websocket->thread_data.incoming_message_window_update > 0) {
957+
err = aws_channel_slot_increment_read_window(slot, websocket->thread_data.incoming_message_window_update);
958+
if (err) {
959+
AWS_LOGF_ERROR(
960+
AWS_LS_HTTP_WEBSOCKET,
961+
"id=%p: Failed to increment read window after message processing, error %d (%s). Closing connection.",
962+
(void *)websocket,
963+
aws_last_error(),
964+
aws_error_name(aws_last_error()));
965+
goto error;
966+
}
967+
}
968+
969+
goto clean_up;
970+
971+
error:
972+
s_shutdown_due_to_read_err(websocket, aws_last_error());
973+
974+
clean_up:
975+
if (cursor.len > 0) {
976+
AWS_LOGF_TRACE(
977+
AWS_LS_HTTP_WEBSOCKET,
978+
"id=%p: Done processing message, final %zu bytes ignored.",
979+
(void *)websocket,
980+
cursor.len);
981+
} else {
982+
AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Done processing message.", (void *)websocket);
983+
}
984+
aws_mem_release(message->allocator, message);
985+
return AWS_OP_SUCCESS;
883986
}
884987

885988
static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *user_data) {
886-
(void)frame;
887-
(void)user_data;
888-
return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
989+
struct aws_websocket *websocket = user_data;
990+
assert(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
991+
assert(!websocket->thread_data.current_incoming_frame);
992+
assert(!websocket->thread_data.is_reading_stopped);
993+
994+
websocket->thread_data.current_incoming_frame = &websocket->thread_data.incoming_frame_storage;
995+
996+
websocket->thread_data.current_incoming_frame->payload_length = frame->payload_length;
997+
websocket->thread_data.current_incoming_frame->opcode = frame->opcode;
998+
websocket->thread_data.current_incoming_frame->fin = frame->fin;
999+
websocket->thread_data.current_incoming_frame->rsv[0] = frame->rsv[0];
1000+
websocket->thread_data.current_incoming_frame->rsv[1] = frame->rsv[1];
1001+
websocket->thread_data.current_incoming_frame->rsv[2] = frame->rsv[2];
1002+
1003+
/* Invoke user cb */
1004+
if (websocket->on_incoming_frame_begin) {
1005+
websocket->on_incoming_frame_begin(
1006+
websocket, websocket->thread_data.current_incoming_frame, websocket->user_data);
1007+
}
1008+
1009+
/* Stop decoding if user callback shut down the connection. */
1010+
if (websocket->thread_data.is_reading_stopped) {
1011+
return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
1012+
}
1013+
1014+
return AWS_OP_SUCCESS;
8891015
}
8901016

8911017
static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data) {
892-
(void)data;
893-
(void)user_data;
894-
return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
1018+
struct aws_websocket *websocket = user_data;
1019+
assert(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1020+
assert(websocket->thread_data.current_incoming_frame);
1021+
assert(!websocket->thread_data.is_reading_stopped);
1022+
1023+
/* Invoke user cb */
1024+
if (websocket->on_incoming_frame_payload) {
1025+
size_t window_update_size = data.len;
1026+
1027+
websocket->on_incoming_frame_payload(
1028+
websocket, websocket->thread_data.current_incoming_frame, data, &window_update_size, websocket->user_data);
1029+
1030+
/* If user reduced window_udpate_size, reduce how much the websocket will update its window */
1031+
size_t reduce = data.len - window_update_size;
1032+
assert(reduce <= websocket->thread_data.incoming_message_window_update);
1033+
websocket->thread_data.incoming_message_window_update -= reduce;
1034+
1035+
AWS_LOGF_DEBUG(
1036+
AWS_LS_HTTP_WEBSOCKET,
1037+
"id=%p: Incoming payload callback changed window update size, window will shrink by %zu.",
1038+
(void *)websocket,
1039+
reduce);
1040+
}
1041+
1042+
/* TODO: pass data to channel handler on right */
1043+
1044+
/* Stop decoding if user callback shut down the connection. */
1045+
if (websocket->thread_data.is_reading_stopped) {
1046+
return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
1047+
}
1048+
1049+
return AWS_OP_SUCCESS;
1050+
}
1051+
1052+
static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code) {
1053+
assert(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1054+
assert(websocket->thread_data.current_incoming_frame);
1055+
1056+
if (error_code == AWS_OP_SUCCESS) {
1057+
/* If this was a CLOSE frame, don't read any more data. */
1058+
if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
1059+
AWS_LOGF_DEBUG(
1060+
AWS_LS_HTTP_WEBSOCKET,
1061+
"id=%p: Close frame received, any further data received will be ignored.",
1062+
(void *)websocket);
1063+
websocket->thread_data.is_reading_stopped = true;
1064+
1065+
/* TODO: auto-close if there's a channel-handler to the right */
1066+
}
1067+
1068+
/* TODO: auto-respond to PING with PONG */
1069+
}
1070+
1071+
/* Invoke user cb */
1072+
if (websocket->on_incoming_frame_complete) {
1073+
websocket->on_incoming_frame_complete(
1074+
websocket, websocket->thread_data.current_incoming_frame, error_code, websocket->user_data);
1075+
}
1076+
1077+
websocket->thread_data.current_incoming_frame = NULL;
8951078
}
8961079

8971080
static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) {

tests/CMakeLists.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ add_test_case(websocket_handler_shutdown_on_zero_refcount)
105105
#add_test_case(websocket_handler_close_from_incoming_payload_cb_stops_processing)
106106
#add_test_case(websocket_handler_close_from_incoming_complete_cb_stops_processing)
107107
#add_test_case(websocket_handler_force_write_error_somehow)
108-
109-
108+
add_test_case(websocket_handler_read_frame)
109+
add_test_case(websocket_handler_read_multiple_frames)
110+
add_test_case(websocket_handler_read_frames_split_across_io_messages)
111+
add_test_case(websocket_handler_read_frames_complete_on_shutdown)
110112

111113
set(TEST_BINARY_NAME ${CMAKE_PROJECT_NAME}-tests)
112114
generate_test_driver(${TEST_BINARY_NAME})

0 commit comments

Comments
 (0)