Skip to content

Commit 3b56c38

Browse files
authored
Merge pull request #91 from guzman-raphael/agent_without_reservations
Minimal orchestrator agent w/o reservations
2 parents ad1e3a1 + 7e3d08c commit 3b56c38

File tree

19 files changed

+718
-23
lines changed

19 files changed

+718
-23
lines changed

.devcontainer/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ RUN \
4343
curl -LsSf https://astral.sh/uv/install.sh | sh && \
4444
uv venv -p 3.10 ~/.local/share/base && \
4545
# pip package based on C lib/client
46-
uv pip install cffi maturin ipykernel -p ~/.local/share/base && \
46+
uv pip install cffi maturin[patchelf] -p ~/.local/share/base && \
47+
# useful in examples
48+
uv pip install ipykernel eclipse-zenoh -p ~/.local/share/base && \
4749
echo '. ~/.local/share/base/bin/activate' >> ~/.bashrc
4850
ENV VIRTUAL_ENV=/home/vscode/.local/share/base
4951
CMD ["bash", "-c", "sudo rm /var/run/docker.pid; sudo dockerd"]

.devcontainer/gpu/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ RUN \
4343
curl -LsSf https://astral.sh/uv/install.sh | sh && \
4444
uv venv -p 3.10 ~/.local/share/base && \
4545
# pip package based on C lib/client
46-
uv pip install cffi maturin ipykernel -p ~/.local/share/base && \
46+
uv pip install cffi maturin[patchelf] -p ~/.local/share/base && \
47+
# useful in examples
48+
uv pip install ipykernel eclipse-zenoh -p ~/.local/share/base && \
4749
echo '. ~/.local/share/base/bin/activate' >> ~/.bashrc
4850
ENV VIRTUAL_ENV=/root/.local/share/base
4951
CMD ["bash", "-c", "sudo rm /var/run/docker.pid; sudo dockerd"]

.github/workflows/tests.yaml

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,24 @@ jobs:
4646
runs-on: ubuntu-latest
4747
steps:
4848
- uses: actions/checkout@v4
49-
- name: Install Python + components
49+
- name: Install Python, dependencies, and orcapod
5050
run: |
51-
curl -LsSf https://astral.sh/uv/install.sh | sh && \
52-
uv venv -p 3.10 ~/.local/share/base && \
53-
uv pip install cffi maturin -p ~/.local/share/base
54-
- name: Run integration tests
51+
set -e
52+
curl -LsSf https://astral.sh/uv/install.sh | sh
53+
uv venv -p 3.10 ~/.local/share/base
54+
uv pip install cffi maturin[patchelf] -p ~/.local/share/base
55+
uv pip install eclipse-zenoh -p ~/.local/share/base
56+
. ~/.local/share/base/bin/activate
57+
maturin develop --uv
58+
- name: Run smoke test
5559
env:
5660
RUST_BACKTRACE: full
5761
run: |
5862
. ~/.local/share/base/bin/activate
59-
maturin develop --uv
6063
python tests/extra/python/smoke_test.py -- tests/.tmp
64+
- name: Run agent test
65+
env:
66+
RUST_BACKTRACE: full
67+
run: |
68+
. ~/.local/share/base/bin/activate
69+
python tests/extra/python/agent_test.py -- tests/.tmp

.vscode/launch.json

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
"cwd": "${workspaceFolder}",
9696
},
9797
{
98-
"name": "Python: Debug File",
98+
"name": "Python: Debug smoke test",
9999
"type": "debugpy",
100100
"request": "launch",
101101
"console": "integratedTerminal",
@@ -106,7 +106,22 @@
106106
},
107107
"program": "tests/extra/python/smoke_test.py",
108108
"args": [
109-
"./tests/.tmp"
109+
"tests/.tmp"
110+
]
111+
},
112+
{
113+
"name": "Python: Debug agent test",
114+
"type": "debugpy",
115+
"request": "launch",
116+
"console": "integratedTerminal",
117+
"justMyCode": false,
118+
"preLaunchTask": "package_orcapod_python",
119+
"env": {
120+
"RUST_BACKTRACE": "1"
121+
},
122+
"program": "tests/extra/python/agent_test.py",
123+
"args": [
124+
"tests/.tmp"
110125
]
111126
}
112127
]

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ chrono = "0.4.39"
3737
colored = "2.1.0"
3838
# derive utilities for new types
3939
derive_more = { version = "2.0.1", features = ["display"] }
40+
# allow sync functions to be called from async using `block_on`
41+
futures-executor = "0.3.31"
4042
# chaining async calls and processing stream data in local docker orchestrator
4143
futures-util = "0.3.31"
4244
# auto derive getter access methods on structs
@@ -67,6 +69,8 @@ tokio = { version = "1.41.0", features = ["full"] }
6769
tokio-util = "0.7.13"
6870
# automated CFFI + bindings in other languages
6971
uniffi = { version = "0.29.1", features = ["cli", "tokio"] }
72+
# shared, distributed memory via communication
73+
zenoh = { version = "1.3.4" }
7074

7175
[[bin]]
7276
name = "uniffi-bindgen"

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" -- --nocapture # Rust i
1717
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --html -- --nocapture # Rust integration tests w/ HTML coverage report (target/llvm-cov/html/index.html)
1818
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --codecov --output-path target/llvm-cov-target/codecov.json -- --nocapture # Rust integration tests w/ codecov coverage report
1919
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --cobertura --output-path target/llvm-cov-target/cobertura.xml -- --nocapture # Rust integration tests w/ cobertura coverage report
20-
. ~/.local/share/base/bin/activate && maturin develop --uv && RUST_BACKTRACE=1 python tests/extra/python/smoke_test.py -- tests/.tmp # Python integration tests
20+
. ~/.local/share/base/bin/activate && maturin develop --uv && export RUST_BACKTRACE=1 && python tests/extra/python/smoke_test.py -- tests/.tmp && python tests/extra/python/agent_test.py # Python integration tests
2121
```
2222

2323
## Docs
@@ -84,7 +84,7 @@ You can easily enforce resource limits by adding the following to `devcontainer.
8484
// ..
8585
"runArgs": [
8686
// ..
87-
"--cpus=2",,
87+
"--cpus=2",
8888
"--memory=8gb",
8989
// ..
9090
],

cspell.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
"numpy",
1616
"graphviz",
1717
"uniffi",
18-
"cffi"
18+
"cffi",
19+
"zenoh",
20+
"PodJob",
1921
],
2022
"ignoreWords": [
2123
"relpath",
@@ -73,7 +75,8 @@
7375
"getset",
7476
"strsim",
7577
"getrandom",
76-
"wasi"
78+
"wasi",
79+
"patchelf"
7780
],
7881
"useGitignore": false,
7982
"ignorePaths": [

src/core/error.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
io,
1010
path::{self},
1111
};
12+
use tokio::task;
1213

1314
impl From<BollardError> for OrcaError {
1415
fn from(error: BollardError) -> Self {
@@ -70,6 +71,16 @@ impl From<serde_yaml::Error> for OrcaError {
7071
}
7172
}
7273
}
74+
impl From<task::JoinError> for OrcaError {
75+
fn from(error: task::JoinError) -> Self {
76+
Self {
77+
kind: Kind::TokioTaskJoinError {
78+
source: error,
79+
backtrace: Some(Backtrace::capture()),
80+
},
81+
}
82+
}
83+
}
7384
impl From<Kind> for OrcaError {
7485
fn from(error: Kind) -> Self {
7586
Self { kind: error }
@@ -89,7 +100,8 @@ fn format_stack(backtrace: Option<&Backtrace>) -> String {
89100
impl fmt::Debug for OrcaError {
90101
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
91102
match &self.kind {
92-
Kind::EmptyResponseWhenLoadingContainerAltImage { backtrace, .. }
103+
Kind::AgentCommunicationFailure { backtrace, .. }
104+
| Kind::EmptyResponseWhenLoadingContainerAltImage { backtrace, .. }
93105
| Kind::GeneratedNamesOverflow { backtrace, .. }
94106
| Kind::InvalidFilepath { backtrace, .. }
95107
| Kind::InvalidPodResultTerminatedDatetime { backtrace, .. }
@@ -98,13 +110,15 @@ impl fmt::Debug for OrcaError {
98110
| Kind::NoContainerNames { backtrace, .. }
99111
| Kind::NoFileName { backtrace, .. }
100112
| Kind::NoMatchingPodRun { backtrace, .. }
113+
| Kind::NoRemainingServices { backtrace, .. }
101114
| Kind::NoTagFoundInContainerAltImage { backtrace, .. }
102115
| Kind::BollardError { backtrace, .. }
103116
| Kind::GlobPatternError { backtrace, .. }
104117
| Kind::IoError { backtrace, .. }
105118
| Kind::PathPrefixError { backtrace, .. }
106119
| Kind::SerdeJsonError { backtrace, .. }
107-
| Kind::SerdeYamlError { backtrace, .. } => {
120+
| Kind::SerdeYamlError { backtrace, .. }
121+
| Kind::TokioTaskJoinError { backtrace, .. } => {
108122
write!(f, "{}{}", self.kind, format_stack(backtrace.as_ref()))
109123
}
110124
}

src/core/orchestrator/agent.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
use crate::uniffi::{
2+
error::{OrcaError, Result, selector},
3+
model::{PodJob, PodResult},
4+
orchestrator::agent::{Agent, AgentClient},
5+
store::ModelID,
6+
};
7+
use chrono::Utc;
8+
use futures_util::future::FutureExt as _;
9+
use regex::Regex;
10+
use serde::{Deserialize, Serialize};
11+
use snafu::{OptionExt as _, ResultExt as _};
12+
use std::{
13+
collections::HashMap,
14+
path::PathBuf,
15+
sync::{Arc, LazyLock},
16+
};
17+
use tokio::{
18+
sync::mpsc::{self, error::SendError},
19+
task::JoinSet,
20+
};
21+
use tokio_util::task::TaskTracker;
22+
23+
#[expect(clippy::expect_used, reason = "Valid static regex")]
24+
static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
25+
Regex::new(
26+
r"(?x)
27+
^
28+
group\/(?<group>[a-z_]+)\/
29+
(?<action>request|reservation|success|failure)\/
30+
pod_job\/(?<pod_job_hash>[0-9a-f]+)\/
31+
host\/(?<host>[a-z_]+)\/
32+
timestamp\/(?<timestamp>.*?)
33+
$
34+
",
35+
)
36+
.expect("Invalid PodJob action regex.")
37+
});
38+
39+
#[expect(
40+
dead_code,
41+
reason = "Need to be able to initialize to pass metadata as input."
42+
)]
43+
#[derive(Debug, Clone)]
44+
pub struct EventMetadata {
45+
group: String,
46+
host: String,
47+
subgroup: String,
48+
}
49+
50+
#[expect(
51+
dead_code,
52+
reason = "Need to be able to initialize to pass metadata as input."
53+
)]
54+
#[derive(Debug, Clone)]
55+
pub enum EventPayload {
56+
Request(PodJob),
57+
Reservation(ModelID),
58+
Success(PodResult),
59+
Failure(PodResult),
60+
}
61+
62+
#[expect(
63+
dead_code,
64+
reason = "Need to be able to initialize to pass metadata as input."
65+
)]
66+
#[derive(Debug, Clone)]
67+
pub struct Event {
68+
metadata: EventMetadata,
69+
payload: EventPayload,
70+
}
71+
72+
impl AgentClient {
73+
pub(crate) async fn publish<T>(&self, topic: &str, payload: &T) -> Result<()>
74+
where
75+
T: Serialize + Sync + ?Sized,
76+
{
77+
Ok(self
78+
.session
79+
.put(
80+
format!(
81+
"group/{}/{}/host/{}/timestamp/{}",
82+
self.group,
83+
topic,
84+
self.host,
85+
Utc::now().to_rfc3339()
86+
),
87+
&serde_json::to_vec(payload)?,
88+
)
89+
.await
90+
.context(selector::AgentCommunicationFailure {})?)
91+
}
92+
/// Send a log message to the agent network.
93+
///
94+
/// # Errors
95+
///
96+
/// Will fail if there is an issue sending the message.
97+
pub(crate) async fn log(&self, message: &str) -> Result<()> {
98+
println!("{message}");
99+
self.publish("log", message).await
100+
}
101+
}
102+
103+
#[expect(
104+
clippy::excessive_nesting,
105+
clippy::let_underscore_must_use,
106+
reason = "`result::Result<(), SendError<_>>` is the only uncaptured result since it would mean we can't transmit results over mpsc."
107+
)]
108+
pub async fn start_service<
109+
EventClassifierF, // function to classify the event payload e.g. EventPayload::{Request | Reservation | ..}
110+
RequestF, // function to run on requests
111+
RequestI, // input to the function for requests
112+
RequestR, // output to the function for requests
113+
ResponseF, // function to run on completing a request i.e. response
114+
ResponseI, // input to the function for responses
115+
ResponseR, // output to the function for responses
116+
>(
117+
agent: Arc<Agent>,
118+
request_key_expr: String,
119+
namespace_lookup: HashMap<String, PathBuf>,
120+
event_classifier: EventClassifierF,
121+
request_task: RequestF,
122+
response_task: ResponseF,
123+
) -> Result<()>
124+
where
125+
EventClassifierF: Fn(&RequestI) -> EventPayload + Send + 'static,
126+
RequestI: for<'serde> Deserialize<'serde>,
127+
RequestF: Fn(Arc<Agent>, HashMap<String, PathBuf>, EventMetadata, RequestI) -> RequestR
128+
+ Send
129+
+ 'static,
130+
RequestR: Future<Output = Result<ResponseI>> + Send + 'static,
131+
ResponseI: Send + 'static,
132+
ResponseF: Fn(Arc<AgentClient>, ResponseI) -> ResponseR + Send + 'static,
133+
ResponseR: Future<Output = Result<()>> + Send + 'static,
134+
{
135+
agent
136+
.client
137+
.log(&format!("Started `{request_key_expr}` service."))
138+
.await?;
139+
let (response_tx, mut response_rx) = mpsc::channel(100);
140+
141+
let mut services = JoinSet::new();
142+
services.spawn({
143+
let inner_agent = Arc::clone(&agent);
144+
async move {
145+
let tasks = TaskTracker::new();
146+
let subscriber = inner_agent
147+
.client
148+
.session
149+
.declare_subscriber(format!(
150+
"group/{}/{}",
151+
inner_agent.client.group, request_key_expr
152+
))
153+
.await
154+
.context(selector::AgentCommunicationFailure {})?;
155+
while let Ok(sample) = subscriber.recv_async().await {
156+
if let (Ok(input), Some(metadata)) = (
157+
serde_json::from_slice::<RequestI>(&sample.payload().to_bytes()),
158+
RE_PODJOB_ACTION.captures(sample.key_expr().as_str()),
159+
) {
160+
let inner_response_tx = response_tx.clone();
161+
let event_metadata = EventMetadata {
162+
group: metadata["group"].to_string(),
163+
host: metadata["host"].to_string(),
164+
subgroup: metadata["pod_job_hash"].to_string(),
165+
};
166+
let _event_payload = event_classifier(&input);
167+
tasks.spawn(
168+
request_task(
169+
Arc::clone(&inner_agent),
170+
namespace_lookup.clone(),
171+
event_metadata,
172+
input,
173+
)
174+
.then(move |response| async move {
175+
let _: Result<(), SendError<Result<ResponseI>>> =
176+
inner_response_tx.send(response).await;
177+
Ok::<_, OrcaError>(())
178+
}),
179+
);
180+
}
181+
}
182+
Ok(())
183+
}
184+
});
185+
services.spawn(async move {
186+
while let Some(content) = response_rx.recv().await {
187+
response_task(Arc::clone(&agent.client), content?).await?;
188+
}
189+
Ok(())
190+
});
191+
192+
services
193+
.join_next()
194+
.await
195+
.context(selector::NoRemainingServices {})??
196+
}

src/core/orchestrator/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ impl PodRun {
2525
}
2626
}
2727

28+
pub mod agent;
2829
pub mod docker;

0 commit comments

Comments
 (0)