Skip to content

Commit 06527f1

Browse files
committed
Merge branch 'manual-puback' into manual-puback-tests
2 parents fdc2d98 + 19648ef commit 06527f1

4 files changed

Lines changed: 58 additions & 38 deletions

File tree

include/aws/mqtt/private/v5/mqtt5_options_storage.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -281,11 +281,6 @@ AWS_MQTT_API void aws_mqtt5_packet_puback_view_log(
281281
const struct aws_mqtt5_packet_puback_view *puback_view,
282282
enum aws_log_level level);
283283

284-
AWS_MQTT_API struct aws_mqtt5_manual_puback_entry *s_aws_mqtt_manual_puback_entry_new(
285-
struct aws_allocator *allocator,
286-
uint16_t packet_id,
287-
uint64_t puback_control_id);
288-
289284
/* Subscribe */
290285

291286
AWS_MQTT_API struct aws_mqtt5_operation_subscribe *aws_mqtt5_operation_subscribe_new(

include/aws/mqtt/v5/mqtt5_client.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,9 @@ typedef void(aws_mqtt5_publish_completion_fn)(
300300
* @param puback_result result of the PUBACK operation
301301
* @param complete_ctx user data passed in with the completion options
302302
*/
303-
typedef void(
304-
aws_mqtt5_manual_puback_completion_fn)(enum aws_mqtt5_manual_puback_result puback_result, void *complete_ctx);
303+
typedef void(aws_mqtt5_manual_puback_completion_fn)(
304+
enum aws_mqtt5_manual_puback_result puback_result,
305+
void *completion_user_data);
305306

306307
/**
307308
* Signature of callback to invoke on Subscribe success/failure.
@@ -773,7 +774,7 @@ AWS_MQTT_API uint64_t aws_mqtt5_client_acquire_puback(
773774
*
774775
* @param client mqtt5 client to queue a puback for
775776
* @param puback_control_id Control ID of aws_mqtt5_manual_puback_entry to send to broker/server
776-
* @return success/failure of the manual PUBACK operation.
777+
* @return success/failure of starting the manual PUBACK operation.
777778
*/
778779
AWS_MQTT_API
779780
int aws_mqtt5_client_invoke_puback(

source/v5/mqtt5_client.c

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ static int s_manual_puback_transfer(void *context, struct aws_hash_element *elem
608608
}
609609

610610
/* This is called when the manual puback entry is removed from a hashset to properly decref on removal */
611-
static void aws_mqtt5_manual_puback_entry_decref(void *value) {
611+
static void s_aws_mqtt5_manual_puback_entry_decref(void *value) {
612612
struct aws_mqtt5_manual_puback_entry *manual_puback_entry = value;
613613
if (manual_puback_entry != NULL) {
614614
aws_ref_count_release(&manual_puback_entry->ref_count);
@@ -622,9 +622,10 @@ static void s_aws_mqtt5_reset_manual_puback_tables(
622622
struct aws_mqtt5_client_operational_state *client_operational_state) {
623623
size_t count = aws_hash_table_get_entry_count(&client_operational_state->manual_puback_control_id_table);
624624
if (count > 0) {
625-
AWS_LOGF_INFO(
625+
AWS_LOGF_DEBUG(
626626
AWS_LS_MQTT5_CLIENT,
627-
"id=%p: Clearing %zu manual PUBACK control ids.",
627+
"id=%p: Clearing %zu PUBACKs under user control. Previously controlled PUBACKs are no longer valid and "
628+
"have been cancelled.",
628629
(void *)client_operational_state->client,
629630
count);
630631
aws_hash_table_clear(&client_operational_state->manual_puback_packet_id_table);
@@ -2606,6 +2607,30 @@ int aws_mqtt5_client_invoke_puback(
26062607
return AWS_OP_SUCCESS;
26072608
}
26082609

2610+
static void s_aws_mqtt5_manual_puback_entry_destroy(void *object) {
2611+
if (object == NULL) {
2612+
return;
2613+
}
2614+
struct aws_mqtt5_manual_puback_entry *manual_puback_entry = object;
2615+
aws_mem_release(manual_puback_entry->allocator, manual_puback_entry);
2616+
}
2617+
2618+
static struct aws_mqtt5_manual_puback_entry *s_aws_mqtt_manual_puback_entry_new(
2619+
struct aws_allocator *allocator,
2620+
uint16_t packet_id,
2621+
uint64_t puback_control_id) {
2622+
2623+
struct aws_mqtt5_manual_puback_entry *manual_puback_entry =
2624+
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt5_manual_puback_entry));
2625+
2626+
manual_puback_entry->allocator = allocator;
2627+
aws_ref_count_init(&manual_puback_entry->ref_count, manual_puback_entry, s_aws_mqtt5_manual_puback_entry_destroy);
2628+
manual_puback_entry->packet_id = packet_id;
2629+
manual_puback_entry->puback_control_id = puback_control_id;
2630+
2631+
return manual_puback_entry;
2632+
}
2633+
26092634
uint64_t aws_mqtt5_client_acquire_puback(
26102635
struct aws_mqtt5_client *client,
26112636
const struct aws_mqtt5_packet_publish_view *publish_view) {
@@ -2629,6 +2654,10 @@ uint64_t aws_mqtt5_client_acquire_puback(
26292654
/* In this case we simply provide the same control_id that was already sent before. We do not want to create a
26302655
* second control_id with the same packet_id. It is the user's responsibility to know that they have two PUBACKs
26312656
* for the same PUBLISH. */
2657+
AWS_LOGF_WARN(
2658+
AWS_LS_MQTT5_CLIENT,
2659+
"id=%p: PUBACK acquire called on a PUBLISH that is already under user control.",
2660+
(void *)client);
26322661
struct aws_mqtt5_manual_puback_entry *entry = elem->value;
26332662
return entry->puback_control_id;
26342663
}
@@ -2674,6 +2703,25 @@ uint64_t aws_mqtt5_client_acquire_puback(
26742703

26752704
/* Increment next_mqtt5_puback_control_id for next use */
26762705
client->operational_state.next_mqtt5_puback_control_id = current_control_packet_id + 1;
2706+
2707+
size_t in_flight_unacked_publishes =
2708+
aws_hash_table_get_entry_count(&client->operational_state.manual_puback_control_id_table);
2709+
if (in_flight_unacked_publishes >= 100) {
2710+
AWS_LOGF_WARN(
2711+
AWS_LS_MQTT5_CLIENT,
2712+
"id=%p: 100 or more PUBACKs under user control: %zu. AWS IoT Core limits of 100 in-flight has been met or "
2713+
"exceeded.",
2714+
(void *)client,
2715+
in_flight_unacked_publishes);
2716+
} else {
2717+
AWS_LOGF_DEBUG(
2718+
AWS_LS_MQTT5_CLIENT,
2719+
"id=%p: Manual PUBACK control taken for a PUBLISH packet. Current in-flight PUBACKs under user control: "
2720+
"%zu",
2721+
(void *)client,
2722+
in_flight_unacked_publishes);
2723+
}
2724+
26772725
return manual_puback->puback_control_id;
26782726

26792727
cleanup:
@@ -2781,7 +2829,7 @@ int aws_mqtt5_client_operational_state_init(
27812829
aws_mqtt_hash_uint16_t,
27822830
aws_mqtt_compare_uint16_t_eq,
27832831
NULL,
2784-
aws_mqtt5_manual_puback_entry_decref)) {
2832+
s_aws_mqtt5_manual_puback_entry_decref)) {
27852833
return AWS_OP_ERR;
27862834
}
27872835

@@ -2792,7 +2840,7 @@ int aws_mqtt5_client_operational_state_init(
27922840
aws_mqtt_hash_uint64_t,
27932841
aws_mqtt_compare_uint64_t_eq,
27942842
NULL,
2795-
aws_mqtt5_manual_puback_entry_decref)) {
2843+
s_aws_mqtt5_manual_puback_entry_decref)) {
27962844
return AWS_OP_ERR;
27972845
}
27982846

@@ -2803,7 +2851,7 @@ int aws_mqtt5_client_operational_state_init(
28032851
aws_mqtt_hash_uint64_t,
28042852
aws_mqtt_compare_uint64_t_eq,
28052853
NULL,
2806-
aws_mqtt5_manual_puback_entry_decref)) {
2854+
s_aws_mqtt5_manual_puback_entry_decref)) {
28072855
return AWS_OP_ERR;
28082856
}
28092857

source/v5/mqtt5_options_storage.c

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2370,30 +2370,6 @@ struct aws_mqtt5_operation_puback *aws_mqtt5_operation_puback_new(
23702370
return NULL;
23712371
}
23722372

2373-
static void s_aws_mqtt5_manual_puback_entry_destroy(void *object) {
2374-
if (object == NULL) {
2375-
return;
2376-
}
2377-
struct aws_mqtt5_manual_puback_entry *manual_puback_entry = object;
2378-
aws_mem_release(manual_puback_entry->allocator, manual_puback_entry);
2379-
}
2380-
2381-
struct aws_mqtt5_manual_puback_entry *s_aws_mqtt_manual_puback_entry_new(
2382-
struct aws_allocator *allocator,
2383-
uint16_t packet_id,
2384-
uint64_t puback_control_id) {
2385-
2386-
struct aws_mqtt5_manual_puback_entry *manual_puback_entry =
2387-
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt5_manual_puback_entry));
2388-
2389-
manual_puback_entry->allocator = allocator;
2390-
aws_ref_count_init(&manual_puback_entry->ref_count, manual_puback_entry, s_aws_mqtt5_manual_puback_entry_destroy);
2391-
manual_puback_entry->packet_id = packet_id;
2392-
manual_puback_entry->puback_control_id = puback_control_id;
2393-
2394-
return manual_puback_entry;
2395-
}
2396-
23972373
/*********************************************************************************************************************
23982374
* Unsubscribe
23992375
********************************************************************************************************************/

0 commit comments

Comments
 (0)