@@ -494,9 +494,6 @@ void DiscoveryDataBase::process_pdp_data_queue()
494
494
// Lock(exclusive mode) mutex locally
495
495
std::lock_guard<std::recursive_mutex> guard (mutex_);
496
496
497
- // Swap DATA queues
498
- pdp_data_queue_.Swap ();
499
-
500
497
// Process all messages in the queque
501
498
while (!pdp_data_queue_.Empty ())
502
499
{
@@ -534,9 +531,6 @@ bool DiscoveryDataBase::process_edp_data_queue()
534
531
// Lock(exclusive mode) mutex locally
535
532
std::lock_guard<std::recursive_mutex> guard (mutex_);
536
533
537
- // Swap DATA queues
538
- edp_data_queue_.Swap ();
539
-
540
534
eprosima::fastdds::rtps::CacheChange_t* change;
541
535
std::string topic_name;
542
536
@@ -660,7 +654,7 @@ void DiscoveryDataBase::match_new_server_(
660
654
}
661
655
}
662
656
// The resources needed for TCP new connections are created during the matching process when the
663
- // DATA(p) is receieved by each server.
657
+ // DATA(p) is received by each server.
664
658
665
659
// Create virtual endpoints
666
660
create_virtual_endpoints_ (participant_prefix);
@@ -788,32 +782,71 @@ void DiscoveryDataBase::update_participant_from_change_(
788
782
{
789
783
fastdds::rtps::GUID_t change_guid = guid_from_change (ch);
790
784
785
+ assert (ch->kind == eprosima::fastdds::rtps::ALIVE);
786
+
787
+ // If the change corresponds to a previously removed participant (which hasn't yet been removed from the map since
788
+ // the DATA(Up) is still unacked), update map with new data and behave as if it was a new participant.
789
+ // Remove also the old change from the disposals collection, if it was added just before
790
+ if (participant_info.change ()->kind != eprosima::fastdds::rtps::ALIVE)
791
+ {
792
+ // Update the change data
793
+ participant_info.participant_change_data (change_data);
794
+
795
+ // Remove old change from disposals if it was added just before to avoid sending data UP
796
+ auto it = std::find (disposals_.begin (), disposals_.end (), participant_info.change ());
797
+ if (it != disposals_.end ())
798
+ {
799
+ disposals_.erase (it);
800
+ }
801
+
802
+ // Update change. This should add the UNALIVE change to changes_to_release_, which should later both remove the
803
+ // change from the writer's history and release the change
804
+ update_change_and_unmatch_ (ch, participant_info);
805
+
806
+ // If it is local and server we have to create virtual endpoints, except for our own server
807
+ if (change_guid.guidPrefix != server_guid_prefix_ && !change_data.is_client () && change_data.is_local ())
808
+ {
809
+ // Match new server and create virtual endpoints
810
+ // NOTE: match after having updated the change, so virtual endpoints are not discarded for having
811
+ // an associated unalive participant
812
+ match_new_server_ (change_guid.guidPrefix , change_data.is_superclient ());
813
+ }
814
+
815
+ // Treat as a new participant found
816
+ new_updates_++;
817
+ if (change_guid.guidPrefix != server_guid_prefix_)
818
+ {
819
+ server_acked_by_all (false );
820
+ }
821
+ }
822
+
791
823
// Specific case when a Data(P) from an entity A known as remote comes from the very entity A (we have
792
824
// the Data(P) because of other server B, but now it arrives from A itself)
793
825
// The entity A changes to local
794
826
// Must be local data, or else it is a remote endpoint and should not be changed
795
- if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local () &&
827
+ else if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local () &&
796
828
DiscoveryDataBase::participant_data_has_changed_ (participant_info, change_data))
797
829
{
830
+ // Update the change data
831
+ participant_info.participant_change_data (change_data);
832
+
833
+ // Update change
834
+ update_change_and_unmatch_ (ch, participant_info);
835
+
798
836
// If the participant changes to server local, virtual endpoints must be added
799
837
// If it is local and server the only possibility is it was a remote server and it must be converted to local
800
838
if (!change_data.is_client ())
801
839
{
840
+ // NOTE: match after having updated the change in order to send the new Data(P)
802
841
match_new_server_ (change_guid.guidPrefix , change_data.is_superclient ());
803
842
}
804
843
805
- // Update the change data
806
- participant_info.participant_change_data (change_data);
807
-
808
- // Update change
809
- update_change_and_unmatch_ (ch, participant_info);
810
-
811
844
// Treat as a new participant found
812
845
new_updates_++;
813
846
server_acked_by_all (false );
814
847
815
848
// It is possible that this Data(P) is in our history if it has not been acked by all
816
- // In this case we have to resent it with the new update
849
+ // In this case we have to resend it with the new update
817
850
if (!participant_info.is_acked_by_all ())
818
851
{
819
852
add_pdp_to_send_ (ch);
@@ -916,6 +949,29 @@ void DiscoveryDataBase::create_writers_from_change_(
916
949
// The writer was NOT known by the database
917
950
else
918
951
{
952
+ // Check if corresponding participant is known, abort otherwise
953
+ // NOTE: Processing a DATA(w) should always be preceded by the reception and processing of its corresponding
954
+ // participant. However, one may receive a DATA(w) just after the participant has been removed, case in which the
955
+ // former should no longer be processed.
956
+ std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
957
+ participants_.find (writer_guid.guidPrefix );
958
+ if (writer_part_it == participants_.end ())
959
+ {
960
+ EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
961
+ " Writer " << writer_guid << " has no associated participant. Skipping" );
962
+ assert (topic_name != virtual_topic_);
963
+ changes_to_release_.push_back (ch); // Release change so it can be reused
964
+ return ;
965
+ }
966
+ else if (writer_part_it->second .change ()->kind != fastdds::rtps::ChangeKind_t::ALIVE)
967
+ {
968
+ EPROSIMA_LOG_WARNING (DISCOVERY_DATABASE,
969
+ " Writer " << writer_guid << " is associated to a removed participant. Skipping" );
970
+ assert (topic_name != virtual_topic_);
971
+ changes_to_release_.push_back (ch); // Release change so it can be reused
972
+ return ;
973
+ }
974
+
919
975
// Add entry to writers_
920
976
DiscoveryEndpointInfo tmp_writer (
921
977
ch,
@@ -936,18 +992,7 @@ void DiscoveryDataBase::create_writers_from_change_(
936
992
new_updates_++;
937
993
938
994
// Add entry to participants_[guid_prefix]::writers
939
- std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
940
- participants_.find (writer_guid.guidPrefix );
941
- if (writer_part_it != participants_.end ())
942
- {
943
- writer_part_it->second .add_writer (writer_guid);
944
- }
945
- else
946
- {
947
- EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
948
- " Writer " << writer_guid << " has no associated participant. Skipping" );
949
- return ;
950
- }
995
+ writer_part_it->second .add_writer (writer_guid);
951
996
952
997
// Add writer to writers_by_topic_[topic_name]
953
998
add_writer_to_topic_ (writer_guid, topic_name);
@@ -1034,6 +1079,29 @@ void DiscoveryDataBase::create_readers_from_change_(
1034
1079
// The reader was NOT known by the database
1035
1080
else
1036
1081
{
1082
+ // Check if corresponding participant is known, abort otherwise
1083
+ // NOTE: Processing a DATA(r) should always be preceded by the reception and processing of its corresponding
1084
+ // participant. However, one may receive a DATA(r) just after the participant has been removed, case in which the
1085
+ // former should no longer be processed.
1086
+ std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
1087
+ participants_.find (reader_guid.guidPrefix );
1088
+ if (reader_part_it == participants_.end ())
1089
+ {
1090
+ EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
1091
+ " Reader " << reader_guid << " has no associated participant. Skipping" );
1092
+ assert (topic_name != virtual_topic_);
1093
+ changes_to_release_.push_back (ch); // Release change so it can be reused
1094
+ return ;
1095
+ }
1096
+ else if (reader_part_it->second .change ()->kind != fastdds::rtps::ChangeKind_t::ALIVE)
1097
+ {
1098
+ EPROSIMA_LOG_WARNING (DISCOVERY_DATABASE,
1099
+ " Reader " << reader_guid << " is associated to a removed participant. Skipping" );
1100
+ assert (topic_name != virtual_topic_);
1101
+ changes_to_release_.push_back (ch); // Release change so it can be reused
1102
+ return ;
1103
+ }
1104
+
1037
1105
// Add entry to readers_
1038
1106
DiscoveryEndpointInfo tmp_reader (
1039
1107
ch,
@@ -1054,18 +1122,7 @@ void DiscoveryDataBase::create_readers_from_change_(
1054
1122
new_updates_++;
1055
1123
1056
1124
// Add entry to participants_[guid_prefix]::readers
1057
- std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
1058
- participants_.find (reader_guid.guidPrefix );
1059
- if (reader_part_it != participants_.end ())
1060
- {
1061
- reader_part_it->second .add_reader (reader_guid);
1062
- }
1063
- else
1064
- {
1065
- EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
1066
- " Reader " << reader_guid << " has no associated participant. Skipping" );
1067
- return ;
1068
- }
1125
+ reader_part_it->second .add_reader (reader_guid);
1069
1126
1070
1127
// Add reader to readers_by_topic_[topic_name]
1071
1128
add_reader_to_topic_ (reader_guid, topic_name);
@@ -1347,7 +1404,7 @@ void DiscoveryDataBase::process_dispose_participant_(
1347
1404
delete_reader_entity_ (reader_guid);
1348
1405
}
1349
1406
1350
- // All participant endoints must be already unmatched in others endopoints relevant_ack maps
1407
+ // All participant endpoints must be already unmatched in others endpoints relevant_ack maps
1351
1408
1352
1409
// Unmatch own participant
1353
1410
unmatch_participant_ (participant_guid.guidPrefix );
@@ -1618,6 +1675,14 @@ bool DiscoveryDataBase::data_queue_empty()
1618
1675
return (pdp_data_queue_.BothEmpty () && edp_data_queue_.BothEmpty ());
1619
1676
}
1620
1677
1678
+ void DiscoveryDataBase::swap_data_queues ()
1679
+ {
1680
+ // Swap EDP before PDP to avoid race condition in which both data P and w/r are received at the same time,
1681
+ // just after having swapped the PDP queue
1682
+ edp_data_queue_.Swap ();
1683
+ pdp_data_queue_.Swap ();
1684
+ }
1685
+
1621
1686
bool DiscoveryDataBase::is_participant (
1622
1687
const eprosima::fastdds::rtps::GUID_t& guid)
1623
1688
{
0 commit comments