Skip to content

Commit 80c09a6

Browse files
authored
Merge pull request #104 from guzman-raphael/output_packet
Add `output_packet` to `PodResult`
2 parents e9d2a7b + e51c11d commit 80c09a6

File tree

24 files changed

+306
-176
lines changed

24 files changed

+306
-176
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
"max_width=100"
1414
],
1515
"rust-analyzer.check.command": "clippy",
16+
"rust-analyzer.runnables.extraTestBinaryArgs": [
17+
"--nocapture"
18+
],
1619
"files.autoSave": "off",
1720
"gitlens.showWelcomeOnInstall": false,
1821
"gitlens.showWhatsNewAfterUpgrades": false,

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ name = "uniffi-bindgen"
8686
[dev-dependencies]
8787
# pretty multiline strings
8888
indoc = "2.0.5"
89+
# pretty assert statements
90+
pretty_assertions = "1.4.1"
8991
# creating temp directories
9092
tempfile = "3.13.0"
9193

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
set -e # fail early on non-zero exit
1414
cargo clippy --all-targets -- -D warnings # Rust syntax and style tests
1515
cargo fmt --check # Rust formatting test
16-
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" -- --nocapture # Rust integration tests w/ stdout coverage summary
17-
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --html -- --nocapture # Rust integration tests w/ HTML coverage report (target/llvm-cov/html/index.html)
18-
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --codecov --output-path target/llvm-cov-target/codecov.json -- --nocapture # Rust integration tests w/ codecov coverage report
19-
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --cobertura --output-path target/llvm-cov-target/cobertura.xml -- --nocapture # Rust integration tests w/ cobertura coverage report
16+
cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" -- --nocapture # Rust integration tests w/ stdout coverage summary
17+
cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --html -- --nocapture # Rust integration tests w/ HTML coverage report (target/llvm-cov/html/index.html)
18+
cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --codecov --output-path target/llvm-cov-target/codecov.json -- --nocapture # Rust integration tests w/ codecov coverage report
19+
cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --cobertura --output-path target/llvm-cov-target/cobertura.xml -- --nocapture # Rust integration tests w/ cobertura coverage report
2020
. ~/.local/share/base/bin/activate && maturin develop --uv && export RUST_BACKTRACE=1 && python tests/extra/python/smoke_test.py -- tests/.tmp && python tests/extra/python/agent_test.py # Python integration tests
2121
```
2222

src/core/error.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,17 @@ impl From<BollardError> for OrcaError {
1616
fn from(error: BollardError) -> Self {
1717
Self {
1818
kind: Kind::BollardError {
19-
source: error,
19+
source: error.into(),
20+
backtrace: Some(Backtrace::capture()),
21+
},
22+
}
23+
}
24+
}
25+
impl From<chrono::ParseError> for OrcaError {
26+
fn from(error: chrono::ParseError) -> Self {
27+
Self {
28+
kind: Kind::ChronoParseError {
29+
source: error.into(),
2030
backtrace: Some(Backtrace::capture()),
2131
},
2232
}
@@ -36,7 +46,7 @@ impl From<glob::PatternError> for OrcaError {
3646
fn from(error: glob::PatternError) -> Self {
3747
Self {
3848
kind: Kind::GlobPatternError {
39-
source: error,
49+
source: error.into(),
4050
backtrace: Some(Backtrace::capture()),
4151
},
4252
}
@@ -46,7 +56,7 @@ impl From<io::Error> for OrcaError {
4656
fn from(error: io::Error) -> Self {
4757
Self {
4858
kind: Kind::IoError {
49-
source: error,
59+
source: error.into(),
5060
backtrace: Some(Backtrace::capture()),
5161
},
5262
}
@@ -56,7 +66,7 @@ impl From<path::StripPrefixError> for OrcaError {
5666
fn from(error: path::StripPrefixError) -> Self {
5767
Self {
5868
kind: Kind::PathPrefixError {
59-
source: error,
69+
source: error.into(),
6070
backtrace: Some(Backtrace::capture()),
6171
},
6272
}
@@ -66,7 +76,7 @@ impl From<serde_json::Error> for OrcaError {
6676
fn from(error: serde_json::Error) -> Self {
6777
Self {
6878
kind: Kind::SerdeJsonError {
69-
source: error,
79+
source: error.into(),
7080
backtrace: Some(Backtrace::capture()),
7181
},
7282
}
@@ -76,7 +86,7 @@ impl From<serde_yaml::Error> for OrcaError {
7686
fn from(error: serde_yaml::Error) -> Self {
7787
Self {
7888
kind: Kind::SerdeYamlError {
79-
source: error,
89+
source: error.into(),
8090
backtrace: Some(Backtrace::capture()),
8191
},
8292
}
@@ -86,7 +96,7 @@ impl From<task::JoinError> for OrcaError {
8696
fn from(error: task::JoinError) -> Self {
8797
Self {
8898
kind: Kind::TokioTaskJoinError {
89-
source: error,
99+
source: error.into(),
90100
backtrace: Some(Backtrace::capture()),
91101
},
92102
}
@@ -125,6 +135,7 @@ impl fmt::Debug for OrcaError {
125135
| Kind::NoRemainingServices { backtrace, .. }
126136
| Kind::NoTagFoundInContainerAltImage { backtrace, .. }
127137
| Kind::BollardError { backtrace, .. }
138+
| Kind::ChronoParseError { backtrace, .. }
128139
| Kind::DOTError { backtrace, .. }
129140
| Kind::GlobPatternError { backtrace, .. }
130141
| Kind::IoError { backtrace, .. }

src/core/orchestrator/agent.rs

Lines changed: 32 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use crate::uniffi::{
22
error::{OrcaError, Result, selector},
3-
model::pod::{PodJob, PodResult},
43
orchestrator::agent::{Agent, AgentClient},
5-
store::ModelID,
64
};
7-
use chrono::Utc;
5+
use chrono::{DateTime, Utc};
86
use futures_util::future::FutureExt as _;
97
use regex::Regex;
108
use serde::{Deserialize, Serialize};
@@ -21,15 +19,20 @@ use tokio::{
2119
use tokio_util::task::TaskTracker;
2220

2321
#[expect(clippy::expect_used, reason = "Valid static regex")]
24-
static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
22+
static RE_AGENT_KEY_EXPR: LazyLock<Regex> = LazyLock::new(|| {
2523
Regex::new(
2624
r"(?x)
2725
^
28-
group\/(?<group>[a-z_\-]+)\/
29-
(?<action>request|reservation|success|failure)\/
30-
pod_job\/(?<pod_job_hash>[0-9a-f]+)\/
31-
host\/(?<host>[a-z_]+)\/
32-
timestamp\/(?<timestamp>.*?)
26+
group/
27+
(?<group>[a-z_\-]+)/
28+
(?<action>request|success|failure)/
29+
(?<model_type>[a-z_]+)/
30+
(?<ref>[0-9a-f]+)/
31+
.*?
32+
host/
33+
(?<host>[a-z_]+)/
34+
timestamp/
35+
(?<timestamp>.*?)
3336
$
3437
",
3538
)
@@ -40,33 +43,14 @@ static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
4043
dead_code,
4144
reason = "Need to be able to initialize to pass metadata as input."
4245
)]
43-
#[derive(Debug, Clone)]
46+
#[derive(Debug)]
4447
pub struct EventMetadata {
45-
group: String,
46-
host: String,
47-
subgroup: String,
48-
}
49-
50-
#[expect(
51-
dead_code,
52-
reason = "Need to be able to initialize to pass metadata as input."
53-
)]
54-
#[derive(Debug, Clone)]
55-
pub enum EventPayload {
56-
Request(PodJob),
57-
Reservation(ModelID),
58-
Success(PodResult),
59-
Failure(PodResult),
60-
}
61-
62-
#[expect(
63-
dead_code,
64-
reason = "Need to be able to initialize to pass metadata as input."
65-
)]
66-
#[derive(Debug, Clone)]
67-
pub struct Event {
68-
metadata: EventMetadata,
69-
payload: EventPayload,
48+
pub group: String,
49+
pub action: String,
50+
pub model_type: String,
51+
pub r#ref: String,
52+
pub host: String,
53+
pub timestamp: DateTime<Utc>,
7054
}
7155

7256
impl AgentClient {
@@ -95,7 +79,6 @@ impl AgentClient {
9579
///
9680
/// Will fail if there is an issue sending the message.
9781
pub(crate) async fn log(&self, message: &str) -> Result<()> {
98-
println!("{message}");
9982
self.publish("log", message).await
10083
}
10184
}
@@ -106,23 +89,20 @@ impl AgentClient {
10689
reason = "`result::Result<(), SendError<_>>` is the only uncaptured result since it would mean we can't transmit results over mpsc."
10790
)]
10891
pub async fn start_service<
109-
EventClassifierF, // function to classify the event payload e.g. EventPayload::{Request | Reservation | ..}
110-
RequestF, // function to run on requests
111-
RequestI, // input to the function for requests
112-
RequestR, // output to the function for requests
113-
ResponseF, // function to run on completing a request i.e. response
114-
ResponseI, // input to the function for responses
115-
ResponseR, // output to the function for responses
92+
RequestF, // function to run on requests
93+
RequestI, // input to the function for requests
94+
RequestR, // output to the function for requests
95+
ResponseF, // function to run on completing a request i.e. response
96+
ResponseI, // input to the function for responses
97+
ResponseR, // output to the function for responses
11698
>(
11799
agent: Arc<Agent>,
118100
request_key_expr: String,
119101
namespace_lookup: HashMap<String, PathBuf>,
120-
event_classifier: EventClassifierF,
121102
request_task: RequestF,
122103
response_task: ResponseF,
123104
) -> Result<()>
124105
where
125-
EventClassifierF: Fn(&RequestI) -> EventPayload + Send + 'static,
126106
RequestI: for<'serde> Deserialize<'serde> + Send + 'static,
127107
RequestF: FnOnce(Arc<Agent>, HashMap<String, PathBuf>, EventMetadata, RequestI) -> RequestR
128108
+ Clone
@@ -156,15 +136,17 @@ where
156136
while let Ok(sample) = subscriber.recv_async().await {
157137
if let (Ok(input), Some(metadata)) = (
158138
serde_json::from_slice::<RequestI>(&sample.payload().to_bytes()),
159-
RE_PODJOB_ACTION.captures(sample.key_expr().as_str()),
139+
RE_AGENT_KEY_EXPR.captures(sample.key_expr().as_str()),
160140
) {
161141
let inner_response_tx = response_tx.clone();
162142
let event_metadata = EventMetadata {
163143
group: metadata["group"].to_string(),
144+
action: metadata["action"].to_string(),
145+
model_type: metadata["model_type"].to_string(),
146+
r#ref: metadata["ref"].to_string(),
164147
host: metadata["host"].to_string(),
165-
subgroup: metadata["pod_job_hash"].to_string(),
148+
timestamp: DateTime::parse_from_rfc3339(&metadata["timestamp"])?.into(),
166149
};
167-
let _event_payload = event_classifier(&input);
168150
tasks.spawn({
169151
let inner_request_task = request_task.clone();
170152
let inner_inner_agent = Arc::clone(&inner_agent);
@@ -190,8 +172,8 @@ where
190172
}
191173
});
192174
services.spawn(async move {
193-
while let Some(content) = response_rx.recv().await {
194-
response_task(Arc::clone(&agent.client), content?).await?;
175+
while let Some(response) = response_rx.recv().await {
176+
response_task(Arc::clone(&agent.client), response?).await?;
195177
}
196178
Ok(())
197179
});

src/core/orchestrator/docker.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
uniffi::{
44
error::{Result, selector},
55
model::{packet::PathSet, pod::PodJob},
6-
orchestrator::{RunInfo, Status, docker::LocalDockerOrchestrator},
6+
orchestrator::{PodRunInfo, PodStatus, docker::LocalDockerOrchestrator},
77
},
88
};
99
use bollard::{
@@ -175,7 +175,7 @@ impl LocalDockerOrchestrator {
175175
pub(crate) async fn list_containers(
176176
&self,
177177
filters: HashMap<String, Vec<String>>, // https://docs.rs/bollard/latest/bollard/container/struct.ListContainersOptions.html#structfield.filters
178-
) -> Result<impl Iterator<Item = (String, RunInfo)>> {
178+
) -> Result<impl Iterator<Item = (String, PodRunInfo)>> {
179179
Ok(join_all(
180180
self.api
181181
.list_containers(Some(ListContainersOptions {
@@ -207,7 +207,7 @@ impl LocalDockerOrchestrator {
207207
.timestamp();
208208
Some((
209209
container_name,
210-
RunInfo {
210+
PodRunInfo {
211211
image: container_spec.config.as_ref()?.image.as_ref()?.clone(),
212212
created: container_summary.created? as u64,
213213
terminated: (terminated_timestamp > 0).then_some(terminated_timestamp as u64),
@@ -231,19 +231,19 @@ impl LocalDockerOrchestrator {
231231
container_spec.state.as_ref()?.status.as_ref()?,
232232
container_spec.state.as_ref()?.exit_code? as i16,
233233
) {
234-
(ContainerStateStatusEnum::RUNNING, _) => Status::Running,
234+
(ContainerStateStatusEnum::RUNNING, _) => PodStatus::Running,
235235
(
236236
ContainerStateStatusEnum::EXITED
237237
| ContainerStateStatusEnum::REMOVING
238238
| ContainerStateStatusEnum::DEAD,
239239
0,
240-
) => Status::Completed,
240+
) => PodStatus::Completed,
241241
(
242242
ContainerStateStatusEnum::EXITED
243243
| ContainerStateStatusEnum::REMOVING
244244
| ContainerStateStatusEnum::DEAD,
245245
code,
246-
) => Status::Failed(code),
246+
) => PodStatus::Failed(code),
247247
(_, code) => {
248248
todo!(
249249
"Unhandled container state: {}, exit code: {code}.",

src/core/store/filestore.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ static RE_MODEL_METADATA: LazyLock<Regex> = LazyLock::new(|| {
2828
Regex::new(
2929
r"(?x)
3030
^
31-
(?<store_directory>.*)\/
32-
(?<namespace>[a-z_]+)\/
33-
(?<class>[a-z_]+)\/
34-
(?<hash>[0-9a-f]+)\/
31+
(?<store_directory>.*?)/
32+
(?<namespace>[a-z_]+)/
33+
(?<class>[a-z_]+)/
34+
(?<hash>[0-9a-f]+)/
3535
(
36-
annotation\/
36+
annotation/
3737
(?<name>[0-9a-zA-Z\-]+)
3838
-
3939
(?<version>[0-9]+\.[0-9]+\.[0-9]+)

src/uniffi/error.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{
1818
};
1919
use tokio::task;
2020
use uniffi;
21-
/// Shorthand for a Result that returns an `OrcaError`.
21+
/// Shorthand for a Result that returns an [`OrcaError`].
2222
pub type Result<T, E = OrcaError> = result::Result<T, E>;
2323
/// Possible errors you may encounter.
2424
#[derive(Snafu, Debug, uniffi::Error)]
@@ -91,7 +91,12 @@ pub(crate) enum Kind {
9191
},
9292
#[snafu(transparent)]
9393
BollardError {
94-
source: BollardError,
94+
source: Box<BollardError>,
95+
backtrace: Option<Backtrace>,
96+
},
97+
#[snafu(transparent)]
98+
ChronoParseError {
99+
source: Box<chrono::ParseError>,
95100
backtrace: Option<Backtrace>,
96101
},
97102
#[snafu(transparent)]
@@ -101,32 +106,32 @@ pub(crate) enum Kind {
101106
},
102107
#[snafu(transparent)]
103108
GlobPatternError {
104-
source: glob::PatternError,
109+
source: Box<glob::PatternError>,
105110
backtrace: Option<Backtrace>,
106111
},
107112
#[snafu(transparent)]
108113
IoError {
109-
source: io::Error,
114+
source: Box<io::Error>,
110115
backtrace: Option<Backtrace>,
111116
},
112117
#[snafu(transparent)]
113118
PathPrefixError {
114-
source: path::StripPrefixError,
119+
source: Box<path::StripPrefixError>,
115120
backtrace: Option<Backtrace>,
116121
},
117122
#[snafu(transparent)]
118123
SerdeJsonError {
119-
source: serde_json::Error,
124+
source: Box<serde_json::Error>,
120125
backtrace: Option<Backtrace>,
121126
},
122127
#[snafu(transparent)]
123128
SerdeYamlError {
124-
source: serde_yaml::Error,
129+
source: Box<serde_yaml::Error>,
125130
backtrace: Option<Backtrace>,
126131
},
127132
#[snafu(transparent)]
128133
TokioTaskJoinError {
129-
source: task::JoinError,
134+
source: Box<task::JoinError>,
130135
backtrace: Option<Backtrace>,
131136
},
132137
}

0 commit comments

Comments
 (0)