Skip to content

Commit 2dca3c1

Browse files
committed
- Optimize bufer reclamation
- Optimize memory fragmentation - Optimize main read loop - Optimnize zbuf reader
1 parent 21203b8 commit 2dca3c1

4 files changed

Lines changed: 114 additions & 17 deletions

File tree

commons/zenoh-buffers/src/zbuf.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,17 @@ impl Buffer for ZBuf {
9595
.iter()
9696
.fold(0, |len, slice| len + slice.len())
9797
}
98+
99+
#[inline(always)]
100+
fn is_empty(&self) -> bool {
101+
// optimize compared to default implementation by avoiding the walkthouh
102+
for slice in self.slices.as_ref() {
103+
if !slice.is_empty() {
104+
return false;
105+
}
106+
}
107+
true
108+
}
98109
}
99110

100111
// SplitBuffer

commons/zenoh-uring/src/linux/api/reader/mod.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ impl Reader {
8888
}
8989

9090
pub fn new(batch_size: usize, batch_count: BufferCount) -> ZResult<Self> {
91+
let batch_size = batch_size + batch_size / 2; // add some headroom to reduce ENOBUFS errors
92+
9193
// create eventfd to wake io_uring on demand by producing read events
9294
let waker = Arc::new(nix::sys::eventfd::EventFd::from_value_and_flags(
9395
0,
@@ -184,17 +186,12 @@ impl Reader {
184186
}
185187

186188
loop {
189+
#[cfg(feature = "uring_trace")]
190+
let mut i = 0;
191+
187192
while let Some(e) = unsafe { ring.completion_shared() }.next() {
188193
let mut sq = unsafe { ring.submission_shared() };
189194

190-
roll_cmds(
191-
&receiver,
192-
&mut context_storage,
193-
&arena,
194-
&mut sq,
195-
batch_count,
196-
)?;
197-
198195
match e.user_data() {
199196
IndexGeneration::INVALID_MIN => {
200197
tracing::debug!("Zero-user-data entry: {:?}", e);
@@ -205,9 +202,16 @@ impl Reader {
205202
unsafe { sq.push(&waker_read)? };
206203
}
207204
index => {
205+
#[cfg(feature = "uring_trace")]
206+
{
207+
i += 1;
208+
}
209+
208210
let index = unsafe { IndexGeneration::new_unchecked(index) };
209-
if Reader::multi(&context_storage, &e, index, &arena, &mut sq)? {
210-
let len = sq.len() as u32;
211+
let to_submit =
212+
Reader::multi(&context_storage, &e, index, &arena, &mut sq)?;
213+
let len = sq.len() as u32;
214+
if to_submit || len >= (batch_count / 2) as u32 {
211215
drop(sq);
212216
//ring.submit()?;
213217
unsafe {
@@ -223,6 +227,9 @@ impl Reader {
223227
}
224228
}
225229

230+
#[cfg(feature = "uring_trace")]
231+
tracing::info!("Processed {} completion entries", i);
232+
226233
// receive external submissions
227234
let mut sq = unsafe { ring.submission_shared() };
228235
roll_cmds(
@@ -354,16 +361,17 @@ impl Reader {
354361
) -> ZResult<bool> {
355362
let mut need_submit = false;
356363
if e.result() < 0 {
357-
tracing::trace!("Error entry: {:?}", e);
364+
tracing::debug!("Error entry: {:?}", e);
358365

359366
match e.result().neg() {
360367
libc::ENOBUFS => {
361368
// We are out of buffers
362369
tracing::debug!("ENOBUFS: Restart multishot receive for task {:?}", index);
363370

364-
let recv = opcode::RecvMulti::new(types::Fd(context.fd), context.buffer_group().id())
371+
let recv =
372+
opcode::RecvMulti::new(types::Fd(context.fd), context.buffer_group().id())
365373
.build()
366-
//.flags(io_uring::squeue::Flags::ASYNC)
374+
.flags(io_uring::squeue::Flags::ASYNC)
367375
.user_data(index.into());
368376

369377
unsafe { sq.push(&recv)? };
@@ -393,7 +401,7 @@ impl Reader {
393401
context.buffer_group().id(),
394402
)
395403
.build()
396-
//.flags(io_uring::squeue::Flags::ASYNC)
404+
.flags(io_uring::squeue::Flags::ASYNC)
397405
.user_data(index.into());
398406

399407
unsafe { sq.push(&recv)? };

commons/zenoh-uring/src/linux/reader/buffer_group.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,20 @@ impl BufferGroup {
139139
buf_len: usize,
140140
sq: &mut SubmissionQueue<'_>,
141141
) -> ZResult<RxBuffer> {
142+
let count = self
143+
.buffers_missing
144+
.load(std::sync::atomic::Ordering::Relaxed)
145+
+ 1;
146+
let leftover = self
147+
.arena
148+
.arena
149+
.provide_batches_to_group(self.id, count, sq)?;
142150
self.buffers_missing
143-
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
144-
self.ensure_batches_to_ring(sq)?;
151+
.store(leftover, std::sync::atomic::Ordering::Relaxed);
152+
153+
//self.buffers_missing
154+
// .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
155+
//self.ensure_batches_to_ring(sq)?;
145156

146157
let data = &mut unsafe {
147158
self.arena

commons/zenoh-uring/src/linux/reader/reservable_arena.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414

1515
use std::sync::Arc;
1616

17+
use io_uring::{opcode, squeue::Flags, SubmissionQueue};
18+
use zenoh_result::ZResult;
19+
1720
use crate::{
1821
api::types::BufferCount,
1922
batch_arena::{BatchArena, Batches},
2023
reader::submission::SubmissionIface,
21-
types::BufferId,
24+
types::{BufferGroupId, BufferId},
2225
};
2326

2427
pub(crate) struct ReservableArenaInner {
@@ -54,6 +57,70 @@ impl ReservableArenaInner {
5457
self.arena.batch_size()
5558
}
5659

60+
pub fn provide_batches_to_group(
61+
&self,
62+
group_id: BufferGroupId,
63+
mut count: BufferCount,
64+
sq: &mut SubmissionQueue<'_>,
65+
) -> ZResult<BufferCount> {
66+
// recycle batches from the recycled_batches queue first
67+
while let Some(buf_id) = self.recycled_batches.pop() {
68+
let data = unsafe { self.arena.index_mut_unchecked(buf_id as usize) };
69+
70+
let entry = opcode::ProvideBuffers::new(
71+
data.as_mut_ptr(),
72+
self.arena.batch_size() as i32,
73+
1,
74+
group_id,
75+
buf_id as BufferId,
76+
)
77+
.build()
78+
.flags(Flags::SKIP_SUCCESS);
79+
80+
unsafe {
81+
sq.push(&entry)?;
82+
}
83+
84+
count -= 1;
85+
86+
if count == 0 {
87+
break;
88+
}
89+
}
90+
91+
// allocate more memory if needed
92+
if count > 0 {
93+
if let Some(additional_batches) = self.arena.allocate_more_batches() {
94+
let (primary, to_recycle) =
95+
additional_batches.split(count as BufferCount, self.arena.batch_size());
96+
97+
// push the primary batch to the result
98+
let entry = opcode::ProvideBuffers::new(
99+
primary.addr,
100+
self.arena.batch_size() as i32,
101+
primary.nbufs,
102+
group_id,
103+
primary.start_bid as BufferId,
104+
)
105+
.build()
106+
.flags(Flags::SKIP_SUCCESS);
107+
108+
unsafe {
109+
sq.push(&entry)?;
110+
}
111+
112+
// recycle the leftover batches
113+
if let Some(to_recycle) = to_recycle {
114+
for buf_id in to_recycle.start_bid..to_recycle.start_bid + to_recycle.nbufs {
115+
self.recycle_batch(buf_id);
116+
}
117+
}
118+
}
119+
}
120+
121+
Ok(count)
122+
}
123+
57124
pub fn pop_batches(&self, count: BufferCount) -> Vec<Batches> {
58125
let mut result = Vec::with_capacity(count as usize);
59126

0 commit comments

Comments
 (0)