@@ -25,6 +25,13 @@ struct Waiter<M> {
2525 responder : oneshot:: Sender < M > ,
2626}
2727
28+ /// Result of buffering an incoming or locally sent digest (inserted, duplicate, or ineligible).
29+ enum InsertMessageResult {
30+ Inserted ,
31+ Duplicate ,
32+ Ineligible ,
33+ }
34+
2835/// Instance of the main engine for the module.
2936///
3037/// It is responsible for:
9097 /// the message once.
9198 counts : BTreeMap < M :: Digest , usize > ,
9299
100+ /// Latest primary peer set allowed to keep buffered messages resident.
101+ latest_primary_peers : Set < P > ,
102+
93103 ////////////////////////////////////////
94104 // Metrics
95105 ////////////////////////////////////////
@@ -124,6 +134,7 @@ where
124134 deques : BTreeMap :: new ( ) ,
125135 items : BTreeMap :: new ( ) ,
126136 counts : BTreeMap :: new ( ) ,
137+ latest_primary_peers : Set :: default ( ) ,
127138 peer_provider : cfg. peer_provider ,
128139 metrics,
129140 } ;
@@ -159,6 +170,14 @@ where
159170 on_stopped => {
160171 debug!( "shutdown" ) ;
161172 } ,
173+ // Handle peer set subscription messages
174+ Some ( update) = peer_set_subscription. recv( ) else {
175+ debug!( "peer set subscription closed" ) ;
176+ break ;
177+ } => {
178+ // Evict by latest primary only; see buffered module docs.
179+ self . update_latest_primary_peers( update. latest. primary) ;
180+ } ,
162181 // Handle mailbox messages
163182 Some ( msg) = self . mailbox_receiver. recv( ) else {
164183 error!( "mailbox receiver failed" ) ;
@@ -210,12 +229,6 @@ where
210229 . inc( ) ;
211230 self . handle_network( peer, msg) ;
212231 } ,
213- Some ( ( _, _, tracked_peers) ) = peer_set_subscription. recv( ) else {
214- debug!( "peer set subscription closed" ) ;
215- break ;
216- } => {
217- self . evict_untracked_peers( & tracked_peers) ;
218- } ,
219232 }
220233 }
221234
@@ -232,7 +245,8 @@ where
232245 responder : oneshot:: Sender < Vec < P > > ,
233246 ) {
234247 // Store the message, continue even if it was already stored
235- let _ = self . insert_message ( self . public_key . clone ( ) , msg. clone ( ) ) ;
248+ let digest = msg. digest ( ) ;
249+ let _ = self . insert_message ( self . public_key . clone ( ) , digest, msg. clone ( ) ) ;
236250
237251 // Broadcast the message to the network
238252 let sent_to = sender
@@ -271,13 +285,20 @@ where
271285
272286 /// Handles a message that was received from a peer.
273287 fn handle_network ( & mut self , peer : P , msg : M ) {
274- if !self . insert_message ( peer. clone ( ) , msg) {
275- debug ! ( ?peer, "message already stored" ) ;
276- self . metrics . receive . inc ( Status :: Dropped ) ;
277- return ;
288+ let digest = msg. digest ( ) ;
289+ match self . insert_message ( peer. clone ( ) , digest, msg) {
290+ InsertMessageResult :: Inserted => {
291+ self . metrics . receive . inc ( Status :: Success ) ;
292+ }
293+ InsertMessageResult :: Duplicate => {
294+ debug ! ( ?peer, "message already stored" ) ;
295+ self . metrics . receive . inc ( Status :: Dropped ) ;
296+ }
297+ InsertMessageResult :: Ineligible => {
298+ debug ! ( ?peer, "message from peer outside latest.primary not cached" ) ;
299+ self . metrics . receive . inc ( Status :: Dropped ) ;
300+ }
278301 }
279-
280- self . metrics . receive . inc ( Status :: Success ) ;
281302 }
282303
283304 ////////////////////////////////////////
@@ -286,18 +307,21 @@ where
286307
287308 /// Inserts a message into the cache.
288309 ///
289- /// Returns `true` if the message was inserted, `false` if it was already present.
290- /// Updates the deque, item count, and message cache, potentially evicting an old message.
291- fn insert_message ( & mut self , peer : P , msg : M ) -> bool {
292- let digest = msg. digest ( ) ;
293-
310+ /// Waiters are notified even when a sender is not eligible to keep a
311+ /// buffered cache entry resident.
312+ fn insert_message ( & mut self , peer : P , digest : M :: Digest , msg : M ) -> InsertMessageResult {
294313 // Send the message to the waiters, if any
295314 if let Some ( waiters) = self . waiters . remove ( & digest) {
296315 for waiter in waiters {
297316 self . respond_subscribe ( waiter. responder , msg. clone ( ) ) ;
298317 }
299318 }
300319
320+ // Only peers listed in `latest.primary` may buffer
321+ if self . latest_primary_peers . position ( & peer) . is_none ( ) {
322+ return InsertMessageResult :: Ineligible ;
323+ }
324+
301325 // Get the relevant deque for the peer
302326 let deque = self
303327 . deques
@@ -310,7 +334,7 @@ where
310334 let v = deque. remove ( i) . unwrap ( ) ; // Must exist
311335 deque. push_front ( v) ;
312336 }
313- return false ;
337+ return InsertMessageResult :: Duplicate ;
314338 } ;
315339
316340 // - Insert the digest into the peer cache
@@ -336,20 +360,20 @@ where
336360 decrement_digest_refcount ( & mut self . counts , & mut self . items , & stale) ;
337361 }
338362
339- true
363+ InsertMessageResult :: Inserted
340364 }
341365
342- fn evict_untracked_peers ( & mut self , tracked_peers : & Set < P > ) {
343- let tracked = tracked_peers. as_ref ( ) ;
366+ fn update_latest_primary_peers ( & mut self , peers : Set < P > ) {
344367 for ( peer, deque) in self
345368 . deques
346- . extract_if ( .., |peer, _| !tracked . contains ( peer) )
369+ . extract_if ( .., |peer, _| peers . position ( peer) . is_none ( ) )
347370 {
348371 debug ! ( ?peer, digests = deque. len( ) , "evicting disconnected peer" ) ;
349372 for digest in deque {
350373 decrement_digest_refcount ( & mut self . counts , & mut self . items , & digest) ;
351374 }
352375 }
376+ self . latest_primary_peers = peers;
353377 }
354378
355379 ////////////////////////////////////////
0 commit comments