Skip to content

Commit 1eedc89

Browse files
committed
feat: add delay RAII release for BufferPool
1 parent c2512b6 commit 1eedc89

6 files changed

Lines changed: 256 additions & 17 deletions

File tree

compio-driver/src/buffer_pool.rs

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
2-
cell::UnsafeCell,
2+
cell::{RefCell, UnsafeCell},
3+
collections::VecDeque,
34
fmt::Debug,
45
io,
56
mem::{self, MaybeUninit},
@@ -123,6 +124,15 @@ struct Inner {
123124

124125
/// Buffer pointers
125126
bufs: Vec<Slot>,
127+
128+
/// Queue used by extra buffer pools to ask the proactor to release them.
129+
release_queue: Option<ReleaseQueue>,
130+
}
131+
132+
#[derive(Debug)]
133+
struct ReleaseQueue {
134+
buffer_group: u16,
135+
queue: Weak<RefCell<VecDeque<u16>>>,
126136
}
127137

128138
impl BufferPoolRoot {
@@ -132,6 +142,8 @@ impl BufferPoolRoot {
132142
num_of_bufs: u16,
133143
buffer_size: usize,
134144
flags: u16,
145+
buffer_group: u16,
146+
release_queue: Option<Weak<RefCell<VecDeque<u16>>>>,
135147
) -> io::Result<Self> {
136148
let size: u32 = buffer_size.try_into().map_err(|_| {
137149
io::Error::new(
@@ -142,7 +154,11 @@ impl BufferPoolRoot {
142154
let bufs = (0..num_of_bufs.next_power_of_two())
143155
.map(|_| Some((alloc.allocate)(size)))
144156
.collect::<Vec<_>>();
145-
let ctrl = unsafe { BufControl::new(driver, &bufs, size, flags) }?;
157+
let ctrl = unsafe { BufControl::new(driver, &bufs, size, flags, buffer_group) }?;
158+
let release_queue = release_queue.map(|queue| ReleaseQueue {
159+
buffer_group,
160+
queue,
161+
});
146162

147163
Ok(Self {
148164
shared: Shared {
@@ -151,6 +167,7 @@ impl BufferPoolRoot {
151167
ctrl,
152168
size,
153169
bufs,
170+
release_queue,
154171
}
155172
.into(),
156173
}
@@ -361,6 +378,24 @@ impl Shared {
361378
fn len(&self) -> u32 {
362379
unsafe { self.with(|inner| inner.size) }
363380
}
381+
382+
fn queue_release_if_unused(self: &Rc<Self>) {
383+
if Rc::weak_count(self) != 1 {
384+
return;
385+
}
386+
387+
unsafe {
388+
self.with(|inner| {
389+
let Some(release_queue) = &inner.release_queue else {
390+
return;
391+
};
392+
393+
if let Some(queue) = release_queue.queue.upgrade() {
394+
queue.borrow_mut().push_back(release_queue.buffer_group);
395+
}
396+
})
397+
}
398+
}
364399
}
365400

366401
impl BufferRef {
@@ -429,8 +464,17 @@ impl Drop for BufferRef {
429464
if let Some(shared) = self.shared.upgrade() {
430465
// If the buffer pool is alive, set the pointer back
431466
shared.reset(self.buffer_id, self.ptr);
467+
shared.queue_release_if_unused();
432468
} else {
433469
unsafe { (self.alloc.deallocate)(self.ptr, self.full_cap) }
434470
}
435471
}
436472
}
473+
474+
impl Drop for BufferPool {
475+
fn drop(&mut self) {
476+
if let Some(shared) = self.shared.upgrade() {
477+
shared.queue_release_if_unused();
478+
}
479+
}
480+
}

compio-driver/src/lib.rs

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
)]
1616

1717
use std::{
18+
cell::RefCell,
19+
collections::{HashMap, VecDeque},
1820
io,
1921
num::NonZero,
22+
rc::Rc,
2023
task::{Poll, Waker},
2124
time::Duration,
2225
};
@@ -101,7 +104,10 @@ impl<K, R> PushEntry<K, R> {
101104
pub struct Proactor {
102105
driver: Driver,
103106
buffer_pool: BufferPoolState,
104-
extra_pools: Vec<BufferPoolRoot>,
107+
extra_pools: HashMap<u16, BufferPoolRoot>,
108+
pending_extra_pool_releases: Rc<RefCell<VecDeque<u16>>>,
109+
free_extra_bgids: VecDeque<u16>,
110+
next_extra_bgid: u16,
105111
}
106112

107113
enum BufferPoolState {
@@ -115,6 +121,8 @@ enum BufferPoolState {
115121
}
116122

117123
impl BufferPoolState {
124+
const DEFAULT_BUFFER_GROUP: u16 = 1;
125+
118126
fn get(&mut self, driver: &mut Driver) -> io::Result<BufferPool> {
119127
loop {
120128
match self {
@@ -130,6 +138,8 @@ impl BufferPoolState {
130138
*num_of_bufs,
131139
*buffer_len,
132140
*flags,
141+
Self::DEFAULT_BUFFER_GROUP,
142+
None,
133143
)?);
134144
}
135145
BufferPoolState::Init(root) => return Ok(root.get_pool()),
@@ -143,7 +153,7 @@ impl Drop for Proactor {
143153
if let BufferPoolState::Init(buffer_pool) = &mut self.buffer_pool {
144154
_ = unsafe { buffer_pool.release(&mut self.driver) };
145155
}
146-
for pool in &mut self.extra_pools {
156+
for (_, mut pool) in self.extra_pools.drain() {
147157
_ = unsafe { pool.release(&mut self.driver) };
148158
}
149159
}
@@ -172,10 +182,45 @@ impl Proactor {
172182
buffer_len: builder.buffer_pool_buffer_len,
173183
flags: builder.buffer_pool_flag,
174184
},
175-
extra_pools: Vec::new(),
185+
extra_pools: HashMap::new(),
186+
pending_extra_pool_releases: Rc::default(),
187+
free_extra_bgids: VecDeque::new(),
188+
next_extra_bgid: BufferPoolState::DEFAULT_BUFFER_GROUP + 1,
176189
})
177190
}
178191

192+
fn alloc_extra_bgid(&mut self) -> io::Result<u16> {
193+
if let Some(buffer_group) = self.free_extra_bgids.pop_front() {
194+
return Ok(buffer_group);
195+
}
196+
197+
let buffer_group = self.next_extra_bgid;
198+
self.next_extra_bgid = buffer_group
199+
.checked_add(1)
200+
.ok_or_else(|| io::Error::other("no buffer group id available"))?;
201+
202+
Ok(buffer_group)
203+
}
204+
205+
fn release_extra_bgid(&mut self, buffer_group: u16) {
206+
self.free_extra_bgids.push_back(buffer_group);
207+
}
208+
209+
fn release_unused_extra_pools(&mut self) -> io::Result<()> {
210+
while let Some(buffer_group) = { self.pending_extra_pool_releases.borrow_mut().pop_front() }
211+
{
212+
let Some(mut root) = self.extra_pools.remove(&buffer_group) else {
213+
continue;
214+
};
215+
216+
unsafe { root.release(&mut self.driver)? };
217+
218+
self.release_extra_bgid(buffer_group)
219+
}
220+
221+
Ok(())
222+
}
223+
179224
/// Get a default [`Extra`] for underlying driver.
180225
pub fn default_extra(&self) -> Extra {
181226
Extra::new(&self.driver)
@@ -295,6 +340,7 @@ impl Proactor {
295340
/// You need to call [`Proactor::pop`] to get the pushed
296341
/// operations.
297342
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
343+
self.release_unused_extra_pools()?;
298344
self.driver.poll(timeout)
299345
}
300346

@@ -477,11 +523,29 @@ impl Proactor {
477523
buffer_size: usize,
478524
flags: u16,
479525
) -> io::Result<BufferPool> {
526+
self.release_unused_extra_pools()?;
480527
let alloc = BufferAlloc::new::<A>();
481-
let root =
482-
BufferPoolRoot::new(&mut self.driver, alloc, num_of_bufs, buffer_size, flags)?;
528+
let buffer_group = self.alloc_extra_bgid()?;
529+
let release_queue = Rc::downgrade(&self.pending_extra_pool_releases);
530+
let root = match BufferPoolRoot::new(
531+
&mut self.driver,
532+
alloc,
533+
num_of_bufs,
534+
buffer_size,
535+
flags,
536+
buffer_group,
537+
Some(release_queue),
538+
) {
539+
Ok(root) => root,
540+
Err(err) => {
541+
self.release_extra_bgid(buffer_group);
542+
return Err(err);
543+
}
544+
};
545+
483546
let pool = root.get_pool();
484-
self.extra_pools.push(root);
547+
self.extra_pools.insert(buffer_group, root);
548+
485549
Ok(pool)
486550
}
487551
}

compio-driver/src/sys/buffer_pool/fusion.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ impl BufControl {
2121
bufs: &[Slot],
2222
bufs_len: u32,
2323
flags: u16,
24+
buffer_group: u16,
2425
) -> io::Result<Self> {
2526
let inner = if driver.as_iour().is_some() {
26-
let ctrl = unsafe { iour::BufControl::new(driver, bufs, bufs_len, flags) }?;
27+
let ctrl =
28+
unsafe { iour::BufControl::new(driver, bufs, bufs_len, flags, buffer_group) }?;
2729
Inner::IoUring(ctrl)
2830
} else {
2931
Inner::Fallback(fallback::BufControl::new(bufs))

compio-driver/src/sys/buffer_pool/iour.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ pub(in crate::sys) struct BufControl {
2323
len: NonZeroU16,
2424
/// Total size of the mmap
2525
size: usize,
26+
/// Buffer group registered in io_uring.
27+
buffer_group: u16,
2628
}
2729

2830
assert_not_impl!(BufControl, Send);
2931
assert_not_impl!(BufControl, Sync);
3032

3133
impl BufControl {
32-
const BUF_GROUP: u16 = 1;
33-
3434
/// # Safety
3535
///
3636
/// Caller must ensure the buffers will:
@@ -43,6 +43,7 @@ impl BufControl {
4343
bufs: &[Slot],
4444
bufs_len: u32,
4545
flags: u16,
46+
buffer_group: u16,
4647
) -> io::Result<Self> {
4748
debug_assert!(bufs.len().is_power_of_two());
4849

@@ -57,13 +58,18 @@ impl BufControl {
5758
.expect("mmap failed")
5859
.cast::<BufRingEntry>();
5960

60-
let mut this = Self { ptr, len, size };
61+
let mut this = Self {
62+
ptr,
63+
len,
64+
size,
65+
buffer_group,
66+
};
6167

6268
unsafe {
6369
driver.inner().submitter().register_buf_ring_with_flags(
6470
ptr.addr().get() as u64,
6571
len.get(),
66-
Self::BUF_GROUP,
72+
buffer_group,
6773
flags,
6874
)
6975
}?;
@@ -88,7 +94,7 @@ impl BufControl {
8894

8995
/// Get the buffer group id
9096
pub const fn buffer_group(&self) -> u16 {
91-
Self::BUF_GROUP
97+
self.buffer_group
9298
}
9399

94100
/// Reset the buffer and make it available to the kernel
@@ -113,7 +119,7 @@ impl BufControl {
113119
driver
114120
.inner()
115121
.submitter()
116-
.unregister_buf_ring(Self::BUF_GROUP)?;
122+
.unregister_buf_ring(self.buffer_group)?;
117123
unsafe { munmap(self.ptr.cast().as_ptr(), self.size) }?;
118124

119125
Ok(())

compio-driver/src/sys/buffer_pool/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@ impl BufControl {
3434
bufs: &[Slot],
3535
buf_len: u32,
3636
flags: u16,
37+
buffer_group: u16,
3738
) -> io::Result<BufControl> {
3839
#[cfg(io_uring)]
39-
let inner = unsafe { imp::BufControl::new(driver, bufs, buf_len, flags)? };
40+
let inner = unsafe { imp::BufControl::new(driver, bufs, buf_len, flags, buffer_group)? };
4041

4142
#[cfg(not(io_uring))]
4243
let inner = fallback::BufControl::new(bufs);
4344

44-
_ = (driver, buf_len, flags);
45+
_ = (driver, buf_len, flags, buffer_group);
4546

4647
Ok(Self(inner))
4748
}

0 commit comments

Comments
 (0)