Skip to content

Commit 9daae40

Browse files
plebhashShourya742
authored andcommitted
modularize LoopControl on JDS
1 parent 0d4e0cd commit 9daae40

3 files changed

Lines changed: 84 additions & 17 deletions

File tree

pool-apps/jd-server/src/lib/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ pub enum Action {
4848
Shutdown,
4949
}
5050

51+
/// Loop control signal for message-processing loops.
52+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53+
pub enum LoopControl {
54+
Continue,
55+
Break,
56+
}
57+
5158
/// Marker type for errors originating from the [`crate::job_declarator::JobDeclarator`] layer.
5259
#[derive(Debug)]
5360
pub struct JobDeclarator;

pool-apps/jd-server/src/lib/job_declarator/downstream/mod.rs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::{
88
DownstreamJobDeclarationMessage, JobDeclarationMessage, ALLOCATED_TOKEN_TIMEOUT_SECS,
99
JANITOR_INTERVAL_SECS,
1010
};
11-
use crate::{error, error::JDSResult, io_task::spawn_io_tasks};
11+
use crate::{error, error::JDSResult, error::LoopControl, io_task::spawn_io_tasks};
1212
use async_channel::{unbounded, Receiver, Sender};
1313
use bitcoin_core_sv2::job_declaration_protocol::CancellationToken;
1414
use dashmap::DashMap;
@@ -73,6 +73,39 @@ pub struct Downstream {
7373

7474
#[cfg_attr(not(test), hotpath::measure_all)]
7575
impl Downstream {
76+
fn handle_error_action(
77+
&self,
78+
context: &str,
79+
e: &error::JDSError<error::Downstream>,
80+
) -> LoopControl {
81+
match e.action {
82+
error::Action::Log => {
83+
warn!(
84+
downstream_id = self.downstream_id,
85+
error_kind = ?e.kind,
86+
"{context} returned a log-only error"
87+
);
88+
LoopControl::Continue
89+
}
90+
error::Action::Disconnect(_) => {
91+
warn!(
92+
downstream_id = self.downstream_id,
93+
error_kind = ?e.kind,
94+
"{context} requested disconnect"
95+
);
96+
LoopControl::Break
97+
}
98+
error::Action::Shutdown => {
99+
warn!(
100+
downstream_id = self.downstream_id,
101+
error_kind = ?e.kind,
102+
"{context} requested shutdown"
103+
);
104+
LoopControl::Break
105+
}
106+
}
107+
}
108+
76109
/// Creates a new [`Downstream`] and spawns its Noise I/O tasks.
77110
#[allow(clippy::too_many_arguments)]
78111
pub fn new(
@@ -171,20 +204,22 @@ impl Downstream {
171204
res = self_clone_1.handle_message_from_downstream() => {
172205
if let Err(e) = res {
173206
error!(?e, "Error handling downstream message for {downstream_id}");
174-
match e.action {
175-
error::Action::Disconnect(_) => break,
176-
error::Action::Shutdown => break,
177-
error::Action::Log => {}
207+
if let LoopControl::Break = self.handle_error_action(
208+
"Downstream::handle_message_from_downstream",
209+
&e,
210+
) {
211+
break;
178212
}
179213
}
180214
}
181215
res = self_clone_2.handle_job_declarator_message() => {
182216
if let Err(e) = res {
183217
error!(?e, "Error handling job declarator message for {downstream_id}");
184-
match e.action {
185-
error::Action::Disconnect(_) => break,
186-
error::Action::Shutdown => break,
187-
error::Action::Log => {}
218+
if let LoopControl::Break = self.handle_error_action(
219+
"Downstream::handle_job_declarator_message",
220+
&e,
221+
) {
222+
break;
188223
}
189224
}
190225
}

pool-apps/jd-server/src/lib/job_declarator/mod.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
88
use crate::{
99
error,
10-
error::{JDSError, JDSErrorKind, JDSResult},
10+
error::{JDSError, JDSErrorKind, JDSResult, LoopControl},
1111
job_declarator::{
1212
downstream::Downstream,
1313
job_validation::{JobValidationEngine, SetCustomMiningJobResult},
@@ -37,7 +37,7 @@ use stratum_apps::{
3737
utils::types::{DownstreamId, JdToken},
3838
};
3939
use tokio::net::TcpListener;
40-
use tracing::{debug, error, info};
40+
use tracing::{debug, error, info, warn};
4141

4242
// see https://github.com/stratum-mining/sv2-apps/issues/335
4343
const TEMPORARY_TIMEOUT_MULTIPLIER: u64 = 144;
@@ -147,6 +147,32 @@ impl JobDeclarator {
147147

148148
/// Generic implementation for all [`JobValidationEngine`] types.
149149
impl JobDeclarator {
150+
fn handle_error_action(
151+
&self,
152+
context: &str,
153+
e: &JDSError<error::JobDeclarator>,
154+
) -> LoopControl {
155+
match e.action {
156+
error::Action::Log => {
157+
warn!(error_kind = ?e.kind, "{context} returned a log-only error");
158+
LoopControl::Continue
159+
}
160+
error::Action::Disconnect(downstream_id) => {
161+
warn!(
162+
downstream_id,
163+
error_kind = ?e.kind,
164+
"{context} requested downstream disconnect"
165+
);
166+
self.cleanup_downstream(downstream_id);
167+
LoopControl::Continue
168+
}
169+
error::Action::Shutdown => {
170+
warn!(error_kind = ?e.kind, "{context} requested shutdown");
171+
LoopControl::Break
172+
}
173+
}
174+
}
175+
150176
/// Binds a TCP listener and spawns the accept loop that creates a `Downstream`
151177
/// for every new Noise-encrypted connection.
152178
#[allow(clippy::too_many_arguments)]
@@ -280,12 +306,11 @@ impl JobDeclarator {
280306
res = self.handle_jdp_message() => {
281307
if let Err(e) = res {
282308
error!(?e, "Error handling Job Declaration message");
283-
match e.action {
284-
error::Action::Disconnect(downstream_id) => {
285-
self.cleanup_downstream(downstream_id);
286-
}
287-
error::Action::Shutdown => break,
288-
error::Action::Log => {}
309+
if let LoopControl::Break = self.handle_error_action(
310+
"JobDeclarator::handle_jdp_message",
311+
&e,
312+
) {
313+
break;
289314
}
290315
}
291316
}

0 commit comments

Comments
 (0)