@@ -492,9 +492,6 @@ void DiscoveryDataBase::process_pdp_data_queue()
492
492
// Lock(exclusive mode) mutex locally
493
493
std::lock_guard<std::recursive_mutex> guard (mutex_);
494
494
495
- // Swap DATA queues
496
- pdp_data_queue_.Swap ();
497
-
498
495
// Process all messages in the queque
499
496
while (!pdp_data_queue_.Empty ())
500
497
{
@@ -532,9 +529,6 @@ bool DiscoveryDataBase::process_edp_data_queue()
532
529
// Lock(exclusive mode) mutex locally
533
530
std::lock_guard<std::recursive_mutex> guard (mutex_);
534
531
535
- // Swap DATA queues
536
- edp_data_queue_.Swap ();
537
-
538
532
eprosima::fastdds::rtps::CacheChange_t* change;
539
533
std::string topic_name;
540
534
@@ -658,7 +652,7 @@ void DiscoveryDataBase::match_new_server_(
658
652
}
659
653
}
660
654
// The resources needed for TCP new connections are created during the matching process when the
661
- // DATA(p) is receieved by each server.
655
+ // DATA(p) is received by each server.
662
656
663
657
// Create virtual endpoints
664
658
create_virtual_endpoints_ (participant_prefix);
@@ -786,32 +780,71 @@ void DiscoveryDataBase::update_participant_from_change_(
786
780
{
787
781
fastdds::rtps::GUID_t change_guid = guid_from_change (ch);
788
782
783
+ assert (ch->kind == eprosima::fastdds::rtps::ALIVE);
784
+
785
+ // If the change corresponds to a previously removed participant (which hasn't yet been removed from the map since
786
+ // the DATA(Up) is still unacked), update map with new data and behave as if it was a new participant.
787
+ // Remove also the old change from the disposals collection, if it was added just before
788
+ if (participant_info.change ()->kind != eprosima::fastdds::rtps::ALIVE)
789
+ {
790
+ // Update the change data
791
+ participant_info.participant_change_data (change_data);
792
+
793
+ // Remove old change from disposals if it was added just before to avoid sending data UP
794
+ auto it = std::find (disposals_.begin (), disposals_.end (), participant_info.change ());
795
+ if (it != disposals_.end ())
796
+ {
797
+ disposals_.erase (it);
798
+ }
799
+
800
+ // Update change. This should add the UNALIVE change to changes_to_release_, which should later both remove the
801
+ // change from the writer's history and release the change
802
+ update_change_and_unmatch_ (ch, participant_info);
803
+
804
+ // If it is local and server we have to create virtual endpoints, except for our own server
805
+ if (change_guid.guidPrefix != server_guid_prefix_ && !change_data.is_client () && change_data.is_local ())
806
+ {
807
+ // Match new server and create virtual endpoints
808
+ // NOTE: match after having updated the change, so virtual endpoints are not discarded for having
809
+ // an associated unalive participant
810
+ match_new_server_ (change_guid.guidPrefix , change_data.is_superclient ());
811
+ }
812
+
813
+ // Treat as a new participant found
814
+ new_updates_++;
815
+ if (change_guid.guidPrefix != server_guid_prefix_)
816
+ {
817
+ server_acked_by_all (false );
818
+ }
819
+ }
820
+
789
821
// Specific case when a Data(P) from an entity A known as remote comes from the very entity A (we have
790
822
// the Data(P) because of other server B, but now it arrives from A itself)
791
823
// The entity A changes to local
792
824
// Must be local data, or else it is a remote endpoint and should not be changed
793
- if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local () &&
825
+ else if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local () &&
794
826
DiscoveryDataBase::participant_data_has_changed_ (participant_info, change_data))
795
827
{
828
+ // Update the change data
829
+ participant_info.participant_change_data (change_data);
830
+
831
+ // Update change
832
+ update_change_and_unmatch_ (ch, participant_info);
833
+
796
834
// If the participant changes to server local, virtual endpoints must be added
797
835
// If it is local and server the only possibility is it was a remote server and it must be converted to local
798
836
if (!change_data.is_client ())
799
837
{
838
+ // NOTE: match after having updated the change in order to send the new Data(P)
800
839
match_new_server_ (change_guid.guidPrefix , change_data.is_superclient ());
801
840
}
802
841
803
- // Update the change data
804
- participant_info.participant_change_data (change_data);
805
-
806
- // Update change
807
- update_change_and_unmatch_ (ch, participant_info);
808
-
809
842
// Treat as a new participant found
810
843
new_updates_++;
811
844
server_acked_by_all (false );
812
845
813
846
// It is possible that this Data(P) is in our history if it has not been acked by all
814
- // In this case we have to resent it with the new update
847
+ // In this case we have to resend it with the new update
815
848
if (!participant_info.is_acked_by_all ())
816
849
{
817
850
add_pdp_to_send_ (ch);
@@ -914,6 +947,29 @@ void DiscoveryDataBase::create_writers_from_change_(
914
947
// The writer was NOT known by the database
915
948
else
916
949
{
950
+ // Check if corresponding participant is known, abort otherwise
951
+ // NOTE: Processing a DATA(w) should always be preceded by the reception and processing of its corresponding
952
+ // participant. However, one may receive a DATA(w) just after the participant has been removed, case in which the
953
+ // former should no longer be processed.
954
+ std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
955
+ participants_.find (writer_guid.guidPrefix );
956
+ if (writer_part_it == participants_.end ())
957
+ {
958
+ EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
959
+ " Writer " << writer_guid << " has no associated participant. Skipping" );
960
+ assert (topic_name != virtual_topic_);
961
+ changes_to_release_.push_back (ch); // Release change so it can be reused
962
+ return ;
963
+ }
964
+ else if (writer_part_it->second .change ()->kind != fastdds::rtps::ChangeKind_t::ALIVE)
965
+ {
966
+ EPROSIMA_LOG_WARNING (DISCOVERY_DATABASE,
967
+ " Writer " << writer_guid << " is associated to a removed participant. Skipping" );
968
+ assert (topic_name != virtual_topic_);
969
+ changes_to_release_.push_back (ch); // Release change so it can be reused
970
+ return ;
971
+ }
972
+
917
973
// Add entry to writers_
918
974
DiscoveryEndpointInfo tmp_writer (
919
975
ch,
@@ -934,18 +990,7 @@ void DiscoveryDataBase::create_writers_from_change_(
934
990
new_updates_++;
935
991
936
992
// Add entry to participants_[guid_prefix]::writers
937
- std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
938
- participants_.find (writer_guid.guidPrefix );
939
- if (writer_part_it != participants_.end ())
940
- {
941
- writer_part_it->second .add_writer (writer_guid);
942
- }
943
- else
944
- {
945
- EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
946
- " Writer " << writer_guid << " has no associated participant. Skipping" );
947
- return ;
948
- }
993
+ writer_part_it->second .add_writer (writer_guid);
949
994
950
995
// Add writer to writers_by_topic_[topic_name]
951
996
add_writer_to_topic_ (writer_guid, topic_name);
@@ -1032,6 +1077,29 @@ void DiscoveryDataBase::create_readers_from_change_(
1032
1077
// The reader was NOT known by the database
1033
1078
else
1034
1079
{
1080
+ // Check if corresponding participant is known, abort otherwise
1081
+ // NOTE: Processing a DATA(r) should always be preceded by the reception and processing of its corresponding
1082
+ // participant. However, one may receive a DATA(r) just after the participant has been removed, case in which the
1083
+ // former should no longer be processed.
1084
+ std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
1085
+ participants_.find (reader_guid.guidPrefix );
1086
+ if (reader_part_it == participants_.end ())
1087
+ {
1088
+ EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
1089
+ " Reader " << reader_guid << " has no associated participant. Skipping" );
1090
+ assert (topic_name != virtual_topic_);
1091
+ changes_to_release_.push_back (ch); // Release change so it can be reused
1092
+ return ;
1093
+ }
1094
+ else if (reader_part_it->second .change ()->kind != fastdds::rtps::ChangeKind_t::ALIVE)
1095
+ {
1096
+ EPROSIMA_LOG_WARNING (DISCOVERY_DATABASE,
1097
+ " Reader " << reader_guid << " is associated to a removed participant. Skipping" );
1098
+ assert (topic_name != virtual_topic_);
1099
+ changes_to_release_.push_back (ch); // Release change so it can be reused
1100
+ return ;
1101
+ }
1102
+
1035
1103
// Add entry to readers_
1036
1104
DiscoveryEndpointInfo tmp_reader (
1037
1105
ch,
@@ -1052,18 +1120,7 @@ void DiscoveryDataBase::create_readers_from_change_(
1052
1120
new_updates_++;
1053
1121
1054
1122
// Add entry to participants_[guid_prefix]::readers
1055
- std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
1056
- participants_.find (reader_guid.guidPrefix );
1057
- if (reader_part_it != participants_.end ())
1058
- {
1059
- reader_part_it->second .add_reader (reader_guid);
1060
- }
1061
- else
1062
- {
1063
- EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
1064
- " Reader " << reader_guid << " has no associated participant. Skipping" );
1065
- return ;
1066
- }
1123
+ reader_part_it->second .add_reader (reader_guid);
1067
1124
1068
1125
// Add reader to readers_by_topic_[topic_name]
1069
1126
add_reader_to_topic_ (reader_guid, topic_name);
@@ -1345,7 +1402,7 @@ void DiscoveryDataBase::process_dispose_participant_(
1345
1402
delete_reader_entity_ (reader_guid);
1346
1403
}
1347
1404
1348
- // All participant endoints must be already unmatched in others endopoints relevant_ack maps
1405
+ // All participant endpoints must be already unmatched in others endpoints relevant_ack maps
1349
1406
1350
1407
// Unmatch own participant
1351
1408
unmatch_participant_ (participant_guid.guidPrefix );
@@ -1602,6 +1659,14 @@ bool DiscoveryDataBase::data_queue_empty()
1602
1659
return (pdp_data_queue_.BothEmpty () && edp_data_queue_.BothEmpty ());
1603
1660
}
1604
1661
1662
+ void DiscoveryDataBase::swap_data_queues ()
1663
+ {
1664
+ // Swap EDP before PDP to avoid race condition in which both data P and w/r are received at the same time,
1665
+ // just after having swapped the PDP queue
1666
+ edp_data_queue_.Swap ();
1667
+ pdp_data_queue_.Swap ();
1668
+ }
1669
+
1605
1670
bool DiscoveryDataBase::is_participant (
1606
1671
const eprosima::fastdds::rtps::GUID_t& guid)
1607
1672
{
0 commit comments