Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,66 @@
#include <inttypes.h>
#include <math.h>

/*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a design doc folder that in the past we intended as a place to put detailed description kinda docs. does that belong there?

* The s3 client splits user operations (meta requests) into concurrent HTTP requests
* and schedules them across pooled connections.
*
* A meta request is a user-initiated operation (GetObject, PutObject, etc). Each type
* has a state machine that produces individual HTTP requests on demand (range-GETs,
* UploadParts, etc). See s3_auto_ranged_get.c, s3_auto_ranged_put.c, s3_copy_object.c,
* s3_default_meta_request.c.
*
* A request goes through: buffer acquire -> prepare (build message + read body) -> sign -> queue -> send.
*
* An endpoint represents an S3 hostname and owns an HTTP connection manager that pools
* connections to that host.
*
* Threading model:
*
* All scheduling decisions run on a single event loop thread (process_work_event_loop),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

important point here: the way scheduler works is when it kicks off it moves all the pending work into separate area, i.e. it captures the state of the world and no one else is supposed to touch that state. then it does all the scheduling work and updates global state after.
its totally possible that more work will arrive while schedule is running, but it will not be touched until next iteration of scheduler

* pinned at client init from the bootstrap ELG. This means threaded_data needs no lock.
*
* Other threads (user threads, prepare callbacks, connection callbacks) push work into
* synced_data (protected by a mutex) and call schedule_process_work_synced() to wake
* the work loop. The lock is only held long enough to move list pointers.
*
* Client data falls into three categories:
*
* synced_data - mutex-protected, any thread. Mailbox for pending_meta_request_work
* and prepared_requests. External threads deposit here, work loop drains.
*
* threaded_data - no lock, only the process_work_event_loop thread touches it.
* Contains request_queue, active meta_requests list, and prep counters.
* Functions with "_threaded" suffix operate on this data.
*
* stats - atomic counters (num_requests_in_flight, etc). Any thread, no lock.
*
* Event loop groups:
*
* Bootstrap ELG (client_bootstrap->event_loop_group) drives networking. HTTP connections
* are pinned to loops in this group. One loop from this group is designated as the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pinned is a bit confusing in this case. you might want to expand on exactly what you mean

* process_work_event_loop at init; it still handles normal networking work too.
*
* Body Streaming ELG (body_streaming_elg) is a separate group for delivering response
* bodies to user callbacks and for async request preparation. Keeps slow user callbacks
* from blocking networking or scheduling.
*
* The process_work_event_loop clock (aws_event_loop_current_clock_time) is used to
* schedule delayed housekeeping via schedule_task_future():
* - Buffer pool trim after 5s idle.
* - Endpoint cleanup every 5s (removes endpoints with zero references).
*
* Scheduling flow (see comments above s_s3_client_schedule_process_work_synced):
*
* 1. External thread deposits work into synced_data, calls schedule_process_work_synced()
* which posts a task to the event loop.
* 2. The event loop runs s_s3_client_process_work_default() which drains synced_data into
* threaded_data, updates meta requests to produce new requests, and assigns queued
* requests to HTTP connections.
* 3. Requests complete on networking threads, deposit results back into synced_data, and
* re-trigger the work loop.
*/

#ifdef _MSC_VER
# pragma warning(disable : 4232) /* function pointer to dll symbol */
#endif /* _MSC_VER */
Expand Down Expand Up @@ -1524,6 +1584,76 @@ static void s_s3_client_push_meta_request_synced(
aws_linked_list_push_back(&client->synced_data.pending_meta_request_work, &meta_request_work->node);
}

/*
* The scheduling system is a single-threaded work loop driven by event loop tasks.
*
* Any thread can call schedule_process_work_synced() (under lock) to post a task to
* the process_work_event_loop. If a task is already pending, this is a no-op.
* The task runs s_s3_client_process_work_task(), which calls the vtable's process_work,
* defaulting to s_s3_client_process_work_default(). All scheduling decisions happen there.
*
* schedule_process_work_synced() is called from:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel conflicted about mentioning specific function names in doc. will be really hard to keep in sync with actual code

* - aws_s3_client_make_meta_request() when the user creates a new operation.
* - s_s3_client_prepare_callback_queue_request() when a request finishes preparing.
* - s_s3_client_meta_request_finished_request() when a request completes on the network.
* - aws_s3_client_schedule_process_work() when a meta request's internal state changes.
* - s_s3_client_endpoint_ref_count_zero() when an endpoint is cleaned up.
*
* s_s3_client_process_work_default() runs in five steps:
* 1. Lock, swap pending_meta_request_work and prepared_requests into thread-local storage,
* adjust num_requests_being_prepared, set process_work_task_scheduled=false, unlock.
* 2. Add newly arrived meta requests to threaded_data.meta_requests.
* 3. Call update_meta_requests_threaded() to ask each meta request for its next request,
* then update_connections_threaded() to assign queued requests to HTTP connections.
* 4. Log stats.
* 5. If the client is shutting down and all work is drained, call finish_destroy().
*
* A request moves through the pipeline as follows:
* meta_request->update() produces a request, which goes to s_acquire_mem_and_prepare_request()
* to reserve a buffer from the pool (may block if memory is full). Then vtable->prepare_request()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not block. it will just async wait until mem frees up

* builds the HTTP message (for PUT, this reads the body from the input stream). Then
* s_s3_meta_request_sign_request() signs it with SigV4. The signed request is pushed to
* synced_data.prepared_requests and the work loop is re-triggered. On the next run,
* process_work_default() drains it into threaded_data.request_queue, and
* update_connections_threaded() assigns it to an HTTP connection via the retry strategy
* and connection manager.
*
* Several counters track requests through the pipeline and enforce flow control:
*
* num_requests_in_flight (client, atomic) tracks requests from the moment they enter
* prepare until they complete on the network. Incremented in update_meta_requests_threaded(),
* decremented in s_s3_client_meta_request_finished_request(). Checked against
* max_requests_in_flight (= ideal_connection_count * 4) in s_s3_client_should_update_meta_request().
*
* num_requests_being_prepared (client, threaded_data) tracks requests currently in the
* prepare+sign stage. Incremented in update_meta_requests_threaded(), decremented in
* process_work_default() step 1 by subtracting newly queued and failed counts. Checked
* against max_requests_prepare (= ideal_connection_count) in s_s3_client_should_update_meta_request().
*
* num_request_being_prepared (per meta_request, atomic) is the same window as above but
* scoped to a single meta request. Incremented in update_meta_requests_threaded(), decremented
* in s_s3_client_prepare_callback_queue_request(). Checked against the per-meta-request
* connection cap in s_s3_client_should_update_meta_request().
*
* num_requests_network (per meta_request, atomic) and num_requests_network_io (client, atomic)
* track requests from connection assignment until network completion. Incremented in
* s_s3_client_create_connection_for_request_default(), decremented in
* s_s3_client_connection_finished(). Checked against max_active_connections in
* update_connections_threaded().
*
* num_parts_pending_read (PUT only, per meta_request, synced_data) tracks UploadPart requests
* from the moment update() produces them until the body read completes. Incremented in
* s_s3_auto_ranged_put_update() (s3_auto_ranged_put.c), decremented in
* s_s3_new_upload_part_info_after_body() (s3_auto_ranged_put.c). Checked by
* s_should_skip_scheduling_more_parts_based_on_flags() to limit read-ahead to 5 parts,
* preventing wasted buffer memory when reads are sequential. This counter is internal
* to the PUT meta request; the client does not see it.
*
* request_queue_size (client, threaded_data) counts prepared requests waiting for connection
* assignment. Updated by aws_s3_client_queue_requests_threaded() and
* aws_s3_client_dequeue_request_threaded(). Included in the prepare pipeline check in
* s_s3_client_should_update_meta_request().
*/
static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client) {
AWS_PRECONDITION(client);
AWS_PRECONDITION(client->vtable);
Expand Down Expand Up @@ -2109,6 +2239,9 @@ void s_acquire_mem_and_prepare_request(
aws_s3_meta_request_prepare_request(request->meta_request, request, callback, user_data);
}

/* Iterate all active meta requests and call update() on each to get the next request to send.
* If update() returns a request, hand it off to async preparation. If update() returns
* work_remaining=false, the meta request is done and gets removed from threaded_data. */
void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
AWS_PRECONDITION(client);

Expand All @@ -2121,6 +2254,16 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
uint32_t num_requests_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming_request_body);

/* Two passes over the meta request list. The first pass is conservative: each meta request
* type self-limits how many requests it produces (GET caps at 8 in-flight, PUT caps at
* 1 pending read, COPY caps at 1 in-flight). This keeps any single meta request from
* flooding the prepare pipeline.
* The second pass has no per-type restriction, letting meta requests fill remaining
* capacity on a first-come-first-served basis. In this pass the only limits are:
* - GET: no internal limit (only client-level caps apply).
* - PUT: s_max_parts_pending_read (5) limits concurrent body reads.
* - COPY: no internal limit (only client-level caps apply).
* - Default: produces at most one request total (single-request operation). */
const uint32_t pass_flags[] = {
AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE,
0,
Expand Down
42 changes: 42 additions & 0 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,48 @@
* SPDX-License-Identifier: Apache-2.0.
*/

/*
* A meta request represents a single user-initiated S3 operation (GetObject, PutObject,
* CopyObject, or a pass-through default request). It is a state machine that produces
* individual HTTP requests on demand, not a pre-planned list. The client's work loop
* calls vtable->update() repeatedly, and the meta request decides what to produce next
* based on its current state and the results of previous requests.
*
* For PutObject, the state machine starts by producing a CreateMultipartUpload request.
* Once the response arrives with an upload_id, subsequent update() calls produce UploadPart
* requests until flow-control limits are hit or all parts are started. After all parts
* complete, it produces a CompleteMultipartUpload. When that finishes, update() returns
* work_remaining=false and the client removes the meta request.
*
* For GetObject, the state machine first sends a HeadObject or a ranged GET for part 1 to
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a lot more complicated than that. there is a doc on that in design folder

* discover the object size. Once the size is known, subsequent update() calls produce
* ranged GETs (Range: bytes=X-Y) for each part until all parts are requested.
*
* Every aws_s3_request carries a backlink (request->meta_request) and a request_tag so
* completions route back to the right meta request and it knows which sub-operation
* finished. The client does not understand PUT/GET/COPY logic; it just calls update()
* on each meta request and routes completions back via the backlink by calling
* aws_s3_meta_request_finished_request().
*
* Each meta request type implements three vtable functions:
* update() - produce the next request based on current state.
* prepare_request() - build the HTTP message (headers, body).
* finished_request() - handle completion, advance the state machine.
* See s3_auto_ranged_get.c, s3_auto_ranged_put.c, s3_copy_object.c,
* s3_default_meta_request.c.
*
* update() is called from the client's process_work_event_loop thread.
* prepare_request() runs async on the body_streaming_elg or the meta request's
* io_event_loop. finished_request() can be called from any networking thread.
* The meta request has its own synced_data/lock for state shared across threads,
* following the same pattern as the client.
*
* This file manages the preparation pipeline: vtable->prepare_request() builds the
* HTTP message and reads the body, then s_s3_meta_request_sign_request() signs it
* with SigV4, then the callback notifies the client that the request is ready for
* connection assignment.
*/

#include "aws/s3/private/s3_auto_ranged_get.h"
#include "aws/s3/private/s3_auto_ranged_put.h"
#include "aws/s3/private/s3_checksums.h"
Expand Down
Loading