Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ bytemuck = { version = "1.13", features = ["derive"] }
zerocopy = "0.8"
serio = { version = "0.2" }

# io
uid-mux = { version = "0.2" }

# testing
rstest = "0.12"
pretty_assertions = "1"
Expand Down
4 changes: 1 addition & 3 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ default = []
executor = []
sync = ["tokio/sync"]
future = []
test-utils = ["uid-mux/test-utils", "tokio/io-util", "tokio-util/compat"]
test-utils = ["tokio/io-util", "tokio-util/compat"]
ideal = ["tokio/sync"]

[dependencies]
Expand All @@ -18,7 +18,6 @@ bytes = { workspace = true }
pin-project-lite.workspace = true
thiserror.workspace = true
serio.workspace = true
uid-mux = { workspace = true }
serde = { workspace = true, features = ["derive"] }
pollster.workspace = true
cfg-if.workspace = true
Expand All @@ -34,7 +33,6 @@ tokio = { workspace = true, features = [
"net",
] }
tokio-util = { workspace = true, features = ["compat"] }
uid-mux = { workspace = true, features = ["test-utils"] }
tracing-subscriber = { workspace = true, features = ["fmt"] }
criterion = { workspace = true, features = ["async_tokio"] }
rstest = { workspace = true }
Expand Down
10 changes: 5 additions & 5 deletions crates/common/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Context {
match &mut self.mode {
Mode::St => Ok(st::map(self, items, f).await),
Mode::Mt { threads } => {
let threads = threads.get(threads.concurrency()).await?;
let threads = threads.get(threads.concurrency())?;
mt::map(threads, items, f, weight).await
}
}
Expand All @@ -138,7 +138,7 @@ impl Context {
match &mut self.mode {
Mode::St => Ok(st::join(self, a, b).await),
Mode::Mt { threads } => {
let threads = threads.get(2).await?;
let threads = threads.get(2)?;
mt::join(threads, a, b).await
}
}
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Context {
match &mut self.mode {
Mode::St => Ok(st::try_join(self, a, b).await),
Mode::Mt { threads } => {
let threads = threads.get(2).await?;
let threads = threads.get(2)?;
mt::try_join(threads, a, b).await
}
}
Expand All @@ -192,7 +192,7 @@ impl Context {
match &mut self.mode {
Mode::St => Ok(st::try_join3(self, a, b, c).await),
Mode::Mt { threads } => {
let threads = threads.get(3).await?;
let threads = threads.get(3)?;
mt::try_join3(threads, a, b, c).await
}
}
Expand Down Expand Up @@ -220,7 +220,7 @@ impl Context {
match &mut self.mode {
Mode::St => Ok(st::try_join4(self, a, b, c, d).await),
Mode::Mt { threads } => {
let threads = threads.get(4).await?;
let threads = threads.get(4)?;
mt::try_join4(threads, a, b, c, d).await
}
}
Expand Down
26 changes: 14 additions & 12 deletions crates/common/src/context/mt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ impl Multithread {
ContextError::new(ErrorKind::Thread, "thread ID overflow".to_string())
})?;

let io_fut = { self.builder.lock().unwrap().mux.open(id.clone()) };

let io = io_fut
.await
let io = self
.builder
.lock()
.unwrap()
.mux
.open(id.clone())
.map_err(|e| ContextError::new(ErrorKind::Mux, e))?;

let ctx =
Expand All @@ -60,15 +62,16 @@ pub(crate) struct ThreadBuilder {
}

impl ThreadBuilder {
async fn spawn(
fn spawn(
this: Arc<Mutex<Self>>,
id: ThreadId,
config: Arc<MtConfig>,
) -> Result<Handle, ContextError> {
let io_fut = { this.lock().unwrap().mux.open(id.clone()) };

let io = io_fut
.await
let io = this
.lock()
.unwrap()
.mux
.open(id.clone())
.map_err(|e| ContextError::new(ErrorKind::Mux, e))?;

let ctx = Context::new_multi_threaded(id.clone(), io, config, this.clone());
Expand Down Expand Up @@ -115,7 +118,7 @@ impl Threads {
self.config.concurrency
}

pub(crate) async fn get(&mut self, count: usize) -> Result<&[Handle], ContextError> {
pub(crate) fn get(&mut self, count: usize) -> Result<&[Handle], ContextError> {
if count > self.config.concurrency {
return Err(ContextError::new(
ErrorKind::Thread,
Expand All @@ -128,8 +131,7 @@ impl Threads {
ContextError::new(ErrorKind::Thread, "thread ID overflow".to_string())
})?;

let child =
ThreadBuilder::spawn(self.builder.clone(), id, self.config.clone()).await?;
let child = ThreadBuilder::spawn(self.builder.clone(), id, self.config.clone())?;
self.children.push(child);
}
}
Expand Down
16 changes: 2 additions & 14 deletions crates/common/src/context/mt/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::sync::{Arc, Mutex};

use uid_mux::UidMux;

use crate::{
ThreadId,
context::{
Expand Down Expand Up @@ -73,18 +71,8 @@ impl<S> MultithreadBuilder<S> {
}

/// Sets the multiplexer.
pub fn mux<M>(mut self, mux: M) -> Self
where
M: UidMux<ThreadId> + Clone + Send + Sync + 'static,
<M as UidMux<ThreadId>>::Error: std::error::Error + Send + Sync + 'static,
{
self.mux = Some(Box::new(mux));
self
}

#[allow(dead_code)]
pub(crate) fn mux_internal(mut self, mux: Box<dyn Mux + Send>) -> Self {
self.mux = Some(mux);
pub fn mux<M: Into<Box<dyn Mux + Send>>>(mut self, mux: M) -> Self {
self.mux = Some(mux.into());
self
}

Expand Down
14 changes: 7 additions & 7 deletions crates/common/src/context/test/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Basic test context helpers.

use crate::mux::test_framed_mux;
use futures::{AsyncRead, AsyncWrite};
use serio::channel::duplex;
use uid_mux::test_utils::test_framed_mux;

use crate::{
context::{Context, Multithread, SpawnError},
Expand Down Expand Up @@ -41,8 +41,8 @@ pub fn test_mt_context(io_buffer: usize) -> (Multithread, Multithread) {
let mux_1: Box<dyn Mux + Send> = Box::new(mux_1);

(
Multithread::builder().mux_internal(mux_0).build().unwrap(),
Multithread::builder().mux_internal(mux_1).build().unwrap(),
Multithread::builder().mux(mux_0).build().unwrap(),
Multithread::builder().mux(mux_1).build().unwrap(),
)
}

Expand All @@ -62,12 +62,12 @@ where
(
Multithread::builder()
.spawn_handler(spawn.clone())
.mux_internal(mux_0)
.mux(mux_0)
.build()
.unwrap(),
Multithread::builder()
.spawn_handler(spawn)
.mux_internal(mux_1)
.mux(mux_1)
.build()
.unwrap(),
)
Expand Down Expand Up @@ -95,13 +95,13 @@ where
Multithread::builder()
.concurrency(concurrency)
.spawn_handler(spawn.clone())
.mux_internal(mux_0)
.mux(mux_0)
.build()
.unwrap(),
Multithread::builder()
.concurrency(concurrency)
.spawn_handler(spawn)
.mux_internal(mux_1)
.mux(mux_1)
.build()
.unwrap(),
)
Expand Down
Loading
Loading