@@ -36,6 +36,8 @@ namespace fastdds {
3636namespace rtps {
3737namespace ddb {
3838
39+ using ParticipantState = DiscoveryParticipantsAckStatus::ParticipantState;
40+
3941DiscoveryDataBase::DiscoveryDataBase (
4042 fastrtps::rtps::GuidPrefix_t server_guid_prefix,
4143 std::set<fastrtps::rtps::GuidPrefix_t> servers)
@@ -267,8 +269,8 @@ void DiscoveryDataBase::update_change_and_unmatch_(
267269 changes_to_release_.push_back (entity.update_and_unmatch (new_change));
268270 // Manually set relevant participants ACK status of this server, and of the participant that sent the
269271 // change, to 1. This way, we avoid backprogation of the data.
270- entity.add_or_update_ack_participant (server_guid_prefix_, true );
271- entity.add_or_update_ack_participant (new_change->writerGUID .guidPrefix , true );
272+ entity.add_or_update_ack_participant (server_guid_prefix_, ParticipantState::ACKED );
273+ entity.add_or_update_ack_participant (new_change->writerGUID .guidPrefix , ParticipantState::ACKED );
272274}
273275
274276void DiscoveryDataBase::add_ack_ (
@@ -292,7 +294,7 @@ void DiscoveryDataBase::add_ack_(
292294 // database has been updated, so this ACK is not relevant anymore
293295 if (it->second .change ()->write_params .sample_identity () == change->write_params .sample_identity ())
294296 {
295- it->second .add_or_update_ack_participant (acked_entity, true );
297+ it->second .add_or_update_ack_participant (acked_entity, ParticipantState::ACKED );
296298 }
297299 }
298300 }
@@ -307,7 +309,7 @@ void DiscoveryDataBase::add_ack_(
307309 // database has been updated, so this ACK is not relevant anymore
308310 if (it->second .change ()->write_params .sample_identity () == change->write_params .sample_identity ())
309311 {
310- it->second .add_or_update_ack_participant (acked_entity, true );
312+ it->second .add_or_update_ack_participant (acked_entity, ParticipantState::ACKED );
311313 }
312314 }
313315 }
@@ -322,7 +324,7 @@ void DiscoveryDataBase::add_ack_(
322324 // database has been updated, so this ACK is not relevant anymore
323325 if (it->second .change ()->write_params .sample_identity () == change->write_params .sample_identity ())
324326 {
325- it->second .add_or_update_ack_participant (acked_entity, true );
327+ it->second .add_or_update_ack_participant (acked_entity, ParticipantState::ACKED );
326328 }
327329 }
328330 }
@@ -694,7 +696,7 @@ void DiscoveryDataBase::create_new_participant_from_change_(
694696
695697 // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
696698 // we avoid backprogation of the data.
697- ret.first ->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , true );
699+ ret.first ->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , ParticipantState::ACKED );
698700
699701 // If the DATA(p) it's from this server, it is already in history and we do nothing here
700702 if (change_guid.guidPrefix != server_guid_prefix_)
@@ -796,7 +798,7 @@ void DiscoveryDataBase::update_participant_from_change_(
796798 if (ch->write_params .sample_identity ().sequence_number () ==
797799 participant_info.change ()->write_params .sample_identity ().sequence_number ())
798800 {
799- participant_info.add_or_update_ack_participant (ch->writerGUID .guidPrefix , true );
801+ participant_info.add_or_update_ack_participant (ch->writerGUID .guidPrefix , ParticipantState::ACKED );
800802 }
801803
802804 // we release it if it's the same or if it is lower
@@ -846,7 +848,7 @@ void DiscoveryDataBase::create_writers_from_change_(
846848 if (ch->write_params .sample_identity ().sequence_number () ==
847849 writer_it->second .change ()->write_params .sample_identity ().sequence_number ())
848850 {
849- writer_it->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , true );
851+ writer_it->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , ParticipantState::ACKED );
850852 }
851853
852854 // we release it if it's the same or if it is lower
@@ -894,7 +896,7 @@ void DiscoveryDataBase::create_writers_from_change_(
894896
895897 // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
896898 // we avoid backprogation of the data.
897- writer_it->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , true );
899+ writer_it->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , ParticipantState::ACKED );
898900
899901 // if topic is virtual, it must iterate over all readers
900902 if (topic_name == virtual_topic_)
@@ -964,7 +966,7 @@ void DiscoveryDataBase::create_readers_from_change_(
964966 if (ch->write_params .sample_identity ().sequence_number () ==
965967 reader_it->second .change ()->write_params .sample_identity ().sequence_number ())
966968 {
967- reader_it->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , true );
969+ reader_it->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , ParticipantState::ACKED );
968970 }
969971
970972 // we release it if it's the same or if it is lower
@@ -1012,7 +1014,7 @@ void DiscoveryDataBase::create_readers_from_change_(
10121014
10131015 // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
10141016 // we avoid backprogation of the data.
1015- reader_it->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , true );
1017+ reader_it->second .add_or_update_ack_participant (ch->writerGUID .guidPrefix , ParticipantState::ACKED );
10161018
10171019 // if topic is virtual, it must iterate over all readers
10181020 if (topic_name == virtual_topic_)
@@ -1407,37 +1409,42 @@ bool DiscoveryDataBase::process_dirty_topics()
14071409 // Find participants with writer info and participant with reader info in participants_
14081410 parts_reader_it = participants_.find (reader.guidPrefix );
14091411 parts_writer_it = participants_.find (writer.guidPrefix );
1410- // Find reader info in readers_
1411- readers_it = readers_.find (reader);
1412- // Find writer info in writers_
1413- writers_it = writers_.find (writer);
14141412
14151413 // Check in `participants_` whether the client with the reader has acknowledge the PDP of the client
14161414 // with the writer.
14171415 if (parts_reader_it != participants_.end ())
14181416 {
14191417 if (parts_reader_it->second .is_matched (writer.guidPrefix ))
14201418 {
1419+ // Find reader info in readers_
1420+ readers_it = readers_.find (reader);
14211421 // Check the status of the writer in `readers_[reader]::relevant_participants_builtin_ack_status`.
14221422 if (readers_it != readers_.end () &&
14231423 readers_it->second .is_relevant_participant (writer.guidPrefix ) &&
1424- !readers_it->second .is_matched (writer.guidPrefix ))
1424+ !readers_it->second .is_waiting_ack (writer.guidPrefix ))
14251425 {
14261426 // If the status is 0, add DATA(r) to a `edp_publications_to_send_` (if it's not there).
14271427 if (add_edp_subscriptions_to_send_ (readers_it->second .change ()))
14281428 {
14291429 EPROSIMA_LOG_INFO (DISCOVERY_DATABASE, " Addind DATA(r) to send: "
14301430 << readers_it->second .change ()->instanceHandle );
1431+ readers_it->second .add_or_update_ack_participant (writer.guidPrefix ,
1432+ ParticipantState::WAITING_ACK);
14311433 }
14321434 }
14331435 }
14341436 else if (parts_reader_it->second .is_relevant_participant (writer.guidPrefix ))
14351437 {
1436- // Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there).
1437- if (add_pdp_to_send_ (parts_reader_it->second .change ()))
1438+ if (!parts_reader_it->second .is_waiting_ack (writer.guidPrefix ))
14381439 {
1439- EPROSIMA_LOG_INFO (DISCOVERY_DATABASE, " Addind readers' DATA(p) to send: "
1440- << parts_reader_it->second .change ()->instanceHandle );
1440+ // Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there).
1441+ if (add_pdp_to_send_ (parts_reader_it->second .change ()))
1442+ {
1443+ EPROSIMA_LOG_INFO (DISCOVERY_DATABASE, " Adding readers' DATA(p) to send: "
1444+ << parts_reader_it->second .change ()->instanceHandle );
1445+ parts_reader_it->second .add_or_update_ack_participant (writer.guidPrefix ,
1446+ ParticipantState::WAITING_ACK);
1447+ }
14411448 }
14421449 // Set topic as not-clearable.
14431450 is_clearable = false ;
@@ -1450,26 +1457,35 @@ bool DiscoveryDataBase::process_dirty_topics()
14501457 {
14511458 if (parts_writer_it->second .is_matched (reader.guidPrefix ))
14521459 {
1460+ // Find writer info in writers_
1461+ writers_it = writers_.find (writer);
14531462 // Check the status of the reader in `writers_[writer]::relevant_participants_builtin_ack_status`.
14541463 if (writers_it != writers_.end () &&
14551464 writers_it->second .is_relevant_participant (reader.guidPrefix ) &&
1456- !writers_it->second .is_matched (reader.guidPrefix ))
1465+ !writers_it->second .is_waiting_ack (reader.guidPrefix ))
14571466 {
14581467 // If the status is 0, add DATA(w) to a `edp_subscriptions_to_send_` (if it's not there).
14591468 if (add_edp_publications_to_send_ (writers_it->second .change ()))
14601469 {
14611470 EPROSIMA_LOG_INFO (DISCOVERY_DATABASE, " Addind DATA(w) to send: "
14621471 << writers_it->second .change ()->instanceHandle );
1472+ writers_it->second .add_or_update_ack_participant (reader.guidPrefix ,
1473+ ParticipantState::WAITING_ACK);
14631474 }
14641475 }
14651476 }
14661477 else if (parts_writer_it->second .is_relevant_participant (reader.guidPrefix ))
14671478 {
1468- // Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there).
1469- if (add_pdp_to_send_ (parts_writer_it->second .change ()))
1479+ if (!parts_writer_it->second .is_waiting_ack (reader.guidPrefix ))
14701480 {
1471- EPROSIMA_LOG_INFO (DISCOVERY_DATABASE, " Addind writers' DATA(p) to send: "
1472- << parts_writer_it->second .change ()->instanceHandle );
1481+ // Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there).
1482+ if (add_pdp_to_send_ (parts_writer_it->second .change ()))
1483+ {
1484+ EPROSIMA_LOG_INFO (DISCOVERY_DATABASE, " Adding writers' DATA(p) to send: "
1485+ << parts_writer_it->second .change ()->instanceHandle );
1486+ parts_writer_it->second .add_or_update_ack_participant (reader.guidPrefix ,
1487+ ParticipantState::WAITING_ACK);
1488+ }
14731489 }
14741490 // Set topic as not-clearable.
14751491 is_clearable = false ;
@@ -2463,7 +2479,7 @@ bool DiscoveryDataBase::from_json(
24632479 // Populate GuidPrefix_t
24642480 std::istringstream (it_ack.key ()) >> prefix_aux_ack;
24652481
2466- dpi.add_or_update_ack_participant (prefix_aux_ack, it_ack.value ().get <bool >());
2482+ dpi.add_or_update_ack_participant (prefix_aux_ack, it_ack.value ().get <ParticipantState >());
24672483 }
24682484
24692485 // Add Participant
@@ -2501,7 +2517,7 @@ bool DiscoveryDataBase::from_json(
25012517 // Populate GuidPrefix_t
25022518 std::istringstream (it_ack.key ()) >> prefix_aux_ack;
25032519
2504- dei.add_or_update_ack_participant (prefix_aux_ack, it_ack.value ().get <bool >());
2520+ dei.add_or_update_ack_participant (prefix_aux_ack, it_ack.value ().get <ParticipantState >());
25052521 }
25062522
25072523 // Add Participant
@@ -2561,7 +2577,7 @@ bool DiscoveryDataBase::from_json(
25612577 // Populate GuidPrefix_t
25622578 std::istringstream (it_ack.key ()) >> prefix_aux_ack;
25632579
2564- dei.add_or_update_ack_participant (prefix_aux_ack, it_ack.value ().get <bool >());
2580+ dei.add_or_update_ack_participant (prefix_aux_ack, it_ack.value ().get <ParticipantState >());
25652581 }
25662582
25672583 // Add Participant
0 commit comments