Skip to content

Commit 1f115a3

Browse files
committed
renaming and finialize
1 parent b47dcb6 commit 1f115a3

7 files changed

Lines changed: 288 additions & 8 deletions

File tree

include/aws/http/private/h2_stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ struct aws_h2_stream {
8080
* to keep flow continues.
8181
*/
8282
int32_t window_size_threshold_to_send_update;
83-
aws_http2_on_remote_stream_complete_fn *on_remote_complete;
83+
aws_http2_on_remote_end_stream_fn *on_h2_remote_end_stream;
8484

8585
/* Only the event-loop thread may touch this data */
8686
struct {

include/aws/http/request_response.h

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,31 @@ typedef int(
189189
*/
190190
typedef int(aws_http_on_incoming_request_done_fn)(struct aws_http_stream *stream, void *user_data);
191191

192+
/**
193+
* Invoked when the remote peer sends END_STREAM on an HTTP/2 stream.
194+
* This is always invoked on the HTTP connection's event-loop thread.
195+
*
196+
* **HTTP/2 ONLY** - This callback is only supported for HTTP/2 request.
197+
*
198+
* This callback is invoked when the remote peer finishes sending by setting the END_STREAM
199+
* flag on the final HEADERS or DATA frame. This indicates that no more data will be received from the
200+
* remote peer for this stream.
201+
*
202+
* Note: If the server sends RST_STREAM instead of END_STREAM, `on_remote_end_stream` will NOT fire,
203+
* but `on_complete` will fire with an error code.
204+
*
205+
* @param stream The HTTP/2 stream
206+
* @param user_data User data provided in aws_http_make_request_options
207+
*/
208+
typedef void(aws_http2_on_remote_end_stream_fn)(struct aws_http_stream *stream, void *user_data);
209+
192210
/**
193211
* Invoked when a request/response stream is complete, whether successful or unsuccessful
194212
* This is always invoked on the HTTP connection's event-loop thread.
195213
* This will not be invoked if the stream is never activated.
196214
*/
197215
typedef void(aws_http_on_stream_complete_fn)(struct aws_http_stream *stream, int error_code, void *user_data);
198216

199-
typedef void(aws_http2_on_remote_stream_complete_fn)(struct aws_http_stream *stream, void *user_data);
200-
201217
/**
202218
* Invoked when request/response stream destroy completely.
203219
* This can be invoked within the same thead who release the refcount on http stream.
@@ -292,15 +308,23 @@ struct aws_http_make_request_options {
292308
*/
293309
aws_http_on_stream_metrics_fn *on_metrics;
294310

311+
/**
312+
* Invoked when the remote peer sends END_STREAM on an HTTP/2 stream (HTTP/2 ONLY).
313+
* Optional.
314+
* See `aws_http2_on_remote_end_stream_fn`.
315+
*
316+
* This callback fires when the remote peer sends END_STREAM, which happens BEFORE `on_complete`.
317+
* Ignored for HTTP/1.x connections.
318+
*/
319+
aws_http2_on_remote_end_stream_fn *on_h2_remote_end_stream;
320+
295321
/**
296322
* Invoked when request/response stream is complete, whether successful or unsuccessful
297323
* Optional.
298324
* See `aws_http_on_stream_complete_fn`.
299325
*/
300326
aws_http_on_stream_complete_fn *on_complete;
301327

302-
aws_http2_on_remote_stream_complete_fn *on_remote_complete;
303-
304328
/* Callback for when the request/response stream is completely destroyed. */
305329
aws_http_on_stream_destroy_fn *on_destroy;
306330

source/h2_stream.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ struct aws_h2_stream *aws_h2_stream_new_request(
277277
AWS_PRECONDITION(options);
278278

279279
struct aws_h2_stream *stream = aws_mem_calloc(client_connection->alloc, 1, sizeof(struct aws_h2_stream));
280-
stream->on_remote_complete = options->on_remote_complete;
280+
stream->on_h2_remote_end_stream = options->on_h2_remote_end_stream;
281281

282282
/* Initialize base stream */
283283
stream->base.vtable = &s_h2_stream_vtable;
@@ -1274,8 +1274,8 @@ struct aws_h2err aws_h2_stream_on_decoder_end_stream(struct aws_h2_stream *strea
12741274
}
12751275
}
12761276

1277-
if (stream->on_remote_complete) {
1278-
stream->on_remote_complete(stream, stream->base.user_data);
1277+
if (stream->on_h2_remote_end_stream) {
1278+
stream->on_h2_remote_end_stream(&stream->base, stream->base.user_data);
12791279
}
12801280

12811281
if (stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL) {

tests/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,12 @@ add_test_case(h2_client_batch_auto_window_update)
515515
add_test_case(h2_client_batch_manual_window_update)
516516
add_test_case(h2_client_cap_manual_window_update)
517517

518+
# on_h2_remote_end_stream callback tests
519+
add_test_case(h2_client_on_h2_remote_end_stream_fires)
520+
add_test_case(h2_client_on_h2_remote_end_stream_not_fired_on_rst_stream)
521+
add_test_case(h2_client_on_h2_remote_end_stream_early_server_response)
522+
add_test_case(h2_client_on_h2_remote_end_stream_with_body_data)
523+
518524
add_test_case(server_new_destroy)
519525
add_test_case(server_new_destroy_tcp)
520526
add_test_case(connection_setup_shutdown)

tests/stream_test_helper.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@ static void s_on_complete(struct aws_http_stream *stream, int error_code, void *
160160
aws_http_stream_get_incoming_response_status(stream, &tester->response_status);
161161
}
162162

163+
static void s_on_h2_remote_end_stream(struct aws_http_stream *stream, void *user_data) {
164+
(void)stream;
165+
struct client_stream_tester *tester = user_data;
166+
tester->on_h2_remote_end_stream_invoked = true;
167+
}
168+
163169
static void s_on_destroy(void *user_data) {
164170
struct client_stream_tester *tester = user_data;
165171

@@ -202,6 +208,7 @@ int client_stream_tester_init(
202208
.on_complete = s_on_complete,
203209
.on_destroy = s_on_destroy,
204210
.http2_use_manual_data_writes = options->http2_manual_write,
211+
.on_h2_remote_end_stream = options->on_h2_remote_end_stream ? s_on_h2_remote_end_stream : NULL,
205212
};
206213
tester->stream = aws_http_connection_make_request(options->connection, &request_options);
207214
ASSERT_NOT_NULL(tester->stream);

tests/stream_test_helper.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,17 @@ struct client_stream_tester {
4646
struct aws_http_stream_metrics metrics;
4747

4848
bool destroyed;
49+
50+
/* HTTP/2 only: Whether on_h2_remote_end_stream fired */
51+
bool on_h2_remote_end_stream_invoked;
4952
};
5053

5154
struct client_stream_tester_options {
5255
struct aws_http_message *request;
5356
struct aws_http_connection *connection;
5457
bool http2_manual_write;
58+
/* Optional: pointer to bool to track if on_h2_remote_end_stream fires */
59+
bool *on_h2_remote_end_stream;
5560
};
5661

5762
int client_stream_tester_init(

tests/test_h2_client.c

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6025,6 +6025,244 @@ TEST_CASE(h2_client_batch_manual_window_update) {
60256025
return s_tester_clean_up();
60266026
}
60276027

6028+
/* Test that on_h2_remote_end_stream fires when remote peer sends END_STREAM */
6029+
TEST_CASE(h2_client_on_h2_remote_end_stream_fires) {
6030+
ASSERT_SUCCESS(s_tester_init(allocator, ctx));
6031+
6032+
/* get connection preface and acks out of the way */
6033+
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
6034+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6035+
6036+
/* send request */
6037+
struct aws_http_message *request = aws_http2_message_new_request(allocator);
6038+
ASSERT_NOT_NULL(request);
6039+
6040+
struct aws_http_header request_headers_src[] = {
6041+
DEFINE_HEADER(":method", "GET"),
6042+
DEFINE_HEADER(":scheme", "https"),
6043+
DEFINE_HEADER(":path", "/"),
6044+
};
6045+
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));
6046+
6047+
bool track_on_h2_remote_end_stream = true;
6048+
struct client_stream_tester_options tester_options = {
6049+
.request = request,
6050+
.connection = s_tester.connection,
6051+
.on_h2_remote_end_stream = &track_on_h2_remote_end_stream,
6052+
};
6053+
struct client_stream_tester stream_tester;
6054+
ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options));
6055+
6056+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6057+
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);
6058+
6059+
/* fake peer sends response with END_STREAM */
6060+
struct aws_http_header response_headers_src[] = {
6061+
DEFINE_HEADER(":status", "200"),
6062+
};
6063+
6064+
struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
6065+
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));
6066+
6067+
struct aws_h2_frame *response_frame =
6068+
aws_h2_frame_new_headers(allocator, stream_id, response_headers, true /*end_stream*/, 0, NULL);
6069+
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame));
6070+
6071+
/* validate that both callbacks fired */
6072+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6073+
ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked);
6074+
ASSERT_TRUE(stream_tester.complete);
6075+
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);
6076+
6077+
/* clean up */
6078+
aws_http_headers_release(response_headers);
6079+
aws_http_message_release(request);
6080+
client_stream_tester_clean_up(&stream_tester);
6081+
return s_tester_clean_up();
6082+
}
6083+
6084+
/* Test that on_h2_remote_end_stream does NOT fire when RST_STREAM is received */
6085+
TEST_CASE(h2_client_on_h2_remote_end_stream_not_fired_on_rst_stream) {
6086+
ASSERT_SUCCESS(s_tester_init(allocator, ctx));
6087+
6088+
/* get connection preface and acks out of the way */
6089+
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
6090+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6091+
6092+
/* send request */
6093+
struct aws_http_message *request = aws_http2_message_new_request(allocator);
6094+
ASSERT_NOT_NULL(request);
6095+
6096+
struct aws_http_header request_headers_src[] = {
6097+
DEFINE_HEADER(":method", "GET"),
6098+
DEFINE_HEADER(":scheme", "https"),
6099+
DEFINE_HEADER(":path", "/"),
6100+
};
6101+
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));
6102+
6103+
bool track_on_h2_remote_end_stream = true;
6104+
struct client_stream_tester_options tester_options = {
6105+
.request = request,
6106+
.connection = s_tester.connection,
6107+
.on_h2_remote_end_stream = &track_on_h2_remote_end_stream,
6108+
};
6109+
struct client_stream_tester stream_tester;
6110+
ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options));
6111+
6112+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6113+
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);
6114+
6115+
/* fake peer sends RST_STREAM instead of END_STREAM */
6116+
struct aws_h2_frame *rst_frame = aws_h2_frame_new_rst_stream(allocator, stream_id, AWS_HTTP2_ERR_INTERNAL_ERROR);
6117+
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, rst_frame));
6118+
6119+
/* validate that on_h2_remote_end_stream did NOT fire, but on_complete did */
6120+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6121+
ASSERT_FALSE(stream_tester.on_h2_remote_end_stream_invoked);
6122+
ASSERT_TRUE(stream_tester.complete);
6123+
ASSERT_INT_EQUALS(AWS_ERROR_HTTP_RST_STREAM_RECEIVED, stream_tester.on_complete_error_code);
6124+
6125+
/* clean up */
6126+
aws_http_message_release(request);
6127+
client_stream_tester_clean_up(&stream_tester);
6128+
return s_tester_clean_up();
6129+
}
6130+
6131+
/* Test early server response (server sends END_STREAM before client finishes) */
6132+
TEST_CASE(h2_client_on_h2_remote_end_stream_early_server_response) {
6133+
ASSERT_SUCCESS(s_tester_init(allocator, ctx));
6134+
6135+
/* get connection preface and acks out of the way */
6136+
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
6137+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6138+
6139+
/* create request with body that will stall */
6140+
struct aws_http_message *request = aws_http2_message_new_request(allocator);
6141+
ASSERT_NOT_NULL(request);
6142+
6143+
struct aws_http_header request_headers_src[] = {
6144+
DEFINE_HEADER(":method", "POST"),
6145+
DEFINE_HEADER(":scheme", "https"),
6146+
DEFINE_HEADER(":path", "/upload"),
6147+
};
6148+
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));
6149+
6150+
const char *body = "request body data";
6151+
struct aws_byte_cursor body_cursor = aws_byte_cursor_from_c_str(body);
6152+
struct aws_input_stream *body_stream = aws_input_stream_new_tester(allocator, body_cursor);
6153+
aws_input_stream_tester_set_max_bytes_per_read(body_stream, 1); /* Stall the body */
6154+
aws_http_message_set_body_stream(request, body_stream);
6155+
6156+
bool track_on_h2_remote_end_stream = true;
6157+
struct client_stream_tester_options tester_options = {
6158+
.request = request,
6159+
.connection = s_tester.connection,
6160+
.on_h2_remote_end_stream = &track_on_h2_remote_end_stream,
6161+
};
6162+
struct client_stream_tester stream_tester;
6163+
ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options));
6164+
6165+
/* Execute 1 event-loop tick, HEADERS should be sent but body stalled */
6166+
testing_channel_run_currently_queued_tasks(&s_tester.testing_channel);
6167+
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);
6168+
6169+
/* fake peer sends complete response while client is still sending */
6170+
struct aws_http_header response_headers_src[] = {
6171+
DEFINE_HEADER(":status", "200"),
6172+
};
6173+
6174+
struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
6175+
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));
6176+
6177+
struct aws_h2_frame *response_frame =
6178+
aws_h2_frame_new_headers(allocator, stream_id, response_headers, true /*end_stream*/, 0, NULL);
6179+
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame));
6180+
6181+
testing_channel_run_currently_queued_tasks(&s_tester.testing_channel);
6182+
6183+
/* At this point, on_h2_remote_end_stream should have fired, but on_complete should NOT (client still sending) */
6184+
ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked);
6185+
ASSERT_FALSE(stream_tester.complete);
6186+
6187+
/* Now let client finish sending */
6188+
aws_input_stream_tester_set_max_bytes_per_read(body_stream, SIZE_MAX);
6189+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6190+
6191+
/* Now on_complete should have fired */
6192+
ASSERT_TRUE(stream_tester.complete);
6193+
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);
6194+
6195+
/* clean up */
6196+
aws_http_headers_release(response_headers);
6197+
aws_http_message_release(request);
6198+
aws_input_stream_release(body_stream);
6199+
client_stream_tester_clean_up(&stream_tester);
6200+
return s_tester_clean_up();
6201+
}
6202+
6203+
/* Test that on_h2_remote_end_stream works with body data */
6204+
TEST_CASE(h2_client_on_h2_remote_end_stream_with_body_data) {
6205+
ASSERT_SUCCESS(s_tester_init(allocator, ctx));
6206+
6207+
/* get connection preface and acks out of the way */
6208+
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
6209+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6210+
6211+
/* send request */
6212+
struct aws_http_message *request = aws_http2_message_new_request(allocator);
6213+
ASSERT_NOT_NULL(request);
6214+
6215+
struct aws_http_header request_headers_src[] = {
6216+
DEFINE_HEADER(":method", "GET"),
6217+
DEFINE_HEADER(":scheme", "https"),
6218+
DEFINE_HEADER(":path", "/"),
6219+
};
6220+
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));
6221+
6222+
bool track_on_h2_remote_end_stream = true;
6223+
struct client_stream_tester_options tester_options = {
6224+
.request = request,
6225+
.connection = s_tester.connection,
6226+
.on_h2_remote_end_stream = &track_on_h2_remote_end_stream,
6227+
};
6228+
struct client_stream_tester stream_tester;
6229+
ASSERT_SUCCESS(client_stream_tester_init(&stream_tester, allocator, &tester_options));
6230+
6231+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6232+
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);
6233+
6234+
/* fake peer sends response headers */
6235+
struct aws_http_header response_headers_src[] = {
6236+
DEFINE_HEADER(":status", "200"),
6237+
};
6238+
6239+
struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
6240+
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));
6241+
6242+
struct aws_h2_frame *headers_frame =
6243+
aws_h2_frame_new_headers(allocator, stream_id, response_headers, false /*end_stream*/, 0, NULL);
6244+
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, headers_frame));
6245+
6246+
/* fake peer sends body data */
6247+
const char *body = "response body";
6248+
ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, body, false /*end_stream*/));
6249+
6250+
/* fake peer sends final data frame with END_STREAM */
6251+
ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, " more data", true /*end_stream*/));
6252+
6253+
/* validate that both callbacks fired */
6254+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
6255+
ASSERT_TRUE(stream_tester.on_h2_remote_end_stream_invoked);
6256+
ASSERT_TRUE(stream_tester.complete);
6257+
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);
6258+
6259+
/* clean up */
6260+
aws_http_headers_release(response_headers);
6261+
aws_http_message_release(request);
6262+
client_stream_tester_clean_up(&stream_tester);
6263+
return s_tester_clean_up();
6264+
}
6265+
60286266
/* The overflow window update will be capped to the allowed max to be sent. */
60296267
TEST_CASE(h2_client_cap_manual_window_update) {
60306268
/* Automated and default threshold */

0 commit comments

Comments
 (0)