forked from compio-rs/compio
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlib.rs
More file actions
89 lines (74 loc) · 2.58 KB
/
lib.rs
File metadata and controls
89 lines (74 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//! Runtime-compatibility layers for compio.
//!
//! This crate provides a compatibility layer for compio's runtime, allowing it
//! to be used with different underlying event loop implementations, e.g.,
//! `tokio` or `smol`.
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
#![doc(
html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
#![doc(
html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
use std::{
io,
ops::Deref,
task::{Context, Poll},
time::Duration,
};
use compio_log::error;
use compio_runtime::Runtime;
use mod_use::mod_use;
mod_use![sys];
/// A compatibility layer for [`Runtime`]. It is driven by the underlying
/// [`Adapter`].
pub struct RuntimeCompat<A> {
runtime: A,
}
impl<A: Adapter> RuntimeCompat<A> {
/// Creates a new [`RuntimeCompat`] with the given runtime.
pub fn new(runtime: Runtime) -> io::Result<Self> {
let runtime = A::new(runtime)?;
Ok(Self { runtime })
}
/// Executes the given future on the runtime, driving it to completion.
pub async fn execute<F: Future>(&self, f: F) -> F::Output {
let waker = self.runtime.waker();
let mut context = Context::from_waker(&waker);
let mut future = std::pin::pin!(f);
loop {
if let Poll::Ready(result) = self.runtime.enter(|| future.as_mut().poll(&mut context)) {
self.runtime.enter(|| self.runtime.run());
return result;
}
let mut remaining_tasks = self.runtime.enter(|| self.runtime.run());
remaining_tasks |= self.runtime.flush();
let timeout = if remaining_tasks {
Some(Duration::ZERO)
} else {
self.runtime.current_timeout()
};
match self.runtime.wait(timeout).await {
Ok(_) => {}
Err(e)
if matches!(
e.kind(),
io::ErrorKind::TimedOut | io::ErrorKind::Interrupted
) => {}
Err(e) => panic!("failed to wait for driver: {e:?}"),
}
if let Err(_e) = self.runtime.clear() {
error!("failed to clear notifier: {_e:?}");
}
self.runtime.poll_with(Some(Duration::ZERO));
}
}
}
impl<A: Adapter> Deref for RuntimeCompat<A> {
type Target = Runtime;
fn deref(&self) -> &Self::Target {
&self.runtime
}
}