Skip to content

Commit 982f1ac

Browse files
committed
use async stream
1 parent acf3139 commit 982f1ac

5 files changed

Lines changed: 131 additions & 2 deletions

File tree

include/aws/http/private/h1_encoder.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ struct aws_h1_encoder_message {
3333
struct aws_byte_buf outgoing_head_buf;
3434
/* Single stream used for unchunked body */
3535
struct aws_input_stream *body;
36+
/* Single async stream used for unchunked body */
37+
struct aws_async_input_stream *async_body;
3638

3739
/* Pointer to list of `struct aws_h1_chunk`, used for chunked encoding.
3840
* List is owned by aws_h1_stream.
@@ -55,6 +57,7 @@ enum aws_h1_encoder_state {
5557
AWS_H1_ENCODER_STATE_HEAD,
5658
/* Write streaming body, without chunked encoding, because Content-Length is known */
5759
AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM,
60+
AWS_H1_ENCODER_STATE_ASYNC_WAITING,
5861
/* Write streaming body, with chunked encoding, because Content-Length is unknown */
5962
AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM,
6063
AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK,
@@ -81,6 +84,12 @@ struct aws_h1_encoder {
8184
uint64_t chunk_count;
8285
/* Encoder logs with this stream ptr as the ID, and passes this ptr to the chunk_complete callback */
8386
struct aws_http_stream *current_stream;
87+
/* Future to record pending future of async body read */
88+
struct aws_future_bool *pending_async_future;
89+
/* Connection to schedule again while async reading. */
90+
struct aws_h1_connection *connection;
91+
/* async error recorded */
92+
int async_error;
8493
};
8594

8695
struct aws_h1_chunk *aws_h1_chunk_new(struct aws_allocator *allocator, const struct aws_http1_chunk_options *options);

include/aws/http/request_response.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,13 @@ int aws_http_message_set_response_status(struct aws_http_message *response_messa
865865
AWS_HTTP_API
866866
struct aws_input_stream *aws_http_message_get_body_stream(const struct aws_http_message *message);
867867

868+
/**
869+
* Get the async body stream.
870+
* Returns NULL if no async body stream is set.
871+
*/
872+
AWS_HTTP_API
873+
struct aws_async_input_stream *aws_http_message_get_async_body_stream(const struct aws_http_message *message);
874+
868875
/**
869876
* Set the body stream.
870877
* NULL is an acceptable value for messages with no body.
@@ -874,6 +881,15 @@ struct aws_input_stream *aws_http_message_get_body_stream(const struct aws_http_
874881
AWS_HTTP_API
875882
void aws_http_message_set_body_stream(struct aws_http_message *message, struct aws_input_stream *body_stream);
876883

884+
/**
885+
* Set the async body stream.
886+
* NULL is an acceptable value for messages with no body.
887+
* Note: The message does NOT take ownership of the body stream.
888+
* The stream must not be destroyed until the message is complete.
889+
*/
890+
AWS_HTTP_API
891+
void aws_http_message_set_async_body_stream(struct aws_http_message *message, struct aws_async_input_stream *async_body_stream);
892+
877893
/**
878894
* aws_future<aws_http_message*>
879895
*/

source/h1_connection.c

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1113,6 +1113,15 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f
11131113
goto error;
11141114
}
11151115

1116+
if (connection->thread_data.encoder.async_error) {
1117+
/* Error receiving data asynchronously. Need to note when this is happening, but for now, abandon ship */
1118+
if (msg) {
1119+
aws_mem_release(msg->allocator, msg);
1120+
}
1121+
s_shutdown_due_to_error(connection, connection->thread_data.encoder.async_error);
1122+
return;
1123+
}
1124+
11161125
if (msg->message_data.len > 0) {
11171126
AWS_LOGF_TRACE(
11181127
AWS_LS_HTTP_CONNECTION,
@@ -1130,7 +1139,13 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f
11301139

11311140
goto error;
11321141
}
1133-
1142+
} else if(connection->thread_data.encoder.message->async_body) {
1143+
AWS_LOGF_TRACE(
1144+
AWS_LS_HTTP_CONNECTION,
1145+
"id=%p: Outgoing async stream task is either complete or waiting on future. Never reschedule task.",
1146+
(void *)&connection->base);
1147+
aws_mem_release(msg->allocator, msg);
1148+
connection->thread_data.is_outgoing_stream_task_active = false;
11341149
} else {
11351150
/* If message is empty, warn that no work is being done
11361151
* and reschedule the task to try again next tick.
@@ -1552,6 +1567,9 @@ static struct aws_h1_connection *s_connection_new(
15521567

15531568
aws_h1_encoder_init(&connection->thread_data.encoder, alloc);
15541569

1570+
/* hacking around with adding connection to encoder. there should be a better way to do it */
1571+
connection->thread_data.encoder.connection = connection;
1572+
15551573
aws_channel_task_init(
15561574
&connection->outgoing_stream_task, s_outgoing_stream_task, connection, "http1_connection_outgoing_stream");
15571575
aws_channel_task_init(

source/h1_encoder.c

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
* SPDX-License-Identifier: Apache-2.0.
44
*/
5+
#include <aws/common/error.h>
6+
#include <aws/io/future.h>
57
#include <aws/http/private/h1_encoder.h>
8+
#include <aws/http/private/h1_connection.h>
69
#include <aws/http/private/strutil.h>
710
#include <aws/http/status_code.h>
811
#include <aws/io/logging.h>
912
#include <aws/io/stream.h>
13+
#include <aws/io/async_stream.h>
1014

1115
#include <inttypes.h>
1216

@@ -249,6 +253,7 @@ int aws_h1_encoder_message_init_from_request(
249253
AWS_ZERO_STRUCT(*message);
250254

251255
message->body = aws_input_stream_acquire(aws_http_message_get_body_stream(request));
256+
message->async_body = aws_async_input_stream_acquire(aws_http_message_get_async_body_stream(request));
252257
message->pending_chunk_list = pending_chunk_list;
253258

254259
struct aws_byte_cursor method;
@@ -693,6 +698,57 @@ static int s_switch_state(struct aws_h1_encoder *encoder, enum aws_h1_encoder_st
693698
return AWS_OP_SUCCESS;
694699
}
695700

701+
static void s_on_async_body_read_complete(void *user_data) {
702+
struct aws_h1_encoder *encoder = user_data;
703+
704+
struct aws_h1_connection *connection = encoder->connection;
705+
706+
int error = aws_future_bool_get_error(encoder->pending_async_future);
707+
if (error) {
708+
ENCODER_LOG(ERROR, encoder, "Encountered error after future was complete. Setting async_error, should be caught in the connection event loop.");
709+
encoder->async_error = error;
710+
}
711+
712+
bool eof = !error && aws_future_bool_get_result(encoder->pending_async_future);
713+
714+
aws_future_bool_release(encoder->pending_async_future);
715+
encoder->pending_async_future = NULL;
716+
717+
if (eof) {
718+
s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE);
719+
} else {
720+
ENCODER_LOG(DEBUG, encoder, "Error occurred or buffer was full but eof not reached. We have to initiate a new encode request with a new buffer.");
721+
s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM);
722+
}
723+
724+
aws_h1_connection_try_write_outgoing_stream(connection);
725+
}
726+
727+
static int s_encode_stream_async(
728+
struct aws_h1_encoder *encoder,
729+
struct aws_byte_buf *dst,
730+
struct aws_async_input_stream *stream) {
731+
732+
if (dst->capacity == dst->len) {
733+
return AWS_OP_ERR;
734+
}
735+
736+
ENCODER_LOG(TRACE, encoder, "Reading from async body stream.");
737+
738+
encoder->pending_async_future = aws_async_input_stream_read_to_fill(stream, dst);
739+
740+
if (aws_future_bool_is_done(encoder->pending_async_future)) {
741+
s_on_async_body_read_complete(encoder);
742+
return encoder->async_error;
743+
}
744+
745+
aws_future_bool_register_callback(encoder->pending_async_future, s_on_async_body_read_complete, encoder);
746+
747+
s_switch_state(encoder, AWS_H1_ENCODER_STATE_ASYNC_WAITING);
748+
749+
return AWS_OP_SUCCESS;
750+
}
751+
696752
/* Initial state. Waits until a new message is set */
697753
static int s_state_fn_init(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
698754
(void)dst;
@@ -720,7 +776,8 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf *
720776
aws_byte_buf_clean_up(&encoder->message->outgoing_head_buf);
721777

722778
/* Pick next state */
723-
if (encoder->message->body && encoder->message->content_length) {
779+
/* Experimentally supporting async streams for unchunked requests.*/
780+
if ((encoder->message->body || encoder->message->async_body) && encoder->message->content_length) {
724781
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM);
725782

726783
} else if (encoder->message->body && encoder->message->has_chunked_encoding_header) {
@@ -736,6 +793,9 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf *
736793

737794
/* Write out body with known Content-Length (not using chunked encoding). */
738795
static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
796+
if (encoder->message->async_body) {
797+
return s_encode_stream_async(encoder, dst, encoder->message->async_body);
798+
}
739799
bool done;
740800
if (s_encode_stream(encoder, dst, encoder->message->body, encoder->message->content_length, &done)) {
741801
return AWS_OP_ERR;
@@ -750,6 +810,13 @@ static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, stru
750810
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE);
751811
}
752812

813+
static int s_state_fn_async_waiting(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
814+
(void) dst;
815+
ENCODER_LOG(ERROR, encoder, "This point should never be reached. We should come back to the encoder only after the state has changed from ASYNC WAITING");
816+
817+
return AWS_OP_ERR;
818+
}
819+
753820
/* Write out body (of unknown Content-Length) using chunked encoding.
754821
* Each pass through this state writes out 1 chunk of body data (or nothing at all). */
755822
static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
@@ -994,6 +1061,7 @@ static struct encoder_state_def s_encoder_states[] = {
9941061
[AWS_H1_ENCODER_STATE_INIT] = {.fn = s_state_fn_init, .name = "INIT"},
9951062
[AWS_H1_ENCODER_STATE_HEAD] = {.fn = s_state_fn_head, .name = "HEAD"},
9961063
[AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM] = {.fn = s_state_fn_unchunked_body_stream, .name = "BODY"},
1064+
[AWS_H1_ENCODER_STATE_ASYNC_WAITING] = {.fn = s_state_fn_async_waiting, .name = "WAITING"},
9971065
[AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM] = {.fn = s_state_fn_chunked_body_stream, .name = "CHUNKED_BODY_STREAM"},
9981066
[AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK] =
9991067
{.fn = s_state_fn_chunked_body_stream_last_chunk, .name = "LAST_CHUNK"},

source/request_response.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <aws/http/status_code.h>
1414
#include <aws/io/logging.h>
1515
#include <aws/io/stream.h>
16+
#include <aws/io/async_stream.h>
1617

1718
#ifdef _MSC_VER
1819
# pragma warning(disable : 4204) /* non-constant aggregate initializer */
@@ -453,6 +454,7 @@ struct aws_http_message {
453454
struct aws_allocator *allocator;
454455
struct aws_http_headers *headers;
455456
struct aws_input_stream *body_stream;
457+
struct aws_async_input_stream *async_body_stream;
456458
struct aws_atomic_var refcount;
457459
enum aws_http_version http_version;
458460

@@ -781,6 +783,17 @@ void aws_http_message_set_body_stream(struct aws_http_message *message, struct a
781783
}
782784
}
783785

786+
void aws_http_message_set_async_body_stream(struct aws_http_message *message, struct aws_async_input_stream *async_body_stream) {
787+
AWS_PRECONDITION(message);
788+
/* release previous stream, if any */
789+
aws_async_input_stream_release(message->async_body_stream);
790+
791+
message->async_body_stream = async_body_stream;
792+
if (message->async_body_stream) {
793+
aws_async_input_stream_acquire(message->async_body_stream);
794+
}
795+
}
796+
784797
int aws_http1_stream_write_chunk(struct aws_http_stream *http1_stream, const struct aws_http1_chunk_options *options) {
785798
AWS_PRECONDITION(http1_stream);
786799
AWS_PRECONDITION(http1_stream->vtable);
@@ -829,6 +842,11 @@ struct aws_input_stream *aws_http_message_get_body_stream(const struct aws_http_
829842
return message->body_stream;
830843
}
831844

845+
struct aws_async_input_stream *aws_http_message_get_async_body_stream(const struct aws_http_message *message) {
846+
AWS_PRECONDITION(message);
847+
return message->async_body_stream;
848+
}
849+
832850
struct aws_http_headers *aws_http_message_get_headers(const struct aws_http_message *message) {
833851
AWS_PRECONDITION(message);
834852
return message->headers;

0 commit comments

Comments
 (0)