Skip to content

Commit 4487490

Browse files
authored
Merge pull request #31 from ZettaScaleLabs/PR-local/2332
fix(uring): review fixups for eclipse-zenoh#2332
2 parents caca5f0 + 8ee2239 commit 4487490

16 files changed

Lines changed: 74 additions & 549 deletions

File tree

commons/zenoh-uring/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ io-uring = { workspace = true }
3434
libc = { workspace = true }
3535
nix = { workspace = true, features = ["event"] }
3636
rand = { workspace = true, features = ["default"] }
37-
thread-priority = { workspace = true }
3837
tokio = { workspace = true }
3938
tracing = { workspace = true }
4039
zenoh-buffers = { workspace = true }

commons/zenoh-uring/src/linux/api/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,3 @@
2020
2121
pub mod reader;
2222
pub mod types;
23-
pub mod writer;

commons/zenoh-uring/src/linux/api/reader/mod.rs

Lines changed: 3 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use io_uring::{
2929
squeue, types, IoUring, SubmissionQueue,
3030
};
3131
use nix::sys::eventfd::EfdFlags;
32-
//use thread_priority::{RealtimeThreadSchedulePolicy, ThreadBuilder, ThreadPriority};
3332
use zenoh_core::bail;
3433
use zenoh_result::ZResult;
3534
use zenoh_runtime::ZRuntime;
@@ -215,7 +214,6 @@ impl Reader {
215214
let len = sq.len() as u32;
216215
if to_submit || len >= (batch_count / 2) as u32 {
217216
drop(sq);
218-
//ring.submit()?;
219217
unsafe {
220218
ring.submitter().enter::<libc::sigset_t>(
221219
len,
@@ -241,7 +239,6 @@ impl Reader {
241239
&mut sq,
242240
batch_count,
243241
)?;
244-
//let len = sq.len();
245242
drop(sq);
246243

247244
if c_exit_flag.load(std::sync::atomic::Ordering::SeqCst) {
@@ -250,17 +247,6 @@ impl Reader {
250247

251248
// this wait can be interrupted by Self::wake_reader_thread
252249
ring.submit_and_wait(1)?;
253-
//std::thread::sleep(std::time::Duration::from_millis(1));
254-
//ring.submit()?;
255-
256-
//unsafe {
257-
// ring.submitter().enter::<libc::sigset_t>(
258-
// len as u32,
259-
// 0,
260-
// io_uring::EnterFlags::GETEVENTS.bits(),
261-
// None,
262-
// )?;
263-
//}
264250
}
265251
Ok(())
266252
};
@@ -270,56 +256,9 @@ impl Reader {
270256
tracing::error!("Uring reactor error: {e}");
271257
let _ = join_sender.send(e.to_string());
272258
}
273-
tracing::debug!("Urng reactor thread finished!");
259+
tracing::debug!("Uring reactor thread finished!");
274260
});
275261

276-
/*
277-
#[cfg(unix)]
278-
let builder = ThreadBuilder::default()
279-
.name("uring_task")
280-
.policy(thread_priority::ThreadSchedulePolicy::Realtime(
281-
RealtimeThreadSchedulePolicy::Fifo,
282-
))
283-
.priority(ThreadPriority::Min);
284-
285-
let _ = builder.spawn(move |result| {
286-
if let Err(e) = result {
287-
let mut err = format!(
288-
"{:?}: error setting scheduling priority for thread: {:?}, will run with ",
289-
std::thread::current().name(),
290-
e
291-
);
292-
#[cfg(windows)]
293-
{
294-
err.push_str("the default one. ");
295-
}
296-
#[cfg(unix)]
297-
{
298-
use thread_priority::ThreadPriorityValue;
299-
300-
for priority in (ThreadPriorityValue::MIN..ThreadPriorityValue::MAX).rev() {
301-
if let Ok(p) = priority.try_into() {
302-
use thread_priority::set_current_thread_priority;
303-
304-
if set_current_thread_priority(ThreadPriority::Crossplatform(p)).is_ok()
305-
{
306-
err.push_str(&format!("priority {priority}. "));
307-
break;
308-
}
309-
}
310-
}
311-
}
312-
err.push_str("This is not an hard error and it can be safely ignored under normal operating conditions. \
313-
Though the SHM subsystem may experience some timeouts in case of an heavy congested system where this watchdog thread may not be scheduled at the required frequency.");
314-
tracing::debug!( "{}", err);
315-
}
316-
317-
if let Err(e) = ring_worker() {
318-
let _ = join_sender.send(e.to_string());
319-
}
320-
});
321-
*/
322-
323262
let inner = Arc::new(ReaderInner::new(submitter, exit_flag));
324263

325264
Ok(Self {
@@ -414,9 +353,7 @@ impl Reader {
414353
let buffer = Arc::new(context.buffer_group().read_buffer(buf_id, buf_len, sq)?);
415354
context.run_callback(buffer);
416355
}
417-
None => {
418-
//bail!("no IORING_CQE_F_BUFFER: {:?}", e);
419-
}
356+
None => {}
420357
};
421358
}
422359
Ok(need_submit)
@@ -430,9 +367,7 @@ impl Reader {
430367
arena.recycle_batch(buf_id);
431368
return true;
432369
}
433-
None => {
434-
//bail!("no IORING_CQE_F_BUFFER: {:?}", e);
435-
}
370+
None => {}
436371
}
437372
}
438373
false

commons/zenoh-uring/src/linux/api/writer/mod.rs

Lines changed: 0 additions & 133 deletions
This file was deleted.

commons/zenoh-uring/src/linux/memory/mod.rs

Lines changed: 0 additions & 48 deletions
This file was deleted.

0 commit comments

Comments
 (0)