@@ -10,7 +10,8 @@ use futures_util::FutureExt;
1010use tokio:: sync:: mpsc;
1111
1212use std:: {
13- collections:: VecDeque ,
13+ cmp:: { Ordering , Reverse } ,
14+ collections:: { BinaryHeap , VecDeque } ,
1415 convert:: TryFrom ,
1516 pin:: Pin ,
1617 str:: FromStr ,
@@ -62,7 +63,7 @@ impl From<Conn> for IdlingConn {
6263/// This is fine as long as we never do expensive work while holding the lock!
6364#[ derive( Debug ) ]
6465struct Exchange {
65- waiting : VecDeque < Waker > ,
66+ waiting : BinaryHeap < QueuedWaker > ,
6667 available : VecDeque < IdlingConn > ,
6768 exist : usize ,
6869 // only used to spawn the recycler the first time we're in async context
@@ -87,6 +88,51 @@ impl Exchange {
8788 }
8889}
8990
91+ const QUEUE_END_ID : QueueId = QueueId ( Reverse ( u64:: MAX ) ) ;
92+
93+ #[ derive( Debug , Copy , Clone , Eq , PartialEq , Ord , PartialOrd ) ]
94+ pub ( crate ) struct QueueId ( Reverse < u64 > ) ;
95+
96+ impl QueueId {
97+ fn next ( ) -> Self {
98+ static NEXT_QUEUE_ID : atomic:: AtomicU64 = atomic:: AtomicU64 :: new ( 0 ) ;
99+ let id = NEXT_QUEUE_ID . fetch_add ( 1 , atomic:: Ordering :: SeqCst ) ;
100+ QueueId ( Reverse ( id) )
101+ }
102+ }
103+
104+ #[ derive( Debug ) ]
105+ struct QueuedWaker {
106+ queue_id : QueueId ,
107+ waker : Waker ,
108+ }
109+
110+ impl QueuedWaker {
111+ fn new ( queue_id : QueueId , waker : Waker ) -> Self {
112+ QueuedWaker { queue_id, waker }
113+ }
114+ }
115+
116+ impl Eq for QueuedWaker { }
117+
118+ impl PartialEq for QueuedWaker {
119+ fn eq ( & self , other : & Self ) -> bool {
120+ self . queue_id == other. queue_id
121+ }
122+ }
123+
124+ impl Ord for QueuedWaker {
125+ fn cmp ( & self , other : & Self ) -> Ordering {
126+ self . queue_id . cmp ( & other. queue_id )
127+ }
128+ }
129+
130+ impl PartialOrd for QueuedWaker {
131+ fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
132+ Some ( self . cmp ( other) )
133+ }
134+ }
135+
90136/// Connection pool data.
91137#[ derive( Debug ) ]
92138pub struct Inner {
@@ -131,7 +177,7 @@ impl Pool {
131177 closed : false . into ( ) ,
132178 exchange : Mutex :: new ( Exchange {
133179 available : VecDeque :: with_capacity ( pool_opts. constraints ( ) . max ( ) ) ,
134- waiting : VecDeque :: new ( ) ,
180+ waiting : BinaryHeap :: new ( ) ,
135181 exist : 0 ,
136182 recycler : Some ( ( rx, pool_opts) ) ,
137183 } ) ,
@@ -181,8 +227,8 @@ impl Pool {
181227 let mut exchange = self . inner . exchange . lock ( ) . unwrap ( ) ;
182228 if exchange. available . len ( ) < self . opts . pool_opts ( ) . active_bound ( ) {
183229 exchange. available . push_back ( conn. into ( ) ) ;
184- if let Some ( w ) = exchange. waiting . pop_front ( ) {
185- w . wake ( ) ;
230+ if let Some ( qw ) = exchange. waiting . pop ( ) {
231+ qw . waker . wake ( ) ;
186232 }
187233 return ;
188234 }
@@ -216,17 +262,27 @@ impl Pool {
216262 let mut exchange = self . inner . exchange . lock ( ) . unwrap ( ) ;
217263 exchange. exist -= 1 ;
218264 // we just enabled the creation of a new connection!
219- if let Some ( w ) = exchange. waiting . pop_front ( ) {
220- w . wake ( ) ;
265+ if let Some ( qw ) = exchange. waiting . pop ( ) {
266+ qw . waker . wake ( ) ;
221267 }
222268 }
223269
224270 /// Poll the pool for an available connection.
225- fn poll_new_conn ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < GetConn > > {
226- self . poll_new_conn_inner ( cx)
227- }
228-
229- fn poll_new_conn_inner ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < GetConn > > {
271+ fn poll_new_conn (
272+ self : Pin < & mut Self > ,
273+ cx : & mut Context < ' _ > ,
274+ queued : bool ,
275+ queue_id : QueueId ,
276+ ) -> Poll < Result < GetConnInner > > {
277+ self . poll_new_conn_inner ( cx, queued, queue_id)
278+ }
279+
280+ fn poll_new_conn_inner (
281+ self : Pin < & mut Self > ,
282+ cx : & mut Context < ' _ > ,
283+ queued : bool ,
284+ queue_id : QueueId ,
285+ ) -> Poll < Result < GetConnInner > > {
230286 let mut exchange = self . inner . exchange . lock ( ) . unwrap ( ) ;
231287
232288 // NOTE: this load must happen while we hold the lock,
@@ -238,18 +294,23 @@ impl Pool {
238294
239295 exchange. spawn_futures_if_needed ( & self . inner ) ;
240296
297+ // Check if others are waiting and we're not queued.
298+ if !exchange. waiting . is_empty ( ) && !queued {
299+ exchange
300+ . waiting
301+ . push ( QueuedWaker :: new ( queue_id, cx. waker ( ) . clone ( ) ) ) ;
302+ return Poll :: Pending ;
303+ }
304+
241305 while let Some ( IdlingConn { mut conn, .. } ) = exchange. available . pop_back ( ) {
242306 if !conn. expired ( ) {
243- return Poll :: Ready ( Ok ( GetConn {
244- pool : Some ( self . clone ( ) ) ,
245- inner : GetConnInner :: Checking (
246- async move {
247- conn. stream_mut ( ) ?. check ( ) . await ?;
248- Ok ( conn)
249- }
250- . boxed ( ) ,
251- ) ,
252- } ) ) ;
307+ return Poll :: Ready ( Ok ( GetConnInner :: Checking (
308+ async move {
309+ conn. stream_mut ( ) ?. check ( ) . await ?;
310+ Ok ( conn)
311+ }
312+ . boxed ( ) ,
313+ ) ) ) ;
253314 } else {
254315 self . send_to_recycler ( conn) ;
255316 }
@@ -261,14 +322,15 @@ impl Pool {
261322 // we are allowed to make a new connection, so we will!
262323 exchange. exist += 1 ;
263324
264- return Poll :: Ready ( Ok ( GetConn {
265- pool : Some ( self . clone ( ) ) ,
266- inner : GetConnInner :: Connecting ( Conn :: new ( self . opts . clone ( ) ) . boxed ( ) ) ,
267- } ) ) ;
325+ return Poll :: Ready ( Ok ( GetConnInner :: Connecting (
326+ Conn :: new ( self . opts . clone ( ) ) . boxed ( ) ,
327+ ) ) ) ;
268328 }
269329
270- // no go -- we have to wait
271- exchange. waiting . push_back ( cx. waker ( ) . clone ( ) ) ;
330+ // Polled, but no conn available? Back into the queue.
331+ exchange
332+ . waiting
333+ . push ( QueuedWaker :: new ( queue_id, cx. waker ( ) . clone ( ) ) ) ;
272334 Poll :: Pending
273335 }
274336}
0 commit comments