Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,194 changes: 1,177 additions & 17 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ members = [
"crates/common",
"crates/compute-exec",
"crates/compute-fw",
"crates/job/types",
# "crates/jobs",
"crates/net-svc",
# "crates/net-types",
"crates/net-wire",
"crates/network/api",
"crates/rpc/api",
"crates/rpc/provider",
Expand Down Expand Up @@ -70,18 +73,17 @@ async-trait = "0.1"
bytes = "1.10"
futures = "0.3.31"
jsonrpsee = { version = "0.26.0", features = ["macros"] }
jsonrpsee-types = "*" # constrained by jsonrpsee dep
jsonrpsee-types = "*" # constrained by jsonrpsee dep
rand = "0.8"
rand_core = "0.6"
serde = { version = "1.0", features = ["derive"] }
strata-codec = { git = "https://github.com/alpenlabs/strata-common.git" }
thiserror = "2.0"
tokio = { version = "1.48.0", features = ["rt", "sync", "macros", "time"] }
tokio-util = "0.7"
tracing = "0.1"

[profile.release]
opt-level = "z" # Optimized for size, use 3 for speed
opt-level = 3 # Optimized for size, use 3 for speed
lto = true # Enable Link Time Optimization
codegen-units = 1 # Reduced to increase optimizations
panic = "unwind" # Graceful shutdown, even on panic
Expand Down
18 changes: 18 additions & 0 deletions crates/jobs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "jobs"
version.workspace = true
edition.workspace = true
authors.workspace = true
description.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
categories.workspace = true
keywords.workspace = true

[dependencies]
kanal = "0.1.1"
monoio = "0.2.4"

# [lints]
# workspace = true
155 changes: 155 additions & 0 deletions crates/jobs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use kanal::{AsyncReceiver, AsyncSender};
use monoio::io::stream::Stream;
use std::{collections::VecDeque, fmt::Debug, sync::Arc};

pub fn add(left: u64, right: u64) -> u64 {
left + right
}
Comment thread
AaronFeickert marked this conversation as resolved.

struct IncomingNetworkMessage;
struct OutgoingNetworkMessage;

enum Action {}

#[repr(i32)]
#[derive(PartialEq, Eq, PartialOrd, Ord)]
enum MessagePriority {
Low = -1,
Normal = 0,
High = 1,
}

impl Default for MessagePriority {
fn default() -> Self {
MessagePriority::Normal
}
}

/// Queues
struct JobQueues {
/// Network jobs are prioritized over other jobs. They're cheap to execute.
net_jobs: VecDeque<Box<dyn Job>>,
garbling_jobs: VecDeque<Box<dyn Job>>,
other_jobs: VecDeque<Box<dyn Job>>,
}

trait NetworkSendingClient {
async fn send_and_wait_for_ack(
&self,
to: PeerId,
priority: Option<MessagePriority>,
message: OutgoingNetworkMessage,
) -> Result<(), Error>;
}

trait NetworkReceivingClient {
type Stream: Stream<Item = (IncomingNetworkMessage,)>;
async fn receive(&self) -> Result<Option, Error>;
}

trait Acker {
async fn ack(&self) -> Result<(), Error>;
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct Utilisation {
/// Memory usage in bytes
memory: usize,
/// Threads used for job execution
threads: usize,
}

impl Utilisation {
fn checked_sub(&self, other: &Utilisation) -> Option<Utilisation> {
if self.memory < other.memory || self.threads < other.threads {
None
} else {
Some(Utilisation {
memory: self.memory - other.memory,
threads: self.threads - other.threads,
})
}
}

fn checked_add(&self, other: &Utilisation, max: &Utilisation) -> Option<Utilisation> {
if self.memory + other.memory > max.memory || self.threads + other.threads > max.threads {
None
} else {
Some(Utilisation {
memory: self.memory + other.memory,
threads: self.threads + other.threads,
})
}
}
}

pub struct JobScheduler {
/// Immutable context shared with all jobs.
///
/// We use an Arc as jobs are sent to individual threads
context: Arc<JobContext>,
max_util: Utilisation,
cur_util: Utilisation,

workers: Vec<SchedulerToWorker>,
}

struct SchedulerToWorker {
to: AsyncSender<Job>,
}

impl JobScheduler {
fn available_resources(&self) -> Utilisation {
self.max_util
.checked_sub(&self.cur_util)
.expect("Utilisation underflow")
}

fn release_resources(&mut self, util: Utilisation) {
self.cur_util = self
.cur_util
.checked_sub(&util)
.expect("Utilisation overflow");
}

fn use_resources(&mut self, util: Utilisation) {
self.cur_util = self
.cur_util
.checked_add(&util, &self.max_util)
.expect("Utilisation overflow");
}
}

struct JobContext {
net_in: AsyncReceiver<IncomingNetworkMessage>,
net_out: AsyncSender<OutgoingNetworkMessage>,

actions_in: AsyncReceiver<Action>,
action_results_out: AsyncSender<ActionResult>,
}

// SMExecutors send Actions to the JobScheduler
// JobScheduler transforms each Action into a Job and queues it for execution

trait Job {
type Output;

/// Returns an estimated memory requirement of the job in bytes.
///
/// This helps the JobScheduler budget memory usage.
fn memory_requirement(&self) -> Option<usize>;

/// Executes the job with a global job context.
async fn execute(&self, context: &JobContext) -> Result<Self::Output, Box<dyn Debug>>;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}
Comment thread
AaronFeickert marked this conversation as resolved.
50 changes: 50 additions & 0 deletions crates/net-svc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[package]
name = "net-svc"
version.workspace = true
edition.workspace = true
authors.workspace = true
description.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
categories.workspace = true
keywords.workspace = true

[dependencies]
hashbrown = "0.16.1"
kanal = "0.1.1"
tokio = { version = "1.49.0", default-features = false, features = [
"macros",
"net",
"parking_lot",
"rt",
"time",
"io-util",
"sync",
] }
tokio-util = { version = "0.7", features = ["rt"] }

# TLS / QUIC
ed25519-dalek = { version = "2", features = ["pkcs8"] }
quinn = "0.11"
rcgen = { version = "0.13", default-features = false, features = ["ring"] }
rustls = { version = "0.23", default-features = false, features = [
"ring",
"std",
] }
x509-parser = "0.17"

# Wire format
net-wire = { path = "../net-wire" }

# Utilities
ahash = "0.8"
blake3 = "1"
hex = "0.4"
tracing = "0.1"

[dev-dependencies]
rand = "0.8"

# [lints]
# workspace = true
Loading
Loading