Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bidirectional provider implementation #149

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

rafaeling
Copy link
Contributor

@rafaeling rafaeling commented Mar 26, 2025

Introduced the FilterManager structure to manage filter subscriptions based on the lowest time interval.

Added new fields to the Subscriptions struct to manage ChangeSubscription and Providers by their UUID.

Modified sender: broadcast::Sender in ChangeSubscription to sender: broadcast::Sender<Option> to allow receiving None when the sender is gone. This enables returning an error (VISS case) or None to the consumer.

Implemented logic to propagate time-based filters to providers, notify subscriptions according to the time interval, and claim signals by providers during startup.

Added VISS subscription integration tests.

@rafaeling rafaeling requested a review from lukasmittag March 26, 2025 12:56
Copy link

codecov bot commented Mar 28, 2025

Codecov Report

Attention: Patch coverage is 41.89189% with 473 lines in your changes missing coverage. Please review.

Project coverage is 62.59%. Comparing base (0985941) to head (6e922f5).

Files with missing lines Patch % Lines
databroker/src/broker.rs 22.29% 230 Missing ⚠️
databroker/src/grpc/kuksa_val_v2/val.rs 35.78% 183 Missing ⚠️
databroker/src/viss/v2/server.rs 0.00% 23 Missing ⚠️
databroker/src/grpc/kuksa_val_v1/val.rs 0.00% 17 Missing ⚠️
databroker/src/grpc/kuksa_val_v2/conversions.rs 0.00% 8 Missing ⚠️
databroker/src/viss/v2/types.rs 0.00% 5 Missing ⚠️
databroker/src/filter/filter_manager.rs 97.61% 4 Missing ⚠️
databroker/src/types.rs 75.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #149      +/-   ##
==========================================
- Coverage   63.77%   62.59%   -1.18%     
==========================================
  Files          36       37       +1     
  Lines       17637    18293     +656     
==========================================
+ Hits        11248    11451     +203     
- Misses       6389     6842     +453     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rafaeling rafaeling force-pushed the bidirectional-provider-implementation branch 3 times, most recently from 4117bb6 to e06d186 Compare March 31, 2025 07:22
@rafaeling rafaeling marked this pull request as ready for review March 31, 2025 07:23
@rafaeling rafaeling force-pushed the bidirectional-provider-implementation branch from a9ab8a7 to 26d129c Compare March 31, 2025 07:47
@rafaeling rafaeling requested a review from wba2hi March 31, 2025 09:05
@@ -124,7 +136,8 @@ pub struct Database {
pub struct Subscriptions {
actuation_subscriptions: Vec<ActuationSubscription>,
query_subscriptions: Vec<QuerySubscription>,
change_subscriptions: Vec<ChangeSubscription>,
change_subscriptions: HashMap<Uuid, ChangeSubscription>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why store uuid two times and not make another field id instead?

@@ -202,8 +239,10 @@ pub struct QuerySubscription {

pub struct ChangeSubscription {
entries: HashMap<i32, HashSet<Field>>,
sender: broadcast::Sender<EntryUpdates>,
sender: broadcast::Sender<Option<EntryUpdates>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a rather big concept change right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. I thought a lot about also introducing a new channel field to control when and how a subscription should get notified when provider goes away (for the 3 API versions). Adding the Option field was the least breaking change that would still work with all APIs.
In my opinion, we should start at some point removing old APIs and code, it is getting really messy.

/// the new update filter map containing ONLY signals_ids along with
/// their new lowest interval_ms.
///
pub fn add_new_update_filter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

align naming e.g. add_new_update_filter and remove_filter_by_subscription_uuid make it clear that this is the same filter we're talking about

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not check in generated files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sdv API were already there for the integration test, if I dont add them integration test would not work

@@ -17,5 +17,6 @@ bdd_features_base_dir = "features"
viss_ws_base_url = ws://localhost:8090
viss_http_base_url = http://localhost:8090
viss_mqtt_base_url = mqtt://localhost:1883
viss_grpc_base_url = localhost:55555
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grpc:// would be better

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TimebasedFilter {
pub period: u32,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter seems to be incorrect. The parser rejects subscriptions formatted according to the specification:

{"action": "subscribe", "path": "abc", "requestId": "1", "filter": {"type": "timebased", "parameter": {"period": 500}}}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants