Skip to content

Fix paths that check.yaml watches #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ on:
- main
pull_request:
paths:
- "src/**"
- "up-subscription/**"
- "up-subscription-cli/**"
- "Cargo.*"
- "deny.toml"
workflow_call:
Expand Down
Empty file modified tools/fmt_clippy_doc.sh
100644 → 100755
Empty file.
49 changes: 15 additions & 34 deletions up-subscription/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use up_rust::{
RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID,
USUBSCRIPTION_VERSION_MAJOR,
},
UCode, UPriority, UStatus, UUri, UUID,
UCode, UPriority, UStatus, UUri,
};

use crate::{helpers, usubscription::UP_REMOTE_TTL};
Expand Down Expand Up @@ -148,22 +148,21 @@ pub(crate) async fn handle_message(
respond_to,
} => {
// Add new subscriber to topic subscription tracker (create new entries as necessary)
topic_subscribers
let is_new = topic_subscribers
.entry(topic.clone())
.or_default()
.insert(subscriber);

// This really should unwrap() ok, as we just inserted an entry above
let subscribers_count =
topic_subscribers.get(&topic).map(|e| e.len()).unwrap_or(0);

let mut state = TopicState::SUBSCRIBED; // everything in topic_subscribers is considered SUBSCRIBED by default

if topic.is_remote_authority(&own_uri) {
state = TopicState::SUBSCRIBE_PENDING; // for remote_topics, we explicitly track state due to the _PENDING scenarios
remote_topics.entry(topic.clone()).or_insert(state);
// for remote_topics, we explicitly track state due to the _PENDING scenarios
state = *remote_topics
.get(&topic)
.unwrap_or(&TopicState::SUBSCRIBE_PENDING);

if subscribers_count == 1 {
remote_topics.entry(topic.clone()).or_insert(state);
if is_new {
// this is the first subscriber to this (remote) topic, so perform remote subscription
let own_uri_clone = own_uri.clone();
let up_client_clone = up_client.clone();
Expand Down Expand Up @@ -454,12 +453,7 @@ async fn remote_subscribe(
let subscription_response: SubscriptionResponse = up_client
.invoke_proto_method(
make_remote_subscribe_uuri(&subscription_request.topic),
CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
),
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)),
subscription_request,
)
.await
Expand Down Expand Up @@ -507,12 +501,7 @@ async fn remote_unsubscribe(
let unsubscribe_response: UStatus = up_client
.invoke_proto_method(
make_remote_unsubscribe_uuri(&unsubscribe_request.topic),
CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
),
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)),
unsubscribe_request,
)
.await
Expand Down Expand Up @@ -612,13 +601,8 @@ mod tests {
let expected_topic = test_lib::helpers::remote_topic1_uri();
let expected_method = make_remote_subscribe_uuri(&expected_topic);
let expected_subscriber = test_lib::helpers::local_usubscription_service_uri();

let expected_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let expected_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let expected_request = SubscriptionRequest {
topic: Some(expected_topic.clone()).into(),
subscriber: Some(SubscriberInfo {
Expand Down Expand Up @@ -675,12 +659,8 @@ mod tests {
let expected_method = make_remote_unsubscribe_uuri(&expected_topic);
let expected_subscriber = test_lib::helpers::local_usubscription_service_uri();

let expected_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let expected_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let expected_request = UnsubscribeRequest {
topic: Some(expected_topic.clone()).into(),
subscriber: Some(SubscriberInfo {
Expand Down Expand Up @@ -732,6 +712,7 @@ mod tests {
resource_id: RESOURCE_ID_SUBSCRIBE as u32,
..Default::default()
};

let remote_method = make_remote_subscribe_uuri(&test_lib::helpers::remote_topic1_uri());

assert_eq!(expected_uri, remote_method);
Expand Down
26 changes: 7 additions & 19 deletions up-subscription/src/tests/subscription_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod tests {
};
use up_rust::{
communication::{CallOptions, UPayload},
UCode, UPriority, UStatus, UUri, UUID,
UCode, UPriority, UStatus, UUri,
};

use crate::configuration::DEFAULT_COMMAND_BUFFER_SIZE;
Expand Down Expand Up @@ -281,12 +281,8 @@ mod tests {
.into(),
..Default::default()
};
let remote_call_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let remote_call_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let command_sender =
CommandSender::new_with_client_options::<SubscriptionRequest, SubscriptionResponse>(
remote_method,
Expand Down Expand Up @@ -349,12 +345,8 @@ mod tests {
.into(),
..Default::default()
};
let remote_call_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let remote_call_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let command_sender =
CommandSender::new_with_client_options::<SubscriptionRequest, SubscriptionResponse>(
remote_method,
Expand Down Expand Up @@ -509,12 +501,8 @@ mod tests {
code: UCode::OK.into(),
..Default::default()
};
let remote_call_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let remote_call_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let command_sender = CommandSender::new_with_client_options::<UnsubscribeRequest, UStatus>(
remote_method,
remote_call_options,
Expand Down
10 changes: 5 additions & 5 deletions up-subscription/src/tests/usubscription_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod tests {
UnsubscribeRequest, UnsubscribeResponse, RESOURCE_ID_SUBSCRIBE,
RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID, USUBSCRIPTION_VERSION_MAJOR,
},
UCode, UPriority, UUri, UUID,
UCode, UPriority, UUri,
};

use crate::{
Expand Down Expand Up @@ -79,9 +79,9 @@ mod tests {
};
let expected_call_options = CallOptions::for_rpc_request(
crate::UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
None,
Some(UPriority::UPRIORITY_CS4),
);
let remote_subscription_request = SubscriptionRequest {
topic: Some(topic.clone()).into(),
Expand Down Expand Up @@ -199,9 +199,9 @@ mod tests {
};
let expected_call_options = CallOptions::for_rpc_request(
crate::UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
None,
Some(UPriority::UPRIORITY_CS4),
);
let remote_unsubscribe_request = UnsubscribeRequest {
topic: Some(topic.clone()).into(),
Expand Down