Skip to content

Commit e4b5ce9

Browse files
committed
Switch from tokio.rs to async_std
1 parent 6de5ca8 commit e4b5ce9

14 files changed

+422
-115
lines changed

Cargo.lock

+326-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ uuid = { version = "0.8", features = ["v4"] }
2020
anyhow = "1.0"
2121
clap = "3.0.0-beta.2"
2222
lazy_static = "1.4"
23-
tokio = { version = "1.7", features = ["rt-multi-thread", "macros", "sync", "net", "time", "io-util"] }
23+
tokio = { version = "1.7", features = ["macros", "net"] }
24+
async-std = { version = "^1.0", features = ["attributes", "unstable"] }
2425
wasmtime = { version = "0.29", git = "https://github.com/bytecodealliance/wasmtime.git", branch = "main" }
2526
wasmtime-wasi = { version = "0.29", git = "https://github.com/bytecodealliance/wasmtime.git", branch = "main" }
2627
wasmparser = "0.79"
@@ -34,6 +35,7 @@ semver = "1"
3435
wat = "1.0"
3536
wabt = "0.10"
3637
pretty_assertions = "0.7"
38+
tokio = { version = "1.7", features = ["rt-multi-thread"] }
3739
criterion = { version = "0.3", features = ["async_tokio"] }
3840

3941
[build-dependencies]

benches/benchmark.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@ fn criterion_benchmark(c: &mut Criterion) {
1414

1515
c.bench_function("spawn process", |b| {
1616
b.to_async(&rt).iter(|| async {
17-
module.spawn("hello", Vec::new(), None).await.unwrap();
17+
module
18+
.spawn("hello", Vec::new(), None)
19+
.await
20+
.unwrap()
21+
.0
22+
.await;
1823
});
1924
});
2025

src/api/mailbox.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ fn send_receive_skip_search(
511511
.or_trap("lunatic::message::send_receive_skip_search")?;
512512
process.send(Signal::Message(message));
513513
if let Some(message) = tokio::select! {
514-
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
514+
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
515515
message = caller.data_mut().message_mailbox.pop_skip_search(tag) => Some(message)
516516
} {
517517
// Put the message into the scratch area
@@ -550,7 +550,7 @@ fn receive(
550550
tag => Some(tag),
551551
};
552552
if let Some(message) = tokio::select! {
553-
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
553+
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
554554
message = caller.data_mut().message_mailbox.pop(tag) => Some(message)
555555
} {
556556
let result = match message {

src/api/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ fn namespace_matches_filter(namespace: &str, name: &str, namespace_filter: &[Str
7575
}
7676

7777
mod tests {
78-
#[tokio::test]
78+
#[async_std::test]
7979
async fn import_filter_signature_matches() {
8080
use crate::{EnvConfig, Environment};
8181

src/api/networking.rs

+12-28
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ use std::convert::TryInto;
22
use std::future::Future;
33
use std::io::IoSlice;
44
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
5-
use std::sync::Arc;
65
use std::time::Duration;
76

87
use anyhow::Result;
9-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
10-
use tokio::net::{TcpListener, TcpStream};
11-
use tokio::sync::Mutex;
8+
use async_std::io::{ReadExt, WriteExt};
9+
use async_std::net::{TcpListener, TcpStream};
1210
use wasmtime::{Caller, FuncType, Linker, ValType};
1311
use wasmtime::{Memory, Trap};
1412

@@ -211,7 +209,7 @@ fn resolve(
211209
let name = std::str::from_utf8(buffer.as_slice()).or_trap("lunatic::network::resolve")?;
212210
// Check for timeout during lookup
213211
let return_ = if let Some(result) = tokio::select! {
214-
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
212+
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
215213
result = tokio::net::lookup_host(name) => Some(result)
216214
} {
217215
let (iter_or_error_id, result) = match result {
@@ -463,11 +461,7 @@ fn tcp_accept(
463461

464462
let (tcp_stream_or_error_id, peer_addr_iter, result) = match tcp_listener.accept().await {
465463
Ok((stream, socket_addr)) => {
466-
let stream_id = caller
467-
.data_mut()
468-
.resources
469-
.tcp_streams
470-
.add(Arc::new(Mutex::new(stream)));
464+
let stream_id = caller.data_mut().resources.tcp_streams.add(stream);
471465
let dns_iter_id = caller
472466
.data_mut()
473467
.resources
@@ -539,18 +533,11 @@ fn tcp_connect(
539533
)?;
540534

541535
if let Some(result) = tokio::select! {
542-
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
536+
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
543537
result = TcpStream::connect(socket_addr) => Some(result)
544538
} {
545539
let (stream_or_error_id, result) = match result {
546-
Ok(stream) => (
547-
caller
548-
.data_mut()
549-
.resources
550-
.tcp_streams
551-
.add(Arc::new(Mutex::new(stream))),
552-
0,
553-
),
540+
Ok(stream) => (caller.data_mut().resources.tcp_streams.add(stream), 0),
554541
Err(error) => (caller.data_mut().errors.add(error.into()), 1),
555542
};
556543

@@ -660,17 +647,17 @@ fn tcp_write_vectored(
660647
.collect();
661648
let vec_slices = vec_slices?;
662649

663-
let stream_mutex = caller
650+
let mut stream = caller
664651
.data()
665652
.resources
666653
.tcp_streams
667654
.get(stream_id)
668655
.or_trap("lunatic::network::tcp_write_vectored")?
669656
.clone();
670-
let mut stream = stream_mutex.lock().await;
657+
671658
// Check for timeout
672659
if let Some(result) = tokio::select! {
673-
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
660+
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
674661
result = stream.write_vectored(vec_slices.as_slice()) => Some(result)
675662
} {
676663
let (opaque, return_) = match result {
@@ -722,16 +709,14 @@ fn tcp_read(
722709
opaque_ptr: u32,
723710
) -> Box<dyn Future<Output = Result<u32, Trap>> + Send + '_> {
724711
Box::new(async move {
725-
let stream_mutex = caller
712+
let mut stream = caller
726713
.data()
727714
.resources
728715
.tcp_streams
729716
.get(stream_id)
730717
.or_trap("lunatic::network::tcp_read")?
731718
.clone();
732719

733-
let mut stream = stream_mutex.lock().await;
734-
735720
let memory = get_memory(&mut caller)?;
736721
let buffer = memory
737722
.data_mut(&mut caller)
@@ -740,7 +725,7 @@ fn tcp_read(
740725

741726
// Check for timeout first
742727
if let Some(result) = tokio::select! {
743-
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
728+
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
744729
result = stream.read(buffer) => Some(result)
745730
} {
746731
let (opaque, return_) = match result {
@@ -783,14 +768,13 @@ fn tcp_flush(
783768
error_id_ptr: u32,
784769
) -> Box<dyn Future<Output = Result<u32, Trap>> + Send + '_> {
785770
Box::new(async move {
786-
let stream_mutex = caller
771+
let mut stream = caller
787772
.data()
788773
.resources
789774
.tcp_streams
790775
.get(stream_id)
791776
.or_trap("lunatic::network::tcp_flush")?
792777
.clone();
793-
let mut stream = stream_mutex.lock().await;
794778

795779
let (error_id, result) = match stream.flush().await {
796780
Ok(()) => (0, 0),

src/api/process.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ fn clone_process(mut caller: Caller<ProcessState>, process_id: u64) -> Result<u6
754754
//% Suspend process for `millis`.
755755
fn sleep_ms(_: Caller<ProcessState>, millis: u64) -> Box<dyn Future<Output = ()> + Send + '_> {
756756
Box::new(async move {
757-
tokio::time::sleep(Duration::from_millis(millis)).await;
757+
async_std::task::sleep(Duration::from_millis(millis)).await;
758758
})
759759
}
760760

@@ -771,7 +771,7 @@ fn die_when_link_dies(mut caller: Caller<ProcessState>, trap: u32) {
771771
caller
772772
.data_mut()
773773
.signal_mailbox
774-
.send(Signal::DieWhenLinkDies(trap != 0))
774+
.try_send(Signal::DieWhenLinkDies(trap != 0))
775775
.expect("The signal is sent to itself and the receiver must exist at this point");
776776
}
777777

@@ -847,7 +847,7 @@ fn link(mut caller: Caller<ProcessState>, tag: i64, process_id: u64) -> Result<(
847847
caller
848848
.data_mut()
849849
.signal_mailbox
850-
.send(Signal::Link(tag, process))
850+
.try_send(Signal::Link(tag, process))
851851
.expect("The signal is sent to itself and the receiver must exist at this point");
852852
Ok(())
853853
}
@@ -878,7 +878,7 @@ fn unlink(mut caller: Caller<ProcessState>, process_id: u64) -> Result<(), Trap>
878878
caller
879879
.data_mut()
880880
.signal_mailbox
881-
.send(Signal::UnLink(process))
881+
.try_send(Signal::UnLink(process))
882882
.expect("The signal is sent to itself and the receiver must exist at this point");
883883
Ok(())
884884
}

src/environment.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use anyhow::Result;
22
use lazy_static::lazy_static;
3-
use tokio::task;
43
use wasmtime::{Config, Engine, InstanceAllocationStrategy, Linker, OptLevel, ProfilingStrategy};
54

65
use super::config::EnvConfig;
@@ -77,13 +76,13 @@ impl Environment {
7776
let env = self.clone();
7877
let new_module = patch_module(&data, self.config.plugins())?;
7978
// The compilation of a module is a CPU intensive tasks and can take some time.
80-
let module = task::spawn_blocking(move || {
79+
let module = async_std::task::spawn_blocking(move || {
8180
match wasmtime::Module::new(env.engine(), new_module.as_slice()) {
8281
Ok(wasmtime_module) => Ok(Module::new(data, env, wasmtime_module)),
8382
Err(err) => Err(err),
8483
}
8584
})
86-
.await??;
85+
.await?;
8786
Ok(module)
8887
}
8988

src/mailbox.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ mod tests {
151151

152152
use super::{Message, MessageMailbox};
153153

154-
#[tokio::test]
154+
#[async_std::test]
155155
async fn no_tag_signal_message() {
156156
let mailbox = MessageMailbox::default();
157157
let message = Message::Signal(None);
@@ -163,7 +163,7 @@ mod tests {
163163
}
164164
}
165165

166-
#[tokio::test]
166+
#[async_std::test]
167167
async fn tag_signal_message() {
168168
let mailbox = MessageMailbox::default();
169169
let tag = Some(1337);
@@ -173,7 +173,7 @@ mod tests {
173173
assert_eq!(message.tag(), tag);
174174
}
175175

176-
#[tokio::test]
176+
#[async_std::test]
177177
async fn selective_receive_tag_signal_message() {
178178
let mailbox = MessageMailbox::default();
179179
let tag1 = Some(1);

src/main.rs

+17-22
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use clap::{crate_version, App, Arg, ArgSettings};
55
use anyhow::{Context, Result};
66
use lunatic_runtime::{EnvConfig, Environment};
77

8-
fn main() -> Result<()> {
8+
#[async_std::main]
9+
async fn main() -> Result<()> {
910
// Init logger
1011
env_logger::init();
1112
// Parse command line arguments
@@ -60,25 +61,19 @@ fn main() -> Result<()> {
6061
}
6162
let env = Environment::new(config)?;
6263

63-
let rt = tokio::runtime::Builder::new_multi_thread()
64-
.enable_all()
65-
.build()?;
66-
67-
rt.block_on(async {
68-
// Spawn main process
69-
let path = args.value_of("wasm").unwrap();
70-
let path = Path::new(path);
71-
let module = fs::read(path)?;
72-
let module = env.create_module(module).await?;
73-
let (task, _) = module
74-
.spawn("_start", Vec::new(), None)
75-
.await
76-
.context(format!(
77-
"Failed to spawn process from {}::_start()",
78-
path.to_string_lossy()
79-
))?;
80-
// Wait on the main process to finish
81-
task.await?;
82-
Ok(())
83-
})
64+
// Spawn main process
65+
let path = args.value_of("wasm").unwrap();
66+
let path = Path::new(path);
67+
let module = fs::read(path)?;
68+
let module = env.create_module(module).await?;
69+
let (task, _) = module
70+
.spawn("_start", Vec::new(), None)
71+
.await
72+
.context(format!(
73+
"Failed to spawn process from {}::_start()",
74+
path.to_string_lossy()
75+
))?;
76+
// Wait on the main process to finish
77+
task.await;
78+
Ok(())
8479
}

src/message.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
sync::Arc,
1111
};
1212

13-
use tokio::{net::TcpStream, sync::Mutex};
13+
use async_std::net::TcpStream;
1414

1515
use crate::Process;
1616

@@ -65,7 +65,7 @@ impl DataMessage {
6565
}
6666

6767
/// Adds a TCP stream to the message and returns the index of it inside of the message
68-
pub fn add_tcp_stream(&mut self, tcp_stream: Arc<Mutex<TcpStream>>) -> usize {
68+
pub fn add_tcp_stream(&mut self, tcp_stream: TcpStream) -> usize {
6969
self.resources.push(Resource::TcpStream(tcp_stream));
7070
self.resources.len() - 1
7171
}
@@ -94,7 +94,7 @@ impl DataMessage {
9494
///
9595
/// If the index is out of bound or the resource is not a tcp stream the function will return
9696
/// None.
97-
pub fn take_tcp_stream(&mut self, index: usize) -> Option<Arc<Mutex<TcpStream>>> {
97+
pub fn take_tcp_stream(&mut self, index: usize) -> Option<TcpStream> {
9898
if let Some(resource_ref) = self.resources.get_mut(index) {
9999
let resource = std::mem::replace(resource_ref, Resource::None);
100100
match resource {
@@ -147,12 +147,12 @@ impl Read for DataMessage {
147147
}
148148
}
149149

150-
/// A resource ([`WasmProcess`](crate::WasmProcess), [`TcpStream`](tokio::net::TcpStream),
150+
/// A resource ([`WasmProcess`](crate::WasmProcess), [`TcpStream`](async_std::net::TcpStream),
151151
/// ...) that is attached to a [`DataMessage`].
152152
pub enum Resource {
153153
None,
154154
Process(Arc<dyn Process>),
155-
TcpStream(Arc<Mutex<TcpStream>>),
155+
TcpStream(TcpStream),
156156
}
157157

158158
impl Debug for Resource {

0 commit comments

Comments
 (0)