diff --git a/include/aws/http/private/h2_stream.h b/include/aws/http/private/h2_stream.h index 463a9da77..a6e3a8f07 100644 --- a/include/aws/http/private/h2_stream.h +++ b/include/aws/http/private/h2_stream.h @@ -80,6 +80,7 @@ struct aws_h2_stream { * to keep flow continues. */ int32_t window_size_threshold_to_send_update; + aws_http2_on_remote_end_stream_fn *on_h2_remote_end_stream; /* Only the event-loop thread may touch this data */ struct { diff --git a/include/aws/http/request_response.h b/include/aws/http/request_response.h index 98fb71da3..acdbab0d6 100644 --- a/include/aws/http/request_response.h +++ b/include/aws/http/request_response.h @@ -189,6 +189,24 @@ typedef int( */ typedef int(aws_http_on_incoming_request_done_fn)(struct aws_http_stream *stream, void *user_data); +/** + * Invoked when the remote peer sends END_STREAM on an HTTP/2 stream. + * This is always invoked on the HTTP connection's event-loop thread. + * + * **HTTP/2 ONLY** - This callback is only supported for HTTP/2 request. + * + * This callback is invoked when the remote peer finishes sending by setting the END_STREAM + * flag on the final HEADERS or DATA frame. This indicates that no more data will be received from the + * remote peer for this stream. + * + * Note: If the server sends RST_STREAM instead of END_STREAM, `on_remote_end_stream` will NOT fire, + * but `on_complete` will fire with an error code. + * + * @param stream The HTTP/2 stream + * @param user_data User data provided in aws_http_make_request_options + */ +typedef void(aws_http2_on_remote_end_stream_fn)(struct aws_http_stream *stream, void *user_data); + /** * Invoked when a request/response stream is complete, whether successful or unsuccessful * This is always invoked on the HTTP connection's event-loop thread. @@ -290,6 +308,16 @@ struct aws_http_make_request_options { */ aws_http_on_stream_metrics_fn *on_metrics; + /** + * Invoked when the remote peer sends END_STREAM on an HTTP/2 stream (HTTP/2 ONLY). + * Optional. + * See `aws_http2_on_remote_end_stream_fn`. + * + * This callback fires when the remote peer sends END_STREAM, which happens BEFORE `on_complete`. + * Ignored for HTTP/1.x connections. + */ + aws_http2_on_remote_end_stream_fn *on_h2_remote_end_stream; + /** * Invoked when request/response stream is complete, whether successful or unsuccessful * Optional. diff --git a/source/h2_stream.c b/source/h2_stream.c index 667c8a8a6..eefbcba3a 100644 --- a/source/h2_stream.c +++ b/source/h2_stream.c @@ -277,6 +277,7 @@ struct aws_h2_stream *aws_h2_stream_new_request( AWS_PRECONDITION(options); struct aws_h2_stream *stream = aws_mem_calloc(client_connection->alloc, 1, sizeof(struct aws_h2_stream)); + stream->on_h2_remote_end_stream = options->on_h2_remote_end_stream; /* Initialize base stream */ stream->base.vtable = &s_h2_stream_vtable; @@ -1273,6 +1274,10 @@ struct aws_h2err aws_h2_stream_on_decoder_end_stream(struct aws_h2_stream *strea } } + if (stream->on_h2_remote_end_stream) { + stream->on_h2_remote_end_stream(&stream->base, stream->base.user_data); + } + if (stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL) { /* Both sides have sent END_STREAM */ stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ed14545cd..b898e64f0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -515,6 +515,12 @@ add_test_case(h2_client_batch_auto_window_update) add_test_case(h2_client_batch_manual_window_update) add_test_case(h2_client_cap_manual_window_update) +# on_h2_remote_end_stream callback tests +add_test_case(h2_client_on_h2_remote_end_stream_fires) +add_test_case(h2_client_on_h2_remote_end_stream_not_fired_on_rst_stream) +add_test_case(h2_client_on_h2_remote_end_stream_early_server_response) +add_test_case(h2_client_on_h2_remote_end_stream_with_body_data) + add_test_case(server_new_destroy) add_test_case(server_new_destroy_tcp) add_test_case(connection_setup_shutdown) diff --git a/tests/stream_test_helper.c b/tests/stream_test_helper.c index c57073ff4..9290b009b 100644 --- a/tests/stream_test_helper.c +++ b/tests/stream_test_helper.c @@ -160,6 +160,12 @@ static void s_on_complete(struct aws_http_stream *stream, int error_code, void * aws_http_stream_get_incoming_response_status(stream, &tester->response_status); } +static void s_on_h2_remote_end_stream(struct aws_http_stream *stream, void *user_data) { + (void)stream; + struct client_stream_tester *tester = user_data; + tester->on_h2_remote_end_stream_invoked = true; +} + static void s_on_destroy(void *user_data) { struct client_stream_tester *tester = user_data; @@ -202,6 +208,7 @@ int client_stream_tester_init( .on_complete = s_on_complete, .on_destroy = s_on_destroy, .http2_use_manual_data_writes = options->http2_manual_write, + .on_h2_remote_end_stream = options->on_h2_remote_end_stream ? s_on_h2_remote_end_stream : NULL, }; tester->stream = aws_http_connection_make_request(options->connection, &request_options); ASSERT_NOT_NULL(tester->stream); diff --git a/tests/stream_test_helper.h b/tests/stream_test_helper.h index 3b2ff0263..d242db06d 100644 --- a/tests/stream_test_helper.h +++ b/tests/stream_test_helper.h @@ -46,12 +46,17 @@ struct client_stream_tester { struct aws_http_stream_metrics metrics; bool destroyed; + + /* HTTP/2 only: Whether on_h2_remote_end_stream fired */ + bool on_h2_remote_end_stream_invoked; }; struct client_stream_tester_options { struct aws_http_message *request; struct aws_http_connection *connection; bool http2_manual_write; + /* Optional: pointer to bool to track if on_h2_remote_end_stream fires */ + bool *on_h2_remote_end_stream; }; int client_stream_tester_init( diff --git a/tests/test_h2_client.c b/tests/test_h2_client.c index 5bf944c57..2514a6c46 100644 --- a/tests/test_h2_client.c +++ b/tests/test_h2_client.c @@ -6025,6 +6025,244 @@ TEST_CASE(h2_client_batch_manual_window_update) { return s_tester_clean_up(); } +/* Test that on_h2_remote_end_stream fires when remote peer sends END_STREAM */ +TEST_CASE(h2_client_on_h2_remote_end_stream_fires) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + /* send request */ + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + + bool track_on_h2_remote_end_stream = true; + struct client_stream_tester_options tester_options = { + .request = request, + .connection = s_tester.connection, + .on_h2_remote_end_stream = &track_on_h2_remote_end_stream, + }; + struct client_stream_tester stream_tester; + ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options)); + + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream); + + /* fake peer sends response with END_STREAM */ + struct aws_http_header response_headers_src[] = { + DEFINE_HEADER(":status", "200"), + }; + + struct aws_http_headers *response_headers = aws_http_headers_new(allocator); + aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src)); + + struct aws_h2_frame *response_frame = + aws_h2_frame_new_headers(allocator, stream_id, response_headers, true /*end_stream*/, 0, NULL); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame)); + + /* validate that both callbacks fired */ + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked); + ASSERT_TRUE(stream_tester.complete); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code); + + /* clean up */ + aws_http_headers_release(response_headers); + aws_http_message_release(request); + client_stream_tester_clean_up(&stream_tester); + return s_tester_clean_up(); +} + +/* Test that on_h2_remote_end_stream does NOT fire when RST_STREAM is received */ +TEST_CASE(h2_client_on_h2_remote_end_stream_not_fired_on_rst_stream) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + /* send request */ + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + + bool track_on_h2_remote_end_stream = true; + struct client_stream_tester_options tester_options = { + .request = request, + .connection = s_tester.connection, + .on_h2_remote_end_stream = &track_on_h2_remote_end_stream, + }; + struct client_stream_tester stream_tester; + ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options)); + + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream); + + /* fake peer sends RST_STREAM instead of END_STREAM */ + struct aws_h2_frame *rst_frame = aws_h2_frame_new_rst_stream(allocator, stream_id, AWS_HTTP2_ERR_INTERNAL_ERROR); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, rst_frame)); + + /* validate that on_h2_remote_end_stream did NOT fire, but on_complete did */ + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_FALSE(stream_tester.on_h2_remote_end_stream_invoked); + ASSERT_TRUE(stream_tester.complete); + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_RST_STREAM_RECEIVED, stream_tester.on_complete_error_code); + + /* clean up */ + aws_http_message_release(request); + client_stream_tester_clean_up(&stream_tester); + return s_tester_clean_up(); +} + +/* Test early server response (server sends END_STREAM before client finishes) */ +TEST_CASE(h2_client_on_h2_remote_end_stream_early_server_response) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + /* create request with body that will stall */ + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "POST"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/upload"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + + const char *body = "request body data"; + struct aws_byte_cursor body_cursor = aws_byte_cursor_from_c_str(body); + struct aws_input_stream *body_stream = aws_input_stream_new_tester(allocator, body_cursor); + aws_input_stream_tester_set_max_bytes_per_read(body_stream, 1); /* Stall the body */ + aws_http_message_set_body_stream(request, body_stream); + + bool track_on_h2_remote_end_stream = true; + struct client_stream_tester_options tester_options = { + .request = request, + .connection = s_tester.connection, + .on_h2_remote_end_stream = &track_on_h2_remote_end_stream, + }; + struct client_stream_tester stream_tester; + ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options)); + + /* Execute 1 event-loop tick, HEADERS should be sent but body stalled */ + testing_channel_run_currently_queued_tasks(&s_tester.testing_channel); + uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream); + + /* fake peer sends complete response while client is still sending */ + struct aws_http_header response_headers_src[] = { + DEFINE_HEADER(":status", "200"), + }; + + struct aws_http_headers *response_headers = aws_http_headers_new(allocator); + aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src)); + + struct aws_h2_frame *response_frame = + aws_h2_frame_new_headers(allocator, stream_id, response_headers, true /*end_stream*/, 0, NULL); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame)); + + testing_channel_run_currently_queued_tasks(&s_tester.testing_channel); + + /* At this point, on_h2_remote_end_stream should have fired, but on_complete should NOT (client still sending) */ + ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked); + ASSERT_FALSE(stream_tester.complete); + + /* Now let client finish sending */ + aws_input_stream_tester_set_max_bytes_per_read(body_stream, SIZE_MAX); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + /* Now on_complete should have fired */ + ASSERT_TRUE(stream_tester.complete); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code); + + /* clean up */ + aws_http_headers_release(response_headers); + aws_http_message_release(request); + aws_input_stream_release(body_stream); + client_stream_tester_clean_up(&stream_tester); + return s_tester_clean_up(); +} + +/* Test that on_h2_remote_end_stream works with body data */ +TEST_CASE(h2_client_on_h2_remote_end_stream_with_body_data) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + /* send request */ + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + + bool track_on_h2_remote_end_stream = true; + struct client_stream_tester_options tester_options = { + .request = request, + .connection = s_tester.connection, + .on_h2_remote_end_stream = &track_on_h2_remote_end_stream, + }; + struct client_stream_tester stream_tester; + ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options)); + + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream); + + /* fake peer sends response headers */ + struct aws_http_header response_headers_src[] = { + DEFINE_HEADER(":status", "200"), + }; + + struct aws_http_headers *response_headers = aws_http_headers_new(allocator); + aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src)); + + struct aws_h2_frame *headers_frame = + aws_h2_frame_new_headers(allocator, stream_id, response_headers, false /*end_stream*/, 0, NULL); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, headers_frame)); + + /* fake peer sends body data */ + const char *body = "response body"; + ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, body, false /*end_stream*/)); + + /* fake peer sends final data frame with END_STREAM */ + ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, " more data", true /*end_stream*/)); + + /* validate that both callbacks fired */ + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked); + ASSERT_TRUE(stream_tester.complete); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code); + + /* clean up */ + aws_http_headers_release(response_headers); + aws_http_message_release(request); + client_stream_tester_clean_up(&stream_tester); + return s_tester_clean_up(); +} + /* The overflow window update will be capped to the allowed max to be sent. */ TEST_CASE(h2_client_cap_manual_window_update) { /* Automated and default threshold */