Skip to content

Commit 67222b2

Browse files
committed
Wrap connections in waiter channels in a drop guard
1 parent a89e062 commit 67222b2

File tree

2 files changed

+80
-36
lines changed

2 files changed

+80
-36
lines changed

bb8/src/inner.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ use std::time::{Duration, Instant};
66

77
use futures_channel::oneshot;
88
use futures_util::stream::{FuturesUnordered, StreamExt};
9-
use parking_lot::Mutex;
109
use tokio::spawn;
1110
use tokio::time::{interval_at, sleep, timeout, Interval};
1211

1312
use crate::api::{Builder, ManageConnection, PooledConnection, RunError};
14-
use crate::internals::{Approval, ApprovalIter, Conn, PoolInternals, State};
13+
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State};
1514

1615
pub(crate) struct PoolInner<M>
1716
where
@@ -25,11 +24,7 @@ where
2524
M: ManageConnection + Send,
2625
{
2726
pub(crate) fn new(builder: Builder<M>, manager: M) -> Self {
28-
let inner = Arc::new(SharedPool {
29-
statics: builder,
30-
manager,
31-
internals: Mutex::new(PoolInternals::default()),
32-
});
27+
let inner = Arc::new(SharedPool::new(builder, manager));
3328

3429
if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() {
3530
let s = Arc::downgrade(&inner);
@@ -120,7 +115,7 @@ where
120115
};
121116

122117
match timeout(self.inner.statics.connection_timeout, rx).await {
123-
Ok(Ok(conn)) => Ok(PooledConnection::new(self, conn)),
118+
Ok(Ok(mut guard)) => Ok(PooledConnection::new(self, guard.extract())),
124119
_ => Err(RunError::TimedOut),
125120
}
126121
}
@@ -141,7 +136,7 @@ where
141136

142137
let mut locked = self.inner.internals.lock();
143138
match conn {
144-
Some(conn) => locked.put(conn, None),
139+
Some(conn) => locked.put(conn, None, self.inner.clone()),
145140
None => {
146141
let approvals = locked.dropped(1, &self.inner.statics);
147142
self.spawn_replenishing_approvals(approvals);
@@ -177,7 +172,10 @@ where
177172
match shared.manager.connect().await {
178173
Ok(conn) => {
179174
let conn = Conn::new(conn);
180-
shared.internals.lock().put(conn, Some(approval));
175+
shared
176+
.internals
177+
.lock()
178+
.put(conn, Some(approval), self.inner.clone());
181179
return Ok(());
182180
}
183181
Err(e) => {
@@ -216,17 +214,6 @@ where
216214
}
217215
}
218216

219-
/// The guts of a `Pool`.
220-
#[allow(missing_debug_implementations)]
221-
struct SharedPool<M>
222-
where
223-
M: ManageConnection + Send,
224-
{
225-
statics: Builder<M>,
226-
manager: M,
227-
internals: Mutex<PoolInternals<M>>,
228-
}
229-
230217
fn schedule_reaping<M>(mut interval: Interval, weak_shared: Weak<SharedPool<M>>)
231218
where
232219
M: ManageConnection,

bb8/src/internals.rs

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,44 @@
11
use std::cmp::min;
2+
use std::sync::Arc;
23
use std::time::Instant;
34

45
use futures_channel::oneshot;
6+
use parking_lot::Mutex;
57

68
use crate::api::{Builder, ManageConnection};
79
use std::collections::VecDeque;
810

11+
/// The guts of a `Pool`.
12+
#[allow(missing_debug_implementations)]
13+
pub(crate) struct SharedPool<M>
14+
where
15+
M: ManageConnection + Send,
16+
{
17+
pub(crate) statics: Builder<M>,
18+
pub(crate) manager: M,
19+
pub(crate) internals: Mutex<PoolInternals<M>>,
20+
}
21+
22+
impl<M> SharedPool<M>
23+
where
24+
M: ManageConnection + Send,
25+
{
26+
pub(crate) fn new(statics: Builder<M>, manager: M) -> Self {
27+
Self {
28+
statics,
29+
manager,
30+
internals: Mutex::new(PoolInternals::default()),
31+
}
32+
}
33+
}
34+
935
/// The pool data that must be protected by a lock.
1036
#[allow(missing_debug_implementations)]
1137
pub(crate) struct PoolInternals<M>
1238
where
1339
M: ManageConnection,
1440
{
15-
waiters: VecDeque<oneshot::Sender<Conn<M::Connection>>>,
41+
waiters: VecDeque<oneshot::Sender<InternalsGuard<M>>>,
1642
conns: VecDeque<IdleConn<M::Connection>>,
1743
num_conns: u32,
1844
pending_conns: u32,
@@ -31,27 +57,31 @@ where
3157
.map(|idle| (idle.conn, self.wanted(config)))
3258
}
3359

34-
pub(crate) fn put(&mut self, conn: Conn<M::Connection>, approval: Option<Approval>) {
35-
let mut conn = IdleConn::from(conn);
60+
pub(crate) fn put(
61+
&mut self,
62+
conn: Conn<M::Connection>,
63+
approval: Option<Approval>,
64+
pool: Arc<SharedPool<M>>,
65+
) {
3666
if approval.is_some() {
3767
self.pending_conns -= 1;
3868
self.num_conns += 1;
3969
}
4070

41-
loop {
42-
if let Some(waiter) = self.waiters.pop_front() {
43-
// This connection is no longer idle, send it back out.
44-
match waiter.send(conn.conn) {
45-
Ok(_) => break,
46-
// Oops, that receiver was gone. Loop and try again.
47-
Err(c) => conn.conn = c,
71+
let mut guard = InternalsGuard::new(conn, pool);
72+
while let Some(waiter) = self.waiters.pop_front() {
73+
// This connection is no longer idle, send it back out
74+
match waiter.send(guard) {
75+
Ok(()) => return,
76+
Err(g) => {
77+
guard = g;
4878
}
49-
} else {
50-
// Queue it in the idle queue.
51-
self.conns.push_back(conn);
52-
break;
5379
}
5480
}
81+
82+
// Queue it in the idle queue
83+
self.conns
84+
.push_back(IdleConn::from(guard.conn.take().unwrap()));
5585
}
5686

5787
pub(crate) fn connect_failed(&mut self, _: Approval) {
@@ -77,7 +107,7 @@ where
77107

78108
pub(crate) fn push_waiter(
79109
&mut self,
80-
waiter: oneshot::Sender<Conn<M::Connection>>,
110+
waiter: oneshot::Sender<InternalsGuard<M>>,
81111
config: &Builder<M>,
82112
) -> ApprovalIter {
83113
self.waiters.push_back(waiter);
@@ -137,6 +167,33 @@ where
137167
}
138168
}
139169

170+
pub(crate) struct InternalsGuard<M: ManageConnection> {
171+
conn: Option<Conn<M::Connection>>,
172+
pool: Arc<SharedPool<M>>,
173+
}
174+
175+
impl<M: ManageConnection> InternalsGuard<M> {
176+
fn new(conn: Conn<M::Connection>, pool: Arc<SharedPool<M>>) -> Self {
177+
Self {
178+
conn: Some(conn),
179+
pool,
180+
}
181+
}
182+
183+
pub(crate) fn extract(&mut self) -> Conn<M::Connection> {
184+
self.conn.take().unwrap() // safe: can only be `None` after `Drop`
185+
}
186+
}
187+
188+
impl<M: ManageConnection> Drop for InternalsGuard<M> {
189+
fn drop(&mut self) {
190+
if let Some(conn) = self.conn.take() {
191+
let mut locked = self.pool.internals.lock();
192+
locked.put(conn, None, self.pool.clone());
193+
}
194+
}
195+
}
196+
140197
#[must_use]
141198
pub(crate) struct ApprovalIter {
142199
num: usize,

0 commit comments

Comments
 (0)