Skip to content

Commit b575b1a

Browse files
Sh4d1tisonkun
andauthored
feat: add configurable behavior for cancelled get() calls (#17)
Signed-off-by: Patrik Cyvoct <patrik@ptrk.io> Co-authored-by: tison <wander4096@gmail.com>
1 parent 2b02d8a commit b575b1a

6 files changed

Lines changed: 425 additions & 14 deletions

File tree

fastpool/src/bounded.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ use mea::semaphore::Semaphore;
8686
use crate::ManageObject;
8787
use crate::ObjectStatus;
8888
use crate::QueueStrategy;
89+
use crate::RecycleCancelledStrategy;
8990
use crate::RetainResult;
9091
use crate::mutex::Mutex;
9192
use crate::retain_spec;
@@ -101,6 +102,9 @@ pub struct PoolConfig {
101102
///
102103
/// Determines the order of objects being queued and dequeued.
103104
pub queue_strategy: QueueStrategy,
105+
106+
/// Strategy when recycling object has been cancelled.
107+
pub recycle_cancelled_strategy: RecycleCancelledStrategy,
104108
}
105109

106110
impl PoolConfig {
@@ -109,6 +113,7 @@ impl PoolConfig {
109113
Self {
110114
max_size,
111115
queue_strategy: QueueStrategy::default(),
116+
recycle_cancelled_strategy: RecycleCancelledStrategy::default(),
112117
}
113118
}
114119

@@ -117,6 +122,15 @@ impl PoolConfig {
117122
self.queue_strategy = queue_strategy;
118123
self
119124
}
125+
126+
/// Returns a new [`PoolConfig`] with the specified recycle cancelled strategy.
127+
pub fn with_recycle_cancelled_strategy(
128+
mut self,
129+
recycle_cancelled_strategy: RecycleCancelledStrategy,
130+
) -> Self {
131+
self.recycle_cancelled_strategy = recycle_cancelled_strategy;
132+
self
133+
}
120134
}
121135

122136
/// The current pool status.
@@ -310,6 +324,7 @@ impl<M: ManageObject> Pool<M> {
310324
let mut unready_object = UnreadyObject {
311325
state: Some(object),
312326
pool: Arc::downgrade(self),
327+
recycle_cancelled_strategy: self.config.recycle_cancelled_strategy,
313328
};
314329

315330
let state = unready_object.state();
@@ -323,6 +338,10 @@ impl<M: ManageObject> Pool<M> {
323338
state.status.recycle_count += 1;
324339
state.status.recycled = Some(std::time::Instant::now());
325340
break unready_object.ready(permit);
341+
} else {
342+
// We need to manually detach here as the drop implementation
343+
// depends on the recycle cancelled strategy.
344+
unready_object.detach();
326345
}
327346
}
328347
};
@@ -396,6 +415,11 @@ impl<M: ManageObject> Pool<M> {
396415
}
397416

398417
fn push_back(&self, o: ObjectState<M::Object>) {
418+
self.return_to_pool(o);
419+
self.users.fetch_sub(1, Ordering::Relaxed);
420+
}
421+
422+
fn return_to_pool(&self, o: ObjectState<M::Object>) {
399423
let mut slots = self.slots.lock();
400424

401425
assert!(
@@ -406,9 +430,6 @@ impl<M: ManageObject> Pool<M> {
406430
);
407431

408432
slots.deque.push_back(o);
409-
drop(slots);
410-
411-
self.users.fetch_sub(1, Ordering::Relaxed);
412433
}
413434

414435
fn detach_object(&self, o: &mut M::Object, ready: bool) {
@@ -517,17 +538,30 @@ impl<M: ManageObject> Object<M> {
517538
}
518539
}
519540

520-
/// A wrapper of ObjectStatus that detaches the object from the pool when dropped.
541+
/// A wrapper of ObjectState used during the `is_recyclable` check in `Pool::get`.
542+
///
543+
/// If the check passes, the object is converted to a ready `Object` via `ready()`.
544+
/// If the check fails, `detach()` should be called to permanently remove the object
545+
/// from the pool. If dropped without calling either method (due to being cancelled),
546+
/// the behavior depends on the pool's [`RecycleCancelledStrategy`] configuration.
521547
struct UnreadyObject<M: ManageObject> {
522548
state: Option<ObjectState<M::Object>>,
523549
pool: Weak<Pool<M>>,
550+
recycle_cancelled_strategy: RecycleCancelledStrategy,
524551
}
525552

526553
impl<M: ManageObject> Drop for UnreadyObject<M> {
527554
fn drop(&mut self) {
528555
if let Some(mut state) = self.state.take() {
529556
if let Some(pool) = self.pool.upgrade() {
530-
pool.detach_object(&mut state.o, false);
557+
match self.recycle_cancelled_strategy {
558+
RecycleCancelledStrategy::Detach => {
559+
pool.detach_object(&mut state.o, false);
560+
}
561+
RecycleCancelledStrategy::ReturnToPool => {
562+
pool.return_to_pool(state);
563+
}
564+
}
531565
}
532566
}
533567
}
@@ -545,6 +579,14 @@ impl<M: ManageObject> UnreadyObject<M> {
545579
}
546580
}
547581

582+
fn detach(&mut self) {
583+
if let Some(mut state) = self.state.take() {
584+
if let Some(pool) = self.pool.upgrade() {
585+
pool.detach_object(&mut state.o, false);
586+
}
587+
}
588+
}
589+
548590
fn state(&mut self) -> &mut ObjectState<M::Object> {
549591
// SAFETY: `state` is always `Some` when `UnreadyObject` is owned.
550592
self.state.as_mut().unwrap()

fastpool/src/common.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,25 @@ pub enum QueueStrategy {
9090
/// This strategy behaves like a stack.
9191
Lifo,
9292
}
93+
94+
/// Strategy when recycling object has been cancelled.
95+
///
96+
/// This enum controls the behavior when the recycling process (specifically the
97+
/// [`ManageObject::is_recyclable`] check) is cancelled; for example, when the
98+
/// `get()` future is dropped.
99+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
100+
pub enum RecycleCancelledStrategy {
101+
/// Detach the object from the pool.
102+
///
103+
/// This is the safest option. If the recycling check is cancelled, we assume the object might
104+
/// be in an unknown state or that the check was taking too long for a reason. The object will
105+
/// detach from the pool.
106+
#[default]
107+
Detach,
108+
109+
/// Return the object to the pool for potential reuse.
110+
///
111+
/// This assumes that interrupting the check does not invalidate the object. The object is put
112+
/// back into the pool.
113+
ReturnToPool,
114+
}

fastpool/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@
198198
pub use common::ManageObject;
199199
pub use common::ObjectStatus;
200200
pub use common::QueueStrategy;
201+
pub use common::RecycleCancelledStrategy;
201202
pub use retain_spec::RetainResult;
202203

203204
mod common;

fastpool/src/unbounded.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ use std::sync::Weak;
102102
use crate::ManageObject;
103103
use crate::ObjectStatus;
104104
use crate::QueueStrategy;
105+
use crate::RecycleCancelledStrategy;
105106
use crate::RetainResult;
106107
use crate::mutex::Mutex;
107108
use crate::retain_spec;
@@ -114,6 +115,9 @@ pub struct PoolConfig {
114115
///
115116
/// Determines the order of objects being queued and dequeued.
116117
pub queue_strategy: QueueStrategy,
118+
119+
/// Strategy when recycling object has been cancelled.
120+
pub recycle_cancelled_strategy: RecycleCancelledStrategy,
117121
}
118122

119123
impl Default for PoolConfig {
@@ -127,6 +131,7 @@ impl PoolConfig {
127131
pub fn new() -> Self {
128132
Self {
129133
queue_strategy: QueueStrategy::default(),
134+
recycle_cancelled_strategy: RecycleCancelledStrategy::default(),
130135
}
131136
}
132137

@@ -135,6 +140,15 @@ impl PoolConfig {
135140
self.queue_strategy = queue_strategy;
136141
self
137142
}
143+
144+
/// Returns a new [`PoolConfig`] with the specified recycle cancelled strategy.
145+
pub fn with_recycle_cancelled_strategy(
146+
mut self,
147+
recycle_cancelled_strategy: RecycleCancelledStrategy,
148+
) -> Self {
149+
self.recycle_cancelled_strategy = recycle_cancelled_strategy;
150+
self
151+
}
138152
}
139153

140154
/// The current pool status.
@@ -324,6 +338,7 @@ impl<T, M: ManageObject<Object = T>> Pool<T, M> {
324338
let mut unready_object = UnreadyObject {
325339
state: Some(object),
326340
pool: Arc::downgrade(self),
341+
recycle_cancelled_strategy: self.config.recycle_cancelled_strategy,
327342
};
328343

329344
let state = unready_object.state();
@@ -337,6 +352,10 @@ impl<T, M: ManageObject<Object = T>> Pool<T, M> {
337352
state.status.recycle_count += 1;
338353
state.status.recycled = Some(std::time::Instant::now());
339354
break unready_object.ready();
355+
} else {
356+
// We need to manually detach here as the drop implementation
357+
// depends on the recycle cancelled strategy.
358+
unready_object.detach();
340359
}
341360
}
342361
};
@@ -538,17 +557,30 @@ impl<T, M: ManageObject<Object = T>> Object<T, M> {
538557
}
539558
}
540559

541-
/// A wrapper of ObjectStatus that detaches the object from the pool when dropped.
560+
/// A wrapper of ObjectState used during the `is_recyclable` check in `Pool::get`.
561+
///
562+
/// If the check passes, the object is converted to a ready `Object` via `ready()`.
563+
/// If the check fails, `detach()` should be called to permanently remove the object
564+
/// from the pool. If dropped without calling either method (due to being cancelled),
565+
/// the behavior depends on the pool's [`RecycleCancelledStrategy`] configuration.
542566
struct UnreadyObject<T, M: ManageObject<Object = T> = NeverManageObject<T>> {
543567
state: Option<ObjectState<T>>,
544568
pool: Weak<Pool<T, M>>,
569+
recycle_cancelled_strategy: RecycleCancelledStrategy,
545570
}
546571

547572
impl<T, M: ManageObject<Object = T>> Drop for UnreadyObject<T, M> {
548573
fn drop(&mut self) {
549574
if let Some(mut state) = self.state.take() {
550575
if let Some(pool) = self.pool.upgrade() {
551-
pool.detach_object(&mut state.o);
576+
match self.recycle_cancelled_strategy {
577+
RecycleCancelledStrategy::Detach => {
578+
pool.detach_object(&mut state.o);
579+
}
580+
RecycleCancelledStrategy::ReturnToPool => {
581+
pool.push_back(state);
582+
}
583+
}
552584
}
553585
}
554586
}
@@ -562,6 +594,14 @@ impl<T, M: ManageObject<Object = T>> UnreadyObject<T, M> {
562594
Object { state, pool }
563595
}
564596

597+
fn detach(&mut self) {
598+
if let Some(mut state) = self.state.take() {
599+
if let Some(pool) = self.pool.upgrade() {
600+
pool.detach_object(&mut state.o);
601+
}
602+
}
603+
}
604+
565605
fn state(&mut self) -> &mut ObjectState<T> {
566606
// SAFETY: `state` is always `Some` when `UnreadyObject` is owned.
567607
self.state.as_mut().unwrap()

0 commit comments

Comments
 (0)