Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions include/aws/s3/private/s3_checksums.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,20 @@ struct aws_input_stream *aws_checksum_stream_new(
* @param existing_stream The data to be chunkified prepended by information on the stream length followed by a final
* chunk and a trailing chunk containing a checksum of the existing stream. Destroying the
* chunk stream will destroy the existing stream.
* @param checksum_output Optional argument, if provided the buffer will be initialized to the appropriate size and
* @param checksum_buffer Optional.
* - If the checksum_buffer is NULL, the checksum will still be calculated to append as
* trailer.
* - Empty buffer, the buffer will be initialized to the appropriate size and
* filled with the checksum result when calculated. Callers responsibility to cleanup.
* - Otherwise, the buffer will be used directly.
* Caller takes the ownership of the buffer, error or not.
*/
AWS_S3_API
struct aws_input_stream *aws_chunk_stream_new(
struct aws_allocator *allocator,
struct aws_input_stream *existing_stream,
enum aws_s3_checksum_algorithm algorithm,
struct aws_byte_buf *checksum_output);
struct aws_byte_buf *checksum_buffer);

/**
* Get the size of the checksum output corresponding to the aws_s3_checksum_algorithm enum value.
Expand Down
2 changes: 2 additions & 0 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ struct aws_s3_client_vtable {
struct aws_http_stream *(*http_connection_make_request)(
struct aws_http_connection *client_connection,
const struct aws_http_make_request_options *options);

void (*after_prepare_upload_part_finish)(struct aws_s3_request *request);
};

struct aws_s3_upload_part_timeout_stats {
Expand Down
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_request_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct aws_input_stream *aws_s3_message_util_assign_body(
struct aws_byte_buf *byte_buf,
struct aws_http_message *out_message,
const struct checksum_config_storage *checksum_config,
struct aws_byte_buf *out_checksum);
struct aws_byte_buf *checksum_buf);

/* Create an HTTP request for an S3 Ranged Get Object Request, using the given request as a basis */
AWS_S3_API
Expand Down Expand Up @@ -90,7 +90,7 @@ struct aws_http_message *aws_s3_upload_part_message_new(
const struct aws_string *upload_id,
bool should_compute_content_md5,
const struct checksum_config_storage *checksum_config,
struct aws_byte_buf *encoded_checksum_output);
struct aws_byte_buf *encoded_checksum);

/* Create an HTTP request for an S3 UploadPartCopy request, using the original request as a basis.
* If multipart is not needed, part number and upload_id can be 0 and NULL,
Expand Down
11 changes: 9 additions & 2 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
static void s_s3_prepare_upload_part_finish(struct aws_s3_prepare_upload_part_job *part_prep, int error_code) {
struct aws_s3_request *request = part_prep->request;
struct aws_s3_meta_request *meta_request = request->meta_request;
struct aws_s3_client *client = meta_request->client;
struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;

if (error_code != AWS_ERROR_SUCCESS) {
Expand All @@ -1141,8 +1142,10 @@ static void s_s3_prepare_upload_part_finish(struct aws_s3_prepare_upload_part_jo
aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1);
AWS_ASSERT(part != NULL);
checksum_buf = &part->checksum_base64;
/* Clean up the buffer in case of it's initialized before and retry happens. */
aws_byte_buf_clean_up(checksum_buf);
/* If checksum buf is not empty, it means either the part being retried or the part resumed from list parts.
* Keep reusing the old checksum in case of the request body in memory mangled */
AWS_ASSERT(
checksum_buf->len == 0 || request->num_times_prepared > 0 || auto_ranged_put->resume_token != NULL);
aws_s3_meta_request_unlock_synced_data(meta_request);
}
/* END CRITICAL SECTION */
Expand Down Expand Up @@ -1173,6 +1176,10 @@ static void s_s3_prepare_upload_part_finish(struct aws_s3_prepare_upload_part_jo
aws_future_http_message_set_result_by_move(part_prep->on_complete, &message);

on_done:
if (client->vtable->after_prepare_upload_part_finish) {
/* TEST ONLY, allow test to stub here. */
client->vtable->after_prepare_upload_part_finish(request);
}
AWS_FATAL_ASSERT(aws_future_http_message_is_done(part_prep->on_complete));
aws_future_bool_release(part_prep->asyncstep_read_part);
aws_future_http_message_release(part_prep->on_complete);
Expand Down
94 changes: 60 additions & 34 deletions source/s3_chunk_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ typedef int(set_stream_fn)(struct aws_chunk_stream *parent_stream);
struct aws_chunk_stream {
struct aws_input_stream base;
struct aws_allocator *allocator;

/* aws_input_stream_byte_cursor provides our actual functionality */
/* Pointing to the stream we read from */
struct aws_input_stream *current_stream;

struct aws_input_stream *checksum_stream;
struct aws_byte_buf checksum_result;
struct aws_byte_buf *checksum_result_output;
/* The passed-in buffer, owned by the caller. If passed-in buffer is empty
* it will be created by the chunk stream, but still caller owns its lifetime. Error or not. */
struct aws_byte_buf *passed_in_checksum_buffer;
/* If there is no passed in checksum buffer, we still calculate the checksum. This stores the checksum. */
struct aws_byte_buf checksum_buffer;

struct aws_input_stream *chunk_body_stream;
struct aws_byte_buf pre_chunk_buffer;
struct aws_byte_buf post_chunk_buffer;
struct aws_byte_cursor checksum_header_name;
Expand Down Expand Up @@ -62,17 +65,23 @@ static int s_set_post_chunk_stream(struct aws_chunk_stream *parent_stream) {
}
struct aws_byte_cursor post_trailer_cursor = aws_byte_cursor_from_string(s_post_trailer);
struct aws_byte_cursor colon_cursor = aws_byte_cursor_from_string(s_colon);

if (parent_stream->checksum_result.len == 0) {
AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Failed to extract base64 encoded checksum of stream");
return aws_raise_error(AWS_ERROR_S3_CHECKSUM_CALCULATION_FAILED);
}
struct aws_byte_cursor checksum_result_cursor = aws_byte_cursor_from_buf(&parent_stream->checksum_result);
if (parent_stream->checksum_result_output &&
aws_byte_buf_init_copy_from_cursor(
parent_stream->checksum_result_output, parent_stream->allocator, checksum_result_cursor)) {
return AWS_OP_ERR;
struct aws_byte_cursor checksum_result_cursor;
if (!parent_stream->passed_in_checksum_buffer || parent_stream->passed_in_checksum_buffer->len == 0) {
if (parent_stream->checksum_buffer.len == 0) {
AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Failed to extract base64 encoded checksum of stream");
return aws_raise_error(AWS_ERROR_S3_CHECKSUM_CALCULATION_FAILED);
}
checksum_result_cursor = aws_byte_cursor_from_buf(&parent_stream->checksum_buffer);
if (parent_stream->passed_in_checksum_buffer) {
/* the passed in checksum buffer is empty, initialize it with the calculated checksum */
aws_byte_buf_init_copy(
parent_stream->passed_in_checksum_buffer, parent_stream->allocator, &parent_stream->checksum_buffer);
}
} else {
/* The passed-in checksum buffer already have the checksum. */
checksum_result_cursor = aws_byte_cursor_from_buf(parent_stream->passed_in_checksum_buffer);
}

if (aws_byte_buf_init(
&parent_stream->post_chunk_buffer,
parent_stream->allocator,
Expand All @@ -92,16 +101,15 @@ static int s_set_post_chunk_stream(struct aws_chunk_stream *parent_stream) {
parent_stream->set_current_stream_fn = s_set_null_stream;
return AWS_OP_SUCCESS;
error:
aws_byte_buf_clean_up(parent_stream->checksum_result_output);
aws_byte_buf_clean_up(&parent_stream->post_chunk_buffer);
return AWS_OP_ERR;
}

static int s_set_chunk_stream(struct aws_chunk_stream *parent_stream) {
aws_input_stream_release(parent_stream->current_stream);
parent_stream->current_stream = parent_stream->checksum_stream;
parent_stream->current_stream = parent_stream->chunk_body_stream;
aws_byte_buf_clean_up(&parent_stream->pre_chunk_buffer);
parent_stream->checksum_stream = NULL;
parent_stream->chunk_body_stream = NULL;
parent_stream->set_current_stream_fn = s_set_post_chunk_stream;
return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -169,12 +177,12 @@ static void s_aws_input_chunk_stream_destroy(struct aws_chunk_stream *impl) {
if (impl->current_stream) {
aws_input_stream_release(impl->current_stream);
}
if (impl->checksum_stream) {
aws_input_stream_release(impl->checksum_stream);
if (impl->chunk_body_stream) {
aws_input_stream_release(impl->chunk_body_stream);
}
aws_byte_buf_clean_up(&impl->pre_chunk_buffer);
aws_byte_buf_clean_up(&impl->checksum_result);
aws_byte_buf_clean_up(&impl->post_chunk_buffer);
aws_byte_buf_clean_up(&impl->checksum_buffer);
aws_mem_release(impl->allocator, impl);
}
}
Expand All @@ -190,13 +198,16 @@ struct aws_input_stream *aws_chunk_stream_new(
struct aws_allocator *allocator,
struct aws_input_stream *existing_stream,
enum aws_s3_checksum_algorithm algorithm,
struct aws_byte_buf *checksum_output) {
struct aws_byte_buf *checksum_buffer) {
AWS_PRECONDITION(allocator);
AWS_PRECONDITION(existing_stream);

struct aws_chunk_stream *impl = aws_mem_calloc(allocator, 1, sizeof(struct aws_chunk_stream));

impl->allocator = allocator;
impl->base.vtable = &s_aws_input_chunk_stream_vtable;
impl->checksum_result_output = checksum_output;
impl->passed_in_checksum_buffer = checksum_buffer;

int64_t stream_length = 0;
int64_t final_chunk_len = 0;
if (aws_input_stream_get_length(existing_stream, &stream_length)) {
Expand Down Expand Up @@ -225,15 +236,30 @@ struct aws_input_stream *aws_chunk_stream_new(
if (aws_base64_compute_encoded_len(checksum_len, &encoded_checksum_len)) {
goto error;
}
if (aws_byte_buf_init(&impl->checksum_result, allocator, encoded_checksum_len)) {
goto error;
}

impl->checksum_stream = aws_checksum_stream_new(allocator, existing_stream, algorithm, &impl->checksum_result);
if (impl->checksum_stream == NULL) {
goto error;
if (!checksum_buffer || checksum_buffer->len == 0) {
/* Empty passed-in buffer or no passed-in, calculate the checksum during reading from the stream. */
if (aws_byte_buf_init(&impl->checksum_buffer, allocator, encoded_checksum_len)) {
goto error;
}
/* Wrap the existing stream with checksum stream to calculate the checksum when reading from it. */
impl->chunk_body_stream =
aws_checksum_stream_new(allocator, existing_stream, algorithm, &impl->checksum_buffer);
if (impl->chunk_body_stream == NULL) {
goto error;
}
} else {
if (checksum_buffer->len != encoded_checksum_len) {
AWS_LOGF_ERROR(
AWS_LS_S3_GENERAL,
"Mismatched checksum buffer and algorithm. checksum_buf->len is %zu, but encoded_checksum_len is %zu",
checksum_buffer->len,
encoded_checksum_len);
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
goto error;
}
/* No need to calculate the checksum during read, use the existing stream directly. */
impl->chunk_body_stream = aws_input_stream_acquire(existing_stream);
}

int64_t prechunk_stream_len = 0;
int64_t colon_len = s_colon->len;
int64_t post_trailer_len = s_post_trailer->len;
Expand All @@ -248,9 +274,9 @@ struct aws_input_stream *aws_chunk_stream_new(
}
impl->set_current_stream_fn = s_set_chunk_stream;
} else {
impl->current_stream = impl->checksum_stream;
impl->current_stream = impl->chunk_body_stream;
final_chunk_len = s_empty_chunk->len;
impl->checksum_stream = NULL;
impl->chunk_body_stream = NULL;
impl->set_current_stream_fn = s_set_post_chunk_stream;
}

Expand All @@ -268,10 +294,10 @@ struct aws_input_stream *aws_chunk_stream_new(
return &impl->base;

error:
aws_input_stream_release(impl->checksum_stream);
aws_input_stream_release(impl->chunk_body_stream);
aws_input_stream_release(impl->current_stream);
aws_byte_buf_clean_up(&impl->pre_chunk_buffer);
aws_byte_buf_clean_up(&impl->checksum_result);
aws_byte_buf_clean_up(&impl->checksum_buffer);
aws_mem_release(impl->allocator, impl);
return NULL;
}
Loading