Skip to content
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
e5e3b4e
quickstart
azkrishpy Feb 25, 2026
b070017
lint
azkrishpy Feb 25, 2026
8431439
example
azkrishpy Feb 26, 2026
26661ab
some bug fixes
azkrishpy Feb 27, 2026
ec85f76
separate the tests
azkrishpy Feb 27, 2026
ef57a49
fix some docs
azkrishpy Feb 27, 2026
b3e2af9
lint
azkrishpy Feb 28, 2026
af32dd9
allow write_chunk also to be routed via write_data
azkrishpy Feb 28, 2026
1f7c463
address some comments
azkrishpy Mar 3, 2026
732fcfe
fix test
azkrishpy Mar 3, 2026
b08bcaf
allow use_manual_data_writes for h2
azkrishpy Mar 3, 2026
af0bb7a
add option to elasticurl
azkrishpy Mar 3, 2026
b265ed4
lint
azkrishpy Mar 3, 2026
a78369c
fix docs
azkrishpy Mar 12, 2026
99b7cb5
fix behavior
azkrishpy Mar 12, 2026
89b28d5
more fixes
azkrishpy Mar 12, 2026
7242285
bugfix
azkrishpy Mar 12, 2026
f590520
more bugfix
azkrishpy Mar 12, 2026
486817c
more behavior fix
azkrishpy Mar 13, 2026
55efd1b
minor fix
azkrishpy Mar 13, 2026
2f69171
i am so bad
azkrishpy Mar 13, 2026
2d7a9f0
my first deadlock
azkrishpy Mar 13, 2026
dd6f393
fix the pending write null definition
azkrishpy Mar 13, 2026
f463ee3
fix encoder
azkrishpy Mar 13, 2026
5829245
fix encoder loop
azkrishpy Mar 13, 2026
9aef1ba
delete to prove it is needed
azkrishpy Mar 16, 2026
5a5d314
it was indeed not needed + lint
azkrishpy Mar 16, 2026
f68f849
revert to i64
azkrishpy Mar 16, 2026
e846d2b
more comment addressing
azkrishpy Mar 18, 2026
43ae56f
fix the tests
azkrishpy Mar 19, 2026
b7febda
lint
azkrishpy Mar 19, 2026
20d8235
minor comment addressing
azkrishpy Mar 20, 2026
ad027eb
deprecation notice of http2_write_data API
azkrishpy Mar 20, 2026
d01c000
fix the tests
azkrishpy Mar 20, 2026
ead0539
add different tests
azkrishpy Mar 20, 2026
a9315c1
wip
azkrishpy Mar 24, 2026
fc43b59
still wip
azkrishpy Mar 25, 2026
83aaf18
poor wip
azkrishpy Mar 25, 2026
e5350ed
real wip
azkrishpy Mar 26, 2026
ed627b7
read stream only if stream exists
azkrishpy Mar 26, 2026
925893a
don't redefine error code
azkrishpy Mar 26, 2026
5e1f774
add null data with nonzero contentlength test
azkrishpy Mar 27, 2026
944d1e5
mimic write_chunk for cleanup behavior
azkrishpy Mar 27, 2026
fd05f03
Merge branch 'main' into unified-write-data-api
azkrishpy Mar 27, 2026
03a69d6
add more docs
azkrishpy Mar 27, 2026
6db38be
linting
azkrishpy Mar 27, 2026
089f8e9
test that callbacks fire correctly
azkrishpy Mar 27, 2026
f572900
fix behavior
azkrishpy Mar 27, 2026
28d3999
add noop test to ensure callback firing
azkrishpy Mar 27, 2026
cb6cd15
fix to ensure noop fires callback
azkrishpy Mar 27, 2026
f98dd3d
move manual data writes out
azkrishpy Mar 27, 2026
ccb0d1b
goto unlock and check null
azkrishpy Mar 28, 2026
b233ea6
linting
azkrishpy Mar 28, 2026
6552bb4
revert null data with end stream true
azkrishpy Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 160 additions & 2 deletions bin/elasticurl/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ struct elasticurl_ctx {
enum aws_log_level log_level;
enum aws_http_version required_http_version;
bool exchange_completed;
bool manual_write;
bool manual_write_chunked;
int64_t manual_write_content_length;
Comment thread
azkrishpy marked this conversation as resolved.
struct aws_http_stream *stream;
bool stream_ready;
};

static void s_usage(int exit_code) {
Expand Down Expand Up @@ -96,6 +101,7 @@ static void s_usage(int exit_code) {
fprintf(stderr, " --version: print the version of elasticurl.\n");
fprintf(stderr, " --http2: HTTP/2 connection required\n");
fprintf(stderr, " --http1_1: HTTP/1.1 connection required\n");
fprintf(stderr, " --manual-write: interactively write request body via stdin\n");
fprintf(stderr, " -h, --help\n");
fprintf(stderr, " Display this message and quit.\n");
exit(exit_code);
Expand Down Expand Up @@ -125,6 +131,7 @@ static struct aws_cli_option s_long_options[] = {
{"version", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'V'},
{"http2", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'w'},
{"http1_1", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'W'},
{"manual-write", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'n'},
{"help", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'h'},
/* Per getopt(3) the last element of the array has to be filled with all zeros */
{NULL, AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 0},
Expand Down Expand Up @@ -162,7 +169,7 @@ static void s_parse_options(int argc, char **argv, struct elasticurl_ctx *ctx) {
while (true) {
int option_index = 0;
int c =
aws_cli_getopt_long(argc, argv, "a:b:c:e:f:H:d:g:j:l:m:M:GPHiko:t:v:VwWh", s_long_options, &option_index);
aws_cli_getopt_long(argc, argv, "a:b:c:e:f:H:d:g:j:l:m:M:GPHiko:t:v:VwWnh", s_long_options, &option_index);
if (c == -1) {
break;
}
Expand Down Expand Up @@ -276,6 +283,9 @@ static void s_parse_options(int argc, char **argv, struct elasticurl_ctx *ctx) {
ctx->alpn = "http/1.1";
ctx->required_http_version = AWS_HTTP_VERSION_1_1;
break;
case 'n':
ctx->manual_write = true;
break;
case 'h':
s_usage(0);
break;
Expand Down Expand Up @@ -432,7 +442,26 @@ static struct aws_http_message *s_build_http_request(
};
aws_http_message_add_header(request, user_agent_header);

if (app_ctx->input_body) {
if (app_ctx->manual_write) {
/* Manual write mode: set headers but no body stream.
* H2 doesn't use Transfer-Encoding — just send DATA frames. */
if (app_ctx->manual_write_chunked && protocol_version != AWS_HTTP_VERSION_2) {
struct aws_http_header te_header = {
.name = aws_byte_cursor_from_c_str("transfer-encoding"),
.value = aws_byte_cursor_from_c_str("chunked"),
};
aws_http_message_add_header(request, te_header);
} else if (!app_ctx->manual_write_chunked) {
char content_length[64];
AWS_ZERO_ARRAY(content_length);
snprintf(content_length, sizeof(content_length), "%" PRIi64, app_ctx->manual_write_content_length);
struct aws_http_header cl_header = {
.name = aws_byte_cursor_from_c_str("content-length"),
.value = aws_byte_cursor_from_c_str(content_length),
};
aws_http_message_add_header(request, cl_header);
}
} else if (app_ctx->input_body) {
int64_t data_len = 0;
if (aws_input_stream_get_length(app_ctx->input_body, &data_len)) {
fprintf(stderr, "failed to get length of input stream.\n");
Expand Down Expand Up @@ -522,6 +551,7 @@ static void s_on_signing_complete(struct aws_http_message *request, int error_co
.on_response_header_block_done = s_on_incoming_header_block_done_fn,
.on_response_body = s_on_incoming_body_fn,
.on_complete = s_on_stream_complete_fn,
.use_manual_data_writes = app_ctx->manual_write,
};

app_ctx->response_code_written = false;
Expand All @@ -533,6 +563,15 @@ static void s_on_signing_complete(struct aws_http_message *request, int error_co
}
aws_http_stream_activate(stream);

if (app_ctx->manual_write) {
/* Store stream and signal main thread to begin interactive writes */
app_ctx->stream = stream;
aws_mutex_lock(&app_ctx->mutex);
app_ctx->stream_ready = true;
aws_mutex_unlock(&app_ctx->mutex);
aws_condition_variable_notify_all(&app_ctx->c_var);
}

/* Connection will stay alive until stream completes */
aws_http_connection_release(app_ctx->connection);
app_ctx->connection = NULL;
Expand All @@ -554,6 +593,103 @@ static bool s_completion_predicate(void *arg) {
return app_ctx->exchange_completed;
}

static bool s_stream_ready_predicate(void *arg) {
struct elasticurl_ctx *app_ctx = arg;
return app_ctx->stream_ready || app_ctx->exchange_completed;
}

struct manual_write_ctx {
struct aws_allocator *allocator;
uint8_t *data;
struct aws_input_stream *stream;
};

static void s_on_manual_write_complete(struct aws_http_stream *stream, int error_code, void *user_data) {
(void)stream;
(void)error_code;
struct manual_write_ctx *ctx = user_data;
aws_input_stream_release(ctx->stream);
aws_mem_release(ctx->allocator, ctx->data);
aws_mem_release(ctx->allocator, ctx);
}

static void s_manual_write_loop(struct elasticurl_ctx *app_ctx) {
/* Wait for stream to be activated */
aws_mutex_lock(&app_ctx->mutex);
aws_condition_variable_wait_pred(&app_ctx->c_var, &app_ctx->mutex, s_stream_ready_predicate, app_ctx);
aws_mutex_unlock(&app_ctx->mutex);

if (app_ctx->exchange_completed) {
return;
}

int64_t bytes_sent = 0;
char line_buf[4096];

fprintf(stderr, "Enter data (empty line to finish):\n");
while (fgets(line_buf, sizeof(line_buf), stdin)) {
/* Strip trailing newline */
size_t len = strlen(line_buf);
if (len > 0 && line_buf[len - 1] == '\n') {
line_buf[--len] = '\0';
}

/* Empty line = done */
if (len == 0) {
break;
}

/* Heap-allocate data so it outlives this stack frame */
uint8_t *heap_data = aws_mem_calloc(app_ctx->allocator, 1, len);
memcpy(heap_data, line_buf, len);

struct aws_byte_cursor data_cursor = aws_byte_cursor_from_array(heap_data, len);
struct aws_input_stream *data_stream =
aws_input_stream_new_from_cursor(app_ctx->allocator, &data_cursor);

struct manual_write_ctx *write_ctx = aws_mem_calloc(app_ctx->allocator, 1, sizeof(struct manual_write_ctx));
write_ctx->allocator = app_ctx->allocator;
write_ctx->data = heap_data;
write_ctx->stream = data_stream;

struct aws_http_stream_write_data_options write_opts = {
.data = data_stream,
.end_stream = false,
.on_complete = s_on_manual_write_complete,
.user_data = write_ctx,
};

if (aws_http_stream_write_data(app_ctx->stream, &write_opts)) {
fprintf(stderr, "write_data failed: %s\n", aws_error_debug_str(aws_last_error()));
aws_input_stream_release(data_stream);
aws_mem_release(app_ctx->allocator, heap_data);
aws_mem_release(app_ctx->allocator, write_ctx);
break;
}

bytes_sent += (int64_t)len;
fprintf(stderr, "Sent %zu bytes (total: %" PRIi64 ")\n", len, bytes_sent);
}

/* Send final write */
struct aws_byte_cursor empty_cursor = aws_byte_cursor_from_c_str("");
struct aws_input_stream *empty_stream =
aws_input_stream_new_from_cursor(app_ctx->allocator, &empty_cursor);

struct aws_http_stream_write_data_options final_opts = {
.data = empty_stream,
.end_stream = true,
};

if (aws_http_stream_write_data(app_ctx->stream, &final_opts)) {
fprintf(stderr, "final write_data failed: %s\n", aws_error_debug_str(aws_last_error()));
} else {
fprintf(stderr, "Stream complete. Sent %" PRIi64 " bytes.\n", bytes_sent);
}

aws_input_stream_release(empty_stream);
}

int main(int argc, char **argv) {
struct aws_allocator *allocator = aws_default_allocator();

Expand All @@ -579,6 +715,25 @@ int main(int argc, char **argv) {

s_parse_options(argc, argv, &app_ctx);

/* Interactive prompt for manual-write mode */
if (app_ctx.manual_write) {
if (!strcmp(app_ctx.verb, "POST")) {
fprintf(stderr, "Only POST requests allowed for manual_writes. Exiting... \n");
return 1;
}
fprintf(stderr, "Manual write mode enabled.\n");
fprintf(stderr, "Content-Length (leave empty for chunked transfer encoding): ");
char cl_buf[64];
if (fgets(cl_buf, sizeof(cl_buf), stdin) && cl_buf[0] != '\n') {
app_ctx.manual_write_content_length = (int64_t)atoll(cl_buf);
app_ctx.manual_write_chunked = false;
fprintf(stderr, "Using Content-Length: %" PRIi64 "\n", app_ctx.manual_write_content_length);
} else {
app_ctx.manual_write_chunked = true;
fprintf(stderr, "Using chunked transfer encoding.\n");
}
}

struct aws_logger logger;
AWS_ZERO_STRUCT(logger);

Expand Down Expand Up @@ -728,6 +883,9 @@ int main(int argc, char **argv) {
http_client_options.prior_knowledge_http2 = true;
}
aws_http_client_connect(&http_client_options);
if (app_ctx.manual_write) {
s_manual_write_loop(&app_ctx);
}
aws_mutex_lock(&app_ctx.mutex);
aws_condition_variable_wait_pred(&app_ctx.c_var, &app_ctx.mutex, s_completion_predicate, &app_ctx);
aws_mutex_unlock(&app_ctx.mutex);
Expand Down
21 changes: 20 additions & 1 deletion include/aws/http/private/h1_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <aws/http/private/http_impl.h>
#include <aws/http/private/request_response_impl.h>

struct aws_h1_data_write;

struct aws_h1_chunk {
struct aws_allocator *allocator;
struct aws_input_stream *data;
Expand Down Expand Up @@ -44,10 +46,18 @@ struct aws_h1_encoder_message {
/* Pointer to chunked_trailer, used for chunked_trailer. */
struct aws_h1_trailer *trailer;

/* Pointer to list of `struct aws_h1_data_write`, used for manual data writes with Content-Length.
* List is owned by aws_h1_stream. */
struct aws_linked_list *pending_data_write_list;

/* Current data write being processed (for manual data writes with Content-Length) */
struct aws_h1_data_write *current_data_write;

/* If non-zero, length of unchunked body to send */
uint64_t content_length;
bool has_connection_close_header;
bool has_chunked_encoding_header;
bool has_manual_data_writes;
};

enum aws_h1_encoder_state {
Expand All @@ -64,6 +74,9 @@ enum aws_h1_encoder_state {
AWS_H1_ENCODER_STATE_CHUNK_BODY,
AWS_H1_ENCODER_STATE_CHUNK_END,
AWS_H1_ENCODER_STATE_CHUNK_TRAILER,
/* The _DATA_WRITE_ states support the write_data() API (manual data writes with Content-Length) */
AWS_H1_ENCODER_STATE_DATA_WRITE_NEXT,
AWS_H1_ENCODER_STATE_DATA_WRITE_BODY,
AWS_H1_ENCODER_STATE_DONE,
};

Expand Down Expand Up @@ -104,7 +117,9 @@ int aws_h1_encoder_message_init_from_request(
struct aws_h1_encoder_message *message,
struct aws_allocator *allocator,
const struct aws_http_message *request,
struct aws_linked_list *pending_chunk_list);
struct aws_linked_list *pending_chunk_list,
struct aws_linked_list *pending_data_write_list,
bool use_manual_data_writes);

int aws_h1_encoder_message_init_from_response(
struct aws_h1_encoder_message *message,
Expand Down Expand Up @@ -138,6 +153,10 @@ bool aws_h1_encoder_is_message_in_progress(const struct aws_h1_encoder *encoder)
AWS_HTTP_API
bool aws_h1_encoder_is_waiting_for_chunks(const struct aws_h1_encoder *encoder);

/* Return true if the encoder is stuck waiting for more data writes to be added to the current message */
AWS_HTTP_API
bool aws_h1_encoder_is_waiting_for_data_writes(const struct aws_h1_encoder *encoder);

AWS_EXTERN_C_END

#endif /* AWS_HTTP_H1_ENCODER_H */
39 changes: 39 additions & 0 deletions include/aws/http/private/h1_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ enum aws_h1_stream_api_state {
AWS_H1_STREAM_API_STATE_COMPLETE,
};

struct aws_h1_data_write {
struct aws_allocator *allocator;
struct aws_input_stream *data;
aws_http_stream_write_complete_fn *on_complete;
void *user_data;
struct aws_linked_list_node node;
bool is_end_stream;
};

struct aws_h1_stream {
struct aws_http_stream base;

Expand Down Expand Up @@ -66,6 +75,15 @@ struct aws_h1_stream {
* Only body data (not headers, etc) counts against the stream's flow-control window. */
uint64_t stream_window;

/* List of `struct aws_h1_data_write` which have been moved from synced_data for processing */
struct aws_linked_list pending_data_write_list;

/* Whether the stream is using manual data writes instead of input_stream */
bool using_manual_data_writes : 1;

/* Whether the final data write (with is_end_stream=true) has been received */
bool has_final_data_write : 1;

/* Whether a "request handler" stream has a response to send.
* Has mirror variable in synced_data */
bool has_outgoing_response : 1;
Expand All @@ -83,6 +101,10 @@ struct aws_h1_stream {
* but haven't yet moved to thread_data.encoder_message.pending_chunk_list where the encoder will find them. */
struct aws_linked_list pending_chunk_list;

/* List of `struct aws_h1_data_write` which have been submitted by user,
* but haven't yet moved to thread_data.pending_data_write_list where the encoder will find them. */
struct aws_linked_list pending_data_write_list;

/* trailing headers which have been submitted by user,
* but haven't yet moved to thread_data.encoder_message where the encoder will find them. */
struct aws_h1_trailer *pending_trailer;
Expand All @@ -102,9 +124,15 @@ struct aws_h1_stream {
/* Whether the outgoing message is using chunked encoding */
bool using_chunked_encoding : 1;

/* Whether the stream is using manual data writes instead of input_stream */
bool using_manual_data_writes : 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need using_manual_data_writes to be protected by the lock, it should only be set once from the make-request-option.
So that we can simplify a bit to not have this boolean in couple different places.

BTW, the reason of using_chunked_encoding being part of the sync_data is for the aws_h1_stream_send_response for the server side implementation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this out


/* Whether the final 0 length chunk has already been sent */
bool has_final_chunk : 1;

/* Whether the final data write (with is_end_stream=true) has been received */
bool has_final_data_write : 1;

/* Whether the chunked trailer has already been sent */
bool has_added_trailer : 1;
} synced_data;
Expand All @@ -123,4 +151,15 @@ void aws_h1_stream_cancel(struct aws_http_stream *stream, int error_code);

int aws_h1_stream_send_response(struct aws_h1_stream *stream, struct aws_http_message *response);

struct aws_h1_data_write *aws_h1_data_write_new(
struct aws_allocator *allocator,
const struct aws_http_stream_write_data_options *options);

void aws_h1_data_write_destroy(struct aws_h1_data_write *data_write);

void aws_h1_data_write_complete_and_destroy(
struct aws_h1_data_write *data_write,
struct aws_http_stream *http_stream,
int error_code);

#endif /* AWS_HTTP_H1_STREAM_H */
4 changes: 1 addition & 3 deletions include/aws/http/private/request_response_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ struct aws_http_stream_vtable {
int (*http2_reset_stream)(struct aws_http_stream *http2_stream, uint32_t http2_error);
int (*http2_get_received_error_code)(struct aws_http_stream *http2_stream, uint32_t *http2_error);
int (*http2_get_sent_error_code)(struct aws_http_stream *http2_stream, uint32_t *http2_error);
int (*http2_write_data)(
struct aws_http_stream *http2_stream,
const struct aws_http2_stream_write_data_options *options);
int (*write_data)(struct aws_http_stream *stream, const struct aws_http_stream_write_data_options *options);
};

/**
Expand Down
Loading
Loading