Skip to content

Commit c03608c

Browse files
committed
Add read-write lock to protect MQTT connection callbacks
1 parent 7d5d6a8 commit c03608c

2 files changed

Lines changed: 66 additions & 0 deletions

File tree

include/aws/mqtt/private/client_impl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <aws/common/hash_table.h>
1818
#include <aws/common/mutex.h>
19+
#include <aws/common/rw_lock.h>
1920
#include <aws/common/task_scheduler.h>
2021

2122
#include <aws/io/channel.h>
@@ -28,15 +29,19 @@ struct aws_mqtt_client_connection_311_impl;
2829

2930
#define MQTT_CLIENT_CALL_CALLBACK(client_ptr, callback) \
3031
do { \
32+
aws_rw_lock_rlock(&(client_ptr)->callback_lock); \
3133
if ((client_ptr)->callback) { \
3234
(client_ptr)->callback((&client_ptr->base), (client_ptr)->callback##_ud); \
3335
} \
36+
aws_rw_lock_runlock(&(client_ptr)->callback_lock); \
3437
} while (false)
3538
#define MQTT_CLIENT_CALL_CALLBACK_ARGS(client_ptr, callback, ...) \
3639
do { \
40+
aws_rw_lock_rlock(&(client_ptr)->callback_lock); \
3741
if ((client_ptr)->callback) { \
3842
(client_ptr)->callback((&client_ptr->base), __VA_ARGS__, (client_ptr)->callback##_ud); \
3943
} \
44+
aws_rw_lock_runlock(&(client_ptr)->callback_lock); \
4045
} while (false)
4146

4247
#if ASSERT_LOCK_HELD
@@ -278,6 +283,9 @@ struct aws_mqtt_client_connection_311_impl {
278283
aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics;
279284
void *on_any_operation_statistics_ud;
280285

286+
/* Read-write lock to protect callbacks from being modified during callback execution */
287+
struct aws_rw_lock callback_lock;
288+
281289
/* listener callbacks */
282290
struct aws_mqtt311_callback_set_manager callback_manager;
283291

source/client.c

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,7 @@ static void s_mqtt_client_connection_destroy_final(struct aws_mqtt_client_connec
844844
aws_memory_pool_clean_up(&connection->synced_data.requests_pool);
845845

846846
aws_mutex_clean_up(&connection->synced_data.lock);
847+
aws_rw_lock_clean_up(&connection->callback_lock);
847848

848849
aws_tls_connection_options_clean_up(&connection->tls_options);
849850

@@ -1144,12 +1145,21 @@ static int s_aws_mqtt_client_connection_311_set_connection_result_handlers(
11441145
goto done;
11451146
}
11461147

1148+
if (aws_rw_lock_try_wlock(&connection->callback_lock)) {
1149+
AWS_LOGF_ERROR(
1150+
AWS_LS_MQTT_CLIENT, "id=%p: Failed to set callback handlers, callbacks are in use.", (void *)connection);
1151+
result = aws_raise_error(AWS_ERROR_INVALID_STATE);
1152+
goto done;
1153+
}
1154+
11471155
connection->on_connection_success = on_connection_success;
11481156
connection->on_connection_success_ud = on_connection_success_ud;
11491157
connection->on_connection_failure = on_connection_failure;
11501158
connection->on_connection_failure_ud = on_connection_failure_ud;
11511159
result = AWS_OP_SUCCESS;
11521160

1161+
aws_rw_lock_wunlock(&connection->callback_lock);
1162+
11531163
done:
11541164
mqtt_connection_unlock_synced_data(connection);
11551165
/* END CRITICAL SECTION */
@@ -1187,12 +1197,21 @@ static int s_aws_mqtt_client_connection_311_set_connection_interruption_handlers
11871197
goto done;
11881198
}
11891199

1200+
if (aws_rw_lock_try_wlock(&connection->callback_lock)) {
1201+
AWS_LOGF_ERROR(
1202+
AWS_LS_MQTT_CLIENT, "id=%p: Failed to set callback handlers, callbacks are in use.", (void *)connection);
1203+
result = aws_raise_error(AWS_ERROR_INVALID_STATE);
1204+
goto done;
1205+
}
1206+
11901207
connection->on_interrupted = on_interrupted;
11911208
connection->on_interrupted_ud = on_interrupted_ud;
11921209
connection->on_resumed = on_resumed;
11931210
connection->on_resumed_ud = on_resumed_ud;
11941211
result = AWS_OP_SUCCESS;
11951212

1213+
aws_rw_lock_wunlock(&connection->callback_lock);
1214+
11961215
done:
11971216
mqtt_connection_unlock_synced_data(connection);
11981217
/* END CRITICAL SECTION */
@@ -1212,6 +1231,7 @@ static int s_aws_mqtt_client_connection_311_set_connection_closed_handler(
12121231
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Setting connection closed handler", (void *)connection);
12131232

12141233
int result = AWS_OP_ERR;
1234+
12151235
/* BEGIN CRITICAL SECTION */
12161236
mqtt_connection_lock_synced_data(connection);
12171237

@@ -1226,10 +1246,19 @@ static int s_aws_mqtt_client_connection_311_set_connection_closed_handler(
12261246
goto done;
12271247
}
12281248

1249+
if (aws_rw_lock_try_wlock(&connection->callback_lock)) {
1250+
AWS_LOGF_ERROR(
1251+
AWS_LS_MQTT_CLIENT, "id=%p: Failed to set callback handlers, callbacks are in use.", (void *)connection);
1252+
result = aws_raise_error(AWS_ERROR_INVALID_STATE);
1253+
goto done;
1254+
}
1255+
12291256
connection->on_closed = on_closed;
12301257
connection->on_closed_ud = on_closed_ud;
12311258
result = AWS_OP_SUCCESS;
12321259

1260+
aws_rw_lock_wunlock(&connection->callback_lock);
1261+
12331262
done:
12341263
/* END CRITICAL SECTION */
12351264
mqtt_connection_unlock_synced_data(connection);
@@ -1261,9 +1290,16 @@ static int s_aws_mqtt_client_connection_311_set_on_any_publish_handler(
12611290

12621291
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Setting on_any_publish handler", (void *)connection);
12631292

1293+
if (aws_rw_lock_try_wlock(&connection->callback_lock)) {
1294+
AWS_LOGF_ERROR(
1295+
AWS_LS_MQTT_CLIENT, "id=%p: Failed to set callback handlers, callbacks are in use.", (void *)connection);
1296+
return aws_raise_error(AWS_ERROR_INVALID_STATE);
1297+
}
1298+
12641299
connection->on_any_publish = on_any_publish;
12651300
connection->on_any_publish_ud = on_any_publish_ud;
12661301

1302+
aws_rw_lock_wunlock(&connection->callback_lock);
12671303
return AWS_OP_SUCCESS;
12681304
}
12691305

@@ -1293,10 +1329,19 @@ static int s_aws_mqtt_client_connection_311_set_connection_termination_handler(
12931329
goto done;
12941330
}
12951331

1332+
if (aws_rw_lock_try_wlock(&connection->callback_lock)) {
1333+
AWS_LOGF_ERROR(
1334+
AWS_LS_MQTT_CLIENT, "id=%p: Failed to set callback handlers, callbacks are in use.", (void *)connection);
1335+
result = aws_raise_error(AWS_ERROR_INVALID_STATE);
1336+
goto done;
1337+
}
1338+
12961339
connection->on_termination = on_termination;
12971340
connection->on_termination_ud = on_termination_ud;
12981341
result = AWS_OP_SUCCESS;
12991342

1343+
aws_rw_lock_wunlock(&connection->callback_lock);
1344+
13001345
done:
13011346
mqtt_connection_unlock_synced_data(connection);
13021347
/* END CRITICAL SECTION */
@@ -3546,6 +3591,16 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt
35463591
goto failed_init_mutex;
35473592
}
35483593

3594+
if (aws_rw_lock_init(&connection->callback_lock)) {
3595+
AWS_LOGF_ERROR(
3596+
AWS_LS_MQTT_CLIENT,
3597+
"id=%p: Failed to initialize callback lock, error %d (%s)",
3598+
(void *)connection,
3599+
aws_last_error(),
3600+
aws_error_name(aws_last_error()));
3601+
goto failed_init_callback_lock;
3602+
}
3603+
35493604
struct aws_mqtt311_decoder_options config = {
35503605
.packet_handlers = aws_mqtt311_get_default_packet_handlers(),
35513606
.handler_user_data = connection,
@@ -3615,6 +3670,9 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt
36153670
aws_mqtt_topic_tree_clean_up(&connection->thread_data.subscriptions);
36163671

36173672
failed_init_subscriptions:
3673+
aws_rw_lock_clean_up(&connection->callback_lock);
3674+
3675+
failed_init_callback_lock:
36183676
aws_mutex_clean_up(&connection->synced_data.lock);
36193677

36203678
failed_init_mutex:

0 commit comments

Comments
 (0)