Skip to content

Commit 8c8d4cf

Browse files
committed
collect metrics
1 parent 3146a6e commit 8c8d4cf

File tree

5 files changed

+86
-3
lines changed

5 files changed

+86
-3
lines changed

include/aws/s3/private/s3_meta_request_impl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ struct aws_s3_meta_request_vtable {
124124
int (*pause)(struct aws_s3_meta_request *meta_request, struct aws_s3_meta_request_resume_token **resume_token);
125125
};
126126

127+
struct s3_data_read_metrics {
128+
uint64_t offset;
129+
uint64_t size;
130+
uint64_t start_timestamp;
131+
uint64_t end_timestamp;
132+
};
133+
127134
/**
128135
* This represents one meta request, ie, one accelerated file transfer. One S3 meta request can represent multiple S3
129136
* requests.
@@ -139,6 +146,7 @@ struct aws_s3_meta_request {
139146

140147
/* Initial HTTP Message that this meta request is based on. */
141148
struct aws_http_message *initial_request_message;
149+
struct aws_array_list read_metrics_list;
142150

143151
/* The meta request's outgoing body comes from one of these:
144152
* 1) request_body_async_stream: if set, then async stream 1 part at a time

include/aws/s3/private/s3_parallel_input_stream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
124124
struct aws_input_stream *aws_input_stream_new_from_parallel_stream(
125125
struct aws_allocator *allocator,
126126
struct aws_parallel_input_stream *stream,
127+
struct aws_s3_meta_request *meta_request,
127128
uint64_t offset,
128129
size_t request_body_size);
129130

source/s3_auto_ranged_put.c

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,7 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *
982982
uint64_t offset = 0;
983983
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
984984
request->request_stream = aws_input_stream_new_from_parallel_stream(
985-
allocator, meta_request->request_body_parallel_stream, offset, request_body_size);
985+
allocator, meta_request->request_body_parallel_stream, meta_request, offset, request_body_size);
986986
request->content_length = request_body_size;
987987
struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
988988

@@ -1020,14 +1020,71 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *
10201020
s_s3_prepare_upload_part_finish(part_prep, AWS_ERROR_SUCCESS);
10211021
} else {
10221022
printf("PARALLEL retry 8MB read\n");
1023+
// Create unique filename with timestamp and meta_request pointer
1024+
char filename[256];
1025+
struct timespec ts;
1026+
clock_gettime(CLOCK_REALTIME, &ts);
1027+
snprintf(
1028+
filename,
1029+
sizeof(filename),
1030+
"/tmp/s3_read_metrics_%p_%ld_%ld.csv",
1031+
(void *)meta_request,
1032+
ts.tv_sec,
1033+
ts.tv_nsec);
1034+
1035+
FILE *metrics_file = fopen(filename, "w");
1036+
/* BEGIN CRITICAL SECTION */
1037+
aws_s3_meta_request_lock_synced_data(meta_request);
1038+
/* write every read metric to a file */
1039+
size_t metric_length = aws_array_list_length(&meta_request->read_metrics_list);
1040+
/* write every read metric to a file */
1041+
if (metrics_file) {
1042+
// Write CSV header
1043+
fprintf(metrics_file, "index,offset,size,start_timestamp,end_timestamp,duration_ns,throughput_mbps\n");
1044+
// Write all metrics
1045+
for (size_t j = 0; j < metric_length; j++) {
1046+
struct s3_data_read_metrics m;
1047+
aws_array_list_get_at(&meta_request->read_metrics_list, &m, j);
1048+
1049+
uint64_t duration = m.end_timestamp - m.start_timestamp;
1050+
double throughput_mbps =
1051+
duration > 0 ? (double)(m.size * 8) / (duration / 1000.0) / 1000000.0 : 0.0;
1052+
1053+
fprintf(
1054+
metrics_file,
1055+
"%zu,%llu,%llu,%llu,%llu,%llu,%.2f\n",
1056+
j,
1057+
(unsigned long long)m.offset,
1058+
(unsigned long long)m.size,
1059+
(unsigned long long)m.start_timestamp,
1060+
(unsigned long long)m.end_timestamp,
1061+
(unsigned long long)duration,
1062+
throughput_mbps);
1063+
}
1064+
aws_array_list_clean_up(&meta_request->read_metrics_list);
1065+
fclose(metrics_file);
1066+
1067+
AWS_LOGF_INFO(
1068+
AWS_LS_S3_META_REQUEST,
1069+
"id=%p Wrote %zu read metrics to %s",
1070+
(void *)meta_request,
1071+
metric_length,
1072+
filename);
1073+
} else {
1074+
AWS_LOGF_ERROR(
1075+
AWS_LS_S3_META_REQUEST, "id=%p Failed to open metrics file %s", (void *)meta_request, filename);
1076+
}
1077+
aws_s3_meta_request_unlock_synced_data(meta_request);
1078+
/* END CRITICAL SECTION */
1079+
10231080
/* Not the first time preparing request (e.g. retry).
10241081
* We can skip over the async steps that read the body stream */
10251082
/* Seek back to beginning of the stream. */
10261083
aws_input_stream_release(request->request_stream);
10271084
uint64_t offset = 0;
10281085
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
10291086
request->request_stream = aws_input_stream_new_from_parallel_stream(
1030-
allocator, meta_request->request_body_parallel_stream, offset, request_body_size);
1087+
allocator, meta_request->request_body_parallel_stream, meta_request, offset, request_body_size);
10311088
s_s3_prepare_upload_part_finish(part_prep, AWS_ERROR_SUCCESS);
10321089
}
10331090
} else if (request->num_times_prepared == 0) {

source/s3_meta_request.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ int aws_s3_meta_request_init_base(
319319
} else if (options->send_using_async_writes == true) {
320320
meta_request->request_body_using_async_writes = true;
321321
}
322-
322+
aws_array_list_init_dynamic(&meta_request->read_metrics_list, allocator, 2048, sizeof(struct s3_data_read_metrics));
323323
meta_request->synced_data.next_streaming_part = 1;
324324

325325
meta_request->meta_request_level_running_response_sum = NULL;

source/s3_parallel_input_stream.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0.
44
*/
55

6+
#include <aws/s3/private/s3_meta_request_impl.h>
67
#include <aws/s3/private/s3_parallel_input_stream.h>
78

89
#include <aws/common/atomics.h>
@@ -259,6 +260,10 @@ struct aws_s3_mmap_part_streaming_input_stream_impl {
259260

260261
bool eos_loaded;
261262
bool eos_reached;
263+
264+
struct s3_data_read_metrics metrics;
265+
266+
struct aws_s3_meta_request *meta_request;
262267
};
263268

264269
static int s_aws_s3_mmap_part_streaming_input_stream_seek(
@@ -276,6 +281,7 @@ static int s_aws_s3_mmap_part_streaming_input_stream_read(struct aws_input_strea
276281
AWS_CONTAINER_OF(stream, struct aws_s3_mmap_part_streaming_input_stream_impl, base);
277282
/* Map the content */
278283
size_t read_length = aws_min_size(dest->capacity - dest->len, impl->total_length - impl->total_length_read);
284+
aws_high_res_clock_get_ticks(&impl->metrics.start_timestamp);
279285

280286
if (impl->in_chunk_offset == SIZE_MAX) {
281287
/* The reading buf is invalid. Block until the loading buf is available. */
@@ -308,13 +314,22 @@ static int s_aws_s3_mmap_part_streaming_input_stream_read(struct aws_input_strea
308314
impl->in_chunk_offset = 0;
309315
}
310316
read_length = aws_min_size(read_length, impl->reading_chunk_buf->len - impl->in_chunk_offset);
317+
impl->metrics.offset = impl->offset + impl->total_length_read;
318+
impl->metrics.size = read_length;
319+
311320
struct aws_byte_cursor chunk_cursor = aws_byte_cursor_from_buf(impl->reading_chunk_buf);
312321
aws_byte_cursor_advance(&chunk_cursor, impl->in_chunk_offset);
313322
chunk_cursor.len = read_length;
314323
aws_byte_buf_append(dest, &chunk_cursor);
315324
impl->in_chunk_offset += read_length;
316325
impl->total_length_read += read_length;
326+
aws_high_res_clock_get_ticks(&impl->metrics.end_timestamp);
317327

328+
/* BEGIN CRITICAL SECTION */
329+
aws_s3_meta_request_lock_synced_data(impl->meta_request);
330+
aws_array_list_push_back(&impl->meta_request->read_metrics_list, &impl->metrics);
331+
aws_s3_meta_request_unlock_synced_data(impl->meta_request);
332+
/* END CRITICAL SECTION */
318333
if (impl->in_chunk_offset == impl->reading_chunk_buf->len) {
319334
/* We finished reading the reading buffer, reset it. */
320335
aws_byte_buf_reset(impl->reading_chunk_buf, false);
@@ -391,6 +406,7 @@ void aws_streaming_input_stream_reset(struct aws_input_stream *stream) {
391406
struct aws_input_stream *aws_input_stream_new_from_parallel_stream(
392407
struct aws_allocator *allocator,
393408
struct aws_parallel_input_stream *stream,
409+
struct aws_s3_meta_request *meta_request,
394410
uint64_t offset,
395411
size_t request_body_size) {
396412

@@ -412,6 +428,7 @@ struct aws_input_stream *aws_input_stream_new_from_parallel_stream(
412428
aws_byte_buf_init(&impl->chunk_buf_2, allocator, impl->chunk_load_size);
413429
impl->loading_chunk_buf = &impl->chunk_buf_1;
414430
impl->reading_chunk_buf = &impl->chunk_buf_2;
431+
impl->meta_request = meta_request;
415432

416433
/* Reset the input stream to start */
417434
aws_streaming_input_stream_reset(&impl->base);

0 commit comments

Comments
 (0)