@@ -1237,6 +1237,18 @@ static int s_add_request_operation_to_correlation_token_table(
12371237 struct aws_mqtt_request_response_client * client ,
12381238 struct aws_mqtt_rr_client_operation * operation ) {
12391239
1240+ // First inspect whether the correlation token is already in use
1241+ struct aws_hash_element * elem = NULL ;
1242+ aws_hash_table_find (
1243+ & client -> operations_by_correlation_tokens ,
1244+ & operation -> storage .request_storage .options .correlation_token ,
1245+ & elem );
1246+ if (elem != NULL ) {
1247+ // correlation token is already in use. Return appropriate error.
1248+ aws_raise_error (AWS_ERROR_MQTT_REQUEST_RESPONSE_DUPLICATE_CORRELATION_TOKEN );
1249+ return AWS_OP_ERR ;
1250+ }
1251+
12401252 return aws_hash_table_put (
12411253 & client -> operations_by_correlation_tokens ,
12421254 & operation -> storage .request_storage .options .correlation_token ,
@@ -1684,8 +1696,9 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg,
16841696done :
16851697
16861698 /*
1687- * We hold a second reference to the operation during submission. This ensures that even if a streaming operation
1688- * is immediately dec-refed by the creator (before submission completes), the operation will not get destroyed.
1699+ * We hold a second reference to the operation during submission. This ensures that even if a streaming
1700+ * operation is immediately dec-refed by the creator (before submission completes), the operation will not get
1701+ * destroyed.
16891702 *
16901703 * It is now safe and correct to release that reference.
16911704 *
@@ -2004,8 +2017,9 @@ int aws_mqtt_request_response_client_submit_request(
20042017 s_log_request_response_operation (operation , client );
20052018
20062019 /*
2007- * We hold a second reference to the operation during submission. This ensures that even if a streaming operation
2008- * is immediately dec-refed by the creator (before submission runs), the operation will not get destroyed.
2020+ * We hold a second reference to the operation during submission. This ensures that even if a streaming
2021+ * operation is immediately dec-refed by the creator (before submission runs), the operation will not get
2022+ * destroyed.
20092023 */
20102024 aws_mqtt_rr_client_operation_acquire (operation );
20112025
@@ -2103,8 +2117,9 @@ int aws_mqtt_rr_client_operation_activate(struct aws_mqtt_rr_client_operation *o
21032117 operation -> id );
21042118
21052119 /*
2106- * We hold a second reference to the operation during submission. This ensures that even if a streaming operation
2107- * is immediately dec-refed by the creator (before submission runs), the operation will not get destroyed.
2120+ * We hold a second reference to the operation during submission. This ensures that even if a streaming
2121+ * operation is immediately dec-refed by the creator (before submission runs), the operation will not get
2122+ * destroyed.
21082123 */
21092124 aws_mqtt_rr_client_operation_acquire (operation );
21102125
0 commit comments