Skip to content

Commit e88b571

Browse files
committed
feat(net-svc): first build of net-svc
1 parent af03d7c commit e88b571

21 files changed

Lines changed: 5263 additions & 27 deletions

File tree

Cargo.lock

Lines changed: 1379 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ members = [
88
"crates/common",
99
"crates/compute-exec",
1010
"crates/compute-fw",
11-
"crates/job/types",
11+
"crates/jobs", "crates/net-svc", "crates/net-types", "crates/net-wire",
1212
"crates/network/api",
1313
"crates/rpc/api",
1414
"crates/rpc/provider",
@@ -76,12 +76,11 @@ rand_core = "0.6"
7676
serde = { version = "1.0", features = ["derive"] }
7777
strata-codec = { git = "https://github.com/alpenlabs/strata-common.git" }
7878
thiserror = "2.0"
79-
tokio = { version = "1.48.0", features = ["rt", "sync", "macros", "time"] }
8079
tokio-util = "0.7"
8180
tracing = "0.1"
8281

8382
[profile.release]
84-
opt-level = "z" # Optimized for size, use 3 for speed
83+
opt-level = 3 # Optimized for size, use 3 for speed
8584
lto = true # Enable Link Time Optimization
8685
codegen-units = 1 # Reduced to increase optimizations
8786
panic = "unwind" # Graceful shutdown, even on panic

crates/jobs/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "jobs"
3+
version.workspace = true
4+
edition.workspace = true
5+
authors.workspace = true
6+
description.workspace = true
7+
license.workspace = true
8+
readme.workspace = true
9+
repository.workspace = true
10+
categories.workspace = true
11+
keywords.workspace = true
12+
13+
[dependencies]
14+
kanal = "0.1.1"
15+
monoio = "0.2.4"
16+
17+
# [lints]
18+
# workspace = true

crates/jobs/src/lib.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
use kanal::{AsyncReceiver, AsyncSender};
2+
use monoio::io::stream::Stream;
3+
use std::{collections::VecDeque, fmt::Debug, sync::Arc};
4+
5+
pub fn add(left: u64, right: u64) -> u64 {
6+
left + right
7+
}
8+
9+
struct IncomingNetworkMessage;
10+
struct OutgoingNetworkMessage;
11+
12+
enum Action {}
13+
14+
#[repr(i32)]
15+
#[derive(PartialEq, Eq, PartialOrd, Ord)]
16+
enum MessagePriority {
17+
Low = -1,
18+
Normal = 0,
19+
High = 1,
20+
}
21+
22+
impl Default for MessagePriority {
23+
fn default() -> Self {
24+
MessagePriority::Normal
25+
}
26+
}
27+
28+
/// Queues
29+
struct JobQueues {
30+
/// Network jobs are prioritized over other jobs. They're cheap to execute.
31+
net_jobs: VecDeque<Box<dyn Job>>,
32+
garbling_jobs: VecDeque<Box<dyn Job>>,
33+
other_jobs: VecDeque<Box<dyn Job>>,
34+
}
35+
36+
trait NetworkSendingClient {
37+
async fn send_and_wait_for_ack(
38+
&self,
39+
to: PeerId,
40+
priority: Option<MessagePriority>,
41+
message: OutgoingNetworkMessage,
42+
) -> Result<(), Error>;
43+
}
44+
45+
trait NetworkReceivingClient {
46+
type Stream: Stream<Item = (IncomingNetworkMessage,)>;
47+
async fn receive(&self) -> Result<Option, Error>;
48+
}
49+
50+
trait Acker {
51+
async fn ack(&self) -> Result<(), Error>;
52+
}
53+
54+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
55+
struct Utilisation {
56+
/// Memory usage in bytes
57+
memory: usize,
58+
/// Threads used for job execution
59+
threads: usize,
60+
}
61+
62+
impl Utilisation {
63+
fn checked_sub(&self, other: &Utilisation) -> Option<Utilisation> {
64+
if self.memory < other.memory || self.threads < other.threads {
65+
None
66+
} else {
67+
Some(Utilisation {
68+
memory: self.memory - other.memory,
69+
threads: self.threads - other.threads,
70+
})
71+
}
72+
}
73+
74+
fn checked_add(&self, other: &Utilisation, max: &Utilisation) -> Option<Utilisation> {
75+
if self.memory + other.memory > max.memory || self.threads + other.threads > max.threads {
76+
None
77+
} else {
78+
Some(Utilisation {
79+
memory: self.memory + other.memory,
80+
threads: self.threads + other.threads,
81+
})
82+
}
83+
}
84+
}
85+
86+
pub struct JobScheduler {
87+
/// Immutable context shared with all jobs.
88+
///
89+
/// We use an Arc as jobs are sent to individual threads
90+
context: Arc<JobContext>,
91+
max_util: Utilisation,
92+
cur_util: Utilisation,
93+
94+
workers: Vec<SchedulerToWorker>,
95+
}
96+
97+
struct SchedulerToWorker {
98+
to: AsyncSender<Job>,
99+
}
100+
101+
impl JobScheduler {
102+
fn available_resources(&self) -> Utilisation {
103+
self.max_util
104+
.checked_sub(&self.cur_util)
105+
.expect("Utilisation underflow")
106+
}
107+
108+
fn release_resources(&mut self, util: Utilisation) {
109+
self.cur_util = self
110+
.cur_util
111+
.checked_sub(&util)
112+
.expect("Utilisation overflow");
113+
}
114+
115+
fn use_resources(&mut self, util: Utilisation) {
116+
self.cur_util = self
117+
.cur_util
118+
.checked_add(&util, &self.max_util)
119+
.expect("Utilisation overflow");
120+
}
121+
}
122+
123+
struct JobContext {
124+
net_in: AsyncReceiver<IncomingNetworkMessage>,
125+
net_out: AsyncSender<OutgoingNetworkMessage>,
126+
127+
actions_in: AsyncReceiver<Action>,
128+
action_results_out: AsyncSender<ActionResult>,
129+
}
130+
131+
// SMExecutors send Actions to the JobScheduler
132+
// JobScheduler transforms each Action into a Job and queues it for execution
133+
134+
trait Job {
135+
type Output;
136+
137+
/// Returns an estimated memory requirement of the job in bytes.
138+
///
139+
/// This helps the JobScheduler budget memory usage.
140+
fn memory_requirement(&self) -> Option<usize>;
141+
142+
/// Executes the job with a global job context.
143+
async fn execute(&self, context: &JobContext) -> Result<Self::Output, Box<dyn Debug>>;
144+
}
145+
146+
#[cfg(test)]
147+
mod tests {
148+
use super::*;
149+
150+
#[test]
151+
fn it_works() {
152+
let result = add(2, 2);
153+
assert_eq!(result, 4);
154+
}
155+
}

crates/net-svc/Cargo.toml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
[package]
2+
name = "net-svc"
3+
version.workspace = true
4+
edition.workspace = true
5+
authors.workspace = true
6+
description.workspace = true
7+
license.workspace = true
8+
readme.workspace = true
9+
repository.workspace = true
10+
categories.workspace = true
11+
keywords.workspace = true
12+
13+
[dependencies]
14+
hashbrown = "0.16.1"
15+
kanal = "0.1.1"
16+
tokio = { version = "1.49.0", default-features = false, features = ["macros", "net", "parking_lot", "rt", "time", "io-util", "sync"] }
17+
tokio-util = { version = "0.7", features = ["rt"] }
18+
19+
# TLS / QUIC
20+
quinn = "0.11"
21+
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
22+
rcgen = { version = "0.13", default-features = false, features = ["ring"] }
23+
x509-parser = "0.17"
24+
ed25519-dalek = { version = "2", features = ["pkcs8"] }
25+
26+
# Wire format
27+
net-wire = { path = "../net-wire" }
28+
29+
# Utilities
30+
hex = "0.4"
31+
blake3 = "1"
32+
tracing = "0.1"
33+
ahash = "0.8"
34+
35+
[dev-dependencies]
36+
rand = "0.8"
37+
38+
# [lints]
39+
# workspace = true

0 commit comments

Comments
 (0)