diff --git a/bin/elasticurl/main.c b/bin/elasticurl/main.c index bedcb7ba0..886f657a6 100644 --- a/bin/elasticurl/main.c +++ b/bin/elasticurl/main.c @@ -64,6 +64,11 @@ struct elasticurl_ctx { enum aws_log_level log_level; enum aws_http_version required_http_version; bool exchange_completed; + bool manual_write; + bool manual_write_chunked; + int64_t manual_write_content_length; + struct aws_http_stream *stream; + bool stream_ready; }; static void s_usage(int exit_code) { @@ -96,6 +101,7 @@ static void s_usage(int exit_code) { fprintf(stderr, " --version: print the version of elasticurl.\n"); fprintf(stderr, " --http2: HTTP/2 connection required\n"); fprintf(stderr, " --http1_1: HTTP/1.1 connection required\n"); + fprintf(stderr, " --manual-write: interactively write request body via stdin\n"); fprintf(stderr, " -h, --help\n"); fprintf(stderr, " Display this message and quit.\n"); exit(exit_code); @@ -125,6 +131,7 @@ static struct aws_cli_option s_long_options[] = { {"version", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'V'}, {"http2", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'w'}, {"http1_1", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'W'}, + {"manual-write", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'n'}, {"help", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'h'}, /* Per getopt(3) the last element of the array has to be filled with all zeros */ {NULL, AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 0}, @@ -162,7 +169,7 @@ static void s_parse_options(int argc, char **argv, struct elasticurl_ctx *ctx) { while (true) { int option_index = 0; int c = - aws_cli_getopt_long(argc, argv, "a:b:c:e:f:H:d:g:j:l:m:M:GPHiko:t:v:VwWh", s_long_options, &option_index); + aws_cli_getopt_long(argc, argv, "a:b:c:e:f:H:d:g:j:l:m:M:GPHiko:t:v:VwWnh", s_long_options, &option_index); if (c == -1) { break; } @@ -276,6 +283,9 @@ static void s_parse_options(int argc, char **argv, struct elasticurl_ctx *ctx) { ctx->alpn = "http/1.1"; ctx->required_http_version = AWS_HTTP_VERSION_1_1; break; + case 'n': + ctx->manual_write = true; + break; case 'h': s_usage(0); break; @@ -432,7 +442,26 @@ static struct aws_http_message *s_build_http_request( }; aws_http_message_add_header(request, user_agent_header); - if (app_ctx->input_body) { + if (app_ctx->manual_write) { + /* Manual write mode: set headers but no body stream. + * H2 doesn't use Transfer-Encoding — just send DATA frames. */ + if (app_ctx->manual_write_chunked && protocol_version != AWS_HTTP_VERSION_2) { + struct aws_http_header te_header = { + .name = aws_byte_cursor_from_c_str("transfer-encoding"), + .value = aws_byte_cursor_from_c_str("chunked"), + }; + aws_http_message_add_header(request, te_header); + } else if (!app_ctx->manual_write_chunked) { + char content_length[64]; + AWS_ZERO_ARRAY(content_length); + snprintf(content_length, sizeof(content_length), "%" PRIi64, app_ctx->manual_write_content_length); + struct aws_http_header cl_header = { + .name = aws_byte_cursor_from_c_str("content-length"), + .value = aws_byte_cursor_from_c_str(content_length), + }; + aws_http_message_add_header(request, cl_header); + } + } else if (app_ctx->input_body) { int64_t data_len = 0; if (aws_input_stream_get_length(app_ctx->input_body, &data_len)) { fprintf(stderr, "failed to get length of input stream.\n"); @@ -522,6 +551,7 @@ static void s_on_signing_complete(struct aws_http_message *request, int error_co .on_response_header_block_done = s_on_incoming_header_block_done_fn, .on_response_body = s_on_incoming_body_fn, .on_complete = s_on_stream_complete_fn, + .use_manual_data_writes = app_ctx->manual_write, }; app_ctx->response_code_written = false; @@ -533,6 +563,15 @@ static void s_on_signing_complete(struct aws_http_message *request, int error_co } aws_http_stream_activate(stream); + if (app_ctx->manual_write) { + /* Store stream and signal main thread to begin interactive writes */ + app_ctx->stream = stream; + aws_mutex_lock(&app_ctx->mutex); + app_ctx->stream_ready = true; + aws_mutex_unlock(&app_ctx->mutex); + aws_condition_variable_notify_all(&app_ctx->c_var); + } + /* Connection will stay alive until stream completes */ aws_http_connection_release(app_ctx->connection); app_ctx->connection = NULL; @@ -554,6 +593,103 @@ static bool s_completion_predicate(void *arg) { return app_ctx->exchange_completed; } +static bool s_stream_ready_predicate(void *arg) { + struct elasticurl_ctx *app_ctx = arg; + return app_ctx->stream_ready || app_ctx->exchange_completed; +} + +struct manual_write_ctx { + struct aws_allocator *allocator; + uint8_t *data; + struct aws_input_stream *stream; +}; + +static void s_on_manual_write_complete(struct aws_http_stream *stream, int error_code, void *user_data) { + (void)stream; + (void)error_code; + struct manual_write_ctx *ctx = user_data; + aws_input_stream_release(ctx->stream); + aws_mem_release(ctx->allocator, ctx->data); + aws_mem_release(ctx->allocator, ctx); +} + +static void s_manual_write_loop(struct elasticurl_ctx *app_ctx) { + /* Wait for stream to be activated */ + aws_mutex_lock(&app_ctx->mutex); + aws_condition_variable_wait_pred(&app_ctx->c_var, &app_ctx->mutex, s_stream_ready_predicate, app_ctx); + aws_mutex_unlock(&app_ctx->mutex); + + if (app_ctx->exchange_completed) { + return; + } + + int64_t bytes_sent = 0; + char line_buf[4096]; + + fprintf(stderr, "Enter data (empty line to finish):\n"); + while (fgets(line_buf, sizeof(line_buf), stdin)) { + /* Strip trailing newline */ + size_t len = strlen(line_buf); + if (len > 0 && line_buf[len - 1] == '\n') { + line_buf[--len] = '\0'; + } + + /* Empty line = done */ + if (len == 0) { + break; + } + + /* Heap-allocate data so it outlives this stack frame */ + uint8_t *heap_data = aws_mem_calloc(app_ctx->allocator, 1, len); + memcpy(heap_data, line_buf, len); + + struct aws_byte_cursor data_cursor = aws_byte_cursor_from_array(heap_data, len); + struct aws_input_stream *data_stream = + aws_input_stream_new_from_cursor(app_ctx->allocator, &data_cursor); + + struct manual_write_ctx *write_ctx = aws_mem_calloc(app_ctx->allocator, 1, sizeof(struct manual_write_ctx)); + write_ctx->allocator = app_ctx->allocator; + write_ctx->data = heap_data; + write_ctx->stream = data_stream; + + struct aws_http_stream_write_data_options write_opts = { + .data = data_stream, + .end_stream = false, + .on_complete = s_on_manual_write_complete, + .user_data = write_ctx, + }; + + if (aws_http_stream_write_data(app_ctx->stream, &write_opts)) { + fprintf(stderr, "write_data failed: %s\n", aws_error_debug_str(aws_last_error())); + aws_input_stream_release(data_stream); + aws_mem_release(app_ctx->allocator, heap_data); + aws_mem_release(app_ctx->allocator, write_ctx); + break; + } + + bytes_sent += (int64_t)len; + fprintf(stderr, "Sent %zu bytes (total: %" PRIi64 ")\n", len, bytes_sent); + } + + /* Send final write */ + struct aws_byte_cursor empty_cursor = aws_byte_cursor_from_c_str(""); + struct aws_input_stream *empty_stream = + aws_input_stream_new_from_cursor(app_ctx->allocator, &empty_cursor); + + struct aws_http_stream_write_data_options final_opts = { + .data = empty_stream, + .end_stream = true, + }; + + if (aws_http_stream_write_data(app_ctx->stream, &final_opts)) { + fprintf(stderr, "final write_data failed: %s\n", aws_error_debug_str(aws_last_error())); + } else { + fprintf(stderr, "Stream complete. Sent %" PRIi64 " bytes.\n", bytes_sent); + } + + aws_input_stream_release(empty_stream); +} + int main(int argc, char **argv) { struct aws_allocator *allocator = aws_default_allocator(); @@ -579,6 +715,25 @@ int main(int argc, char **argv) { s_parse_options(argc, argv, &app_ctx); + /* Interactive prompt for manual-write mode */ + if (app_ctx.manual_write) { + if (!strcmp(app_ctx.verb, "POST")) { + fprintf(stderr, "Only POST requests allowed for manual_writes. Exiting... \n"); + return 1; + } + fprintf(stderr, "Manual write mode enabled.\n"); + fprintf(stderr, "Content-Length (leave empty for chunked transfer encoding): "); + char cl_buf[64]; + if (fgets(cl_buf, sizeof(cl_buf), stdin) && cl_buf[0] != '\n') { + app_ctx.manual_write_content_length = (int64_t)atoll(cl_buf); + app_ctx.manual_write_chunked = false; + fprintf(stderr, "Using Content-Length: %" PRIi64 "\n", app_ctx.manual_write_content_length); + } else { + app_ctx.manual_write_chunked = true; + fprintf(stderr, "Using chunked transfer encoding.\n"); + } + } + struct aws_logger logger; AWS_ZERO_STRUCT(logger); @@ -728,6 +883,9 @@ int main(int argc, char **argv) { http_client_options.prior_knowledge_http2 = true; } aws_http_client_connect(&http_client_options); + if (app_ctx.manual_write) { + s_manual_write_loop(&app_ctx); + } aws_mutex_lock(&app_ctx.mutex); aws_condition_variable_wait_pred(&app_ctx.c_var, &app_ctx.mutex, s_completion_predicate, &app_ctx); aws_mutex_unlock(&app_ctx.mutex); diff --git a/include/aws/http/private/h1_encoder.h b/include/aws/http/private/h1_encoder.h index 8f82b23d2..e09b970e7 100644 --- a/include/aws/http/private/h1_encoder.h +++ b/include/aws/http/private/h1_encoder.h @@ -8,6 +8,8 @@ #include #include +struct aws_h1_data_write; + struct aws_h1_chunk { struct aws_allocator *allocator; struct aws_input_stream *data; @@ -44,10 +46,18 @@ struct aws_h1_encoder_message { /* Pointer to chunked_trailer, used for chunked_trailer. */ struct aws_h1_trailer *trailer; + /* Pointer to list of `struct aws_h1_data_write`, used for manual data writes with Content-Length. + * List is owned by aws_h1_stream. */ + struct aws_linked_list *pending_data_write_list; + + /* Current data write being processed (for manual data writes with Content-Length) */ + struct aws_h1_data_write *current_data_write; + /* If non-zero, length of unchunked body to send */ uint64_t content_length; bool has_connection_close_header; bool has_chunked_encoding_header; + bool has_manual_data_writes; }; enum aws_h1_encoder_state { @@ -64,6 +74,9 @@ enum aws_h1_encoder_state { AWS_H1_ENCODER_STATE_CHUNK_BODY, AWS_H1_ENCODER_STATE_CHUNK_END, AWS_H1_ENCODER_STATE_CHUNK_TRAILER, + /* The _DATA_WRITE_ states support the write_data() API (manual data writes with Content-Length) */ + AWS_H1_ENCODER_STATE_DATA_WRITE_NEXT, + AWS_H1_ENCODER_STATE_DATA_WRITE_BODY, AWS_H1_ENCODER_STATE_DONE, }; @@ -104,7 +117,9 @@ int aws_h1_encoder_message_init_from_request( struct aws_h1_encoder_message *message, struct aws_allocator *allocator, const struct aws_http_message *request, - struct aws_linked_list *pending_chunk_list); + struct aws_linked_list *pending_chunk_list, + struct aws_linked_list *pending_data_write_list, + bool use_manual_data_writes); int aws_h1_encoder_message_init_from_response( struct aws_h1_encoder_message *message, @@ -138,6 +153,10 @@ bool aws_h1_encoder_is_message_in_progress(const struct aws_h1_encoder *encoder) AWS_HTTP_API bool aws_h1_encoder_is_waiting_for_chunks(const struct aws_h1_encoder *encoder); +/* Return true if the encoder is stuck waiting for more data writes to be added to the current message */ +AWS_HTTP_API +bool aws_h1_encoder_is_waiting_for_data_writes(const struct aws_h1_encoder *encoder); + AWS_EXTERN_C_END #endif /* AWS_HTTP_H1_ENCODER_H */ diff --git a/include/aws/http/private/h1_stream.h b/include/aws/http/private/h1_stream.h index 2efa2c379..449df7e6a 100644 --- a/include/aws/http/private/h1_stream.h +++ b/include/aws/http/private/h1_stream.h @@ -22,6 +22,15 @@ enum aws_h1_stream_api_state { AWS_H1_STREAM_API_STATE_COMPLETE, }; +struct aws_h1_data_write { + struct aws_allocator *allocator; + struct aws_input_stream *data; + aws_http_stream_write_complete_fn *on_complete; + void *user_data; + struct aws_linked_list_node node; + bool is_end_stream; +}; + struct aws_h1_stream { struct aws_http_stream base; @@ -40,6 +49,9 @@ struct aws_h1_stream { */ struct aws_channel_task cross_thread_work_task; + /* Whether the stream is using manual data writes instead of input_stream */ + bool using_manual_data_writes : 1; + struct { /* Message (derived from outgoing request or response) to be submitted to encoder */ struct aws_h1_encoder_message encoder_message; @@ -66,6 +78,12 @@ struct aws_h1_stream { * Only body data (not headers, etc) counts against the stream's flow-control window. */ uint64_t stream_window; + /* List of `struct aws_h1_data_write` which have been moved from synced_data for processing */ + struct aws_linked_list pending_data_write_list; + + /* Whether the final data write (with is_end_stream=true) has been received */ + bool has_final_data_write : 1; + /* Whether a "request handler" stream has a response to send. * Has mirror variable in synced_data */ bool has_outgoing_response : 1; @@ -83,6 +101,10 @@ struct aws_h1_stream { * but haven't yet moved to thread_data.encoder_message.pending_chunk_list where the encoder will find them. */ struct aws_linked_list pending_chunk_list; + /* List of `struct aws_h1_data_write` which have been submitted by user, + * but haven't yet moved to thread_data.pending_data_write_list where the encoder will find them. */ + struct aws_linked_list pending_data_write_list; + /* trailing headers which have been submitted by user, * but haven't yet moved to thread_data.encoder_message where the encoder will find them. */ struct aws_h1_trailer *pending_trailer; @@ -105,6 +127,9 @@ struct aws_h1_stream { /* Whether the final 0 length chunk has already been sent */ bool has_final_chunk : 1; + /* Whether the final data write (with is_end_stream=true) has been received */ + bool has_final_data_write : 1; + /* Whether the chunked trailer has already been sent */ bool has_added_trailer : 1; } synced_data; @@ -123,4 +148,9 @@ void aws_h1_stream_cancel(struct aws_http_stream *stream, int error_code); int aws_h1_stream_send_response(struct aws_h1_stream *stream, struct aws_http_message *response); +void aws_h1_data_write_complete_and_destroy( + struct aws_h1_data_write *data_write, + struct aws_http_stream *http_stream, + int error_code); + #endif /* AWS_HTTP_H1_STREAM_H */ diff --git a/include/aws/http/private/request_response_impl.h b/include/aws/http/private/request_response_impl.h index acc5d9dd7..5e57aaaf5 100644 --- a/include/aws/http/private/request_response_impl.h +++ b/include/aws/http/private/request_response_impl.h @@ -25,9 +25,7 @@ struct aws_http_stream_vtable { int (*http2_reset_stream)(struct aws_http_stream *http2_stream, uint32_t http2_error); int (*http2_get_received_error_code)(struct aws_http_stream *http2_stream, uint32_t *http2_error); int (*http2_get_sent_error_code)(struct aws_http_stream *http2_stream, uint32_t *http2_error); - int (*http2_write_data)( - struct aws_http_stream *http2_stream, - const struct aws_http2_stream_write_data_options *options); + int (*write_data)(struct aws_http_stream *stream, const struct aws_http_stream_write_data_options *options); }; /** diff --git a/include/aws/http/request_response.h b/include/aws/http/request_response.h index acdbab0d6..c323db428 100644 --- a/include/aws/http/request_response.h +++ b/include/aws/http/request_response.h @@ -329,8 +329,30 @@ struct aws_http_make_request_options { aws_http_on_stream_destroy_fn *on_destroy; /** + * When true, request body data will be provided over time via `aws_http_stream_write_data()`. + * The stream will only be polled for writing when data has been supplied. + * When false (default), the entire request body is read from the input stream immediately. + * + * HTTP/1.1 requirements: + * - SHOULD have either `Content-Length` OR `Transfer-Encoding: chunked` header (but not both). + * Fails with AWS_ERROR_HTTP_INVALID_HEADER_FIELD if both are set. + * Transfer-Encoding: chunked header will be automatically added if neither header is set. + * - MUST NOT have a body stream set. Fails with AWS_ERROR_HTTP_INVALID_HEADER_FIELD otherwise. + * - With `Content-Length`: total bytes written must exactly match the declared length. + * Fails with AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT if data exceeds Content-Length, + * or if `end_stream` is set before enough data is written. + * - With `Transfer-Encoding: chunked`: no length validation, data sent as chunks. + * + * HTTP/2: No `Content-Length` or `Transfer-Encoding` header required. Data sent via DATA frames. + * Note: When this variable is set, we expect request to be ended with a data write with end_stream=true. + */ + bool use_manual_data_writes; + + /** + * This field will be DEPRECATED and removed in a future release. + * Use `use_manual_data_writes` instead, which works for both HTTP/1.1 and HTTP/2. * When using HTTP/2, request body data will be provided over time. The stream will only be polled for writing - * when data has been supplied via `aws_http2_stream_write_data` + * when data has been supplied via `aws_http2_stream_write_data`. */ bool http2_use_manual_data_writes; @@ -498,34 +520,54 @@ struct aws_http1_chunk_options { */ typedef aws_http_stream_write_complete_fn aws_http2_stream_write_data_complete_fn; +/** + * Common fields for write_data options structures. + * This macro allows protocol-specific options to share the same base fields. + */ +#define AWS_HTTP_STREAM_WRITE_DATA_OPTIONS_FIELDS \ + /** \ + * The data to be sent. \ + * Optional. May be NULL to write zero bytes. \ + * If NULL and end_stream is false, the write is a no-op. \ + * If NULL and end_stream is true, the stream is completed with zero bytes written. \ + * With Content-Length, total bytes across all writes must match the declared length \ + * or the stream fails with AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT. \ + */ \ + struct aws_input_stream *data; \ + /** \ + * Set true when it's the last data to be sent. \ + * After a write with end_stream, no more data writes will be accepted \ + * (AWS_ERROR_HTTP_MANUAL_WRITE_HAS_COMPLETED). \ + */ \ + bool end_stream; \ + /** \ + * Invoked when the data is no longer in use, whether or not it was successfully sent. \ + * Optional. \ + * See `aws_http_stream_write_complete_fn`. \ + * Called with AWS_ERROR_SUCCESS if data was sent successfully. \ + * Called with AWS_ERROR_HTTP_STREAM_HAS_COMPLETED if the stream was torn down \ + * before this write could be processed. \ + * Called with another error code if this write's data caused a problem. \ + */ \ + aws_http_stream_write_complete_fn *on_complete; \ + /** \ + * User provided data passed to the on_complete callback on its invocation. \ + */ \ + void *user_data; + +/** + * Unified options for writing data to an HTTP stream. + * Works with both HTTP/1.1 and HTTP/2. + */ +struct aws_http_stream_write_data_options { + AWS_HTTP_STREAM_WRITE_DATA_OPTIONS_FIELDS +}; + /** * Encoding options for manual H2 data frame writes */ struct aws_http2_stream_write_data_options { - /** - * The data to be sent. - * Optional. - * If not set, input stream with length 0 will be used. - */ - struct aws_input_stream *data; - - /** - * Set true when it's the last chunk to be sent. - * After a write with end_stream, no more data write will be accepted. - */ - bool end_stream; - - /** - * Invoked when the data stream is no longer in use, whether or not it was successfully sent. - * Optional. - * See `aws_http2_stream_write_data_complete_fn`. - */ - aws_http2_stream_write_data_complete_fn *on_complete; - - /** - * User provided data passed to the on_complete callback on its invocation. - */ - void *user_data; + AWS_HTTP_STREAM_WRITE_DATA_OPTIONS_FIELDS }; #define AWS_HTTP_REQUEST_HANDLER_OPTIONS_INIT \ @@ -931,6 +973,34 @@ AWS_HTTP_API int aws_http1_stream_write_chunk( const struct aws_http1_chunk_options *options); /** + * Write data to an HTTP stream in a protocol-agnostic way. + * Works with both HTTP/1.1 and HTTP/2. + * + * The stream must have specified `use_manual_data_writes` during request creation. + * For HTTP/1.1: The request must have either a Content-Length OR Transfer-Encoding: chunked header, + * but not both. Transfer-Encoding: chunked is automatically added if neither is set. + * The request must NOT have a body stream set. + * + * For client streams, activate() must be called before any data is written. + * For server streams, the response must be submitted before any data is written. + * + * If data is NULL and end_stream is false, the call is a no-op. + * If data is NULL and end_stream is true, the stream is completed with zero bytes written. + * A write with end_stream set to true will prevent any further writes. + * + * @return AWS_OP_SUCCESS if the write was queued successfully. + * AWS_OP_ERR indicating the attempt raised an error code: + * AWS_ERROR_HTTP_MANUAL_WRITE_NOT_ENABLED if use_manual_data_writes was not set. + * AWS_ERROR_HTTP_MANUAL_WRITE_HAS_COMPLETED if a previous write already set end_stream. + * AWS_ERROR_HTTP_STREAM_NOT_ACTIVATED if the stream has not been activated. + * AWS_ERROR_HTTP_STREAM_HAS_COMPLETED if the stream ended before this call. + */ +AWS_HTTP_API int aws_http_stream_write_data( + struct aws_http_stream *stream, + const struct aws_http_stream_write_data_options *options); + +/** + * This API will be DEPRECATED in favor of protocol agnostic `aws_http_stream_write_data` API. * The stream must have specified `http2_use_manual_data_writes` during request creation. * For client streams, activate() must be called before any frames are submitted. * For server streams, the response headers must be submitted before any frames. diff --git a/source/h1_connection.c b/source/h1_connection.c index 40839d23b..106b04ad3 100644 --- a/source/h1_connection.c +++ b/source/h1_connection.c @@ -710,8 +710,10 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) { /* Mark stream complete */ stream->synced_data.api_state = AWS_H1_STREAM_API_STATE_COMPLETE; - /* Move chunks out of synced data */ + /* Move chunks and data writes out of synced data */ aws_linked_list_move_all_back(&stream->thread_data.pending_chunk_list, &stream->synced_data.pending_chunk_list); + aws_linked_list_move_all_back( + &stream->thread_data.pending_data_write_list, &stream->synced_data.pending_data_write_list); aws_h1_connection_unlock_synced_data(connection); } /* END CRITICAL SECTION */ @@ -723,6 +725,13 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) { aws_h1_chunk_complete_and_destroy(chunk, &stream->base, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED); } + /* Complete any leftover data writes */ + while (!aws_linked_list_empty(&stream->thread_data.pending_data_write_list)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&stream->thread_data.pending_data_write_list); + struct aws_h1_data_write *data_write = AWS_CONTAINER_OF(node, struct aws_h1_data_write, node); + aws_h1_data_write_complete_and_destroy(data_write, &stream->base, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED); + } + if (stream->base.on_metrics) { stream->base.on_metrics(&stream->base, &stream->base.metrics, stream->base.user_data); } @@ -1072,14 +1081,17 @@ static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool f * The outgoing stream task will be kicked off again when user adds more data (new stream, new chunk, etc) */ struct aws_h1_stream *outgoing_stream = s_update_outgoing_stream_ptr(connection); bool waiting_for_chunks = aws_h1_encoder_is_waiting_for_chunks(&connection->thread_data.encoder); - if (!outgoing_stream || waiting_for_chunks) { + bool waiting_for_data_writes = aws_h1_encoder_is_waiting_for_data_writes(&connection->thread_data.encoder); + if (!outgoing_stream || waiting_for_chunks || waiting_for_data_writes) { if (!first_try) { AWS_LOGF_TRACE( AWS_LS_HTTP_CONNECTION, - "id=%p: Outgoing stream task stopped. outgoing_stream=%p waiting_for_chunks:%d", + "id=%p: Outgoing stream task stopped. outgoing_stream=%p waiting_for_chunks:%d " + "waiting_for_data_writes:%d", (void *)&connection->base, outgoing_stream ? (void *)&outgoing_stream->base : NULL, - waiting_for_chunks); + waiting_for_chunks, + waiting_for_data_writes); } connection->thread_data.is_outgoing_stream_task_active = false; return; diff --git a/source/h1_encoder.c b/source/h1_encoder.c index aecc01977..a441c34dd 100644 --- a/source/h1_encoder.c +++ b/source/h1_encoder.c @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ #include +#include #include #include #include @@ -25,7 +26,8 @@ static int s_scan_outgoing_headers( const struct aws_http_message *message, size_t *out_header_lines_len, bool body_headers_ignored, - bool body_headers_forbidden) { + bool body_headers_forbidden, + bool use_manual_data_writes) { size_t total = 0; bool has_body_stream = aws_http_message_get_body_stream(message); @@ -140,7 +142,13 @@ static int s_scan_outgoing_headers( encoder_message->has_chunked_encoding_header = false; } - if (encoder_message->content_length > 0 && !has_body_stream) { + if (use_manual_data_writes && has_body_stream) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, "id=static: Body stream must not be set when manual data writes are enabled"); + return aws_raise_error(AWS_ERROR_HTTP_INVALID_HEADER_FIELD); + } + + if (encoder_message->content_length > 0 && !has_body_stream && !use_manual_data_writes) { return aws_raise_error(AWS_ERROR_HTTP_MISSING_BODY_STREAM); } @@ -242,7 +250,9 @@ int aws_h1_encoder_message_init_from_request( struct aws_h1_encoder_message *message, struct aws_allocator *allocator, const struct aws_http_message *request, - struct aws_linked_list *pending_chunk_list) { + struct aws_linked_list *pending_chunk_list, + struct aws_linked_list *pending_data_write_list, + bool use_manual_data_writes) { AWS_PRECONDITION(aws_linked_list_is_valid(pending_chunk_list)); @@ -250,6 +260,8 @@ int aws_h1_encoder_message_init_from_request( message->body = aws_input_stream_acquire(aws_http_message_get_body_stream(request)); message->pending_chunk_list = pending_chunk_list; + message->pending_data_write_list = pending_data_write_list; + message->has_manual_data_writes = use_manual_data_writes; struct aws_byte_cursor method; int err = aws_http_message_get_request_method(request, &method); @@ -278,6 +290,19 @@ int aws_h1_encoder_message_init_from_request( goto error; } + /* For manual data writes, auto-add Transfer-Encoding: chunked if neither + * Content-Length nor Transfer-Encoding is set */ + if (use_manual_data_writes) { + struct aws_http_headers *headers = aws_http_message_get_headers(request); + if (!aws_http_headers_has(headers, aws_byte_cursor_from_c_str("Content-Length")) && + !aws_http_headers_has(headers, aws_byte_cursor_from_c_str("Transfer-Encoding"))) { + if (aws_http_headers_add( + headers, aws_byte_cursor_from_c_str("Transfer-Encoding"), aws_byte_cursor_from_c_str("chunked"))) { + goto error; + } + } + } + struct aws_byte_cursor version = aws_http_version_to_str(AWS_HTTP_VERSION_1_1); /** @@ -286,7 +311,12 @@ int aws_h1_encoder_message_init_from_request( size_t header_lines_len; err = s_scan_outgoing_headers( - message, request, &header_lines_len, false /*body_headers_ignored*/, false /*body_headers_forbidden*/); + message, + request, + &header_lines_len, + false /*body_headers_ignored*/, + false /*body_headers_forbidden*/, + use_manual_data_writes); if (err) { goto error; } @@ -374,7 +404,13 @@ int aws_h1_encoder_message_init_from_response( */ body_headers_ignored |= status_int == AWS_HTTP_STATUS_CODE_304_NOT_MODIFIED; bool body_headers_forbidden = status_int == AWS_HTTP_STATUS_CODE_204_NO_CONTENT || status_int / 100 == 1; - err = s_scan_outgoing_headers(message, response, &header_lines_len, body_headers_ignored, body_headers_forbidden); + err = s_scan_outgoing_headers( + message, + response, + &header_lines_len, + body_headers_ignored, + body_headers_forbidden, + false /*use_manual_data_writes*/); if (err) { goto error; } @@ -688,8 +724,17 @@ typedef int encoder_state_fn(struct aws_h1_encoder *encoder, struct aws_byte_buf /* Switch state. * The only reason this returns a value is so it can be called with `return` to conclude a state function */ static int s_switch_state(struct aws_h1_encoder *encoder, enum aws_h1_encoder_state state) { + /* Don't reset progress_bytes when transitioning between DATA_WRITE states, + * as we need to track cumulative progress across multiple writes */ + bool preserve_progress = + (encoder->state == AWS_H1_ENCODER_STATE_DATA_WRITE_NEXT || + encoder->state == AWS_H1_ENCODER_STATE_DATA_WRITE_BODY) && + (state == AWS_H1_ENCODER_STATE_DATA_WRITE_NEXT || state == AWS_H1_ENCODER_STATE_DATA_WRITE_BODY); + encoder->state = state; - encoder->progress_bytes = 0; + if (!preserve_progress) { + encoder->progress_bytes = 0; + } return AWS_OP_SUCCESS; } @@ -720,7 +765,15 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf * aws_byte_buf_clean_up(&encoder->message->outgoing_head_buf); /* Pick next state */ - if (encoder->message->body && encoder->message->content_length) { + if (encoder->message->has_manual_data_writes && encoder->message->has_chunked_encoding_header) { + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_NEXT); + + } else if (encoder->message->has_manual_data_writes) { + /* if chunked encoding header is not present we automatically assume the content-length header was provided + * because content length provided can also be zero. */ + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DATA_WRITE_NEXT); + + } else if (encoder->message->body && encoder->message->content_length) { return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM); } else if (encoder->message->body && encoder->message->has_chunked_encoding_header) { @@ -728,7 +781,6 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf * } else if (encoder->message->has_chunked_encoding_header) { return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_NEXT); - } else { return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE); } @@ -976,6 +1028,123 @@ static int s_state_fn_chunk_trailer(struct aws_h1_encoder *encoder, struct aws_b return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE); } +/* Select next data write to work on for manual data writes with Content-Length. + * Encoder is essentially "paused" here if no data writes are available. */ +static int s_encoder_state_data_write_next(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { + (void)dst; + + if (aws_linked_list_empty(encoder->message->pending_data_write_list)) { + ENCODER_LOG(TRACE, encoder, "No data writes ready, waiting..."); + return AWS_OP_SUCCESS; + } + + struct aws_linked_list_node *node = aws_linked_list_front(encoder->message->pending_data_write_list); + encoder->message->current_data_write = AWS_CONTAINER_OF(node, struct aws_h1_data_write, node); + + ENCODER_LOG(TRACE, encoder, "Begin sending manual data write"); + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DATA_WRITE_BODY); +} + +/* Write out data from current manual data write */ +static int s_encoder_state_data_write_body(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { + struct aws_h1_data_write *data_write = encoder->message->current_data_write; + + if (dst->capacity == dst->len) { + return AWS_OP_SUCCESS; + } + + /* Read from stream */ + ENCODER_LOG(TRACE, encoder, "Reading from manual data write stream"); + const size_t prev_len = dst->len; + size_t amount_read = dst->len - prev_len; + int error_code = AWS_OP_ERR; + + if (data_write->data) { + int err = aws_input_stream_read(data_write->data, dst); + amount_read = dst->len - prev_len; + + if (err) { + ENCODER_LOGF( + ERROR, + encoder, + "Failed to read data write stream, error %d (%s)", + aws_last_error(), + aws_error_name(aws_last_error())); + error_code = aws_last_error(); + goto error; + } + } + + /* Increment progress_bytes and check we haven't exceeded Content-Length */ + if (aws_add_u64_checked(encoder->progress_bytes, amount_read, &encoder->progress_bytes) || + encoder->progress_bytes > encoder->message->content_length) { + ENCODER_LOGF( + ERROR, encoder, "Manual data writes exceeded Content-Length: %" PRIu64, encoder->message->content_length); + error_code = AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT; + goto error; + } + + ENCODER_LOGF( + TRACE, + encoder, + "Sent %zu bytes from manual data write, total progress: %" PRIu64 "/%" PRIu64, + amount_read, + encoder->progress_bytes, + encoder->message->content_length); + + if (data_write->data) { + /* If we read something or reached end of stream, check if stream is complete */ + struct aws_stream_status status; + int err = aws_input_stream_get_status(data_write->data, &status); + if (err) { + ENCODER_LOGF( + ERROR, + encoder, + "Failed to query data write stream status, error %d (%s)", + aws_last_error(), + aws_error_name(aws_last_error())); + error_code = aws_last_error(); + goto error; + } + + if (!status.is_end_of_stream) { + /* Stream not done yet, remain in state */ + return AWS_OP_SUCCESS; + } + } + + /* This data write is complete */ + ENCODER_LOG(TRACE, encoder, "Manual data write complete"); + bool is_end = data_write->is_end_stream; + aws_linked_list_remove(&data_write->node); + aws_h1_data_write_complete_and_destroy(data_write, encoder->current_stream, AWS_ERROR_SUCCESS); + encoder->message->current_data_write = NULL; + + if (is_end) { + /* This was the final write, validate total matches Content-Length */ + if (encoder->progress_bytes != encoder->message->content_length) { + ENCODER_LOGF( + ERROR, + encoder, + "Manual data writes sent %" PRIu64 " bytes but Content-Length is %" PRIu64, + encoder->progress_bytes, + encoder->message->content_length); + error_code = AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT; + return aws_raise_error(error_code); + } + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE); + } + + /* More writes expected, go back to waiting for next write */ + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DATA_WRITE_NEXT); + +error: + aws_linked_list_remove(&data_write->node); + aws_h1_data_write_complete_and_destroy(data_write, encoder->current_stream, error_code); + encoder->message->current_data_write = NULL; + return aws_raise_error(error_code); +} + /* Message is done, loop back to start of state machine */ static int s_state_fn_done(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { (void)dst; @@ -1002,6 +1171,8 @@ static struct encoder_state_def s_encoder_states[] = { [AWS_H1_ENCODER_STATE_CHUNK_BODY] = {.fn = s_state_fn_chunk_body, .name = "CHUNK_BODY"}, [AWS_H1_ENCODER_STATE_CHUNK_END] = {.fn = s_state_fn_chunk_end, .name = "CHUNK_END"}, [AWS_H1_ENCODER_STATE_CHUNK_TRAILER] = {.fn = s_state_fn_chunk_trailer, .name = "CHUNK_TRAILER"}, + [AWS_H1_ENCODER_STATE_DATA_WRITE_NEXT] = {.fn = s_encoder_state_data_write_next, .name = "DATA_WRITE_NEXT"}, + [AWS_H1_ENCODER_STATE_DATA_WRITE_BODY] = {.fn = s_encoder_state_data_write_body, .name = "DATA_WRITE_BODY"}, [AWS_H1_ENCODER_STATE_DONE] = {.fn = s_state_fn_done, .name = "DONE"}, }; @@ -1035,3 +1206,8 @@ bool aws_h1_encoder_is_waiting_for_chunks(const struct aws_h1_encoder *encoder) return encoder->state == AWS_H1_ENCODER_STATE_CHUNK_NEXT && aws_linked_list_empty(encoder->message->pending_chunk_list); } + +bool aws_h1_encoder_is_waiting_for_data_writes(const struct aws_h1_encoder *encoder) { + return encoder->state == AWS_H1_ENCODER_STATE_DATA_WRITE_NEXT && + aws_linked_list_empty(encoder->message->pending_data_write_list); +} diff --git a/source/h1_stream.c b/source/h1_stream.c index 2cf43def1..78e5ad531 100644 --- a/source/h1_stream.c +++ b/source/h1_stream.c @@ -13,6 +13,36 @@ #include +static struct aws_h1_data_write *s_data_write_new( + struct aws_allocator *allocator, + const struct aws_http_stream_write_data_options *options) { + struct aws_h1_data_write *data_write = aws_mem_calloc(allocator, 1, sizeof(struct aws_h1_data_write)); + data_write->allocator = allocator; + data_write->data = aws_input_stream_acquire(options->data); + data_write->on_complete = options->on_complete; + data_write->user_data = options->user_data; + data_write->is_end_stream = options->end_stream; + return data_write; +} + +static void s_data_write_destroy(struct aws_h1_data_write *data_write) { + aws_input_stream_release(data_write->data); + aws_mem_release(data_write->allocator, data_write); +} + +void aws_h1_data_write_complete_and_destroy( + struct aws_h1_data_write *data_write, + struct aws_http_stream *http_stream, + int error_code) { + AWS_PRECONDITION(data_write); + aws_http_stream_write_complete_fn *on_complete = data_write->on_complete; + void *user_data = data_write->user_data; + s_data_write_destroy(data_write); + if (on_complete) { + on_complete(http_stream, error_code, user_data); + } +} + static void s_stream_destroy(struct aws_http_stream *stream_base) { struct aws_h1_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h1_stream, base); AWS_ASSERT( @@ -22,6 +52,10 @@ static void s_stream_destroy(struct aws_http_stream *stream_base) { aws_linked_list_empty(&stream->thread_data.pending_chunk_list) && aws_linked_list_empty(&stream->synced_data.pending_chunk_list) && "Chunks should be marked complete before stream destroyed"); + AWS_ASSERT( + aws_linked_list_empty(&stream->thread_data.pending_data_write_list) && + aws_linked_list_empty(&stream->synced_data.pending_data_write_list) && + "Data writes should be marked complete before stream destroyed"); aws_h1_encoder_message_clean_up(&stream->thread_data.encoder_message); aws_h1_encoder_message_clean_up(&stream->synced_data.pending_outgoing_response); @@ -60,8 +94,11 @@ static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void int api_state = stream->synced_data.api_state; /* If we have any new outgoing data, prompt the connection to try and send it. */ - bool new_outgoing_data = !aws_linked_list_empty(&stream->synced_data.pending_chunk_list); + bool new_outgoing_data = !aws_linked_list_empty(&stream->synced_data.pending_chunk_list) || + !aws_linked_list_empty(&stream->synced_data.pending_data_write_list); aws_linked_list_move_all_back(&stream->thread_data.pending_chunk_list, &stream->synced_data.pending_chunk_list); + aws_linked_list_move_all_back( + &stream->thread_data.pending_data_write_list, &stream->synced_data.pending_data_write_list); /* If we JUST learned about having an outgoing response, that's a reason to try sending data */ if (stream->synced_data.has_outgoing_response && !stream->thread_data.has_outgoing_response) { @@ -330,6 +367,136 @@ static int s_stream_add_trailer(struct aws_http_stream *stream_base, const struc return AWS_OP_SUCCESS; } +static int s_stream_write_data( + struct aws_http_stream *stream_base, + const struct aws_http_stream_write_data_options *options) { + AWS_PRECONDITION(stream_base); + AWS_PRECONDITION(options); + struct aws_h1_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h1_stream, base); + + /* NULL data without end_stream is a no-op, but still fire the callback */ + if (!options->data && !options->end_stream) { + if (options->on_complete) { + options->on_complete(stream_base, AWS_ERROR_SUCCESS, options->user_data); + } + return AWS_OP_SUCCESS; + } + + if (!stream->using_manual_data_writes) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, + "id=%p: Manual writes not enabled. Set 'use_manual_data_writes' in aws_http_make_request_options.", + (void *)stream_base); + return aws_raise_error(AWS_ERROR_HTTP_MANUAL_WRITE_NOT_ENABLED); + } + + bool should_schedule_task = false; + bool is_chunked = false; + int error_code = AWS_ERROR_SUCCESS; + + { /* BEGIN CRITICAL SECTION */ + s_stream_lock_synced_data(stream); + + if (stream->synced_data.api_state != AWS_H1_STREAM_API_STATE_ACTIVE) { + error_code = (stream->synced_data.api_state == AWS_H1_STREAM_API_STATE_INIT) + ? AWS_ERROR_HTTP_STREAM_NOT_ACTIVATED + : AWS_ERROR_HTTP_STREAM_HAS_COMPLETED; + goto unlock; + } + + if (stream->synced_data.has_final_data_write) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, + "id=%p: Cannot write data after final write (end_stream=true).", + (void *)stream_base); + error_code = AWS_ERROR_HTTP_MANUAL_WRITE_HAS_COMPLETED; + goto unlock; + } + + is_chunked = stream->synced_data.using_chunked_encoding; + + if (!is_chunked) { + struct aws_h1_data_write *data_write = s_data_write_new(stream_base->alloc, options); + + aws_linked_list_push_back(&stream->synced_data.pending_data_write_list, &data_write->node); + should_schedule_task = !stream->synced_data.is_cross_thread_work_task_scheduled; + stream->synced_data.is_cross_thread_work_task_scheduled = true; + } + + stream->synced_data.has_final_data_write = options->end_stream; + + } /* END CRITICAL SECTION */ + +unlock: + s_stream_unlock_synced_data(stream); + + if (error_code != AWS_ERROR_SUCCESS) { + if (error_code == AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT) { + aws_h1_stream_cancel(stream_base, AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT); + return AWS_OP_SUCCESS; + } + AWS_LOGF_ERROR(AWS_LS_HTTP_STREAM, "id=%p: Could not complete write data successfully.", (void *)stream_base); + return aws_raise_error(error_code); + } + + if (is_chunked) { + /* Send user's data as a chunk if present */ + if (options->data) { + int64_t data_len = 0; + if (aws_input_stream_get_length(options->data, &data_len)) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, + "id=%p: Failed to get data stream length for chunked conversion", + (void *)stream_base); + return AWS_OP_ERR; + } + + struct aws_http1_chunk_options chunk_opts = { + .chunk_data = options->data, + .chunk_data_size = (uint64_t)data_len, + .on_complete = options->end_stream ? NULL : options->on_complete, + .user_data = options->end_stream ? NULL : options->user_data, + }; + + if (aws_http1_stream_write_chunk(stream_base, &chunk_opts)) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, + "id=%p: Failed to write chunk to stream, error %d (%s).", + (void *)stream_base, + aws_last_error(), + aws_error_name(aws_last_error())); + return AWS_OP_ERR; + } + } + + /* end_stream on chunked requires a 0-length termination chunk */ + if (options->end_stream) { + struct aws_http1_chunk_options termination_opts; + AWS_ZERO_STRUCT(termination_opts); + termination_opts.on_complete = options->on_complete; + termination_opts.user_data = options->user_data; + if (aws_http1_stream_write_chunk(stream_base, &termination_opts)) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, + "id=%p: Failed to write termination chunk, error %d (%s).", + (void *)stream_base, + aws_last_error(), + aws_error_name(aws_last_error())); + return AWS_OP_ERR; + } + } + } + + if (should_schedule_task) { + aws_atomic_fetch_add(&stream->base.refcount, 1); + AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Scheduling stream cross-thread work task.", (void *)stream_base); + aws_channel_schedule_task_now( + stream->base.owning_connection->channel_slot->channel, &stream->cross_thread_work_task); + } + + return AWS_OP_SUCCESS; +} + static const struct aws_http_stream_vtable s_stream_vtable = { .destroy = s_stream_destroy, .update_window = s_stream_update_window, @@ -340,6 +507,7 @@ static const struct aws_http_stream_vtable s_stream_vtable = { .http2_reset_stream = NULL, .http2_get_received_error_code = NULL, .http2_get_sent_error_code = NULL, + .write_data = s_stream_write_data, }; static struct aws_h1_stream *s_stream_new_common( @@ -379,6 +547,8 @@ static struct aws_h1_stream *s_stream_new_common( aws_linked_list_init(&stream->thread_data.pending_chunk_list); aws_linked_list_init(&stream->synced_data.pending_chunk_list); + aws_linked_list_init(&stream->thread_data.pending_data_write_list); + aws_linked_list_init(&stream->synced_data.pending_data_write_list); stream->thread_data.stream_window = connection->initial_stream_window_size; @@ -416,12 +586,17 @@ struct aws_h1_stream *aws_h1_stream_new_request( stream->base.client_data->response_first_byte_timeout_ms = options->response_first_byte_timeout_ms; stream->base.on_metrics = options->on_metrics; + /* Set manual data writes flag from options */ + stream->using_manual_data_writes = options->use_manual_data_writes; + /* Validate request and cache info that the encoder will eventually need */ if (aws_h1_encoder_message_init_from_request( &stream->thread_data.encoder_message, client_connection->alloc, options->request, - &stream->thread_data.pending_chunk_list)) { + &stream->thread_data.pending_chunk_list, + &stream->thread_data.pending_data_write_list, + stream->using_manual_data_writes)) { goto error; } diff --git a/source/h2_stream.c b/source/h2_stream.c index eefbcba3a..7d7d78d61 100644 --- a/source/h2_stream.c +++ b/source/h2_stream.c @@ -23,7 +23,7 @@ static int s_stream_get_received_error_code(struct aws_http_stream *stream_base, static int s_stream_get_sent_error_code(struct aws_http_stream *stream_base, uint32_t *out_http2_error); static int s_stream_write_data( struct aws_http_stream *stream_base, - const struct aws_http2_stream_write_data_options *options); + const struct aws_http_stream_write_data_options *options); static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status); static struct aws_h2err s_send_rst_and_close_stream(struct aws_h2_stream *stream, struct aws_h2err stream_error); @@ -42,7 +42,7 @@ struct aws_http_stream_vtable s_h2_stream_vtable = { .http2_reset_stream = s_stream_reset_stream, .http2_get_received_error_code = s_stream_get_received_error_code, .http2_get_sent_error_code = s_stream_get_sent_error_code, - .http2_write_data = s_stream_write_data, + .write_data = s_stream_write_data, }; const char *aws_h2_stream_state_to_str(enum aws_h2_stream_state state) { @@ -334,8 +334,9 @@ struct aws_h2_stream *aws_h2_stream_new_request( /* Init H2 specific stuff */ stream->thread_data.state = AWS_H2_STREAM_STATE_IDLE; /* stream end is implicit if the request isn't using manual data writes */ - stream->synced_data.manual_write_ended = !options->http2_use_manual_data_writes; - stream->manual_write = options->http2_use_manual_data_writes; + bool manual_write = options->use_manual_data_writes || options->http2_use_manual_data_writes; + stream->synced_data.manual_write_ended = !manual_write; + stream->manual_write = manual_write; /* if there's a request body to write, add it as the first outgoing write */ struct aws_input_stream *body_stream = aws_http_message_get_body_stream(options->request); @@ -1350,7 +1351,7 @@ struct aws_h2err aws_h2_stream_on_decoder_rst_stream(struct aws_h2_stream *strea static int s_stream_write_data( struct aws_http_stream *stream_base, - const struct aws_http2_stream_write_data_options *options) { + const struct aws_http_stream_write_data_options *options) { struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base); if (!stream->manual_write) { AWS_H2_STREAM_LOG( diff --git a/source/request_response.c b/source/request_response.c index 6f290b0e1..e2d21ba5b 100644 --- a/source/request_response.c +++ b/source/request_response.c @@ -796,15 +796,26 @@ int aws_http1_stream_write_chunk(struct aws_http_stream *http1_stream, const str return http1_stream->vtable->http1_write_chunk(http1_stream, options); } +int aws_http_stream_write_data( + struct aws_http_stream *stream, + const struct aws_http_stream_write_data_options *options) { + AWS_PRECONDITION(stream); + AWS_PRECONDITION(stream->vtable); + AWS_PRECONDITION(stream->vtable->write_data); + AWS_PRECONDITION(options); + + return stream->vtable->write_data(stream, options); +} + int aws_http2_stream_write_data( struct aws_http_stream *http2_stream, const struct aws_http2_stream_write_data_options *options) { AWS_PRECONDITION(http2_stream); AWS_PRECONDITION(http2_stream->vtable); - AWS_PRECONDITION(http2_stream->vtable->http2_write_data); + AWS_PRECONDITION(http2_stream->vtable->write_data); AWS_PRECONDITION(options); - return http2_stream->vtable->http2_write_data(http2_stream, options); + return http2_stream->vtable->write_data(http2_stream, (const struct aws_http_stream_write_data_options *)options); } int aws_http1_stream_add_chunked_trailer( diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b898e64f0..ee5e260e6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -157,6 +157,19 @@ add_test_case(h1_client_connection_close_before_request_finishes_with_buffer) add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_incomplete_response) add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_force_shutdown) add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_stream_cancel) +add_test_case(h1_client_write_data_single_chunk) +add_test_case(h1_client_write_data_multiple_chunks) +add_test_case(h1_client_write_data_not_enabled) +add_test_case(h1_client_write_data_after_final) +add_test_case(h1_client_write_data_exceeds_content_length) +add_test_case(h1_client_write_data_less_than_content_length) +add_test_case(h1_client_write_data_chunked_single) +add_test_case(h1_client_write_data_chunked_multiple) +add_test_case(h1_client_write_data_null_data_content_length) +add_test_case(h1_client_write_data_null_data_chunked) +add_test_case(h1_client_write_data_auto_chunked) +add_test_case(h1_client_write_data_body_stream_conflict) +add_test_case(h1_client_write_data_null_data_nonzero_content_length) add_test_case(strutil_trim_http_whitespace) add_test_case(strutil_is_http_token) @@ -514,6 +527,8 @@ add_test_case(h2_client_manual_data_write_connection_close) add_test_case(h2_client_batch_auto_window_update) add_test_case(h2_client_batch_manual_window_update) add_test_case(h2_client_cap_manual_window_update) +add_test_case(h2_client_unified_write_data_api) +add_test_case(h2_client_unified_write_data_api_new_field) # on_h2_remote_end_stream callback tests add_test_case(h2_client_on_h2_remote_end_stream_fires) diff --git a/tests/stream_test_helper.c b/tests/stream_test_helper.c index 9290b009b..39d57b298 100644 --- a/tests/stream_test_helper.c +++ b/tests/stream_test_helper.c @@ -208,6 +208,7 @@ int client_stream_tester_init( .on_complete = s_on_complete, .on_destroy = s_on_destroy, .http2_use_manual_data_writes = options->http2_manual_write, + .use_manual_data_writes = options->use_manual_data_writes, .on_h2_remote_end_stream = options->on_h2_remote_end_stream ? s_on_h2_remote_end_stream : NULL, }; tester->stream = aws_http_connection_make_request(options->connection, &request_options); diff --git a/tests/stream_test_helper.h b/tests/stream_test_helper.h index d242db06d..1042ef57b 100644 --- a/tests/stream_test_helper.h +++ b/tests/stream_test_helper.h @@ -55,6 +55,7 @@ struct client_stream_tester_options { struct aws_http_message *request; struct aws_http_connection *connection; bool http2_manual_write; + bool use_manual_data_writes; /* Optional: pointer to bool to track if on_h2_remote_end_stream fires */ bool *on_h2_remote_end_stream; }; diff --git a/tests/test_h1_client.c b/tests/test_h1_client.c index f645f8d0b..d962d401d 100644 --- a/tests/test_h1_client.c +++ b/tests/test_h1_client.c @@ -104,7 +104,7 @@ static int s_tester_init_ex(struct tester *tester, struct aws_allocator *alloc, tester->alloc = alloc; struct aws_logger_standard_options logger_options = { - .level = AWS_LOG_LEVEL_TRACE, + .level = AWS_LOG_LEVEL_DEBUG, .file = stderr, }; ASSERT_SUCCESS(aws_logger_init_standard(&tester->logger, tester->alloc, &logger_options)); @@ -5127,3 +5127,597 @@ H1_CLIENT_TEST_CASE(h1_client_connection_close_before_request_finishes_with_buff (void)ctx; return s_h1_client_connection_close_before_request_finishes_with_buffer_force_shutdown_helper(allocator, false); } + +/* Test helper for write_data callback tracking */ +struct write_data_callback_tester { + int num_callbacks; + int last_error_code; +}; + +static void s_on_write_data_complete(struct aws_http_stream *stream, int error_code, void *user_data) { + (void)stream; + struct write_data_callback_tester *callback_tester = user_data; + callback_tester->num_callbacks++; + callback_tester->last_error_code = error_code; +} + +struct write_data_test_fixture { + struct tester tester; + struct client_stream_tester stream_tester; +}; + +static int s_write_data_test_setup( + struct write_data_test_fixture *fixture, + struct aws_allocator *allocator, + const struct aws_http_header *headers, + size_t num_headers, + bool use_manual_data_writes) { + + ASSERT_SUCCESS(s_tester_init(&fixture->tester, allocator)); + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("POST"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/upload"))); + ASSERT_SUCCESS(aws_http_message_add_header_array(request, headers, num_headers)); + + ASSERT_SUCCESS(client_stream_tester_init( + &fixture->stream_tester, + allocator, + &(struct client_stream_tester_options){ + .request = request, + .connection = fixture->tester.connection, + .use_manual_data_writes = use_manual_data_writes, + })); + + testing_channel_drain_queued_tasks(&fixture->tester.testing_channel); + aws_http_message_destroy(request); + return AWS_OP_SUCCESS; +} + +static int s_write_data_test_teardown(struct write_data_test_fixture *fixture) { + client_stream_tester_clean_up(&fixture->stream_tester); + ASSERT_SUCCESS(s_tester_clean_up(&fixture->tester)); + return AWS_OP_SUCCESS; +} + +/* Test: Single chunk write with Content-Length */ +H1_CLIENT_TEST_CASE(h1_client_write_data_single_chunk) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Content-Length"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("5")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + struct aws_byte_cursor data = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *input_stream = aws_input_stream_new_from_cursor(allocator, &data); + ASSERT_NOT_NULL(input_stream); + + struct write_data_callback_tester callback_tester = {0}; + struct aws_http_stream_write_data_options write_options = { + .data = input_stream, + .end_stream = true, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester, + }; + + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_SUCCESS(testing_channel_check_written_messages_str( + &fixture.tester.testing_channel, allocator, "POST /upload HTTP/1.1\r\nContent-Length: 5\r\n\r\nhello")); + + ASSERT_INT_EQUALS(1, callback_tester.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester.last_error_code); + + ASSERT_SUCCESS(testing_channel_push_read_str(&fixture.tester.testing_channel, "HTTP/1.1 200 OK\r\n\r\n")); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_TRUE(fixture.stream_tester.complete); + ASSERT_INT_EQUALS(200, fixture.stream_tester.response_status); + + aws_input_stream_release(input_stream); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: Multiple chunk writes with Content-Length */ +H1_CLIENT_TEST_CASE(h1_client_write_data_multiple_chunks) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Content-Length"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("10")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + /* Write first chunk */ + struct aws_byte_cursor data1 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *input_stream1 = aws_input_stream_new_from_cursor(allocator, &data1); + ASSERT_NOT_NULL(input_stream1); + + struct write_data_callback_tester callback_tester1 = {0}; + struct aws_http_stream_write_data_options write_options1 = { + .data = input_stream1, + .end_stream = false, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester1, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options1)); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + /* Write second chunk */ + struct aws_byte_cursor data2 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("world"); + struct aws_input_stream *input_stream2 = aws_input_stream_new_from_cursor(allocator, &data2); + ASSERT_NOT_NULL(input_stream2); + + struct write_data_callback_tester callback_tester2 = {0}; + struct aws_http_stream_write_data_options write_options2 = { + .data = input_stream2, + .end_stream = true, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester2, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options2)); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_SUCCESS(testing_channel_check_written_messages_str( + &fixture.tester.testing_channel, allocator, "POST /upload HTTP/1.1\r\nContent-Length: 10\r\n\r\nhelloworld")); + + ASSERT_INT_EQUALS(1, callback_tester1.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester1.last_error_code); + ASSERT_INT_EQUALS(1, callback_tester2.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester2.last_error_code); + + ASSERT_SUCCESS(testing_channel_push_read_str(&fixture.tester.testing_channel, "HTTP/1.1 200 OK\r\n\r\n")); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_TRUE(fixture.stream_tester.complete); + ASSERT_INT_EQUALS(200, fixture.stream_tester.response_status); + + aws_input_stream_release(input_stream1); + aws_input_stream_release(input_stream2); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: Validation error - manual writes not enabled */ +H1_CLIENT_TEST_CASE(h1_client_write_data_not_enabled) { + (void)ctx; + struct write_data_test_fixture fixture; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, NULL, 0, false)); + + struct aws_byte_cursor data = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *input_stream = aws_input_stream_new_from_cursor(allocator, &data); + ASSERT_NOT_NULL(input_stream); + struct aws_http_stream_write_data_options write_options = { + .data = input_stream, + .end_stream = true, + }; + + ASSERT_FAILS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_MANUAL_WRITE_NOT_ENABLED, aws_last_error()); + + aws_input_stream_release(input_stream); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: Validation error - write after final write */ +H1_CLIENT_TEST_CASE(h1_client_write_data_after_final) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Content-Length"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("10")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + struct aws_byte_cursor data1 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *input_stream1 = aws_input_stream_new_from_cursor(allocator, &data1); + ASSERT_NOT_NULL(input_stream1); + struct aws_http_stream_write_data_options write_options1 = { + .data = input_stream1, + .end_stream = true, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options1)); + + struct aws_byte_cursor data2 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("world"); + struct aws_input_stream *input_stream2 = aws_input_stream_new_from_cursor(allocator, &data2); + ASSERT_NOT_NULL(input_stream2); + struct aws_http_stream_write_data_options write_options2 = { + .data = input_stream2, + .end_stream = false, + }; + ASSERT_FAILS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options2)); + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_MANUAL_WRITE_HAS_COMPLETED, aws_last_error()); + + aws_input_stream_release(input_stream1); + aws_input_stream_release(input_stream2); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: Content-Length mismatch - too much data */ +H1_CLIENT_TEST_CASE(h1_client_write_data_exceeds_content_length) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Content-Length"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("3")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + struct aws_byte_cursor data = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *input_stream = aws_input_stream_new_from_cursor(allocator, &data); + ASSERT_NOT_NULL(input_stream); + + struct write_data_callback_tester callback_tester = {0}; + struct aws_http_stream_write_data_options write_options = { + .data = input_stream, + .end_stream = true, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester, + }; + + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_INT_EQUALS(1, callback_tester.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT, callback_tester.last_error_code); + + aws_input_stream_release(input_stream); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: Content-Length mismatch - too little data */ +H1_CLIENT_TEST_CASE(h1_client_write_data_less_than_content_length) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Content-Length"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("10")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + struct aws_byte_cursor data = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *input_stream = aws_input_stream_new_from_cursor(allocator, &data); + ASSERT_NOT_NULL(input_stream); + + struct aws_http_stream_write_data_options write_options = { + .data = input_stream, + .end_stream = true, + }; + + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_TRUE(fixture.stream_tester.complete); + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT, fixture.stream_tester.on_complete_error_code); + + aws_input_stream_release(input_stream); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test write_data with chunked encoding - single write with end_stream=true + * should automatically send the termination chunk (0\r\n\r\n) */ +H1_CLIENT_TEST_CASE(h1_client_write_data_chunked_single) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + struct aws_byte_cursor data = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("write more tests"); + struct aws_input_stream *input_stream = aws_input_stream_new_from_cursor(allocator, &data); + + struct write_data_callback_tester callback_tester = {0}; + struct aws_http_stream_write_data_options write_options = { + .data = input_stream, + .end_stream = true, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + const char *expected = "POST /upload HTTP/1.1\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "10\r\n" + "write more tests" + "\r\n" + "0\r\n" + "\r\n"; + ASSERT_SUCCESS(testing_channel_check_written_messages_str(&fixture.tester.testing_channel, allocator, expected)); + + ASSERT_INT_EQUALS(1, callback_tester.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester.last_error_code); + + ASSERT_SUCCESS(testing_channel_push_read_str(&fixture.tester.testing_channel, "HTTP/1.1 200 OK\r\n\r\n")); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_TRUE(fixture.stream_tester.complete); + ASSERT_INT_EQUALS(200, fixture.stream_tester.response_status); + + aws_input_stream_release(input_stream); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test write_data with chunked encoding - multiple writes */ +H1_CLIENT_TEST_CASE(h1_client_write_data_chunked_multiple) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + /* First write */ + struct aws_byte_cursor data1 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("write "); + struct aws_input_stream *stream1 = aws_input_stream_new_from_cursor(allocator, &data1); + struct write_data_callback_tester callback_tester1 = {0}; + struct aws_http_stream_write_data_options opts1 = { + .data = stream1, + .end_stream = false, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester1, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &opts1)); + + /* No-op write: NULL data, end_stream=false. Callback should still fire. */ + struct write_data_callback_tester callback_tester_noop = {0}; + struct aws_http_stream_write_data_options opts_noop = { + .data = NULL, + .end_stream = false, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester_noop, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &opts_noop)); + + /* Second write with end_stream=true, termination chunk should be sent automatically */ + struct aws_byte_cursor data2 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("more tests"); + struct aws_input_stream *stream2 = aws_input_stream_new_from_cursor(allocator, &data2); + struct write_data_callback_tester callback_tester2 = {0}; + struct aws_http_stream_write_data_options opts2 = { + .data = stream2, + .end_stream = true, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester2, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &opts2)); + + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + const char *expected = "POST /upload HTTP/1.1\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "6\r\n" + "write " + "\r\n" + "A\r\n" + "more tests" + "\r\n" + "0\r\n" + "\r\n"; + ASSERT_SUCCESS(testing_channel_check_written_messages_str(&fixture.tester.testing_channel, allocator, expected)); + + ASSERT_INT_EQUALS(1, callback_tester1.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester1.last_error_code); + ASSERT_INT_EQUALS(1, callback_tester_noop.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester_noop.last_error_code); + ASSERT_INT_EQUALS(1, callback_tester2.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester2.last_error_code); + + aws_input_stream_release(stream1); + aws_input_stream_release(stream2); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: write_data with NULL data and end_stream=true on Content-Length stream. + * Also verifies NULL data + end_stream=false is a no-op. */ +H1_CLIENT_TEST_CASE(h1_client_write_data_null_data_content_length) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Content-Length"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("5")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + /* First write with end_stream=false */ + struct aws_byte_cursor data = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *stream = aws_input_stream_new_from_cursor(allocator, &data); + struct write_data_callback_tester callback_tester = {0}; + struct aws_http_stream_write_data_options write_opts = { + .data = stream, + .end_stream = false, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_opts)); + + /* Second write NULL data without end_stream should be a no-op */ + struct aws_http_stream_write_data_options noop_options = { + .data = NULL, + .end_stream = false, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &noop_options)); + + /* Third write NULL data with end_stream should complete the stream */ + struct aws_http_stream_write_data_options write_options = { + .data = NULL, + .end_stream = true, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_SUCCESS(testing_channel_check_written_messages_str( + &fixture.tester.testing_channel, allocator, "POST /upload HTTP/1.1\r\nContent-Length: 5\r\n\r\nhello")); + + ASSERT_INT_EQUALS(3, callback_tester.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester.last_error_code); + + ASSERT_SUCCESS(testing_channel_push_read_str(&fixture.tester.testing_channel, "HTTP/1.1 200 OK\r\n\r\n")); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_TRUE(fixture.stream_tester.complete); + ASSERT_INT_EQUALS(200, fixture.stream_tester.response_status); + + aws_input_stream_release(stream); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: write_data with NULL data and end_stream=true on chunked stream */ +H1_CLIENT_TEST_CASE(h1_client_write_data_null_data_chunked) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + struct write_data_callback_tester callback_tester = {0}; + struct aws_http_stream_write_data_options write_options = { + .data = NULL, + .end_stream = true, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + const char *expected = "POST /upload HTTP/1.1\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "0\r\n" + "\r\n"; + ASSERT_SUCCESS(testing_channel_check_written_messages_str(&fixture.tester.testing_channel, allocator, expected)); + + ASSERT_INT_EQUALS(1, callback_tester.num_callbacks); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, callback_tester.last_error_code); + + ASSERT_SUCCESS(testing_channel_push_read_str(&fixture.tester.testing_channel, "HTTP/1.1 200 OK\r\n\r\n")); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_TRUE(fixture.stream_tester.complete); + ASSERT_INT_EQUALS(200, fixture.stream_tester.response_status); + + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: auto-add Transfer-Encoding: chunked when no Content-Length or Transfer-Encoding is set */ +H1_CLIENT_TEST_CASE(h1_client_write_data_auto_chunked) { + (void)ctx; + struct write_data_test_fixture fixture; + /* No Content-Length or Transfer-Encoding header */ + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, NULL, 0, true)); + + struct aws_byte_cursor data = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *input_stream = aws_input_stream_new_from_cursor(allocator, &data); + + struct aws_http_stream_write_data_options write_options = { + .data = input_stream, + .end_stream = true, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + /* Transfer-Encoding: chunked should have been auto-added */ + const char *expected = "POST /upload HTTP/1.1\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "5\r\n" + "hello" + "\r\n" + "0\r\n" + "\r\n"; + ASSERT_SUCCESS(testing_channel_check_written_messages_str(&fixture.tester.testing_channel, allocator, expected)); + + aws_input_stream_release(input_stream); + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} + +/* Test: body stream + use_manual_data_writes is rejected */ +H1_CLIENT_TEST_CASE(h1_client_write_data_body_stream_conflict) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("POST"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/upload"))); + + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Content-Length"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("5")}, + }; + ASSERT_SUCCESS(aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers))); + + /* Set a body stream AND use_manual_data_writes — should conflict */ + struct aws_byte_cursor body = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("hello"); + struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body); + aws_http_message_set_body_stream(request, body_stream); + + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .use_manual_data_writes = true, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &request_options); + ASSERT_NULL(stream); + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_INVALID_HEADER_FIELD, aws_last_error()); + + aws_input_stream_release(body_stream); + aws_http_message_destroy(request); + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + +/* Test: NULL data with non-zero Content-Length should fail with length mismatch */ +H1_CLIENT_TEST_CASE(h1_client_write_data_null_data_nonzero_content_length) { + (void)ctx; + struct write_data_test_fixture fixture; + struct aws_http_header headers[] = { + {.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Content-Length"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("5")}, + }; + ASSERT_SUCCESS(s_write_data_test_setup(&fixture, allocator, headers, AWS_ARRAY_SIZE(headers), true)); + + struct write_data_callback_tester callback_tester = {0}; + struct aws_http_stream_write_data_options write_options = { + .data = NULL, + .end_stream = true, + .on_complete = s_on_write_data_complete, + .user_data = &callback_tester, + }; + ASSERT_SUCCESS(aws_http_stream_write_data(fixture.stream_tester.stream, &write_options)); + testing_channel_drain_queued_tasks(&fixture.tester.testing_channel); + + ASSERT_TRUE(fixture.stream_tester.complete); + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_OUTGOING_STREAM_LENGTH_INCORRECT, fixture.stream_tester.on_complete_error_code); + + ASSERT_SUCCESS(s_write_data_test_teardown(&fixture)); + return AWS_OP_SUCCESS; +} diff --git a/tests/test_h1_encoder.c b/tests/test_h1_encoder.c index 30ef0f3bc..07767d426 100644 --- a/tests/test_h1_encoder.c +++ b/tests/test_h1_encoder.c @@ -82,7 +82,8 @@ H1_ENCODER_TEST_CASE(h1_encoder_content_length_header) { aws_linked_list_init(&chunk_list); struct aws_h1_encoder_message encoder_message; - ASSERT_SUCCESS(aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list)); + ASSERT_SUCCESS( + aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list, NULL, false)); ASSERT_FALSE(encoder_message.has_chunked_encoding_header); ASSERT_FALSE(encoder_message.has_connection_close_header); @@ -124,7 +125,8 @@ H1_ENCODER_TEST_CASE(h1_encoder_transfer_encoding_chunked_header) { aws_linked_list_init(&chunk_list); struct aws_h1_encoder_message encoder_message; - ASSERT_SUCCESS(aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list)); + ASSERT_SUCCESS( + aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list, NULL, false)); ASSERT_TRUE(encoder_message.has_chunked_encoding_header); ASSERT_FALSE(encoder_message.has_connection_close_header); @@ -171,7 +173,8 @@ H1_ENCODER_TEST_CASE(h1_encoder_transfer_encoding_chunked_header_with_multiple_e aws_linked_list_init(&chunk_list); struct aws_h1_encoder_message encoder_message; - ASSERT_SUCCESS(aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list)); + ASSERT_SUCCESS( + aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list, NULL, false)); ASSERT_TRUE(encoder_message.has_chunked_encoding_header); ASSERT_FALSE(encoder_message.has_connection_close_header); @@ -212,7 +215,8 @@ H1_ENCODER_TEST_CASE(h1_encoder_case_insensitive_header_names) { aws_linked_list_init(&chunk_list); struct aws_h1_encoder_message encoder_message; - ASSERT_SUCCESS(aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list)); + ASSERT_SUCCESS( + aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list, NULL, false)); ASSERT_TRUE(encoder_message.has_chunked_encoding_header); ASSERT_FALSE(encoder_message.has_connection_close_header); @@ -299,7 +303,8 @@ H1_ENCODER_TEST_CASE(h1_encoder_transfer_encoding_chunked_across_multiple_header aws_linked_list_init(&chunk_list); struct aws_h1_encoder_message encoder_message; - ASSERT_SUCCESS(aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list)); + ASSERT_SUCCESS( + aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list, NULL, false)); ASSERT_TRUE(encoder_message.has_chunked_encoding_header); ASSERT_FALSE(encoder_message.has_connection_close_header); @@ -386,7 +391,8 @@ static int s_test_bad_request( struct aws_h1_encoder_message encoder_message; ASSERT_ERROR( - expected_error, aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list)); + expected_error, + aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list, NULL, false)); aws_http_message_destroy(request); aws_h1_encoder_message_clean_up(&encoder_message); diff --git a/tests/test_h2_client.c b/tests/test_h2_client.c index 2514a6c46..a5a90af6e 100644 --- a/tests/test_h2_client.c +++ b/tests/test_h2_client.c @@ -6398,3 +6398,111 @@ TEST_CASE(h2_client_cap_manual_window_update) { client_stream_tester_clean_up(&stream_tester); return s_tester_clean_up(); } + +/* Test unified API works for H2 */ +TEST_CASE(h2_client_unified_write_data_api) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "POST"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/upload"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .http2_use_manual_data_writes = true, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &request_options); + ASSERT_NOT_NULL(stream); + + aws_http_stream_activate(stream); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + struct aws_byte_cursor data = aws_byte_cursor_from_c_str("hello"); + struct aws_input_stream *input_stream = aws_input_stream_new_from_cursor(allocator, &data); + ASSERT_NOT_NULL(input_stream); + + struct aws_http_stream_write_data_options write_options = { + .data = input_stream, + .end_stream = true, + }; + + ASSERT_SUCCESS(aws_http_stream_write_data(stream, &write_options)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + uint32_t stream_id = aws_http_stream_get_id(stream); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + ASSERT_NOT_NULL( + h2_decode_tester_find_frame(&s_tester.peer.decode, AWS_H2_FRAME_T_HEADERS, 0 /*search_start_idx*/, NULL)); + ASSERT_SUCCESS( + h2_decode_tester_check_data_str_across_frames(&s_tester.peer.decode, stream_id, "hello", true /*end_stream*/)); + + aws_input_stream_release(input_stream); + aws_http_message_release(request); + aws_http_stream_release(stream); + aws_http_connection_close(s_tester.connection); + + return s_tester_clean_up(); +} + +/* Test: H2 write_data using the new use_manual_data_writes field (not the deprecated http2_use_manual_data_writes) */ +TEST_CASE(h2_client_unified_write_data_api_new_field) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "POST"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/upload"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .use_manual_data_writes = true, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &request_options); + ASSERT_NOT_NULL(stream); + + aws_http_stream_activate(stream); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + struct aws_byte_cursor data = aws_byte_cursor_from_c_str("hello"); + struct aws_input_stream *input_stream = aws_input_stream_new_from_cursor(allocator, &data); + ASSERT_NOT_NULL(input_stream); + + struct aws_http_stream_write_data_options write_options = { + .data = input_stream, + .end_stream = true, + }; + + ASSERT_SUCCESS(aws_http_stream_write_data(stream, &write_options)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + uint32_t stream_id = aws_http_stream_get_id(stream); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + ASSERT_NOT_NULL( + h2_decode_tester_find_frame(&s_tester.peer.decode, AWS_H2_FRAME_T_HEADERS, 0 /*search_start_idx*/, NULL)); + ASSERT_SUCCESS( + h2_decode_tester_check_data_str_across_frames(&s_tester.peer.decode, stream_id, "hello", true /*end_stream*/)); + + aws_input_stream_release(input_stream); + aws_http_message_release(request); + aws_http_stream_release(stream); + aws_http_connection_close(s_tester.connection); + + return s_tester_clean_up(); +}