Skip to content

Commit e99aa26

Browse files
committed
track use of correlation tokens from a RR operation submission through decref/destruction
1 parent d1ac9ac commit e99aa26

4 files changed

Lines changed: 47 additions & 23 deletions

File tree

include/aws/mqtt/mqtt.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ enum aws_mqtt_error {
9393
AWS_ERROR_MQTT_REQUEST_RESPONSE_MODELED_SERVICE_ERROR,
9494
AWS_ERROR_MQTT_REQUEST_RESPONSE_PAYLOAD_PARSE_ERROR,
9595
AWS_ERROR_MQTT_REQUEST_RESPONSE_INVALID_RESPONSE_PATH,
96-
AWS_ERROR_MQTT_REQUEST_RESPONSE_DUPLICATE_CORRELATION_TOKEN,
9796

9897
AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
9998
};

source/mqtt.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,6 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter)
269269
AWS_DEFINE_ERROR_INFO_MQTT(
270270
AWS_ERROR_MQTT_REQUEST_RESPONSE_INVALID_RESPONSE_PATH,
271271
"Request-response operation failed due to arrival on an unknown response path"),
272-
AWS_DEFINE_ERROR_INFO_MQTT(
273-
AWS_ERROR_MQTT_REQUEST_RESPONSE_DUPLICATE_CORRELATION_TOKEN,
274-
"Request-response operation failed due to a duplicate correlation token."),
275272
};
276273
/* clang-format on */
277274
#undef AWS_DEFINE_ERROR_INFO_MQTT

source/request-response/request_response_client.c

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ struct aws_mqtt_request_response_client {
273273
* Map from cursor (correlation token) -> request operation
274274
*/
275275
struct aws_hash_table operations_by_correlation_tokens;
276+
277+
/* Track correlation tokens currently in use by operations in queued or active states */
278+
struct aws_hash_table correlation_tokens_in_use;
276279
};
277280

278281
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire_internal(
@@ -318,6 +321,7 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request
318321

319322
aws_mqtt_request_response_client_subscriptions_clean_up(&client->subscriptions);
320323
aws_hash_table_clean_up(&client->operations_by_correlation_tokens);
324+
aws_hash_table_clean_up(&client->correlation_tokens_in_use);
321325

322326
aws_event_loop_group_release_from_event_loop(client->loop);
323327
aws_mem_release(client->allocator, client);
@@ -1102,6 +1106,15 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie
11021106
NULL,
11031107
NULL);
11041108

1109+
aws_hash_table_init(
1110+
&rr_client->correlation_tokens_in_use,
1111+
allocator,
1112+
MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
1113+
aws_hash_byte_cursor_ptr,
1114+
aws_mqtt_byte_cursor_hash_equality,
1115+
NULL,
1116+
NULL);
1117+
11051118
aws_linked_list_init(&rr_client->operation_queue);
11061119

11071120
aws_task_init(
@@ -1234,25 +1247,17 @@ static int s_add_request_operation_to_response_path_table(
12341247
}
12351248

12361249
static bool s_is_correlation_token_in_use(
1237-
struct aws_mqtt_request_response_client *client,
1238-
struct aws_mqtt_rr_client_operation *operation) {
1250+
const struct aws_mqtt_request_response_client *client,
1251+
const struct aws_byte_cursor *correlation_token) {
12391252
struct aws_hash_element *elem = NULL;
1240-
aws_hash_table_find(
1241-
&client->operations_by_correlation_tokens,
1242-
&operation->storage.request_storage.options.correlation_token,
1243-
&elem);
1253+
aws_hash_table_find(&client->correlation_tokens_in_use, correlation_token, &elem);
12441254
return elem != NULL;
12451255
}
12461256

12471257
static int s_add_request_operation_to_correlation_token_table(
12481258
struct aws_mqtt_request_response_client *client,
12491259
struct aws_mqtt_rr_client_operation *operation) {
12501260

1251-
if (s_is_correlation_token_in_use(client, operation)) {
1252-
aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_DUPLICATE_CORRELATION_TOKEN);
1253-
return AWS_OP_ERR;
1254-
}
1255-
12561261
return aws_hash_table_put(
12571262
&client->operations_by_correlation_tokens,
12581263
&operation->storage.request_storage.options.correlation_token,
@@ -1632,6 +1637,17 @@ static bool s_are_request_operation_options_valid(
16321637
return false;
16331638
}
16341639

1640+
if (request_options->correlation_token.len > 0) {
1641+
// do a correlation token check
1642+
if (s_is_correlation_token_in_use(client, &request_options->correlation_token)) {
1643+
AWS_LOGF_ERROR(
1644+
AWS_LS_MQTT_REQUEST_RESPONSE,
1645+
"(%p) rr client request options - correlation token is already in use.",
1646+
(void *)client);
1647+
return false;
1648+
}
1649+
}
1650+
16351651
return true;
16361652
}
16371653

@@ -1712,6 +1728,15 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg,
17121728
aws_mqtt_rr_client_operation_release(operation);
17131729
}
17141730

1731+
static void s_remove_correlation_token_from_in_use(struct aws_mqtt_rr_client_operation *operation) {
1732+
if (operation == NULL || operation->type != AWS_MRROT_REQUEST) {
1733+
return;
1734+
}
1735+
struct aws_mqtt_request_response_client *client = operation->client_internal_ref;
1736+
aws_hash_table_remove(
1737+
&client->correlation_tokens_in_use, &operation->storage.request_storage.options.correlation_token, NULL, NULL);
1738+
}
1739+
17151740
static void s_aws_mqtt_streaming_operation_storage_clean_up(struct aws_mqtt_streaming_operation_storage *storage) {
17161741
aws_byte_buf_clean_up(&storage->operation_data);
17171742
}
@@ -1993,6 +2018,11 @@ int aws_mqtt_request_response_client_submit_request(
19932018
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
19942019
}
19952020

2021+
// Store the correlation token to prevent allowing a duplicate correlation token to be used.
2022+
if (request_options->correlation_token.len > 0) {
2023+
aws_hash_table_put(&client->correlation_tokens_in_use, &request_options->correlation_token, NULL, NULL);
2024+
}
2025+
19962026
uint64_t now = 0;
19972027
if (aws_high_res_clock_get_ticks(&now)) {
19982028
return aws_raise_error(AWS_ERROR_CLOCK_FAILURE);
@@ -2144,6 +2174,7 @@ struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire(
21442174
struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release(
21452175
struct aws_mqtt_rr_client_operation *operation) {
21462176
if (operation != NULL) {
2177+
s_remove_correlation_token_from_in_use(operation);
21472178
aws_ref_count_release(&operation->ref_count);
21482179
}
21492180

tests/request-response/request_response_client_tests.c

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3337,20 +3337,17 @@ static int s_rrc_request_response_failure_duplicate_correlation_token_fn(struct
33373337
false));
33383338

33393339
struct aws_byte_cursor record_key_2 = aws_byte_cursor_from_c_str("testkey2");
3340-
ASSERT_SUCCESS(s_rrc_test_submit_test_request(
3340+
s_rrc_test_submit_test_request(
33413341
&fixture,
33423342
RRC_PHDT_FAILURE_DUPLICATE_CORRELATION_TOKEN,
3343-
"test",
3343+
"test2",
33443344
record_key_2,
3345-
"test/accepted",
3345+
"test2/accepted",
33463346
"token1",
33473347
NULL,
3348-
false));
3349-
3350-
s_rrc_wait_on_request_completion(&fixture, record_key_2);
3348+
false);
33513349

3352-
ASSERT_SUCCESS(s_rrc_verify_request_completion(
3353-
&fixture, record_key_2, AWS_ERROR_MQTT_REQUEST_RESPONSE_DUPLICATE_CORRELATION_TOKEN, NULL, NULL));
3350+
ASSERT_INT_EQUALS(aws_last_error(), AWS_ERROR_INVALID_ARGUMENT);
33543351

33553352
s_aws_rr_client_test_fixture_clean_up(&fixture, false);
33563353

0 commit comments

Comments
 (0)