Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
"max_width=100"
],
"rust-analyzer.check.command": "clippy",
"rust-analyzer.runnables.extraTestBinaryArgs": [
"--nocapture"
],
"files.autoSave": "off",
"gitlens.showWelcomeOnInstall": false,
"gitlens.showWhatsNewAfterUpgrades": false,
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ name = "uniffi-bindgen"
[dev-dependencies]
# pretty multiline strings
indoc = "2.0.5"
# pretty assert statements
pretty_assertions = "1.4.1"
# creating temp directories
tempfile = "3.13.0"

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
set -e # fail early on non-zero exit
cargo clippy --all-targets -- -D warnings # Rust syntax and style tests
cargo fmt --check # Rust formatting test
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" -- --nocapture # Rust integration tests w/ stdout coverage summary
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --html -- --nocapture # Rust integration tests w/ HTML coverage report (target/llvm-cov/html/index.html)
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --codecov --output-path target/llvm-cov-target/codecov.json -- --nocapture # Rust integration tests w/ codecov coverage report
cargo llvm-cov --ignore-filename-regex "bin/.*|lib\.rs" --cobertura --output-path target/llvm-cov-target/cobertura.xml -- --nocapture # Rust integration tests w/ cobertura coverage report
cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" -- --nocapture # Rust integration tests w/ stdout coverage summary
cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --html -- --nocapture # Rust integration tests w/ HTML coverage report (target/llvm-cov/html/index.html)
cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --codecov --output-path target/llvm-cov-target/codecov.json -- --nocapture # Rust integration tests w/ codecov coverage report
cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --cobertura --output-path target/llvm-cov-target/cobertura.xml -- --nocapture # Rust integration tests w/ cobertura coverage report
. ~/.local/share/base/bin/activate && maturin develop --uv && export RUST_BACKTRACE=1 && python tests/extra/python/smoke_test.py -- tests/.tmp && python tests/extra/python/agent_test.py # Python integration tests
```

Expand Down
25 changes: 18 additions & 7 deletions src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ impl From<BollardError> for OrcaError {
fn from(error: BollardError) -> Self {
Self {
kind: Kind::BollardError {
source: error,
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
}
}
impl From<chrono::ParseError> for OrcaError {
fn from(error: chrono::ParseError) -> Self {
Self {
kind: Kind::ChronoParseError {
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
Expand All @@ -36,7 +46,7 @@ impl From<glob::PatternError> for OrcaError {
fn from(error: glob::PatternError) -> Self {
Self {
kind: Kind::GlobPatternError {
source: error,
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
Expand All @@ -46,7 +56,7 @@ impl From<io::Error> for OrcaError {
fn from(error: io::Error) -> Self {
Self {
kind: Kind::IoError {
source: error,
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
Expand All @@ -56,7 +66,7 @@ impl From<path::StripPrefixError> for OrcaError {
fn from(error: path::StripPrefixError) -> Self {
Self {
kind: Kind::PathPrefixError {
source: error,
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
Expand All @@ -66,7 +76,7 @@ impl From<serde_json::Error> for OrcaError {
fn from(error: serde_json::Error) -> Self {
Self {
kind: Kind::SerdeJsonError {
source: error,
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
Expand All @@ -76,7 +86,7 @@ impl From<serde_yaml::Error> for OrcaError {
fn from(error: serde_yaml::Error) -> Self {
Self {
kind: Kind::SerdeYamlError {
source: error,
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
Expand All @@ -86,7 +96,7 @@ impl From<task::JoinError> for OrcaError {
fn from(error: task::JoinError) -> Self {
Self {
kind: Kind::TokioTaskJoinError {
source: error,
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
Expand Down Expand Up @@ -125,6 +135,7 @@ impl fmt::Debug for OrcaError {
| Kind::NoRemainingServices { backtrace, .. }
| Kind::NoTagFoundInContainerAltImage { backtrace, .. }
| Kind::BollardError { backtrace, .. }
| Kind::ChronoParseError { backtrace, .. }
| Kind::DOTError { backtrace, .. }
| Kind::GlobPatternError { backtrace, .. }
| Kind::IoError { backtrace, .. }
Expand Down
82 changes: 32 additions & 50 deletions src/core/orchestrator/agent.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::uniffi::{
error::{OrcaError, Result, selector},
model::pod::{PodJob, PodResult},
orchestrator::agent::{Agent, AgentClient},
store::ModelID,
};
use chrono::Utc;
use chrono::{DateTime, Utc};
use futures_util::future::FutureExt as _;
use regex::Regex;
use serde::{Deserialize, Serialize};
Expand All @@ -21,15 +19,20 @@ use tokio::{
use tokio_util::task::TaskTracker;

#[expect(clippy::expect_used, reason = "Valid static regex")]
static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
static RE_AGENT_KEY_EXPR: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"(?x)
^
group\/(?<group>[a-z_\-]+)\/
(?<action>request|reservation|success|failure)\/
pod_job\/(?<pod_job_hash>[0-9a-f]+)\/
host\/(?<host>[a-z_]+)\/
timestamp\/(?<timestamp>.*?)
group/
(?<group>[a-z_\-]+)/
(?<action>request|success|failure)/
(?<model_type>[a-z_]+)/
(?<ref>[0-9a-f]+)/
.*?
host/
(?<host>[a-z_]+)/
timestamp/
(?<timestamp>.*?)
$
",
)
Expand All @@ -40,33 +43,14 @@ static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
dead_code,
reason = "Need to be able to initialize to pass metadata as input."
)]
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct EventMetadata {
group: String,
host: String,
subgroup: String,
}

#[expect(
dead_code,
reason = "Need to be able to initialize to pass metadata as input."
)]
#[derive(Debug, Clone)]
pub enum EventPayload {
Request(PodJob),
Reservation(ModelID),
Success(PodResult),
Failure(PodResult),
}

#[expect(
dead_code,
reason = "Need to be able to initialize to pass metadata as input."
)]
#[derive(Debug, Clone)]
pub struct Event {
metadata: EventMetadata,
payload: EventPayload,
pub group: String,
pub action: String,
pub model_type: String,
pub r#ref: String,
pub host: String,
pub timestamp: DateTime<Utc>,
}

impl AgentClient {
Expand Down Expand Up @@ -95,7 +79,6 @@ impl AgentClient {
///
/// Will fail if there is an issue sending the message.
pub(crate) async fn log(&self, message: &str) -> Result<()> {
println!("{message}");
self.publish("log", message).await
}
}
Expand All @@ -106,23 +89,20 @@ impl AgentClient {
reason = "`result::Result<(), SendError<_>>` is the only uncaptured result since it would mean we can't transmit results over mpsc."
)]
pub async fn start_service<
EventClassifierF, // function to classify the event payload e.g. EventPayload::{Request | Reservation | ..}
RequestF, // function to run on requests
RequestI, // input to the function for requests
RequestR, // output to the function for requests
ResponseF, // function to run on completing a request i.e. response
ResponseI, // input to the function for responses
ResponseR, // output to the function for responses
RequestF, // function to run on requests
RequestI, // input to the function for requests
RequestR, // output to the function for requests
ResponseF, // function to run on completing a request i.e. response
ResponseI, // input to the function for responses
ResponseR, // output to the function for responses
>(
agent: Arc<Agent>,
request_key_expr: String,
namespace_lookup: HashMap<String, PathBuf>,
event_classifier: EventClassifierF,
request_task: RequestF,
response_task: ResponseF,
) -> Result<()>
where
EventClassifierF: Fn(&RequestI) -> EventPayload + Send + 'static,
RequestI: for<'serde> Deserialize<'serde> + Send + 'static,
RequestF: FnOnce(Arc<Agent>, HashMap<String, PathBuf>, EventMetadata, RequestI) -> RequestR
+ Clone
Expand Down Expand Up @@ -156,15 +136,17 @@ where
while let Ok(sample) = subscriber.recv_async().await {
if let (Ok(input), Some(metadata)) = (
serde_json::from_slice::<RequestI>(&sample.payload().to_bytes()),
RE_PODJOB_ACTION.captures(sample.key_expr().as_str()),
RE_AGENT_KEY_EXPR.captures(sample.key_expr().as_str()),
) {
let inner_response_tx = response_tx.clone();
let event_metadata = EventMetadata {
group: metadata["group"].to_string(),
action: metadata["action"].to_string(),
model_type: metadata["model_type"].to_string(),
r#ref: metadata["ref"].to_string(),
host: metadata["host"].to_string(),
subgroup: metadata["pod_job_hash"].to_string(),
timestamp: DateTime::parse_from_rfc3339(&metadata["timestamp"])?.into(),
};
let _event_payload = event_classifier(&input);
tasks.spawn({
let inner_request_task = request_task.clone();
let inner_inner_agent = Arc::clone(&inner_agent);
Expand All @@ -190,8 +172,8 @@ where
}
});
services.spawn(async move {
while let Some(content) = response_rx.recv().await {
response_task(Arc::clone(&agent.client), content?).await?;
while let Some(response) = response_rx.recv().await {
response_task(Arc::clone(&agent.client), response?).await?;
}
Ok(())
});
Expand Down
12 changes: 6 additions & 6 deletions src/core/orchestrator/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
uniffi::{
error::{Result, selector},
model::{packet::PathSet, pod::PodJob},
orchestrator::{RunInfo, Status, docker::LocalDockerOrchestrator},
orchestrator::{PodRunInfo, PodStatus, docker::LocalDockerOrchestrator},
},
};
use bollard::{
Expand Down Expand Up @@ -175,7 +175,7 @@ impl LocalDockerOrchestrator {
pub(crate) async fn list_containers(
&self,
filters: HashMap<String, Vec<String>>, // https://docs.rs/bollard/latest/bollard/container/struct.ListContainersOptions.html#structfield.filters
) -> Result<impl Iterator<Item = (String, RunInfo)>> {
) -> Result<impl Iterator<Item = (String, PodRunInfo)>> {
Ok(join_all(
self.api
.list_containers(Some(ListContainersOptions {
Expand Down Expand Up @@ -207,7 +207,7 @@ impl LocalDockerOrchestrator {
.timestamp();
Some((
container_name,
RunInfo {
PodRunInfo {
image: container_spec.config.as_ref()?.image.as_ref()?.clone(),
created: container_summary.created? as u64,
terminated: (terminated_timestamp > 0).then_some(terminated_timestamp as u64),
Expand All @@ -231,19 +231,19 @@ impl LocalDockerOrchestrator {
container_spec.state.as_ref()?.status.as_ref()?,
container_spec.state.as_ref()?.exit_code? as i16,
) {
(ContainerStateStatusEnum::RUNNING, _) => Status::Running,
(ContainerStateStatusEnum::RUNNING, _) => PodStatus::Running,
(
ContainerStateStatusEnum::EXITED
| ContainerStateStatusEnum::REMOVING
| ContainerStateStatusEnum::DEAD,
0,
) => Status::Completed,
) => PodStatus::Completed,
(
ContainerStateStatusEnum::EXITED
| ContainerStateStatusEnum::REMOVING
| ContainerStateStatusEnum::DEAD,
code,
) => Status::Failed(code),
) => PodStatus::Failed(code),
(_, code) => {
todo!(
"Unhandled container state: {}, exit code: {code}.",
Expand Down
10 changes: 5 additions & 5 deletions src/core/store/filestore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ static RE_MODEL_METADATA: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"(?x)
^
(?<store_directory>.*)\/
(?<namespace>[a-z_]+)\/
(?<class>[a-z_]+)\/
(?<hash>[0-9a-f]+)\/
(?<store_directory>.*?)/
(?<namespace>[a-z_]+)/
(?<class>[a-z_]+)/
(?<hash>[0-9a-f]+)/
(
annotation\/
annotation/
(?<name>[0-9a-zA-Z\-]+)
-
(?<version>[0-9]+\.[0-9]+\.[0-9]+)
Expand Down
21 changes: 13 additions & 8 deletions src/uniffi/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
};
use tokio::task;
use uniffi;
/// Shorthand for a Result that returns an `OrcaError`.
/// Shorthand for a Result that returns an [`OrcaError`].
pub type Result<T, E = OrcaError> = result::Result<T, E>;
/// Possible errors you may encounter.
#[derive(Snafu, Debug, uniffi::Error)]
Expand Down Expand Up @@ -91,7 +91,12 @@ pub(crate) enum Kind {
},
#[snafu(transparent)]
BollardError {
source: BollardError,
source: Box<BollardError>,
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
ChronoParseError {
source: Box<chrono::ParseError>,
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
Expand All @@ -101,32 +106,32 @@ pub(crate) enum Kind {
},
#[snafu(transparent)]
GlobPatternError {
source: glob::PatternError,
source: Box<glob::PatternError>,
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
IoError {
source: io::Error,
source: Box<io::Error>,
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
PathPrefixError {
source: path::StripPrefixError,
source: Box<path::StripPrefixError>,
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
SerdeJsonError {
source: serde_json::Error,
source: Box<serde_json::Error>,
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
SerdeYamlError {
source: serde_yaml::Error,
source: Box<serde_yaml::Error>,
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
TokioTaskJoinError {
source: task::JoinError,
source: Box<task::JoinError>,
backtrace: Option<Backtrace>,
},
}
Expand Down
Loading