Skip to content

Commit 8fe15e4

Browse files
committed
Fix publisher shutdown and latching behavior with change to broadcast
1 parent e54eb9d commit 8fe15e4

File tree

3 files changed

+214
-148
lines changed

3 files changed

+214
-148
lines changed

roslibrust/src/ros1/node/actor.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub enum NodeMsg {
4444
// This results in the node's task ending and the node being dropped.
4545
Shutdown,
4646
RegisterPublisher {
47-
reply: oneshot::Sender<Result<broadcast::Sender<Vec<u8>>, String>>,
47+
reply: oneshot::Sender<Result<(broadcast::Sender<Vec<u8>>, mpsc::Sender<()>), String>>,
4848
topic: String,
4949
topic_type: String,
5050
queue_size: usize,
@@ -166,7 +166,7 @@ impl NodeServerHandle {
166166
topic: &str,
167167
queue_size: usize,
168168
latching: bool,
169-
) -> Result<broadcast::Sender<Vec<u8>>, NodeError> {
169+
) -> Result<(broadcast::Sender<Vec<u8>>, mpsc::Sender<()>), NodeError> {
170170
let (sender, receiver) = oneshot::channel();
171171
self.node_server_sender.send(NodeMsg::RegisterPublisher {
172172
reply: sender,
@@ -192,7 +192,7 @@ impl NodeServerHandle {
192192
msg_definition: &str,
193193
queue_size: usize,
194194
latching: bool,
195-
) -> Result<broadcast::Sender<Vec<u8>>, NodeError> {
195+
) -> Result<(broadcast::Sender<Vec<u8>>, mpsc::Sender<()>), NodeError> {
196196
let (sender, receiver) = oneshot::channel();
197197

198198
let md5sum;
@@ -693,33 +693,43 @@ impl Node {
693693
msg_definition: String,
694694
md5sum: String,
695695
latching: bool,
696-
) -> Result<broadcast::Sender<Vec<u8>>, NodeError> {
696+
) -> Result<(broadcast::Sender<Vec<u8>>, mpsc::Sender<()>), NodeError> {
697697
// Return handle to existing Publication if it exists
698698
let existing_entry = {
699699
self.publishers.iter().find_map(|(key, value)| {
700-
if key.as_str() == &topic {
701-
if value.topic_type() == topic_type {
702-
let sender = value.get_sender();
703-
return Some(Ok(sender));
704-
} else {
705-
warn!("Attempted to register publisher with different topic type than existing publisher: existing_type={}, new_type={}", value.topic_type(), topic_type);
700+
if key.as_str() != &topic {
701+
return None;
702+
}
703+
if value.topic_type() != topic_type {
704+
warn!("Attempted to register publisher with different topic type than existing publisher: existing_type={}, new_type={}", value.topic_type(), topic_type);
705+
// TODO MAJOR: this is a terrible error type to return...
706+
return Some(Err(NodeError::IoError(std::io::Error::from(
707+
std::io::ErrorKind::AddrInUse,
708+
))));
709+
}
710+
let (sender, shutdown) = value.get_senders();
711+
match shutdown.upgrade() {
712+
Some(shutdown) => {
713+
Some(Ok((sender, shutdown)))
714+
}
715+
None => {
716+
error!("We still have an entry for a publication, but it has been shutdown");
706717
// TODO MAJOR: this is a terrible error type to return...
707718
Some(Err(NodeError::IoError(std::io::Error::from(
708719
std::io::ErrorKind::AddrInUse,
709720
))))
710721
}
711-
} else {
712-
None
713722
}
714723
})
715724
};
716725
// If we found an existing publication return the handle to it
717726
if let Some(handle) = existing_entry {
718-
return Ok(handle?);
727+
let (sender, shutdown) = handle?;
728+
return Ok((sender, shutdown));
719729
}
720730

721731
// Otherwise create a new Publication and advertise
722-
let (channel, sender) = Publication::new(
732+
let (channel, sender, shutdown) = Publication::new(
723733
&self.node_name,
724734
latching,
725735
&topic,
@@ -737,7 +747,7 @@ impl Node {
737747
})?;
738748
self.publishers.insert(topic.clone(), channel);
739749
let _ = self.client.register_publisher(&topic, topic_type).await?;
740-
Ok(sender)
750+
Ok((sender, shutdown))
741751
}
742752

743753
async fn unregister_publisher(&mut self, topic: &str) -> Result<(), NodeError> {

roslibrust/src/ros1/node/handle.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ impl NodeHandle {
8383
queue_size: usize,
8484
latching: bool,
8585
) -> Result<PublisherAny, NodeError> {
86-
let sender = self
86+
let (sender, shutdown) = self
8787
.inner
8888
.register_publisher_any(topic_name, topic_type, msg_definition, queue_size, latching)
8989
.await?;
90-
Ok(PublisherAny::new(topic_name, sender))
90+
Ok(PublisherAny::new(topic_name, sender, shutdown))
9191
}
9292

9393
/// Create a new publisher for the given type.
@@ -103,11 +103,11 @@ impl NodeHandle {
103103
queue_size: usize,
104104
latching: bool,
105105
) -> Result<Publisher<T>, NodeError> {
106-
let sender = self
106+
let (sender, shutdown) = self
107107
.inner
108108
.register_publisher::<T>(topic_name, queue_size, latching)
109109
.await?;
110-
Ok(Publisher::new(topic_name, sender))
110+
Ok(Publisher::new(topic_name, sender, shutdown))
111111
}
112112

113113
pub async fn subscribe_any(

0 commit comments

Comments
 (0)