Skip to content

Implement register_listener and unregister_listener functionality#75

Closed
arakabCL wants to merge 50 commits into
eclipse-uprotocol:mainfrom
uProtocol-cmu-team-Luna:listener-wildcard-removal-pr
Closed

Implement register_listener and unregister_listener functionality#75
arakabCL wants to merge 50 commits into
eclipse-uprotocol:mainfrom
uProtocol-cmu-team-Luna:listener-wildcard-removal-pr

Conversation

@arakabCL
Copy link
Copy Markdown
Contributor

  • Implemented register_listener and unregister_listener methods.
  • Removed wildcard support for exact URI matching.
  • Added background task for message processing.
  • Included compute_service_name function for URI conversion.

kwarraich and others added 30 commits June 12, 2025 13:58
Merging all of the edits into one shared repo
tests run and there are not issues anymore which prevent them from ru…
…rately - using availbale helper methods defined in uri.rs
This commit introduces the  and  functionalities for the .

Key changes include:
- Implemented the  method to allow clients to subscribe to uProtocol messages.
- Implemented the  method to allow clients to unsubscribe.
- Added tests for both registration and unregistration.
…g- will fix this but i want the functional code to be commited so in case i can go back to it
catappreciationhours2 and others added 15 commits July 10, 2025 08:44
…DataGeneric

Making transmission data generic
Removed all wildcard support from compute listener matching logic and implemented exact URI matching via HashMap lookups. Enhanced register and unregister listener functionality with proper error handling, subscriber creation, and cleanup. Added comprehensive test coverage including race condition fixes and edge cases. All 16 tests passing with robust exact matching behavior for production readiness.
- Added working register_listener and unregister_listener methods
- Removed wildcard support for exact URI matching
- Added background task for message processing
Copy link
Copy Markdown
Contributor

@PLeVasseur PLeVasseur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recording a few things we chatted about in the meeting, no action needed on this PR regarding these comments.

Comment thread src/custom_header.rs
#[repr(C)]
pub struct CustomHeader {
pub version: i32,
pub timestamp: u64,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to @catappreciationhours2 -- the serialized UAttributes is planned for inclusion in the send PR

Comment thread src/custom_header.rs
fn from(header: &CustomHeader) -> UAttributes {
let mut attrs = UAttributes::default();

// Map CustomHeader fields back to UAttributes using available fields
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this mapping is planned the upcoming Send PR according to @catappreciationhours2

Comment thread src/lib.rs
vidishac2004 and others added 2 commits July 30, 2025 03:26
Trying to resolve the build error
Comment thread .DS_Store Outdated
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose No newline at end of file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing EOL

Comment thread src/bin/listener.rs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this file to the examples folder

Comment thread src/bin/sender.rs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this file to the examples folder

Comment thread README.md
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you having the same issues with line endings as @catappreciationhours2 ?

Comment thread src/lib.rs Outdated
Comment thread src/lib.rs
enum TransportCommand {
Send {
message: UMessage,
response: std::sync::mpsc::Sender<Result<(), UStatus>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we are depending on tokio anyway, you might want to consider using a one-shot channel for the response instead ...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this feedback. It's a nice approach and signals intent.

Comment thread src/lib.rs

impl Iceoryx2Transport {
pub fn new() -> Result<Self, UStatus> {
let (tx, rx) = std::sync::mpsc::channel();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to consider using the async tokio channel pendant instead

Comment thread src/lib.rs
Comment on lines +165 to +180
let publisher =
publishers.entry(service_name.clone()).or_insert_with(|| {
let service_name_res: Result<ServiceName, _> =
service_name.as_str().try_into();
let service = node
.service_builder(&service_name_res.unwrap())
.publish_subscribe::<RawBytes>()
.user_header::<CustomHeader>()
.open_or_create()
.expect("Failed to create service");

service
.publisher_builder()
.create()
.expect("Failed to create publisher")
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels a little odd that the lookup and potential creation of the publisher is done here instead of handle_send (as for the register/unregister listener commands) ...

Comment thread src/lib.rs

// Integrate dispatch: In polling/receive, extract attributes and reconstruct UMessage
// Only process subscribers that have active listeners
let active_services: Vec<(String, Vec<Arc<dyn UListener>>)> = listeners
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we never poll for incoming messages unless there a no more commands to process, or am I mistaken? Meaning: if we continuously send messages, we never get a response ...
You might want to replace the two separate while loops (for processing commands and for polling receivers) with a single loop around tokio's select! macro

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this feedback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump for response

Co-authored-by: Kai Hudalla <sophokles.kh@gmail.com>
Copy link
Copy Markdown
Contributor

@PLeVasseur PLeVasseur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to get to the tests yet, but submitting a partial review for your consideration.

Comment thread .DS_Store Outdated
Comment thread Cargo.toml
log = "0.4.27"
env_logger = "0.11.8"
once_cell = "1.18.0"
uuid = { version = "1", features = ["v4"] }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the uuid crate being used, could you point me to it?
I think up-rust should have abstracted this away.

Comment thread Cargo.toml
protobuf = "3.7.2"
bytes = "1.10.1"
tokio = { version = "1", features = ["full"] }
test-case = "3"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any deps which are used only for tests should be moved under:

[dev-dependencies]

instead.

reference

Comment thread src/receiver.rs
@@ -0,0 +1,229 @@
use super::*;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can place tests either at the bottom of the file which implements some functionality and then wrap it in correctly in a module for testing.

It's also possible if you'd like you can have a tests/ folder at the root of the project in which to place them (this is often done for integration tests).

I suppose the easiest change in this case is to move this file to tests/receiver.rs.

reference to read more

Comment thread Cargo.toml
tokio = { version = "1", features = ["full"] }
test-case = "3"
log = "0.4.27"
env_logger = "0.11.8"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any deps which are used only for tests should be moved under:

[dev-dependencies]

instead.

reference

Comment thread src/lib.rs Outdated
if let Some(subscriber) = subscribers.get(&service_name) {
while let Some(sample) = subscriber.receive().ok().flatten() {
for listener in &listeners_to_notify {
// Extract payload bytes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove these line-by-line comments. Should be understandable what's going on.
Reserve comments for where something non-obvious and/or complicated is happening.

Comment thread src/lib.rs

fn handle_register_listener(
node: &Node<ipc::Service>,
subscribers: &mut HashMap<
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another good reason to use a type alias. You can keep this tidy here too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump for response

Comment thread src/lib.rs
};

// Create subscriber if it doesn't exist for this service_name
if !subscribers.contains_key(&service_name) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll need to do a bit more error handling around here to e.g. return a UStatus which indicates said set of source and sink filters are already registered.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump for response

Comment thread src/lib.rs Outdated
};

// Remove from hashmap: listeners.get_mut(&service_name).and_then(|vec| vec.remove(&listener));
if let Some(listener_vec) = listeners.get_mut(&service_name) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll need to do a bit more error handling around here to return a UStatus indicating that said set of source and sink filters are not registered.

Comment thread src/lib.rs
Copy link
Copy Markdown
Contributor

@PLeVasseur PLeVasseur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @arakabCL -- thanks for making some updates.
I recently found out how to correctly use a byte slice [u8] for the underlying buffer.

As I wrote on Slack I'm going to get support from Kenny to implement this in time for the Eclipse SDV Hackathon in ~4 weeks.

Could you commit to getting this done in a week or two?
Otherwise I think I'll have to ask Kenny to pick up where you left off here a bit and revise based on the suggestions I left.

Comment thread .DS_Store Outdated
Comment thread .DS_Store Outdated
Comment thread src/lib.rs
mod raw_bytes;
use raw_bytes::RawBytes;

use std::collections::HashMap;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump to resolve

Comment thread src/lib.rs
Comment thread src/lib.rs
}
};

let mut publishers: HashMap<
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump to resolve

Comment thread src/lib.rs

// Integrate dispatch: In polling/receive, extract attributes and reconstruct UMessage
// Only process subscribers that have active listeners
let active_services: Vec<(String, Vec<Arc<dyn UListener>>)> = listeners
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump for response

Comment thread src/lib.rs

fn handle_register_listener(
node: &Node<ipc::Service>,
subscribers: &mut HashMap<
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump for response

Comment thread src/lib.rs
};

// Create subscriber if it doesn't exist for this service_name
if !subscribers.contains_key(&service_name) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump for response

Comment thread src/lib.rs
Comment thread src/lib.rs
}

fn compute_service_name(message: &UMessage) -> Result<String, UStatus> {
fn compute_service_name_from_message(message: &UMessage) -> Result<String, UStatus> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend removing this and simply having compute_service_name_from_uris() from below, since it will be a larger maintenance burden to have two different ways of doing the same thing, when it's possible to use compute_service_name_from_uris() even with a UMessage's source and sink UUris.

@PLeVasseur PLeVasseur mentioned this pull request Aug 23, 2025
@PLeVasseur
Copy link
Copy Markdown
Contributor

Functionality was implemented in #78

@PLeVasseur PLeVasseur closed this Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants