Skip to content

Commit 8b27a52

Browse files
committed
Add helper aux functions
1 parent 628701c commit 8b27a52

1 file changed

Lines changed: 46 additions & 37 deletions

File tree

source/request-response/request_response_subscription_set.c

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,17 @@ static struct aws_rr_operation_list_topic_filter_entry *s_aws_rr_operation_list_
135135
return entry;
136136
}
137137

138+
static bool s_is_topic_filter_with_wildcard(const struct aws_byte_cursor *topic_filter) {
139+
bool res = (memchr(topic_filter->ptr, '+', topic_filter->len) || memchr(topic_filter->ptr, '#', topic_filter->len));
140+
return res;
141+
}
142+
138143
struct aws_rr_operation_list_topic_filter_entry *aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
139144
struct aws_request_response_subscriptions *subscriptions,
140145
const struct aws_byte_cursor *topic_filter) {
141146
AWS_FATAL_ASSERT(subscriptions);
142147

143-
bool is_topic_with_wildcard =
144-
(memchr(topic_filter->ptr, '+', topic_filter->len) || memchr(topic_filter->ptr, '#', topic_filter->len));
145-
146-
struct aws_hash_table *subscription_lists = is_topic_with_wildcard
148+
struct aws_hash_table *subscription_lists = s_is_topic_filter_with_wildcard(topic_filter)
147149
? &subscriptions->streaming_operation_wildcards_subscription_lists
148150
: &subscriptions->streaming_operation_subscription_lists;
149151

@@ -228,6 +230,43 @@ static void s_match_stream_subscriptions(
228230
}
229231
}
230232

233+
static bool s_is_topic_matched_to_subscription(
234+
struct aws_byte_cursor topic,
235+
struct aws_byte_cursor topic_filter_cursor) {
236+
bool is_matched = true;
237+
bool multi_level_wildcard = false;
238+
239+
struct aws_byte_cursor subscription_topic_filter_segment;
240+
AWS_ZERO_STRUCT(subscription_topic_filter_segment);
241+
242+
struct aws_byte_cursor topic_segment;
243+
AWS_ZERO_STRUCT(topic_segment);
244+
245+
while (aws_byte_cursor_next_split(&topic_filter_cursor, '/', &subscription_topic_filter_segment)) {
246+
if (!aws_byte_cursor_next_split(&topic, '/', &topic_segment)) {
247+
is_matched = false;
248+
break;
249+
}
250+
251+
if (aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "#")) {
252+
multi_level_wildcard = true;
253+
is_matched = true;
254+
break;
255+
}
256+
257+
if (!aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "+") &&
258+
!aws_byte_cursor_eq(&topic_segment, &subscription_topic_filter_segment)) {
259+
is_matched = false;
260+
break;
261+
}
262+
}
263+
if (!multi_level_wildcard && aws_byte_cursor_next_split(&topic, '/', &topic_segment)) {
264+
is_matched = false;
265+
}
266+
267+
return is_matched;
268+
}
269+
231270
static void s_match_wildcard_stream_subscriptions(
232271
const struct aws_hash_table *subscriptions,
233272
const struct aws_mqtt_request_response_publish_event *publish_event,
@@ -242,40 +281,10 @@ static void s_match_wildcard_stream_subscriptions(
242281
for (struct aws_hash_iter iter = aws_hash_iter_begin(subscriptions); !aws_hash_iter_done(&iter);
243282
aws_hash_iter_next(&iter)) {
244283
struct aws_rr_operation_list_topic_filter_entry *entry = iter.element.value;
284+
struct aws_byte_cursor topic_filter_cursor = entry->topic_filter_cursor;
245285

246-
struct aws_byte_cursor subscription_topic_filter_segment;
247-
AWS_ZERO_STRUCT(subscription_topic_filter_segment);
248-
249-
struct aws_byte_cursor topic_segment;
250-
AWS_ZERO_STRUCT(topic_segment);
251-
252-
bool match = true;
253-
bool multi_level_wildcard = false;
254-
255-
while (aws_byte_cursor_next_split(&entry->topic_filter_cursor, '/', &subscription_topic_filter_segment)) {
256-
if (!aws_byte_cursor_next_split(&publish_event->topic, '/', &topic_segment)) {
257-
match = false;
258-
break;
259-
}
260-
261-
if (aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "#")) {
262-
multi_level_wildcard = true;
263-
match = true;
264-
break;
265-
}
266-
267-
if (!aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "+") &&
268-
!aws_byte_cursor_eq(&topic_segment, &subscription_topic_filter_segment)) {
269-
match = false;
270-
break;
271-
}
272-
}
273-
274-
if (!multi_level_wildcard && aws_byte_cursor_next_split(&publish_event->topic, '/', &topic_segment)) {
275-
match = false;
276-
}
277-
278-
if (match) {
286+
bool is_matched = s_is_topic_matched_to_subscription(publish_event->topic, topic_filter_cursor);
287+
if (is_matched) {
279288
on_stream_operation_subscription_match(
280289
&entry->operations, &entry->topic_filter_cursor, publish_event, user_data);
281290
}

0 commit comments

Comments
 (0)