Skip to content

Commit a3d20f9

Browse files
authored
feat: refactored coroutine pipeline so nothing can get stuck (#166)
* big refactor (that's broken) * feat: refactored coroutine pipeline so nothing can get stuck - added lots of gucci comments - nearly lost my mind understanding async rust * aligned some terminology
1 parent b2f7371 commit a3d20f9

12 files changed

Lines changed: 502 additions & 426 deletions

bin/onlyswaps-verifier/src/app.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use crate::chain_state::NetworkBus;
2+
use crate::channel_manager::TaskManager;
3+
use crate::config::AppConfig;
4+
use crate::control_plane::DefaultControlPlane;
5+
use crate::retry_runtime::RetryScheduler;
6+
use crate::verification_events::EventManagement;
7+
use futures::{StreamExt, stream};
8+
use std::sync::Arc;
9+
10+
pub(crate) struct App {}
11+
12+
impl App {
13+
pub async fn start(app_config: &AppConfig) -> anyhow::Result<()> {
14+
// the `network_bus` manages access to all the chains at once for pulling state or submitting txs
15+
let network_bus = Arc::new(NetworkBus::new(app_config).await?);
16+
17+
// the `control_plane` provides the app phases each request goes through; consider it the
18+
// inner circle of the domain-driven-design nested circles.
19+
// Each validation goes through the following phases:
20+
// - receive (event appears from the chain)
21+
// - resolve (we resolve the state relating to that requestId)
22+
// - evaluate (we evaluate the resolved state to see if it's a complete swap)
23+
// - sign (we sign the necessary parameters of verified swaps)
24+
// - submit (we ship the payload back to the contract)
25+
// and optionally error handling (we can stage many of the errors for retry)
26+
let control_plane = DefaultControlPlane::new(app_config, network_bus.clone()).await?;
27+
28+
// the `task_manager` connects all the above phases to one another and the outside world
29+
// and manages sending errors on the right channels. It abstracts movement between
30+
// phases to make the control plane easier to test
31+
let task_manager = TaskManager::new(control_plane);
32+
33+
// `EventManagement` sets up some streaming-related structs; there are some weird semantics
34+
// around dropping that make it a nightmare to actually hide anything away in its implementation,
35+
// hence why so much of the streaming logic is in this function :(
36+
let EventManagement {
37+
omnievent,
38+
event_ids,
39+
} = EventManagement::new(app_config).await?;
40+
//
41+
let live_stream = omnievent
42+
.get_ethereum_multi_event_stream(event_ids.clone())
43+
.await?
44+
.filter_map(|maybe_event| async move {
45+
match maybe_event {
46+
Ok(event) => match event.data.try_into() {
47+
Ok(verification) => Some(verification),
48+
_ => {
49+
tracing::warn!("received an invalid RPC event");
50+
None
51+
}
52+
},
53+
_ => None,
54+
}
55+
});
56+
57+
// the `retry_scheduler` allows errors at any phase to drop back in at the relevant stage.
58+
// e.g. if an RPC is down during submission, it may be possible to just resubmit the
59+
// verified signature in a short while rather than pull all the state again
60+
let retry_scheduler = RetryScheduler::new(app_config);
61+
let retry_tx = retry_scheduler.tx();
62+
let retry_stream = retry_scheduler.into_stream();
63+
64+
// we get outstanding pending verifiations that might have accumulated while our
65+
// node was down. It's not a 'stream' in the traditional sense, hence we have to do some
66+
// iter -> chain magic below
67+
let pending_verifications = network_bus
68+
.fetch_pending_verifications()
69+
.await
70+
.unwrap_or_default();
71+
72+
let live_streams = stream::select(live_stream, retry_stream);
73+
let stream = stream::iter(pending_verifications).chain(live_streams);
74+
75+
// in order to allow the `task_manager` to send retries back 'up the funnel', we pass in a
76+
// tx "Sender" into its `run` method.
77+
task_manager.run(retry_tx, Box::pin(stream)).await;
78+
79+
// if the `run` function ever ends, it's night night
80+
anyhow::bail!("onlyswaps closed unexpectedly")
81+
}
82+
}

bin/onlyswaps-verifier/src/chain_state.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::chain_state_pending::{RequestId, Verification, extract_pending_verifications};
2-
use crate::config::TimeoutConfig;
3-
use crate::signing::VerifiedSwap;
2+
use crate::config::{AppConfig, TimeoutConfig};
3+
use crate::signing::SignedVerification;
44
use alloy::network::EthereumWallet;
55
use alloy::primitives::{Address, Bytes, FixedBytes};
66
use alloy::providers::{DynProvider, Provider, ProviderBuilder, WsConnect};
@@ -33,16 +33,14 @@ pub(crate) struct Network<P> {
3333
}
3434

3535
impl NetworkBus<DynProvider> {
36-
pub async fn new(
37-
eth_private_key: impl Into<Arc<PrivateKeySigner>>,
38-
network_configs: &[NetworkConfig],
39-
timeout_config: &TimeoutConfig,
40-
) -> anyhow::Result<Self> {
41-
let private_key = eth_private_key.into();
36+
pub async fn new(app_config: &AppConfig) -> anyhow::Result<Self> {
37+
let private_key = PrivateKeySigner::from_slice(app_config.eth_private_key.as_slice())?;
38+
let private_key = Arc::new(private_key);
4239
let mut networks = HashMap::new();
4340

44-
for config in network_configs.iter() {
45-
let network = Network::new(private_key.clone(), config, timeout_config.clone()).await?;
41+
for config in app_config.networks.iter() {
42+
let network =
43+
Network::new(private_key.clone(), config, app_config.timeout.clone()).await?;
4644
networks.insert(config.chain_id, network);
4745
}
4846

@@ -84,7 +82,7 @@ impl NetworkBus<DynProvider> {
8482

8583
pub(crate) async fn submit_verification(
8684
&self,
87-
verified_swap: &VerifiedSwap,
85+
verified_swap: &SignedVerification,
8886
) -> anyhow::Result<()> {
8987
let chain_id: u64 = verified_swap.src_chain_id.try_into()?;
9088
let transport = self.networks.get(&chain_id).ok_or(anyhow!(
@@ -98,11 +96,10 @@ impl NetworkBus<DynProvider> {
9896

9997
impl Network<DynProvider> {
10098
pub async fn new(
101-
signer: impl Into<Arc<PrivateKeySigner>>,
99+
signer: Arc<PrivateKeySigner>,
102100
config: &NetworkConfig,
103101
timeout_config: TimeoutConfig,
104102
) -> anyhow::Result<Self> {
105-
let signer = signer.into();
106103
let url = config.rpc_url.clone();
107104
let own_addr = signer.address();
108105
let provider = ProviderBuilder::new()
@@ -178,7 +175,10 @@ impl<P: Provider> Network<P> {
178175
.await?)
179176
}
180177

181-
pub async fn submit_verified_swap(&self, verified_swap: &VerifiedSwap) -> anyhow::Result<()> {
178+
pub async fn submit_verified_swap(
179+
&self,
180+
verified_swap: &SignedVerification,
181+
) -> anyhow::Result<()> {
182182
// nodes can be configured not to write the signature to save gas
183183
if !self.should_write {
184184
return Ok(());
@@ -196,7 +196,7 @@ impl<P: Provider> Network<P> {
196196
}
197197
}
198198

199-
async fn rebalance(&self, verified_swap: &VerifiedSwap) -> anyhow::Result<()> {
199+
async fn rebalance(&self, verified_swap: &SignedVerification) -> anyhow::Result<()> {
200200
let tx = self
201201
.router
202202
.rebalanceSolver(

bin/onlyswaps-verifier/src/chain_state_pending.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ enum Status {
8888
}
8989
pub type RequestId = FixedBytes<32>;
9090

91-
#[derive(Debug, PartialEq, Eq, Hash)]
91+
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
9292
pub struct Verification<ID> {
9393
pub dest_chain_id: u64,
9494
pub request_id: ID,
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
use crate::chain_state_pending::{RequestId, Verification};
2+
use crate::control_plane::{ControlPlane, ResolvedState, VerificationError};
3+
use crate::retry_runtime::RetrySender;
4+
use crate::signing::SignedVerification;
5+
use futures::Stream;
6+
use std::pin::Pin;
7+
use std::sync::Arc;
8+
use tokio::sync::mpsc::unbounded_channel;
9+
use tokio::task::JoinSet;
10+
use tokio_stream::StreamExt;
11+
12+
pub(crate) struct TaskManager<CP> {
13+
control_plane: Arc<CP>,
14+
}
15+
16+
impl<CP: ControlPlane> TaskManager<CP>
17+
where
18+
CP: Send + Sync + 'static,
19+
{
20+
pub fn new(control_plane: impl Into<Arc<CP>>) -> Self {
21+
Self {
22+
control_plane: control_plane.into(),
23+
}
24+
}
25+
26+
// `run` spawns a coroutine for each of the steps of processing, and each
27+
// request also gets its own sub(?)-coroutine to ensure that no slow RPC or
28+
// dodgy request can block the pipeline for other valid ones
29+
pub async fn run(
30+
&self,
31+
retry_tx: RetrySender,
32+
mut event_stream: Pin<Box<impl Stream<Item = Verification<RequestId>> + Send + 'static>>,
33+
) {
34+
tracing::info!("starting channel manager");
35+
let mut tasks = JoinSet::new();
36+
let control_plane = self.control_plane.clone();
37+
38+
let (tx_verifications, mut rx_verifications) =
39+
unbounded_channel::<Verification<RequestId>>();
40+
let (tx_resolve, mut rx_resolve) = unbounded_channel::<ResolvedState>();
41+
let (tx_sign, mut rx_sign) = unbounded_channel::<ResolvedState>();
42+
let (tx_submit, mut rx_submit) = unbounded_channel::<SignedVerification>();
43+
let (tx_done, mut rx_done) = unbounded_channel::<SignedVerification>();
44+
let (tx_err, mut rx_err) = unbounded_channel::<VerificationError>();
45+
46+
// 'receive' step
47+
{
48+
let tx_verifications = tx_verifications.clone();
49+
tasks.spawn(async move {
50+
while let Some(verification) = event_stream.next().await {
51+
tx_verifications
52+
.send(verification)
53+
.expect("failed to send verification on channel");
54+
}
55+
});
56+
}
57+
58+
// 'resolve' step
59+
{
60+
let control_plane = control_plane.clone();
61+
let tx_resolve = tx_resolve.clone();
62+
let tx_err = tx_err.clone();
63+
64+
// resolve
65+
tasks.spawn(async move {
66+
while let Some(verification) = rx_verifications.recv().await {
67+
let control_plane = control_plane.clone();
68+
let tx_resolve = tx_resolve.clone();
69+
let tx_err = tx_err.clone();
70+
71+
tokio::spawn(async move {
72+
match control_plane.resolve_state(verification.clone()).await {
73+
Ok(v) => tx_resolve.send(v).expect("error writing on channel"),
74+
Err(_) => tx_err
75+
.send(VerificationError::Resolve(verification))
76+
.expect("error writing on channel"),
77+
}
78+
});
79+
}
80+
});
81+
}
82+
83+
// 'evaluate' step
84+
{
85+
let control_plane = control_plane.clone();
86+
let tx_err = tx_err.clone();
87+
88+
tasks.spawn(async move {
89+
while let Some(valid) = rx_resolve.recv().await {
90+
let control_plane = control_plane.clone();
91+
let tx_sign = tx_sign.clone();
92+
let tx_err = tx_err.clone();
93+
94+
tokio::spawn(async move {
95+
match control_plane.evaluate_state(valid.clone()).await {
96+
Ok(v) => tx_sign.send(v).expect("error writing on channel"),
97+
Err(_) => tx_err
98+
.send(VerificationError::Evaluate(valid))
99+
.expect("error writing on channel"),
100+
}
101+
});
102+
}
103+
});
104+
}
105+
106+
// 'sign' step
107+
{
108+
let control_plane = control_plane.clone();
109+
let tx_err = tx_err.clone();
110+
111+
tasks.spawn(async move {
112+
while let Some(state) = rx_sign.recv().await {
113+
let control_plane = control_plane.clone();
114+
let tx_submit = tx_submit.clone();
115+
let tx_err = tx_err.clone();
116+
117+
tokio::spawn(async move {
118+
match control_plane.sign_state(state.clone()).await {
119+
Ok(v) => tx_submit.send(v).expect("error writing on channel"),
120+
Err(_) => tx_err
121+
.send(VerificationError::Sign(state))
122+
.expect("error writing on channel"),
123+
}
124+
});
125+
}
126+
});
127+
}
128+
129+
// 'submit' step
130+
{
131+
let control_plane = control_plane.clone();
132+
let tx_done = tx_done.clone();
133+
let tx_err = tx_err.clone();
134+
135+
tasks.spawn(async move {
136+
while let Some(signed_verification) = rx_submit.recv().await {
137+
let control_plane = control_plane.clone();
138+
let tx_done = tx_done.clone();
139+
let tx_err = tx_err.clone();
140+
141+
tokio::spawn(async move {
142+
match control_plane
143+
.submit_state(signed_verification.clone())
144+
.await
145+
{
146+
Ok(v) => tx_done.send(v).expect("error writing on channel"),
147+
Err(_) => tx_err
148+
.send(VerificationError::Submit(signed_verification))
149+
.expect("error writing on channel"),
150+
}
151+
});
152+
}
153+
});
154+
}
155+
156+
// 'done' step
157+
tasks.spawn(async move {
158+
while let Some(state) = rx_done.recv().await {
159+
tracing::info!(
160+
chain_id = state.src_chain_id.to_string(),
161+
request_id = state.request_id.to_string(),
162+
"verification completed successfully"
163+
);
164+
}
165+
});
166+
167+
// 'error handling' step
168+
{
169+
let control_plane = control_plane.clone();
170+
let retry_tx = retry_tx.clone();
171+
172+
tasks.spawn(async move {
173+
while let Some(err) = rx_err.recv().await {
174+
control_plane.handle_error(&err, retry_tx.clone()).await;
175+
}
176+
});
177+
}
178+
179+
// join all the things so we don't cede control back to the caller
180+
tracing::info!("all tasks started");
181+
if tasks.join_next().await.is_some() {
182+
panic!("a task manager task ended, but they should run forever")
183+
}
184+
}
185+
}

0 commit comments

Comments
 (0)