Skip to content

Commit 905fddb

Browse files
authored
Fix remote subscribe (#8)
* Fix UUID and message priority (RPC must at least use CS4) * Changed other rpc calls also to CS4 and a correct UUID * Return connection state if a subscriber is already there and handle same client connecting twice * Fix new remote subscriptions always returning SUBSCRIBE_PENDING, regardless of actual state
1 parent 886846a commit 905fddb

File tree

4 files changed

+27
-58
lines changed

4 files changed

+27
-58
lines changed

tools/fmt_clippy_doc.sh

100644100755
File mode changed.

up-subscription/src/subscription_manager.rs

+15-34
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use up_rust::{
2727
RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID,
2828
USUBSCRIPTION_VERSION_MAJOR,
2929
},
30-
UCode, UPriority, UStatus, UUri, UUID,
30+
UCode, UPriority, UStatus, UUri,
3131
};
3232

3333
use crate::{helpers, usubscription::UP_REMOTE_TTL};
@@ -148,22 +148,21 @@ pub(crate) async fn handle_message(
148148
respond_to,
149149
} => {
150150
// Add new subscriber to topic subscription tracker (create new entries as necessary)
151-
topic_subscribers
151+
let is_new = topic_subscribers
152152
.entry(topic.clone())
153153
.or_default()
154154
.insert(subscriber);
155155

156-
// This really should unwrap() ok, as we just inserted an entry above
157-
let subscribers_count =
158-
topic_subscribers.get(&topic).map(|e| e.len()).unwrap_or(0);
159-
160156
let mut state = TopicState::SUBSCRIBED; // everything in topic_subscribers is considered SUBSCRIBED by default
161157

162158
if topic.is_remote_authority(&own_uri) {
163-
state = TopicState::SUBSCRIBE_PENDING; // for remote_topics, we explicitly track state due to the _PENDING scenarios
164-
remote_topics.entry(topic.clone()).or_insert(state);
159+
// for remote_topics, we explicitly track state due to the _PENDING scenarios
160+
state = *remote_topics
161+
.get(&topic)
162+
.unwrap_or(&TopicState::SUBSCRIBE_PENDING);
165163

166-
if subscribers_count == 1 {
164+
remote_topics.entry(topic.clone()).or_insert(state);
165+
if is_new {
167166
// this is the first subscriber to this (remote) topic, so perform remote subscription
168167
let own_uri_clone = own_uri.clone();
169168
let up_client_clone = up_client.clone();
@@ -454,12 +453,7 @@ async fn remote_subscribe(
454453
let subscription_response: SubscriptionResponse = up_client
455454
.invoke_proto_method(
456455
make_remote_subscribe_uuri(&subscription_request.topic),
457-
CallOptions::for_rpc_request(
458-
UP_REMOTE_TTL,
459-
Some(UUID::new()),
460-
None,
461-
Some(UPriority::UPRIORITY_CS2),
462-
),
456+
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)),
463457
subscription_request,
464458
)
465459
.await
@@ -507,12 +501,7 @@ async fn remote_unsubscribe(
507501
let unsubscribe_response: UStatus = up_client
508502
.invoke_proto_method(
509503
make_remote_unsubscribe_uuri(&unsubscribe_request.topic),
510-
CallOptions::for_rpc_request(
511-
UP_REMOTE_TTL,
512-
Some(UUID::new()),
513-
None,
514-
Some(UPriority::UPRIORITY_CS2),
515-
),
504+
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)),
516505
unsubscribe_request,
517506
)
518507
.await
@@ -612,13 +601,8 @@ mod tests {
612601
let expected_topic = test_lib::helpers::remote_topic1_uri();
613602
let expected_method = make_remote_subscribe_uuri(&expected_topic);
614603
let expected_subscriber = test_lib::helpers::local_usubscription_service_uri();
615-
616-
let expected_options = CallOptions::for_rpc_request(
617-
UP_REMOTE_TTL,
618-
Some(UUID::new()),
619-
None,
620-
Some(UPriority::UPRIORITY_CS2),
621-
);
604+
let expected_options =
605+
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
622606
let expected_request = SubscriptionRequest {
623607
topic: Some(expected_topic.clone()).into(),
624608
subscriber: Some(SubscriberInfo {
@@ -675,12 +659,8 @@ mod tests {
675659
let expected_method = make_remote_unsubscribe_uuri(&expected_topic);
676660
let expected_subscriber = test_lib::helpers::local_usubscription_service_uri();
677661

678-
let expected_options = CallOptions::for_rpc_request(
679-
UP_REMOTE_TTL,
680-
Some(UUID::new()),
681-
None,
682-
Some(UPriority::UPRIORITY_CS2),
683-
);
662+
let expected_options =
663+
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
684664
let expected_request = UnsubscribeRequest {
685665
topic: Some(expected_topic.clone()).into(),
686666
subscriber: Some(SubscriberInfo {
@@ -732,6 +712,7 @@ mod tests {
732712
resource_id: RESOURCE_ID_SUBSCRIBE as u32,
733713
..Default::default()
734714
};
715+
735716
let remote_method = make_remote_subscribe_uuri(&test_lib::helpers::remote_topic1_uri());
736717

737718
assert_eq!(expected_uri, remote_method);

up-subscription/src/tests/subscription_manager_tests.rs

+7-19
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ mod tests {
2727
};
2828
use up_rust::{
2929
communication::{CallOptions, UPayload},
30-
UCode, UPriority, UStatus, UUri, UUID,
30+
UCode, UPriority, UStatus, UUri,
3131
};
3232

3333
use crate::configuration::DEFAULT_COMMAND_BUFFER_SIZE;
@@ -281,12 +281,8 @@ mod tests {
281281
.into(),
282282
..Default::default()
283283
};
284-
let remote_call_options = CallOptions::for_rpc_request(
285-
UP_REMOTE_TTL,
286-
Some(UUID::new()),
287-
None,
288-
Some(UPriority::UPRIORITY_CS2),
289-
);
284+
let remote_call_options =
285+
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
290286
let command_sender =
291287
CommandSender::new_with_client_options::<SubscriptionRequest, SubscriptionResponse>(
292288
remote_method,
@@ -349,12 +345,8 @@ mod tests {
349345
.into(),
350346
..Default::default()
351347
};
352-
let remote_call_options = CallOptions::for_rpc_request(
353-
UP_REMOTE_TTL,
354-
Some(UUID::new()),
355-
None,
356-
Some(UPriority::UPRIORITY_CS2),
357-
);
348+
let remote_call_options =
349+
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
358350
let command_sender =
359351
CommandSender::new_with_client_options::<SubscriptionRequest, SubscriptionResponse>(
360352
remote_method,
@@ -509,12 +501,8 @@ mod tests {
509501
code: UCode::OK.into(),
510502
..Default::default()
511503
};
512-
let remote_call_options = CallOptions::for_rpc_request(
513-
UP_REMOTE_TTL,
514-
Some(UUID::new()),
515-
None,
516-
Some(UPriority::UPRIORITY_CS2),
517-
);
504+
let remote_call_options =
505+
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
518506
let command_sender = CommandSender::new_with_client_options::<UnsubscribeRequest, UStatus>(
519507
remote_method,
520508
remote_call_options,

up-subscription/src/tests/usubscription_tests.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ mod tests {
2525
UnsubscribeRequest, UnsubscribeResponse, RESOURCE_ID_SUBSCRIBE,
2626
RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID, USUBSCRIPTION_VERSION_MAJOR,
2727
},
28-
UCode, UPriority, UUri, UUID,
28+
UCode, UPriority, UUri,
2929
};
3030

3131
use crate::{
@@ -79,9 +79,9 @@ mod tests {
7979
};
8080
let expected_call_options = CallOptions::for_rpc_request(
8181
crate::UP_REMOTE_TTL,
82-
Some(UUID::new()),
8382
None,
84-
Some(UPriority::UPRIORITY_CS2),
83+
None,
84+
Some(UPriority::UPRIORITY_CS4),
8585
);
8686
let remote_subscription_request = SubscriptionRequest {
8787
topic: Some(topic.clone()).into(),
@@ -199,9 +199,9 @@ mod tests {
199199
};
200200
let expected_call_options = CallOptions::for_rpc_request(
201201
crate::UP_REMOTE_TTL,
202-
Some(UUID::new()),
203202
None,
204-
Some(UPriority::UPRIORITY_CS2),
203+
None,
204+
Some(UPriority::UPRIORITY_CS4),
205205
);
206206
let remote_unsubscribe_request = UnsubscribeRequest {
207207
topic: Some(topic.clone()).into(),

0 commit comments

Comments
 (0)