Skip to content

Commit 8c6629f

Browse files
committed
stream
1 parent 1373743 commit 8c6629f

File tree

3 files changed

+870
-610
lines changed

3 files changed

+870
-610
lines changed

crates/node/src/lib.rs

Lines changed: 65 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub trait NodeContext: Send + Sync + 'static {
7878
#[async_trait]
7979
pub trait NodeBidder<C>: Send + Sync + 'static {
8080
/// Bid on requests.
81-
async fn bid(&self, ctx: &C) -> Result<()>;
81+
async fn run_bidding_service(&self, ctx: Arc<C>) -> Result<()>;
8282
}
8383

8484
/// The prover for a node.
@@ -88,7 +88,7 @@ pub trait NodeBidder<C>: Send + Sync + 'static {
8888
#[async_trait]
8989
pub trait NodeProver<C>: Send + Sync + 'static {
9090
/// Prove requests.
91-
async fn prove(&self, ctx: &C) -> Result<()>;
91+
async fn run_proving_service(&self, ctx: Arc<C>) -> Result<()>;
9292
}
9393

9494
/// The monitor for a node.
@@ -118,48 +118,90 @@ pub struct NodeMetrics {
118118
impl<C: NodeContext, B: NodeBidder<C>, P: NodeProver<C>, M: NodeMonitor<C>> Node<C, B, P, M> {
119119
/// Run the node.
120120
pub async fn run(self) -> Result<()> {
121-
// Run the bid and prove task.
122-
let ctx = self.ctx.clone();
121+
// Run the bid task.
122+
let ctx_bidder = self.ctx.clone();
123123
let bidder = self.bidder.clone();
124-
let prover = self.prover.clone();
125-
let bid_and_prove_task = tokio::spawn(async move {
126-
let result: Result<()> = async {
127-
loop {
128-
let bid_future = bidder.bid(&ctx);
129-
let prove_future = prover.prove(&ctx);
130-
let _ = tokio::join!(bid_future, prove_future);
124+
let bid_task = tokio::spawn(async move {
125+
if let Err(e) = bidder.run_bidding_service(ctx_bidder).await {
126+
tracing::error!("Bidding service failed: {:?}", e);
127+
return Err(e);
128+
}
129+
Ok(())
130+
});
131131

132-
sleep(Duration::from_secs(3)).await;
133-
}
132+
// Run the prove task.
133+
let ctx_prover = self.ctx.clone();
134+
let prover = self.prover.clone();
135+
let prove_task = tokio::spawn(async move {
136+
if let Err(e) = prover.run_proving_service(ctx_prover).await {
137+
tracing::error!("Proving service failed: {:?}", e);
138+
return Err(e);
134139
}
135-
.await;
136-
result
140+
Ok(())
137141
});
138142

139143
// Run the system monitor task.
140-
let ctx = self.ctx.clone();
144+
let ctx_monitor = self.ctx.clone();
141145
let monitor = self.monitor.clone();
142146
let monitor_task = tokio::spawn(async move {
143147
let result: Result<()> = async {
144148
loop {
145-
monitor.record(&ctx).await?;
149+
if let Err(e) = monitor.record(&ctx_monitor).await {
150+
tracing::warn!("Monitor task record failed: {:?}", e);
151+
// Decide if this should be fatal or just log and continue
152+
}
146153
sleep(Duration::from_secs(30)).await;
147154
}
148155
}
149156
.await;
150-
result
157+
// If the loop exits, it implies an issue or it was designed to exit.
158+
// For a persistent monitor, it would ideally not exit without error.
159+
if let Err(e) = result {
160+
tracing::error!("Monitor service failed: {:?}", e);
161+
return Err(e);
162+
}
163+
Ok(())
151164
});
152165

153166
// Wait until one of the tasks fails.
154167
tokio::select! {
155-
result = bid_and_prove_task => {
156-
if let Err(e) = result {
157-
return Err(e.into());
168+
result = bid_task => {
169+
match result {
170+
Ok(Ok(())) => tracing::info!("Bidding service completed successfully."),
171+
Ok(Err(e)) => {
172+
tracing::error!("Bidding service exited with error: {:?}", e);
173+
return Err(e.into());
174+
}
175+
Err(e) => {
176+
tracing::error!("Bidding service panicked: {:?}", e);
177+
return Err(e.into());
178+
}
179+
}
180+
},
181+
result = prove_task => {
182+
match result {
183+
Ok(Ok(())) => tracing::info!("Proving service completed successfully."),
184+
Ok(Err(e)) => {
185+
tracing::error!("Proving service exited with error: {:?}", e);
186+
return Err(e.into());
187+
}
188+
Err(e) => {
189+
tracing::error!("Proving service panicked: {:?}", e);
190+
return Err(e.into());
191+
}
158192
}
159193
},
160194
result = monitor_task => {
161-
if let Err(e) = result {
162-
return Err(e.into());
195+
match result {
196+
Ok(Ok(())) => tracing::info!("Monitor service completed successfully."),
197+
Ok(Err(e)) => {
198+
tracing::error!("Monitor service exited with error: {:?}", e);
199+
return Err(e.into());
200+
}
201+
Err(e) => {
202+
tracing::error!("Monitor service panicked: {:?}", e);
203+
return Err(e.into());
204+
}
163205
}
164206
},
165207
}

0 commit comments

Comments
 (0)