Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/aws/http/private/h2_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ struct aws_h2_stream {
* to keep flow continues.
*/
int32_t window_size_threshold_to_send_update;
aws_http2_on_remote_end_stream_fn *on_h2_remote_end_stream;

/* Only the event-loop thread may touch this data */
struct {
Expand Down
28 changes: 28 additions & 0 deletions include/aws/http/request_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ typedef int(
*/
typedef int(aws_http_on_incoming_request_done_fn)(struct aws_http_stream *stream, void *user_data);

/**
* Invoked when the remote peer sends END_STREAM on an HTTP/2 stream.
* This is always invoked on the HTTP connection's event-loop thread.
*
* **HTTP/2 ONLY** - This callback is only supported for HTTP/2 request.
*
* This callback is invoked when the remote peer finishes sending by setting the END_STREAM
* flag on the final HEADERS or DATA frame. This indicates that no more data will be received from the
* remote peer for this stream.
*
* Note: If the server sends RST_STREAM instead of END_STREAM, `on_remote_end_stream` will NOT fire,
* but `on_complete` will fire with an error code.
*
* @param stream The HTTP/2 stream
* @param user_data User data provided in aws_http_make_request_options
*/
typedef void(aws_http2_on_remote_end_stream_fn)(struct aws_http_stream *stream, void *user_data);

/**
* Invoked when a request/response stream is complete, whether successful or unsuccessful
* This is always invoked on the HTTP connection's event-loop thread.
Expand Down Expand Up @@ -290,6 +308,16 @@ struct aws_http_make_request_options {
*/
aws_http_on_stream_metrics_fn *on_metrics;

/**
* Invoked when the remote peer sends END_STREAM on an HTTP/2 stream (HTTP/2 ONLY).
* Optional.
* See `aws_http2_on_remote_end_stream_fn`.
*
* This callback fires when the remote peer sends END_STREAM, which happens BEFORE `on_complete`.
* Ignored for HTTP/1.x connections.
*/
aws_http2_on_remote_end_stream_fn *on_h2_remote_end_stream;

/**
* Invoked when request/response stream is complete, whether successful or unsuccessful
* Optional.
Expand Down
5 changes: 5 additions & 0 deletions source/h2_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ struct aws_h2_stream *aws_h2_stream_new_request(
AWS_PRECONDITION(options);

struct aws_h2_stream *stream = aws_mem_calloc(client_connection->alloc, 1, sizeof(struct aws_h2_stream));
stream->on_h2_remote_end_stream = options->on_h2_remote_end_stream;

/* Initialize base stream */
stream->base.vtable = &s_h2_stream_vtable;
Expand Down Expand Up @@ -1273,6 +1274,10 @@ struct aws_h2err aws_h2_stream_on_decoder_end_stream(struct aws_h2_stream *strea
}
}

if (stream->on_h2_remote_end_stream) {
stream->on_h2_remote_end_stream(&stream->base, stream->base.user_data);
}

if (stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL) {
/* Both sides have sent END_STREAM */
stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED;
Expand Down
6 changes: 6 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,12 @@ add_test_case(h2_client_batch_auto_window_update)
add_test_case(h2_client_batch_manual_window_update)
add_test_case(h2_client_cap_manual_window_update)

# on_h2_remote_end_stream callback tests
add_test_case(h2_client_on_h2_remote_end_stream_fires)
add_test_case(h2_client_on_h2_remote_end_stream_not_fired_on_rst_stream)
add_test_case(h2_client_on_h2_remote_end_stream_early_server_response)
add_test_case(h2_client_on_h2_remote_end_stream_with_body_data)

add_test_case(server_new_destroy)
add_test_case(server_new_destroy_tcp)
add_test_case(connection_setup_shutdown)
Expand Down
7 changes: 7 additions & 0 deletions tests/stream_test_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ static void s_on_complete(struct aws_http_stream *stream, int error_code, void *
aws_http_stream_get_incoming_response_status(stream, &tester->response_status);
}

static void s_on_h2_remote_end_stream(struct aws_http_stream *stream, void *user_data) {
(void)stream;
struct client_stream_tester *tester = user_data;
tester->on_h2_remote_end_stream_invoked = true;
}

static void s_on_destroy(void *user_data) {
struct client_stream_tester *tester = user_data;

Expand Down Expand Up @@ -202,6 +208,7 @@ int client_stream_tester_init(
.on_complete = s_on_complete,
.on_destroy = s_on_destroy,
.http2_use_manual_data_writes = options->http2_manual_write,
.on_h2_remote_end_stream = options->on_h2_remote_end_stream ? s_on_h2_remote_end_stream : NULL,
};
tester->stream = aws_http_connection_make_request(options->connection, &request_options);
ASSERT_NOT_NULL(tester->stream);
Expand Down
5 changes: 5 additions & 0 deletions tests/stream_test_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ struct client_stream_tester {
struct aws_http_stream_metrics metrics;

bool destroyed;

/* HTTP/2 only: Whether on_h2_remote_end_stream fired */
bool on_h2_remote_end_stream_invoked;
};

struct client_stream_tester_options {
struct aws_http_message *request;
struct aws_http_connection *connection;
bool http2_manual_write;
/* Optional: pointer to bool to track if on_h2_remote_end_stream fires */
bool *on_h2_remote_end_stream;
};

int client_stream_tester_init(
Expand Down
238 changes: 238 additions & 0 deletions tests/test_h2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -6025,6 +6025,244 @@ TEST_CASE(h2_client_batch_manual_window_update) {
return s_tester_clean_up();
}

/* Test that on_h2_remote_end_stream fires when remote peer sends END_STREAM */
TEST_CASE(h2_client_on_h2_remote_end_stream_fires) {
ASSERT_SUCCESS(s_tester_init(allocator, ctx));

/* get connection preface and acks out of the way */
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* send request */
struct aws_http_message *request = aws_http2_message_new_request(allocator);
ASSERT_NOT_NULL(request);

struct aws_http_header request_headers_src[] = {
DEFINE_HEADER(":method", "GET"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/"),
};
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));

bool track_on_h2_remote_end_stream = true;
struct client_stream_tester_options tester_options = {
.request = request,
.connection = s_tester.connection,
.on_h2_remote_end_stream = &track_on_h2_remote_end_stream,
};
struct client_stream_tester stream_tester;
ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options));

testing_channel_drain_queued_tasks(&s_tester.testing_channel);
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);

/* fake peer sends response with END_STREAM */
struct aws_http_header response_headers_src[] = {
DEFINE_HEADER(":status", "200"),
};

struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));

struct aws_h2_frame *response_frame =
aws_h2_frame_new_headers(allocator, stream_id, response_headers, true /*end_stream*/, 0, NULL);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame));

/* validate that both callbacks fired */
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked);
ASSERT_TRUE(stream_tester.complete);
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);

/* clean up */
aws_http_headers_release(response_headers);
aws_http_message_release(request);
client_stream_tester_clean_up(&stream_tester);
return s_tester_clean_up();
}

/* Test that on_h2_remote_end_stream does NOT fire when RST_STREAM is received */
TEST_CASE(h2_client_on_h2_remote_end_stream_not_fired_on_rst_stream) {
ASSERT_SUCCESS(s_tester_init(allocator, ctx));

/* get connection preface and acks out of the way */
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* send request */
struct aws_http_message *request = aws_http2_message_new_request(allocator);
ASSERT_NOT_NULL(request);

struct aws_http_header request_headers_src[] = {
DEFINE_HEADER(":method", "GET"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/"),
};
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));

bool track_on_h2_remote_end_stream = true;
struct client_stream_tester_options tester_options = {
.request = request,
.connection = s_tester.connection,
.on_h2_remote_end_stream = &track_on_h2_remote_end_stream,
};
struct client_stream_tester stream_tester;
ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options));

testing_channel_drain_queued_tasks(&s_tester.testing_channel);
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);

/* fake peer sends RST_STREAM instead of END_STREAM */
struct aws_h2_frame *rst_frame = aws_h2_frame_new_rst_stream(allocator, stream_id, AWS_HTTP2_ERR_INTERNAL_ERROR);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, rst_frame));

/* validate that on_h2_remote_end_stream did NOT fire, but on_complete did */
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_FALSE(stream_tester.on_h2_remote_end_stream_invoked);
ASSERT_TRUE(stream_tester.complete);
ASSERT_INT_EQUALS(AWS_ERROR_HTTP_RST_STREAM_RECEIVED, stream_tester.on_complete_error_code);

/* clean up */
aws_http_message_release(request);
client_stream_tester_clean_up(&stream_tester);
return s_tester_clean_up();
}

/* Test early server response (server sends END_STREAM before client finishes) */
TEST_CASE(h2_client_on_h2_remote_end_stream_early_server_response) {
ASSERT_SUCCESS(s_tester_init(allocator, ctx));

/* get connection preface and acks out of the way */
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* create request with body that will stall */
struct aws_http_message *request = aws_http2_message_new_request(allocator);
ASSERT_NOT_NULL(request);

struct aws_http_header request_headers_src[] = {
DEFINE_HEADER(":method", "POST"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/upload"),
};
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));

const char *body = "request body data";
struct aws_byte_cursor body_cursor = aws_byte_cursor_from_c_str(body);
struct aws_input_stream *body_stream = aws_input_stream_new_tester(allocator, body_cursor);
aws_input_stream_tester_set_max_bytes_per_read(body_stream, 1); /* Stall the body */
aws_http_message_set_body_stream(request, body_stream);

bool track_on_h2_remote_end_stream = true;
struct client_stream_tester_options tester_options = {
.request = request,
.connection = s_tester.connection,
.on_h2_remote_end_stream = &track_on_h2_remote_end_stream,
};
struct client_stream_tester stream_tester;
ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options));

/* Execute 1 event-loop tick, HEADERS should be sent but body stalled */
testing_channel_run_currently_queued_tasks(&s_tester.testing_channel);
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);

/* fake peer sends complete response while client is still sending */
struct aws_http_header response_headers_src[] = {
DEFINE_HEADER(":status", "200"),
};

struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));

struct aws_h2_frame *response_frame =
aws_h2_frame_new_headers(allocator, stream_id, response_headers, true /*end_stream*/, 0, NULL);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame));

testing_channel_run_currently_queued_tasks(&s_tester.testing_channel);

/* At this point, on_h2_remote_end_stream should have fired, but on_complete should NOT (client still sending) */
ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked);
ASSERT_FALSE(stream_tester.complete);

/* Now let client finish sending */
aws_input_stream_tester_set_max_bytes_per_read(body_stream, SIZE_MAX);
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* Now on_complete should have fired */
ASSERT_TRUE(stream_tester.complete);
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);

/* clean up */
aws_http_headers_release(response_headers);
aws_http_message_release(request);
aws_input_stream_release(body_stream);
client_stream_tester_clean_up(&stream_tester);
return s_tester_clean_up();
}

/* Test that on_h2_remote_end_stream works with body data */
TEST_CASE(h2_client_on_h2_remote_end_stream_with_body_data) {
ASSERT_SUCCESS(s_tester_init(allocator, ctx));

/* get connection preface and acks out of the way */
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* send request */
struct aws_http_message *request = aws_http2_message_new_request(allocator);
ASSERT_NOT_NULL(request);

struct aws_http_header request_headers_src[] = {
DEFINE_HEADER(":method", "GET"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/"),
};
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));

bool track_on_h2_remote_end_stream = true;
struct client_stream_tester_options tester_options = {
.request = request,
.connection = s_tester.connection,
.on_h2_remote_end_stream = &track_on_h2_remote_end_stream,
};
struct client_stream_tester stream_tester;
ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options));

testing_channel_drain_queued_tasks(&s_tester.testing_channel);
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);

/* fake peer sends response headers */
struct aws_http_header response_headers_src[] = {
DEFINE_HEADER(":status", "200"),
};

struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));

struct aws_h2_frame *headers_frame =
aws_h2_frame_new_headers(allocator, stream_id, response_headers, false /*end_stream*/, 0, NULL);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, headers_frame));

/* fake peer sends body data */
const char *body = "response body";
ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, body, false /*end_stream*/));

/* fake peer sends final data frame with END_STREAM */
ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, " more data", true /*end_stream*/));

/* validate that both callbacks fired */
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked);
ASSERT_TRUE(stream_tester.complete);
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);

/* clean up */
aws_http_headers_release(response_headers);
aws_http_message_release(request);
client_stream_tester_clean_up(&stream_tester);
return s_tester_clean_up();
}

/* The overflow window update will be capped to the allowed max to be sent. */
TEST_CASE(h2_client_cap_manual_window_update) {
/* Automated and default threshold */
Expand Down
Loading