11use super :: {
22 config:: Config ,
33 fetcher:: { Config as FetcherConfig , Fetcher } ,
4+ inflight:: Inflight ,
45 ingress:: { FetchRequest , Mailbox , Message } ,
56 metrics, wire, Producer ,
67} ;
@@ -24,7 +25,7 @@ use commonware_utils::{
2425} ;
2526use futures:: future:: { self , Either } ;
2627use rand:: Rng ;
27- use std:: { collections :: HashMap , marker:: PhantomData } ;
28+ use std:: marker:: PhantomData ;
2829use tracing:: { debug, error, trace, warn} ;
2930
3031/// Represents a pending serve operation.
@@ -42,17 +43,14 @@ pub struct Engine<
4243 D : Provider < PublicKey = P > ,
4344 B : Blocker < PublicKey = P > ,
4445 Key : Span ,
45- Con : Consumer < Key = Key , Value = Bytes , Failure = ( ) > ,
46+ Con : Consumer < Key = Key , Value = Bytes > ,
4647 Pro : Producer < Key = Key > ,
4748 NetS : Sender < PublicKey = P > ,
4849 NetR : Receiver < PublicKey = P > ,
4950> {
5051 /// Context used to spawn tasks, manage time, etc.
5152 context : ContextCell < E > ,
5253
53- /// Consumes data that is fetched from the network
54- consumer : Con ,
55-
5654 /// Produces data for incoming requests
5755 producer : Pro ,
5856
@@ -71,8 +69,8 @@ pub struct Engine<
7169 /// Manages outgoing fetch requests
7270 fetcher : Fetcher < E , P , Key , NetS > ,
7371
74- /// Track the start time of fetch operations
75- fetch_timers : HashMap < Key , histogram :: Timer < E > > ,
72+ /// Tracks all in-flight fetch state
73+ inflight : Inflight < E , Con , P , Key > ,
7674
7775 /// Holds futures that resolve once the `Producer` has produced the data.
7876 /// Once the future is resolved, the data (or an error) is sent to the peer.
9694 D : Provider < PublicKey = P > ,
9795 B : Blocker < PublicKey = P > ,
9896 Key : Span ,
99- Con : Consumer < Key = Key , Value = Bytes , Failure = ( ) > ,
97+ Con : Consumer < Key = Key , Value = Bytes > ,
10098 Pro : Producer < Key = Key > ,
10199 NetS : Sender < PublicKey = P > ,
102100 NetR : Receiver < PublicKey = P > ,
@@ -123,17 +121,16 @@ impl<
123121 (
124122 Self {
125123 context : ContextCell :: new ( context) ,
126- consumer : cfg. consumer ,
127124 producer : cfg. producer ,
128125 peer_provider : cfg. peer_provider ,
129126 blocker : cfg. blocker ,
130127 last_peer_set_id : None ,
131128 mailbox : receiver,
132129 fetcher,
130+ inflight : Inflight :: new ( cfg. consumer ) ,
133131 serves : FuturesPool :: default ( ) ,
134132 priority_responses : cfg. priority_responses ,
135133 metrics,
136- fetch_timers : HashMap :: new ( ) ,
137134 _r : PhantomData ,
138135 } ,
139136 Mailbox :: new ( sender) ,
@@ -189,6 +186,7 @@ impl<
189186 } ,
190187 on_stopped => {
191188 debug!( "shutdown" ) ;
189+ self . inflight. drain( ) ;
192190 self . serves. cancel_all( ) ;
193191 } ,
194192 // Handle peer set updates
@@ -224,7 +222,7 @@ impl<
224222 trace!( ?key, "mailbox: fetch" ) ;
225223
226224 // Check if the fetch is already in progress
227- let is_new = !self . fetch_timers . contains_key ( & key) ;
225+ let is_new = !self . inflight . contains ( & key) ;
228226
229227 // Update targets
230228 match targets {
@@ -241,7 +239,7 @@ impl<
241239
242240 // Only start new fetch if not already in progress
243241 if is_new {
244- self . fetch_timers
242+ self . inflight
245243 . insert( key. clone( ) , self . metrics. fetch_duration. timer( ) ) ;
246244 self . fetcher. add_ready( key) ;
247245 } else {
@@ -252,59 +250,36 @@ impl<
252250 Message :: Cancel { key } => {
253251 trace!( ?key, "mailbox: cancel" ) ;
254252 let mut guard = self . metrics. cancel. guard( Status :: Dropped ) ;
255- if self . fetcher. cancel( & key) {
253+ self . fetcher. cancel( & key) ;
254+ if self . inflight. cancel( & key) {
256255 guard. set( Status :: Success ) ;
257- self . fetch_timers. remove( & key) . unwrap( ) . cancel( ) ; // must exist, don't record metric
258- self . consumer. failed( key. clone( ) , ( ) ) . await ;
259256 }
260257 }
261258 Message :: Retain { predicate } => {
262259 trace!( "mailbox: retain" ) ;
263260
264- // Remove from fetcher
265261 self . fetcher. retain( & predicate) ;
266-
267- // Clean up timers and notify consumer
268- let before = self . fetch_timers. len( ) ;
269- let removed = self
270- . fetch_timers
271- . extract_if( |k, _| !predicate( k) )
272- . collect:: <Vec <_>>( ) ;
273- for ( key, timer) in removed {
274- timer. cancel( ) ;
275- self . consumer. failed( key, ( ) ) . await ;
276- }
277-
278- // Metrics
279- let removed = ( before - self . fetch_timers. len( ) ) as u64 ;
280- if removed == 0 {
281- self . metrics. cancel. inc( Status :: Dropped ) ;
282- } else {
283- self . metrics. cancel. inc_by( Status :: Success , removed) ;
284- }
262+ let count = self . inflight. retain( predicate) as u64 ;
263+ self . record_cancellations( count) ;
285264 }
286265 Message :: Clear => {
287266 trace!( "mailbox: clear" ) ;
288267
289- // Clear fetcher
290268 self . fetcher. clear( ) ;
291-
292- // Drain timers and notify consumer
293- let removed = self . fetch_timers. len( ) as u64 ;
294- for ( key, timer) in self . fetch_timers. drain( ) {
295- timer. cancel( ) ;
296- self . consumer. failed( key, ( ) ) . await ;
297- }
298-
299- // Metrics
300- if removed == 0 {
301- self . metrics. cancel. inc( Status :: Dropped ) ;
302- } else {
303- self . metrics. cancel. inc_by( Status :: Success , removed) ;
304- }
269+ let count = self . inflight. drain( ) as u64 ;
270+ self . record_cancellations( count) ;
305271 }
306272 }
307- assert_eq!( self . fetcher. len( ) , self . fetch_timers. len( ) ) ;
273+ } ,
274+ // Handle completed consumer deliveries
275+ delivery = self . inflight. next_delivery( ) => {
276+ // If the delivery was aborted, its inflight entry was dropped (via
277+ // Cancel, Retain, Clear, or shutdown) before the consumer finished validating.
278+ let ( peer, key, valid) = match delivery {
279+ Ok ( delivery) => delivery,
280+ Err ( _) => continue ,
281+ } ;
282+ self . handle_delivery( peer, key, valid) . await ;
308283 } ,
309284 // Handle completed server requests
310285 serve = self . serves. next_completed( ) => {
@@ -352,15 +327,23 @@ impl<
352327 } ;
353328 match msg. payload {
354329 wire:: Payload :: Request ( key) => self . handle_network_request( peer, msg. id, key) ,
355- wire:: Payload :: Response ( response) => {
356- self . handle_network_response( peer, msg. id, response) . await
357- }
330+ wire:: Payload :: Response ( response) =>
331+ self . handle_network_response( peer, msg. id, response) ,
358332 wire:: Payload :: Error => self . handle_network_error_response( peer, msg. id) ,
359333 } ;
360334 } ,
361335 }
362336 }
363337
338+ /// Record cancellation metrics for a retain-style operation.
339+ fn record_cancellations ( & mut self , count : u64 ) {
340+ if count == 0 {
341+ self . metrics . cancel . inc ( Status :: Dropped ) ;
342+ } else {
343+ self . metrics . cancel . inc_by ( Status :: Success , count) ;
344+ }
345+ }
346+
364347 /// Handles the case where the application responds to a request from an external peer.
365348 async fn handle_serve (
366349 & mut self ,
@@ -409,7 +392,7 @@ impl<
409392 }
410393
411394 /// Handle a network response from a peer.
412- async fn handle_network_response ( & mut self , peer : P , id : u64 , response : Bytes ) {
395+ fn handle_network_response ( & mut self , peer : P , id : u64 , response : Bytes ) {
413396 trace ! ( ?peer, ?id, "peer response: data" ) ;
414397
415398 // Get the key associated with the response, if any
@@ -418,13 +401,15 @@ impl<
418401 return ;
419402 } ;
420403
421- // The peer had the data, so we can deliver it to the consumer
422- if self . consumer . deliver ( key. clone ( ) , response) . await {
423- // Record metrics
424- self . metrics . fetch . inc ( Status :: Success ) ;
425- self . fetch_timers . remove ( & key) . unwrap ( ) ; // must exist in the map, records metric on drop
404+ // The peer had the data, so deliver it to the consumer without blocking the engine.
405+ self . inflight . deliver ( key, peer, response) ;
406+ }
426407
427- // Clear all targets for this key
408+ /// Handle completed delivery to the consumer.
409+ async fn handle_delivery ( & mut self , peer : P , key : Key , valid : bool ) {
410+ if valid {
411+ self . metrics . fetch . inc ( Status :: Success ) ;
412+ self . inflight . complete ( & key) ; // records duration on drop
428413 self . fetcher . clear_targets ( & key) ;
429414 return ;
430415 }
0 commit comments