diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..5daab017 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,5 @@ +[build] +rustflags = [ + "-C", + "force-frame-pointers=y", +] diff --git a/Cargo.toml b/Cargo.toml index c657cc78..0d5ae04f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,5 @@ [workspace] -members = ["store", "crypto", "network", "mempool", "consensus", "node"] \ No newline at end of file +members = ["store", "crypto", "network", "mempool", "consensus", "node", "profile"] + +[profile.release] +debug = true diff --git a/benchmark/async_profile.py b/benchmark/async_profile.py new file mode 100644 index 00000000..597d6426 --- /dev/null +++ b/benchmark/async_profile.py @@ -0,0 +1,179 @@ +import re +import sys + +log_regexp = "^\[.*T(.*):(.*):(.*)\.(.*)Z DEBUG .*\] (.*)$" +log_lines = re.findall(log_regexp, open(sys.argv[1]).read(), re.M) + +print(f"Log lines {len(log_lines)}") + +task_names = {} +task_parents = {} +task_wake = {} +task_states = {} + + +class TaskState: + + def __init__(self, no, time): + self.no = no + self.tot_time = 0 + self.blocked_time = 0 + self.pending_time = 0 + self.running_time = 0 + + # inner + self.prev_time = time + self.can_work = True + self.running = False + self.never_wake = True + self.calls = 0 + + def node_label(self): + name = self.name.split(":")[0] + if self.never_wake or self.calls == 0 or not self.to_print(): + return name + + per_call = (self.running_time + self.pending_time) / self.calls + return f"{name} | {self.running_time}ms / {self.calls} | ({int(per_call*1000):}us P:{self.pending_time / (self.running_time + self.pending_time):.2%})" + + def to_print(self): + return (self.running_time + self.pending_time > 10) or self.never_wake + + def summary(self): + return f"R:{self.running_time:6} P:{self.pending_time:6} B:{self.blocked_time:6} {self.name} " + + def name(self, name): + self.name = name + + def resume(self, time): + assert not self.running + self.calls += 1 + period = time - self.prev_time + if self.can_work: + self.pending_time += period + else: + self.blocked_time += period + self.tot_time += period + + # reset + self.can_work = False + self.running = True + self.prev_time = time + + def pause(self, time): + assert self.running + period = time - self.prev_time + self.running_time += period + self.tot_time += period + + # reset + self.running = False + self.prev_time = time + + def signal(self, time): + self.never_wake = False + if self.can_work: + return + self.can_work = True + if not self.running: + period = time - self.prev_time + self.blocked_time += period + self.tot_time += period + + # reset + self.prev_time = time + + +for (H,M,S,Mill,line) in log_lines: + # print((H,M,S,Mill,line)) + time_ms = int(H)*60*60*1000 + int(M)*60*1000 + int(S)*1000 + int(Mill) + + # Task creation + if line[:4] == "Task": + # Define a task + match_obj = re.match("Task (.*) from (.*) defined (.*)", line) + task_no = match_obj.group(1) + parent_no = match_obj.group(2) + if parent_no == "None": + parent_no = None + else: + # Strip the Some(*) + parent_no = parent_no[5:-1] + + source = match_obj.group(3) + + task_names[task_no] = f"{source}-{task_no}" + task_parents[task_no] = parent_no + + if task_no not in task_states: + task_states[task_no] = TaskState(task_no, time_ms) + task_states[task_no].name(task_names[task_no]) + + # Wake relations + if line[:4] == "Wake": + match_obj = re.match("Wake: (.*) -> (.*)", line) + source = match_obj.group(1) + if source == "None": + source = None + else: + source = source[5:-1] + target = match_obj.group(2) + + pair = (source, target) + task_states[target].signal(time_ms) + if pair not in task_wake: + task_wake[pair] = 1 + else: + task_wake[pair] += 1 + + if line[:4] == "Paus": + task_no = line[len("Pause task: "):] + task_states[task_no].pause(time_ms) + + if line[:4] == "Resu": + task_no = line[len("Resume task: "):] + if task_no not in task_states: + task_states[task_no] = TaskState(task_no, time_ms) + task_states[task_no].resume(time_ms) + +wake_number = sum(task_wake.values()) + +show = {} + +# Make a graph of task parent relations +parent_graph = open('parentgraph.dot', 'w') +print("digraph regexp {", file=parent_graph) +print('graph [ rankdir = "LR" ];', file=parent_graph) + +for task_no in task_names: + if task_states[task_no].to_print(): + print(f'{task_no} [label="{task_states[task_no].node_label()}", shape = "record"];', file=parent_graph) + show[task_no] = True + else: + show[task_no] = False + +for task_no in task_parents: + if task_parents[task_no] is None: + continue + if task_states[task_no].to_print(): + if not show[task_parents[task_no]]: + print(f'{task_parents[task_no]} [label="{task_states[task_parents[task_no]].node_label()}", shape = "record"];', file=parent_graph) + show[task_parents[task_no]] = True + + print(f'{task_parents[task_no]} -> {task_no};', file=parent_graph) + +print(f'edge [weight=1000 style=dashed color=dimgrey]', file=parent_graph) + +for (source_no, target_no) in task_wake: + pc = task_wake[(source_no, target_no)] / wake_number + + if source_no is None: + source_no = "Env" + + if (source_no == "Env" or task_states[source_no].to_print()) and task_states[target_no].to_print(): + print(f'{source_no} -> {target_no} [label="{pc:.2%}"];', file=parent_graph) + +print("}", file=parent_graph) + +for task_no in task_states: + print(task_states[task_no].summary()) diff --git a/benchmark/fabfile.py b/benchmark/fabfile.py index 09683a55..a424f36a 100644 --- a/benchmark/fabfile.py +++ b/benchmark/fabfile.py @@ -15,10 +15,10 @@ def local(ct): bench_params = { 'nodes': 4, - 'txs': 250_000, + 'txs': 1_000_000, 'size': 512, - 'rate': 150_000, - 'duration': 20, + 'rate': 100_000, + 'duration': 60, } node_params = { 'consensus': { @@ -27,11 +27,11 @@ def local(ct): }, 'mempool': { 'queue_capacity': 10_000, - 'max_payload_size': 100_000 + 'max_payload_size': 2_000_000 } } try: - ret = LocalBench(bench_params, node_params).run(debug=False).result() + ret = LocalBench(bench_params, node_params).run(debug=True).result() print(ret) except BenchError as e: Print.error(e) diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 9e4a5bc4..5ebfb227 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -21,6 +21,7 @@ store = { path = "../store" } crypto = { path = "../crypto" } network = { path = "../network" } mempool = { path = "../mempool" } +profile = { path = "../profile" } [dev-dependencies] -rand = "0.7.3" \ No newline at end of file +rand = "0.7.3" diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index e76b8cef..a86f8aeb 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -11,6 +11,10 @@ use network::{NetReceiver, NetSender}; use store::Store; use tokio::sync::mpsc::{channel, Sender}; +use profile::pspawn; +use profile::*; + + #[cfg(test)] #[path = "tests/consensus_tests.rs"] pub mod consensus_tests; @@ -36,12 +40,12 @@ impl Consensus { x })?; let network_receiver = NetReceiver::new(address, tx_core.clone()); - tokio::spawn(async move { + pspawn!("Net-Receiver", { network_receiver.run().await; }); let mut network_sender = NetSender::new(rx_network); - tokio::spawn(async move { + pspawn!("Net-Sender", { network_sender.run().await; }); @@ -76,7 +80,7 @@ impl Consensus { /* network_channel */ tx_network, commit_channel, ); - tokio::spawn(async move { + pspawn!("Consensus-Core", { core.run().await; }); diff --git a/consensus/src/mempool.rs b/consensus/src/mempool.rs index 1d965b55..2419a928 100644 --- a/consensus/src/mempool.rs +++ b/consensus/src/mempool.rs @@ -10,6 +10,9 @@ use std::collections::HashMap; use store::Store; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use profile::pspawn; +use profile::*; + type DriverMessage = (Vec, Block, Receiver<()>); pub struct MempoolDriver { @@ -22,7 +25,7 @@ impl MempoolDriver { pub fn new(mempool: Mempool, core_channel: Sender, store: Store) -> Self { let (tx_inner, mut rx_inner): (_, Receiver) = channel(1000); let mut waiting = FuturesUnordered::new(); - tokio::spawn(async move { + pspawn!("Mempool", { loop { tokio::select! { Some((wait_on, block, handler)) = rx_inner.recv().fuse() => { diff --git a/consensus/src/synchronizer.rs b/consensus/src/synchronizer.rs index 336ce13c..b5ad6092 100644 --- a/consensus/src/synchronizer.rs +++ b/consensus/src/synchronizer.rs @@ -14,6 +14,9 @@ use std::collections::HashSet; use store::Store; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use profile::pspawn; +use profile::*; + #[cfg(test)] #[path = "tests/synchronizer_tests.rs"] pub mod synchronizer_tests; @@ -37,7 +40,7 @@ impl Synchronizer { timer.schedule(sync_retry_delay, true).await; let store_copy = store.clone(); - tokio::spawn(async move { + pspawn!("Synchronizer", { let mut waiting = FuturesUnordered::new(); let mut pending = HashSet::new(); let mut requests = HashSet::new(); diff --git a/consensus/src/timer.rs b/consensus/src/timer.rs index 7ea0e734..0301e0ed 100644 --- a/consensus/src/timer.rs +++ b/consensus/src/timer.rs @@ -6,6 +6,9 @@ use std::time::Duration; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::sleep; +use profile::pspawn; +use profile::*; + #[cfg(test)] #[path = "tests/timer_tests.rs"] pub mod timer_tests; @@ -26,7 +29,7 @@ impl Timer { pub fn new() -> Self { let (tx_notifier, rx_notifier) = channel(100); let (tx_inner, mut rx_inner): (Sender>, _) = channel(100); - tokio::spawn(async move { + pspawn!("Timer", { let mut waiting = FuturesUnordered::new(); let mut pending = HashMap::new(); loop { diff --git a/crypto/Cargo.toml b/crypto/Cargo.toml index b5c20ae9..33262e3e 100644 --- a/crypto/Cargo.toml +++ b/crypto/Cargo.toml @@ -10,4 +10,6 @@ tokio = { version = "1.1.0", features = ["sync", "rt", "macros"] } ed25519-dalek = { version = "1.0.1", features = ["batch"] } serde = { version = "1.0", features = ["derive"] } rand = "0.7.3" -base64 = "0.13.0" \ No newline at end of file +base64 = "0.13.0" + +profile = { path = "../profile" } diff --git a/crypto/src/lib.rs b/crypto/src/lib.rs index 16fb1093..da769b2c 100644 --- a/crypto/src/lib.rs +++ b/crypto/src/lib.rs @@ -10,6 +10,9 @@ use std::fmt; use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::oneshot; +use profile::pspawn; +use profile::*; + #[cfg(test)] #[path = "tests/crypto_tests.rs"] pub mod crypto_tests; @@ -211,7 +214,7 @@ pub struct SignatureService { impl SignatureService { pub fn new(secret: SecretKey) -> Self { let (tx, mut rx): (Sender<(_, oneshot::Sender<_>)>, _) = channel(100); - tokio::spawn(async move { + pspawn!("Signature-Service", { while let Some((digest, sender)) = rx.recv().await { let signature = Signature::new(&digest, &secret); let _ = sender.send(signature); diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index fc6429e2..d3f52c94 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -20,6 +20,7 @@ futures = "0.3.8" crypto = { path = "../crypto" } store = { path = "../store" } network = { path = "../network" } +profile = { path = "../profile" } [dev-dependencies] -rand = "0.7.3" \ No newline at end of file +rand = "0.7.3" diff --git a/mempool/src/front.rs b/mempool/src/front.rs index 223d277b..ece4a9aa 100644 --- a/mempool/src/front.rs +++ b/mempool/src/front.rs @@ -6,6 +6,9 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::Sender; use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use profile::pspawn; +use profile::*; + pub struct Front { address: SocketAddr, deliver: Sender, @@ -38,7 +41,7 @@ impl Front { } async fn spawn_worker(socket: TcpStream, peer: SocketAddr, deliver: Sender) { - tokio::spawn(async move { + pspawn!("Front-Worker", { let mut transport = Framed::new(socket, LengthDelimitedCodec::new()); while let Some(frame) = transport.next().await { match frame { diff --git a/mempool/src/simple.rs b/mempool/src/simple.rs index 196dda53..88083162 100644 --- a/mempool/src/simple.rs +++ b/mempool/src/simple.rs @@ -11,6 +11,9 @@ use store::Store; use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::oneshot; +use profile::pspawn; +use profile::*; + #[cfg(test)] #[path = "tests/mempool_tests.rs"] pub mod mempool_tests; @@ -45,7 +48,7 @@ impl SimpleMempool { })?; let front = Front::new(address, tx_client); - tokio::spawn(async move { + pspawn!("Simple-Mempool-Front", { front.run().await; }); @@ -55,12 +58,12 @@ impl SimpleMempool { x })?; let network_receiver = NetReceiver::new(address, tx_core); - tokio::spawn(async move { + pspawn!("Simple-Mempool-Receiver", { network_receiver.run().await; }); let mut network_sender = NetSender::new(rx_network); - tokio::spawn(async move { + pspawn!("Simple-Mempool-Sender", { network_sender.run().await; }); @@ -76,7 +79,7 @@ impl SimpleMempool { /* client_channel */ rx_client, /* network_channel */ tx_network, ); - tokio::spawn(async move { + pspawn!("Core-Runner", { core.run().await; }); diff --git a/network/Cargo.toml b/network/Cargo.toml index 57497d47..e352f983 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -13,4 +13,6 @@ bytes = "1.0.1" bincode = "1.3.1" log = "0.4.0" futures = "0.3.8" -serde = "1.0" \ No newline at end of file +serde = "1.0" + +profile = { path = "../profile" } diff --git a/network/src/lib.rs b/network/src/lib.rs index 903b31d8..c1c29e8c 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -11,6 +11,9 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use profile::pspawn; +use profile::*; + #[cfg(test)] #[path = "tests/network_tests.rs"] pub mod network_tests; @@ -60,7 +63,7 @@ impl NetSender { async fn spawn_worker(address: SocketAddr) -> Sender { // Each worker handle a TCP connection with on address. let (tx, mut rx) = channel(1000); - tokio::spawn(async move { + pspawn!(format!("Net-Sender-{:?}", address), { let stream = match TcpStream::connect(address).await { Ok(stream) => { info!("Outgoing connection established with {}", address); @@ -118,7 +121,7 @@ impl NetReceiver { } async fn spawn_worker(socket: TcpStream, peer: SocketAddr, deliver: Sender) { - tokio::spawn(async move { + pspawn!(format!("Net-Receiver-{:?}", peer), { let mut transport = Framed::new(socket, LengthDelimitedCodec::new()); while let Some(frame) = transport.next().await { match frame diff --git a/node/Cargo.toml b/node/Cargo.toml index 07e24ca8..7e545b6f 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -24,11 +24,12 @@ crypto = { path = "../crypto" } store = { path = "../store" } consensus = { path = "../consensus" } mempool = { path = "../mempool" } +profile = { path = "../profile" } [features] benchmark = [] -[[bin]] -name = "client" -path = "src/client.rs" -required-features = ["benchmark"] \ No newline at end of file +[[bin]] +name = "client" +path = "src/client.rs" +required-features = ["benchmark"] diff --git a/node/src/main.rs b/node/src/main.rs index 6c49cf6c..12443ee9 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -13,8 +13,12 @@ use mempool::Committee as MempoolCommittee; use std::fs; use tokio::task::JoinHandle; +use profile::pspawn; +use profile::*; + #[tokio::main] async fn main() { + let matches = App::new(crate_name!()) .version(crate_version!()) .about("A research implementation of the HostStuff protocol.") @@ -60,17 +64,22 @@ async fn main() { } } ("run", Some(subm)) => { - let key_file = subm.value_of("keys").unwrap(); - let committee_file = subm.value_of("committee").unwrap(); - let parameters_file = subm.value_of("parameters"); - let store_path = subm.value_of("store").unwrap(); - match Node::new(committee_file, key_file, store_path, parameters_file).await { - Ok(mut node) => { - // Sink the commit channel. - while node.commit.recv().await.is_some() {} + let subm = subm.clone(); + let handle = pspawn!("Main", { + let key_file = subm.value_of("keys").unwrap(); + let committee_file = subm.value_of("committee").unwrap(); + let parameters_file = subm.value_of("parameters"); + let store_path = subm.value_of("store").unwrap(); + match Node::new(committee_file, key_file, store_path, parameters_file).await { + Ok(mut node) => { + // Sink the commit channel. + while node.commit.recv().await.is_some() {} + } + Err(e) => error!("{}", e), } - Err(e) => error!("{}", e), - } + }); + handle.await; + return; } ("deploy", Some(subm)) => { let nodes = subm.value_of("nodes").unwrap(); @@ -86,6 +95,7 @@ async fn main() { } _ => unreachable!(), } + } fn deploy_testbed(nodes: usize) -> Result>, Box> { diff --git a/profile/Cargo.toml b/profile/Cargo.toml new file mode 100644 index 00000000..4e110f87 --- /dev/null +++ b/profile/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "profile" +version = "0.1.0" +authors = ["George "] +edition = "2018" +publish = false + +[dependencies] +tokio = { version = "1.1.0", features = ["time", "macros", "net", "rt-multi-thread"] } +tokio-util = { version = "0.6.2", features= ["codec"] } +log = "0.4.0" +futures = "0.3.8" +env_logger = "0.8.2" +pin-project-lite = "0.2" diff --git a/profile/src/lib.rs b/profile/src/lib.rs new file mode 100644 index 00000000..35f02373 --- /dev/null +++ b/profile/src/lib.rs @@ -0,0 +1,185 @@ +use std::marker::Unpin; +use std::pin::Pin; +use futures::task::{Context, Poll}; +use futures::prelude::*; +use pin_project_lite::pin_project; +use std::thread_local; +use std::cell::RefCell; +use futures::task::ArcWake; +use std::task::{Waker,}; +use std::sync::Arc; +use futures::task::waker; +pub use std::sync::atomic::{AtomicUsize, Ordering}; +pub use log::*; + + +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +pub static TASK_NUM: AtomicUsize = AtomicUsize::new(0); + +thread_local! { + pub static CURRENT_PROFILE_TASK: RefCell> = RefCell::new(None); +} + +pub struct ProfileWaker { + inner_waker : Waker, + from: usize, +} + +impl ArcWake for ProfileWaker { + fn wake(self: Arc) { + + let mut this_name = None; + CURRENT_PROFILE_TASK.with(|f| { + this_name = f.borrow().clone(); + }); + + debug!(" Wake: {:?} -> {:?}", this_name, self.from); + self.inner_waker.clone().wake(); + } + + fn wake_by_ref(arc_self: &Arc){ + + let mut this_name = None; + CURRENT_PROFILE_TASK.with(|f| { + this_name = f.borrow().clone(); + }); + + debug!(" Wake: {:?} -> {:?}", this_name, arc_self.from); + arc_self.inner_waker.clone().wake(); + } +} + +pin_project! { + pub struct ProfiledTask { + name: usize, + #[pin] + fut: Fut, + } +} + +impl ProfiledTask { + pub fn new_waker(&self, cx: &mut Context<'_>) -> Waker { + let pw = Arc::new(ProfileWaker { + inner_waker : cx.waker().clone(), + from : self.name, + }); + waker(pw) + } + +} + +impl ProfiledTask { + pub fn new(name: usize, fut: Fut) -> ProfiledTask { + ProfiledTask { + name, + fut, + } + } +} + +impl Future for ProfiledTask { + type Output = Fut::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll{ + + let mut old_name = None; + CURRENT_PROFILE_TASK.with(|f| { + old_name = f.borrow().clone(); + }); + + // Set the name of the task in thread + CURRENT_PROFILE_TASK.with(|f| { + *f.borrow_mut() = Some(self.name); + }); + debug!(" Resume task: {:?}", self.name); + + // Make a waker with name + let w = self.new_waker(cx); + let mut ct = Context::from_waker(&w); + + let ret = self.as_mut().project().fut.poll(&mut ct); + + // Set the name of the task in thread, back to None + CURRENT_PROFILE_TASK.with(|f| { + *f.borrow_mut() = old_name; + }); + debug!(" Pause task: {:?}", self.name); + + ret + } + +} + +pub fn profile_name_thread(name : &str) { + let num = TASK_NUM.fetch_add(1, Ordering::Relaxed); + + // Set the name of the task in thread + CURRENT_PROFILE_TASK.with(|f| { + *f.borrow_mut() = Some(num); + }); + debug!(" Task {:?} from None defined {}", num, name); + +} + +#[macro_export] +macro_rules! pspawn { + ($task_name:expr, $x:block) => { + + { + let num = TASK_NUM.fetch_add(1, Ordering::Relaxed); + let _h2 = tokio::spawn(ProfiledTask::new(num, async move { + $x + })); + + let mut this_name = None; + CURRENT_PROFILE_TASK.with(|f| { + this_name = f.borrow().clone(); + }); + + let current_file = file!(); + let current_line = line!(); + debug!(" Task {:?} from {:?} defined {}:{}:{}", num, this_name, $task_name, current_file, current_line); + // Potentially dump a back-trace here to help identify deeper dependencies? + + _h2 + } + + }; +} + +// Tokio testing reactor loop (single thread) +#[tokio::test] +async fn test_profile() { + + let (mut tx, mut rx) = channel(10); + + + let h1 = tokio::spawn(ProfiledTask::new(0, async move { + let _x = rx.recv().await; + })); + + let _h2 = tokio::spawn(ProfiledTask::new(1, async move { + let _x = tx.send(10usize).await; + })); + + h1.await.unwrap(); + +} + +#[tokio::test] +async fn test_profile_macro() { + + let (mut tx, mut rx) = channel(10); + + let h1 = pspawn!("Name1", { + let _x = rx.recv().await; + }); + + let _h2 = pspawn!("Name2", { + let _x = tx.send(10usize).await; + }); + + h1.await.unwrap(); + +} diff --git a/store/Cargo.toml b/store/Cargo.toml index 026c30b4..dedc7732 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -7,4 +7,6 @@ publish = false [dependencies] rocksdb = "0.15.0" -tokio = { version = "1.1.0", features = ["sync", "macros", "rt"] } \ No newline at end of file +tokio = { version = "1.1.0", features = ["sync", "macros", "rt"] } + +profile = { path = "../profile" } diff --git a/store/src/lib.rs b/store/src/lib.rs index dbbe33c4..204c4f4c 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -2,6 +2,9 @@ use std::collections::{HashMap, VecDeque}; use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::oneshot; +use profile::pspawn; +use profile::*; + #[cfg(test)] #[path = "tests/store_tests.rs"] pub mod store_tests; @@ -28,7 +31,7 @@ impl Store { let db = rocksdb::DB::open_default(path)?; let mut obligations = HashMap::<_, VecDeque>>::new(); let (tx, mut rx) = channel(100); - tokio::spawn(async move { + pspawn!("Store", { while let Some(command) = rx.recv().await { match command { StoreCommand::Write(key, value, sender) => {