Skip to content

Commit ecb8906

Browse files
committed
Revert "Merge branch 'main' of github.com:awslabs/aws-c-mqtt into get_eventloop_group"
This reverts commit e28e58b, reversing changes made to 9fc2f57.
1 parent 0aa224b commit ecb8906

6 files changed

Lines changed: 36 additions & 30 deletions

File tree

include/aws/mqtt/private/request-response/protocol_adapter.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ struct aws_allocator;
1515
struct aws_event_loop;
1616
struct aws_mqtt_client_connection;
1717
struct aws_mqtt5_client;
18-
struct aws_mqtt_request_response_publish_event;
18+
struct aws_mqtt_rr_incoming_publish_event;
1919

2020
/*
2121
* The request-response protocol adapter is a translation layer that sits between the request-response native client
@@ -99,7 +99,7 @@ typedef void(aws_protocol_adapter_subscription_event_fn)(
9999
void *user_data);
100100

101101
typedef void(aws_protocol_adapter_incoming_publish_fn)(
102-
const struct aws_mqtt_request_response_publish_event *publish,
102+
const struct aws_mqtt_rr_incoming_publish_event *publish,
103103
void *user_data);
104104

105105
typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data);

include/aws/mqtt/request-response/request_response_client.h

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,6 @@ struct aws_mqtt_request_operation_response_path {
3131
struct aws_byte_cursor correlation_token_json_path;
3232
};
3333

34-
/*
35-
* An event emitted by a streaming operation's subscription.
36-
*/
37-
struct aws_mqtt_request_response_publish_event {
38-
struct aws_byte_cursor payload;
39-
struct aws_byte_cursor topic;
40-
/* Below are MQTT optional fields. For MQTT3, they will always be empty, as MQTT3 does not support them. For MQTT5,
41-
* they will be set if they are present in a packet. */
42-
const struct aws_byte_cursor *content_type;
43-
size_t user_property_count;
44-
const struct aws_mqtt5_user_property *user_properties;
45-
/* Even though this field is supposed to be used by MQTT broker to determine if a message-to-be-sent is expired,
46-
* certain services use this field to specify client-side timeouts. */
47-
const uint32_t *message_expiry_interval_seconds;
48-
};
49-
5034
/*
5135
* Callback signature for request-response completion.
5236
*

source/request-response/protocol_adapter.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ static void s_protocol_adapter_mqtt311_listener_publish_received(
347347

348348
struct aws_mqtt_protocol_adapter_311_impl *adapter = userdata;
349349

350-
struct aws_mqtt_request_response_publish_event publish_event = {
350+
struct aws_mqtt_rr_incoming_publish_event publish_event = {
351351
.topic = *topic,
352352
.payload = *payload,
353353
};
@@ -803,7 +803,7 @@ static bool s_protocol_adapter_mqtt5_listener_publish_received(
803803
void *user_data) {
804804
struct aws_mqtt_protocol_adapter_5_impl *adapter = user_data;
805805

806-
struct aws_mqtt_request_response_publish_event publish_event = {
806+
struct aws_mqtt_rr_incoming_publish_event publish_event = {
807807
.topic = publish->topic,
808808
.payload = publish->payload,
809809
.content_type = publish->content_type,

source/request-response/request_response_client.c

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -873,9 +873,22 @@ static void s_aws_rr_client_protocol_adapter_subscription_event_callback(
873873
}
874874

875875
static void s_apply_publish_to_streaming_operation_list(
876-
struct aws_rr_operation_list_topic_filter_entry *entry,
877-
const struct aws_mqtt_request_response_publish_event *publish_event) {
878-
AWS_FATAL_ASSERT(entry != NULL);
876+
const struct aws_linked_list *operations,
877+
const struct aws_byte_cursor *topic_filter,
878+
const struct aws_mqtt_rr_incoming_publish_event *publish_event,
879+
void *user_data) {
880+
881+
AWS_FATAL_ASSERT(operations != NULL);
882+
883+
struct aws_mqtt_request_response_client *rr_client = user_data;
884+
885+
AWS_LOGF_DEBUG(
886+
AWS_LS_MQTT_REQUEST_RESPONSE,
887+
"id=%p: request-response client incoming publish on topic '" PRInSTR
888+
"' matches streaming subscription on topic filter '" PRInSTR "'",
889+
(void *)rr_client,
890+
AWS_BYTE_CURSOR_PRI(publish_event->topic),
891+
AWS_BYTE_CURSOR_PRI(*topic_filter));
879892

880893
struct aws_linked_list_node *node = aws_linked_list_begin(&entry->operations);
881894
while (node != aws_linked_list_end(&entry->operations)) {
@@ -897,8 +910,8 @@ static void s_apply_publish_to_streaming_operation_list(
897910
continue;
898911
}
899912

900-
void *user_data = operation->storage.streaming_storage.options.user_data;
901-
(*incoming_publish_callback)(publish_event, user_data);
913+
void *operation_user_data = operation->storage.streaming_storage.options.user_data;
914+
(*incoming_publish_callback)(publish_event, operation_user_data);
902915

903916
AWS_LOGF_DEBUG(
904917
AWS_LS_MQTT_REQUEST_RESPONSE,
@@ -913,7 +926,7 @@ static void s_apply_publish_to_streaming_operation_list(
913926
static void s_complete_operation_with_correlation_token(
914927
struct aws_mqtt_request_response_client *rr_client,
915928
struct aws_byte_cursor correlation_token,
916-
const struct aws_mqtt_request_response_publish_event *publish_event) {
929+
const struct aws_mqtt_rr_incoming_publish_event *publish_event) {
917930
struct aws_hash_element *hash_element = NULL;
918931

919932
if (aws_hash_table_find(&rr_client->operations_by_correlation_tokens, &correlation_token, &hash_element)) {
@@ -965,7 +978,16 @@ static void s_complete_operation_with_correlation_token(
965978
static void s_apply_publish_to_response_path_entry(
966979
struct aws_mqtt_request_response_client *rr_client,
967980
struct aws_rr_response_path_entry *entry,
968-
const struct aws_mqtt_request_response_publish_event *publish_event) {
981+
const struct aws_mqtt_rr_incoming_publish_event *publish_event,
982+
void *user_data) {
983+
984+
struct aws_mqtt_request_response_client *rr_client = user_data;
985+
986+
AWS_LOGF_DEBUG(
987+
AWS_LS_MQTT_REQUEST_RESPONSE,
988+
"id=%p: request-response client incoming publish on topic '" PRInSTR "' matches response path",
989+
(void *)rr_client,
990+
AWS_BYTE_CURSOR_PRI(publish_event->topic));
969991

970992
struct aws_json_value *json_payload = NULL;
971993

@@ -1044,7 +1066,7 @@ static void s_apply_publish_to_response_path_entry(
10441066
}
10451067

10461068
static void s_aws_rr_client_protocol_adapter_incoming_publish_callback(
1047-
const struct aws_mqtt_request_response_publish_event *publish_event,
1069+
const struct aws_mqtt_rr_incoming_publish_event *publish_event,
10481070
void *user_data) {
10491071

10501072
struct aws_mqtt_request_response_client *rr_client = user_data;

tests/request-response/protocol_adapter_tests.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ static void s_rr_mqtt_protocol_adapter_test_on_subscription_event(
111111
}
112112

113113
static void s_rr_mqtt_protocol_adapter_test_on_incoming_publish(
114-
const struct aws_mqtt_request_response_publish_event *publish,
114+
const struct aws_mqtt_rr_incoming_publish_event *publish,
115115
void *user_data) {
116116
struct aws_request_response_protocol_adapter_test_fixture *fixture = user_data;
117117

tests/request-response/request_response_client_tests.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ static void s_rrc_fixture_streaming_operation_subscription_status_callback(
310310
}
311311

312312
static void s_rrc_fixture_streaming_operation_incoming_publish_callback(
313-
const struct aws_mqtt_request_response_publish_event *publish_event,
313+
const struct aws_mqtt_rr_incoming_publish_event *publish_event,
314314
void *user_data) {
315315
struct aws_rr_client_fixture_streaming_record *record = user_data;
316316
struct aws_rr_client_test_fixture *fixture = record->fixture;

0 commit comments

Comments
 (0)