Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 43 additions & 54 deletions src/dds/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use byteorder::LittleEndian;
use log::{debug, error, info, trace, warn};

use crate::{
create_error_dropped, create_error_internal, create_error_poisoned,
create_error_dropped, create_error_poisoned,
dds::{
adapters,
key::Keyed,
Expand Down Expand Up @@ -50,7 +50,7 @@ use super::{
};
#[cfg(feature = "security")]
use crate::{
create_error_not_allowed_by_security,
create_error_not_allowed_by_security, create_error_internal,
security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo},
};
#[cfg(not(feature = "security"))]
Expand Down Expand Up @@ -531,43 +531,19 @@ impl InnerPublisher {
}
}

let new_writer = WriterIngredients {
guid,
writer_command_receiver: hccc_download,
writer_command_receiver_waker: Arc::clone(&writer_waker),
topic_name: topic.name(),
like_stateless: writer_like_stateless,
qos_policies: writer_qos.clone(),
status_sender,
security_plugins: self.security_plugins_handle.clone(),
};

// Send writer ingredients to DP event loop, where the actual writer will be
// constructed
self
.add_writer_sender
.send(new_writer)
.or_else(|e| create_error_poisoned!("Adding a new writer failed: {}", e))?;

// Construct the data writer
let data_writer = WithKeyDataWriter::<D, SA>::new(
outer.clone(),
topic.clone(),
writer_qos,
writer_qos.clone(),
guid,
dwcc_upload,
writer_waker,
Arc::clone(&writer_waker),
self.discovery_command.clone(),
status_receiver,
)?;

// notify Discovery DB
let mut db = self
.discovery_db
.write()
.map_err(|e| CreateError::Poisoned {
reason: format!("Discovery DB: {e}"),
})?;

// Construct security info if needed
#[cfg(not(feature = "security"))]
let security_info = None;
#[cfg(feature = "security")]
Expand Down Expand Up @@ -595,11 +571,19 @@ impl InnerPublisher {
None
};

// Update topic to DiscoveryDB & inform Discovery about it
// Add the topic & writer to Discovery DB
let mut db = self
.discovery_db
.write()
.map_err(|e| CreateError::Poisoned {
reason: format!("Discovery DB: {e}"),
})?;

let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp, security_info);
db.update_local_topic_writer(dwd);
db.update_topic_data_p(topic);

// Inform Discovery about the topic
if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
topic_name: topic.name(),
}) {
Expand All @@ -612,17 +596,26 @@ impl InnerPublisher {
);
}

// Inform Discovery about the new writer
let writer_guid = self.domain_participant.guid().from_prefix(entity_id);
// Note: notifying Discovery about the new writer is no longer done here.
// Instead, it's done by the DP event loop once it has actually created the new writer.
// This is done to avoid data races.

// Send writer ingredients to DP event loop, where the actual writer will be constructed
let new_writer = WriterIngredients {
guid,
writer_command_receiver: hccc_download,
writer_command_receiver_waker: writer_waker,
topic_name: topic.name(),
like_stateless: writer_like_stateless,
qos_policies: writer_qos,
status_sender,
security_plugins: self.security_plugins_handle.clone(),
};

self
.discovery_command
.try_send(DiscoveryCommand::AddLocalWriter { guid: writer_guid })
.or_else(|e| {
create_error_internal!(
"Cannot inform Discovery about the new writer {writer_guid:?}. Error: {}",
e
)
})?;
.add_writer_sender
.send(new_writer)
.or_else(|e| create_error_poisoned!("Adding a new writer failed: {}", e))?;

// Return the DataWriter to user
Ok(data_writer)
Expand Down Expand Up @@ -1109,6 +1102,7 @@ impl InnerSubscriber {
}
}

// Construct the ReaderIngredients
let data_reader_waker = Arc::new(Mutex::new(None));

let (poll_event_source, poll_event_sender) = mio_source::make_poll_channel()?;
Expand All @@ -1127,6 +1121,7 @@ impl InnerSubscriber {
security_plugins: self.security_plugins_handle.clone(),
};

// Construct security info if needed
#[cfg(not(feature = "security"))]
let security_info: Option<EndpointSecurityInfo> = None;
#[cfg(feature = "security")]
Expand Down Expand Up @@ -1154,7 +1149,7 @@ impl InnerSubscriber {
None
};

// Update topic to DiscoveryDB & inform Discovery about it
// Add the topic & reader to Discovery DB
{
let mut db = self
.discovery_db
Expand All @@ -1163,6 +1158,7 @@ impl InnerSubscriber {
db.update_local_topic_reader(&dp, topic, &new_reader, security_info);
db.update_topic_data_p(topic);

// Inform Discovery about the topic
if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
topic_name: topic.name(),
}) {
Expand All @@ -1176,6 +1172,11 @@ impl InnerSubscriber {
}
}

// Note: notifying Discovery about the new reader is no longer done here.
// Instead, it's done by the DP event loop once it has actually created the new reader.
// This is done to avoid data races.

// Construct the data reader
let datareader = with_key::SimpleDataReader::<D, SA>::new(
outer.clone(),
entity_id,
Expand All @@ -1197,18 +1198,6 @@ impl InnerSubscriber {
.try_send(new_reader)
.or_else(|e| create_error_poisoned!("Cannot add DataReader. Error: {}", e))?;

// Inform Discovery about the new reader
let reader_guid = self.domain_participant.guid().from_prefix(entity_id);
self
.discovery_command
.try_send(DiscoveryCommand::AddLocalReader { guid: reader_guid })
.or_else(|e| {
create_error_internal!(
"Cannot inform Discovery about the new reader {reader_guid:?}. Error: {}",
e
)
})?;

// Return the DataReader to user
Ok(datareader)
}
Expand Down
32 changes: 25 additions & 7 deletions src/rtps/dp_event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ pub struct DPEventLoop {
participant_status_sender: StatusChannelSender<DomainParticipantStatusEvent>,

discovery_update_notification_receiver: mio_channel::Receiver<DiscoveryNotificationType>,
#[cfg(feature = "security")]
discovery_command_sender: mio_channel::SyncSender<DiscoveryCommand>,
}

Expand All @@ -111,14 +110,11 @@ impl DPEventLoop {
remove_writer_receiver: TokenReceiverPair<GUID>,
stop_poll_receiver: mio_channel::Receiver<EventLoopCommand>,
discovery_update_notification_receiver: mio_channel::Receiver<DiscoveryNotificationType>,
_discovery_command_sender: mio_channel::SyncSender<DiscoveryCommand>,
discovery_command_sender: mio_channel::SyncSender<DiscoveryCommand>,
spdp_liveness_sender: mio_channel::SyncSender<GuidPrefix>,
participant_status_sender: StatusChannelSender<DomainParticipantStatusEvent>,
security_plugins_opt: Option<SecurityPluginsHandle>,
) -> Self {
#[cfg(not(feature = "security"))]
let _dummy = _discovery_command_sender;

let poll = Poll::new().expect("Unable to create new poll.");
let (acknack_sender, acknack_receiver) =
mio_channel::sync_channel::<(GuidPrefix, AckSubmessage)>(100);
Expand Down Expand Up @@ -226,8 +222,7 @@ impl DPEventLoop {
ack_nack_receiver: acknack_receiver,
discovery_update_notification_receiver,
participant_status_sender,
#[cfg(feature = "security")]
discovery_command_sender: _discovery_command_sender,
discovery_command_sender,
}
}

Expand Down Expand Up @@ -467,7 +462,11 @@ impl DPEventLoop {
ADD_READER_TOKEN => {
trace!("add reader(s)");
while let Ok(new_reader_ing) = self.add_reader_receiver.receiver.try_recv() {
// Add the reader locally
let guid = new_reader_ing.guid;
self.add_local_reader(new_reader_ing);
// Inform Discovery about it
self.inform_discovery_about_new_local_endpoint(guid);
}
}
REMOVE_READER_TOKEN => {
Expand All @@ -483,7 +482,11 @@ impl DPEventLoop {
match event.token() {
ADD_WRITER_TOKEN => {
while let Ok(new_writer_ingredients) = self.add_writer_receiver.receiver.try_recv() {
// Add the writer locally
let guid = new_writer_ingredients.guid;
self.add_local_writer(new_writer_ingredients);
// Inform Discovery about it
self.inform_discovery_about_new_local_endpoint(guid);
}
}
REMOVE_WRITER_TOKEN => {
Expand Down Expand Up @@ -1037,6 +1040,21 @@ impl DPEventLoop {
}
}
}

fn inform_discovery_about_new_local_endpoint(&self, guid: GUID) {
let discovery_command = if guid.entity_id.kind().is_writer() {
DiscoveryCommand::AddLocalWriter { guid }
} else {
DiscoveryCommand::AddLocalReader { guid }
};

if let Err(e) = self.discovery_command_sender.try_send(discovery_command) {
log::error!(
"Failed to inform Discovery about the new endpoint: {e}. Endpoint guid: {guid:?}"
);
// Improvement TODO: that's it, just an error log entry on failing to inform discovery?
}
}
}

#[cfg(feature = "security")]
Expand Down
2 changes: 1 addition & 1 deletion src/rtps/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ impl Writer {
// But apparently some RTPS implementations send ACKNACK with
// reader_sn_state.base = 0 to indicate they have matched the writer,
// so seeing these once per new writer should be ok.
info!(
debug!(
"ACKNACK SequenceNumberSet minimum must be >= 1, got {:?} from {:?} topic {:?}",
an.reader_sn_state.base(),
reader_guid,
Expand Down
Loading