diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index d0fa93a1..fc9c207a 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -22,7 +22,7 @@ jobs: - name: Install code coverage uses: taiki-e/install-action@cargo-llvm-cov - name: Run syntax and style tests - run: cargo clippy --all-targets -- -D warnings + run: cargo clippy --no-default-features --features=test --all-targets -- -D warnings - name: Run format test run: cargo fmt --check - name: Run integration tests w/ coverage report @@ -31,6 +31,8 @@ jobs: run: | mkdir -p tests/.tmp cargo llvm-cov \ + --no-default-features \ + --features=test \ --ignore-filename-regex "bin/.*|lib\.rs" \ --cobertura \ --output-path target/llvm-cov-target/cobertura.xml \ diff --git a/.vscode/launch.json b/.vscode/launch.json index e1f2a9f8..38373d9b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,6 +8,8 @@ "cargo": { "args": [ "test", + "--no-default-features", + "--features=test", "--no-run", "--package=orcapod", "--test", @@ -31,6 +33,8 @@ "cargo": { "args": [ "test", + "--no-default-features", + "--features=test", "--no-run", "--package=orcapod", "--test", @@ -53,6 +57,8 @@ "cargo": { "args": [ "test", + "--no-default-features", + "--features=test", "--no-run", "--lib", "--package=orcapod" @@ -86,6 +92,8 @@ "cargo": { "args": [ "test", + "--no-default-features", + "--features=test", "--no-run", "--package=orcapod", "--bin=exe_file_stem" diff --git a/.vscode/settings.json b/.vscode/settings.json index dadd4bc3..695f87c2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,7 @@ { + "[markdown]": { + "editor.defaultFormatter": null + }, "editor.formatOnPaste": false, "editor.formatOnSave": true, "editor.rulers": [ @@ -6,12 +9,13 @@ ], "files.autoSave": "off", "files.insertFinalNewline": true, - "[markdown]": { - "editor.defaultFormatter": null - }, "gitlens.showWelcomeOnInstall": false, "gitlens.showWhatsNewAfterUpgrades": false, "lldb.consoleMode": "evaluate", + "rust-analyzer.cargo.features": [ + "test" + ], + "rust-analyzer.cargo.noDefaultFeatures": true, "rust-analyzer.check.command": "clippy", "rust-analyzer.runnables.extraTestBinaryArgs": [ "--nocapture" @@ -22,10 +26,10 @@ ], "jupyter.kernels.excludePythonEnvironments": [ "/bin/python3", - "/usr/bin/python3", + "/usr/bin/python3" ], "notebook.formatOnSave.enabled": true, "notebook.output.scrolling": true, "python.defaultInterpreterPath": "~/.local/share/base/bin/python3", - "python.terminal.activateEnvironment": false, + "python.terminal.activateEnvironment": false } diff --git a/Cargo.toml b/Cargo.toml index 287a75c9..13f015fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,10 @@ categories = [ license = "MIT license" edition = "2024" +[features] +default = [] +test = [] + [lib] crate-type = ["rlib", "cdylib"] @@ -53,6 +57,8 @@ heck = "0.5.0" hex = "0.4.3" # hashmaps that preserve insertion order indexmap = { version = "2.9.0", features = ["serde"] } +# utilities for iterables +itertools = "0.14.0" # random name generator names = "0.14.0" # graph algorithms @@ -106,6 +112,7 @@ restriction = "deny" style = "deny" suspicious = "deny" +allow_attributes = { level = "allow", priority = 127 } # Useful when suppressing warnings is also desired. arbitrary_source_item_ordering = { level = "allow", priority = 127 } # allow arbitrary ordering to keep relevant code nearby arithmetic_side_effects = { level = "allow", priority = 127 } # allow arithmetic for convenience though it could overflow as_conversions = { level = "allow", priority = 127 } # allow casting diff --git a/README.md b/README.md index a2970201..50875d4e 100644 --- a/README.md +++ b/README.md @@ -11,13 +11,13 @@ ```bash #!/bin/bash set -e # fail early on non-zero exit -cargo clippy --all-targets -- -D warnings # Rust syntax and style tests +cargo clippy --no-default-features --features=test --all-targets -- -D warnings # Rust syntax and style tests cargo fmt --check # Rust formatting test -cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" -- --nocapture # Rust integration tests w/ stdout coverage summary -cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --html -- --nocapture # Rust integration tests w/ HTML coverage report (target/llvm-cov/html/index.html) -cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --codecov --output-path target/llvm-cov-target/codecov.json -- --nocapture # Rust integration tests w/ codecov coverage report -cargo llvm-cov --no-clean --ignore-filename-regex "bin/.*|lib\.rs" --cobertura --output-path target/llvm-cov-target/cobertura.xml -- --nocapture # Rust integration tests w/ cobertura coverage report -. ~/.local/share/base/bin/activate && maturin develop --uv && export RUST_BACKTRACE=1 && python tests/extra/python/smoke_test.py -- tests/.tmp && python tests/extra/python/agent_test.py # Python integration tests +cargo llvm-cov --no-clean --no-default-features --features=test --ignore-filename-regex "bin/.*|lib\.rs" -- --nocapture # Rust integration tests w/ stdout coverage summary +cargo llvm-cov --no-clean --no-default-features --features=test --ignore-filename-regex "bin/.*|lib\.rs" --html -- --nocapture # Rust integration tests w/ HTML coverage report (target/llvm-cov/html/index.html) +cargo llvm-cov --no-clean --no-default-features --features=test --ignore-filename-regex "bin/.*|lib\.rs" --codecov --output-path target/llvm-cov-target/codecov.json -- --nocapture # Rust integration tests w/ codecov coverage report +cargo llvm-cov --no-clean --no-default-features --features=test --ignore-filename-regex "bin/.*|lib\.rs" --cobertura --output-path target/llvm-cov-target/cobertura.xml -- --nocapture # Rust integration tests w/ cobertura coverage report +. ~/.local/share/base/bin/activate && maturin develop --uv && export RUST_BACKTRACE=1 && python tests/extra/python/smoke_test.py -- tests/.tmp && python tests/extra/python/agent_test.py -- tests/.tmp # Python integration tests ``` ## Docs @@ -29,6 +29,15 @@ cargo modules dependencies --lib --no-uses --no-fns --focus-on "orcapod::uniffi: cargo modules dependencies --lib --no-uses --no-fns --focus-on "orcapod::uniffi::{model::{Pod,PodJob,PodResult},store::filestore::LocalFileStore,orchestrator::{PodRun,docker::LocalDockerOrchestrator}}" --layout dot | dot -T svg > docs/images/orcapod_diagram.svg # orcapod diagram as SVG ``` +## Git Worktrees + +Update your git to at least version 2.48 since that is when the `--relative-paths` option was added to git worktree. That option makes it compatible with launching using VSCode DevContainers. + +```bash +git worktree add /path/to/new/dir-name branch-name --relative-paths # create +git worktree remove dir-name # delete +``` + ## Project Management Progress is tracked under GH project [orcapod](https://github.com/orgs/walkerlab/projects/2). diff --git a/cspell.json b/cspell.json index 7a2b6e42..4f00f0b8 100644 --- a/cspell.json +++ b/cspell.json @@ -78,7 +78,8 @@ "strsim", "getrandom", "wasi", - "patchelf" + "patchelf", + "itertools" ], "useGitignore": false, "ignorePaths": [ diff --git a/src/core/crypto.rs b/src/core/crypto.rs index 3fa667d9..31933463 100644 --- a/src/core/crypto.rs +++ b/src/core/crypto.rs @@ -26,7 +26,7 @@ use std::{ clippy::indexing_slicing, reason = "Reading less than 0 is impossible." )] -pub(crate) fn hash_stream(stream: &mut impl Read) -> Result { +pub fn hash_stream(stream: &mut impl Read) -> Result { const BUFFER_SIZE: usize = 8 << 10; // 8KB chunks to match with page size typically found let mut hash = Sha256::new(); @@ -88,7 +88,7 @@ pub fn hash_dir(dirpath: impl AsRef) -> Result { /// # Errors /// /// Will return error if hashing fails on file or directory. -pub(crate) fn hash_blob( +pub fn hash_blob( namespace_lookup: &HashMap, blob: &Blob, ) -> Result { @@ -102,7 +102,7 @@ pub(crate) fn hash_blob( }) } -pub(crate) fn make_random_hash() -> String { +pub fn make_random_hash() -> String { let mut bytes = [0; 32]; rand::rng().fill_bytes(&mut bytes); hex::encode(bytes) diff --git a/src/core/error.rs b/src/core/error.rs index 06e41ad6..a42aa82f 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -122,18 +122,9 @@ impl fmt::Debug for OrcaError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match &self.kind { Kind::AgentCommunicationFailure { backtrace, .. } - | Kind::EmptyResponseWhenLoadingContainerAltImage { backtrace, .. } - | Kind::GeneratedNamesOverflow { backtrace, .. } | Kind::IncompletePacket { backtrace, .. } | Kind::InvalidFilepath { backtrace, .. } - | Kind::InvalidPodResultTerminatedDatetime { backtrace, .. } - | Kind::KeyMissing { backtrace, .. } - | Kind::NoAnnotationFound { backtrace, .. } - | Kind::NoContainerNames { backtrace, .. } - | Kind::NoFileName { backtrace, .. } - | Kind::NoMatchingPodRun { backtrace, .. } - | Kind::NoRemainingServices { backtrace, .. } - | Kind::NoTagFoundInContainerAltImage { backtrace, .. } + | Kind::MissingInfo { backtrace, .. } | Kind::BollardError { backtrace, .. } | Kind::ChronoParseError { backtrace, .. } | Kind::DOTError { backtrace, .. } diff --git a/src/core/mod.rs b/src/core/mod.rs index 1b5a78fa..bd7337ee 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,11 +1,38 @@ -/// State change verification via cryptographic utilities. -pub mod crypto; +macro_rules! inner_attr_to_each { + { #!$attr:tt $($it:item)* } => { + $( + #$attr + $it + )* + } +} + pub(crate) mod error; pub(crate) mod graph; -/// Components of the data model. -pub mod model; -pub(crate) mod orchestrator; pub(crate) mod pipeline; pub(crate) mod store; pub(crate) mod util; pub(crate) mod validation; + +inner_attr_to_each! { + #![cfg(feature = "default")] + pub(crate) mod crypto; + pub(crate) mod model; + pub(crate) mod orchestrator; +} + +#[cfg(feature = "test")] +inner_attr_to_each! { + #![cfg_attr( + feature = "test", + allow( + missing_docs, + clippy::missing_errors_doc, + clippy::missing_panics_doc, + reason = "Documentation not necessary since private API.", + ), + )] + pub mod crypto; + pub mod model; + pub mod orchestrator; +} diff --git a/src/core/model.rs b/src/core/model.rs index 60a95bca..6406a7b0 100644 --- a/src/core/model.rs +++ b/src/core/model.rs @@ -11,6 +11,7 @@ use serde::{Deserialize as _, Deserializer, Serialize, Serializer}; use serde_yaml::{self, Value}; use std::{ collections::{BTreeMap, HashMap}, + hash::BuildHasher, result, sync::Arc, }; @@ -38,8 +39,8 @@ pub fn to_yaml(instance: &T) -> Result { Ok(yaml) } -pub(crate) fn serialize_hashmap( - map: &HashMap, +pub fn serialize_hashmap( + map: &HashMap, serializer: S, ) -> result::Result where @@ -49,9 +50,9 @@ where sorted.serialize(serializer) } -#[expect(clippy::ref_option, reason = "Serde requires this signature.")] -pub(crate) fn serialize_hashmap_option( - map_option: &Option>, +#[allow(clippy::ref_option, reason = "Serde requires this signature.")] +pub fn serialize_hashmap_option( + map_option: &Option>, serializer: S, ) -> result::Result where @@ -63,11 +64,8 @@ where sorted.serialize(serializer) } -#[expect( - clippy::expect_used, - reason = "Function signature required by serde API." -)] -pub(crate) fn deserialize_pod<'de, D>(deserializer: D) -> result::Result, D::Error> +#[expect(clippy::expect_used, reason = "Serde requires this signature.")] +pub fn deserialize_pod<'de, D>(deserializer: D) -> result::Result, D::Error> where D: Deserializer<'de>, { @@ -89,11 +87,8 @@ where ) } -#[expect( - clippy::expect_used, - reason = "Function signature required by serde API." -)] -pub(crate) fn deserialize_pod_job<'de, D>(deserializer: D) -> result::Result, D::Error> +#[expect(clippy::expect_used, reason = "Serde requires this signature.")] +pub fn deserialize_pod_job<'de, D>(deserializer: D) -> result::Result, D::Error> where D: Deserializer<'de>, { diff --git a/src/core/orchestrator/agent.rs b/src/core/orchestrator/agent.rs index a3d71eca..9cbc7904 100644 --- a/src/core/orchestrator/agent.rs +++ b/src/core/orchestrator/agent.rs @@ -4,13 +4,16 @@ use crate::uniffi::{ }; use chrono::{DateTime, Utc}; use futures_util::future::FutureExt as _; -use regex::Regex; +use itertools::Itertools as _; use serde::{Deserialize, Serialize}; use snafu::{OptionExt as _, ResultExt as _}; use std::{ - collections::HashMap, + borrow::ToOwned, + collections::{BTreeMap, HashMap}, + fmt::Write as _, + hash::RandomState, path::PathBuf, - sync::{Arc, LazyLock}, + sync::Arc, }; use tokio::{ sync::mpsc::{self, error::SendError}, @@ -18,56 +21,59 @@ use tokio::{ }; use tokio_util::task::TaskTracker; -#[expect(clippy::expect_used, reason = "Valid static regex")] -static RE_AGENT_KEY_EXPR: LazyLock = LazyLock::new(|| { - Regex::new( - r"(?x) - ^ - group/ - (?[a-z_\-]+)/ - (?request|success|failure)/ - (?[a-z_]+)/ - (?[0-9a-f]+)/ - .*? - host/ - (?[a-z_]+)/ - timestamp/ - (?.*?) - $ - ", - ) - .expect("Invalid PodJob action regex.") -}); - -#[expect( - dead_code, - reason = "Need to be able to initialize to pass metadata as input." -)] -#[derive(Debug)] -pub struct EventMetadata { - pub group: String, - pub action: String, - pub model_type: String, - pub r#ref: String, - pub host: String, - pub timestamp: DateTime, +pub fn extract_metadata(key_expr: &str) -> HashMap { + key_expr + .split('/') + .map(ToOwned::to_owned) + .tuples() + .collect() } impl AgentClient { - pub(crate) async fn publish(&self, topic: &str, payload: &T) -> Result<()> + #[expect( + clippy::let_underscore_must_use, + reason = "write! on a `String` cannot fail. https://rust-lang.github.io/rust-clippy/master/index.html#format_collect" + )] + pub(crate) fn make_key_expr( + &self, + is_subscriber: bool, + topic: &str, + mut metadata: BTreeMap<&str, String>, + ) -> String { + metadata.insert("group", self.group.clone()); + metadata.insert("topic", topic.to_owned()); + + let delimiter = if is_subscriber { + "**/".to_owned() + } else { + metadata.insert("host", self.host.clone()); + metadata.insert("timestamp", Utc::now().to_rfc3339()); + String::new() + }; + + metadata + .iter() + .fold(delimiter.clone(), |mut key_expr, (key, value)| { + let _ = write!(key_expr, "{key}/{value}/{delimiter}"); + key_expr + }) + .trim_end_matches('/') + .to_owned() + } + + pub(crate) async fn publish( + &self, + topic: &str, + metadata: BTreeMap<&str, String>, + payload: &T, + ) -> Result<()> where T: Serialize + Sync + ?Sized, { Ok(self .session .put( - format!( - "group/{}/{}/host/{}/timestamp/{}", - self.group, - topic, - self.host, - Utc::now().to_rfc3339() - ), + self.make_key_expr(false, topic, metadata), &serde_json::to_vec(payload)?, ) .await @@ -79,7 +85,7 @@ impl AgentClient { /// /// Will fail if there is an issue sending the message. pub(crate) async fn log(&self, message: &str) -> Result<()> { - self.publish("log", message).await + self.publish("log", BTreeMap::new(), message).await } } @@ -97,14 +103,20 @@ pub async fn start_service< ResponseR, // output to the function for responses >( agent: Arc, - request_key_expr: String, - namespace_lookup: HashMap, + request_topic: &str, + request_metadata: BTreeMap<&'static str, String>, + namespace_lookup: HashMap, request_task: RequestF, response_task: ResponseF, ) -> Result<()> where RequestI: for<'serde> Deserialize<'serde> + Send + 'static, - RequestF: FnOnce(Arc, HashMap, EventMetadata, RequestI) -> RequestR + RequestF: FnOnce( + Arc, + HashMap, + (DateTime, HashMap), + RequestI, + ) -> RequestR + Clone + Send + 'static, @@ -115,71 +127,76 @@ where { agent .client - .log(&format!("Started `{request_key_expr}` service.")) + .log(&format!( + "Started `{request_topic}` service for {request_metadata:?}." + )) .await?; let (response_tx, mut response_rx) = mpsc::channel(100); let mut services = JoinSet::new(); services.spawn({ let inner_agent = Arc::clone(&agent); + let inner_request_topic = request_topic.to_owned(); async move { let tasks = TaskTracker::new(); let subscriber = inner_agent .client .session - .declare_subscriber(format!( - "group/{}/{}", - inner_agent.client.group, request_key_expr + .declare_subscriber(inner_agent.client.make_key_expr( + true, + &inner_request_topic, + request_metadata, )) .await .context(selector::AgentCommunicationFailure {})?; - while let Ok(sample) = subscriber.recv_async().await { - if let (Ok(input), Some(metadata)) = ( - serde_json::from_slice::(&sample.payload().to_bytes()), - RE_AGENT_KEY_EXPR.captures(sample.key_expr().as_str()), - ) { - let inner_response_tx = response_tx.clone(); - let event_metadata = EventMetadata { - group: metadata["group"].to_string(), - action: metadata["action"].to_string(), - model_type: metadata["model_type"].to_string(), - r#ref: metadata["ref"].to_string(), - host: metadata["host"].to_string(), - timestamp: DateTime::parse_from_rfc3339(&metadata["timestamp"])?.into(), - }; - tasks.spawn({ - let inner_request_task = request_task.clone(); - let inner_inner_agent = Arc::clone(&inner_agent); - let inner_namespace_lookup = namespace_lookup.clone(); - async move { - inner_request_task( - inner_inner_agent, - inner_namespace_lookup, - event_metadata, - input, - ) - .then(move |response| async move { - let _: Result<(), SendError>> = - inner_response_tx.send(response).await; - Ok::<_, OrcaError>(()) - }) - .await - } - }); - } + loop { + let sample = subscriber + .recv_async() + .await + .context(selector::AgentCommunicationFailure {})?; + let input = serde_json::from_slice::(&sample.payload().to_bytes())?; + let inner_response_tx = response_tx.clone(); + let mut event_metadata = extract_metadata(sample.key_expr().as_str()); + let timestamp = + event_metadata + .remove("timestamp") + .context(selector::MissingInfo { + details: "timestamp", + })?; + let event_timestamp = + DateTime::::from(DateTime::parse_from_rfc3339(×tamp)?); + tasks.spawn({ + let inner_request_task = request_task.clone(); + let inner_inner_agent = Arc::clone(&inner_agent); + let inner_namespace_lookup = namespace_lookup.clone(); + async move { + inner_request_task( + inner_inner_agent, + inner_namespace_lookup, + (event_timestamp, event_metadata), + input, + ) + .then(move |response| async move { + let _: Result<(), SendError>> = + inner_response_tx.send(response).await; + Ok::<_, OrcaError>(()) + }) + .await + } + }); } - Ok(()) } }); services.spawn(async move { - while let Some(response) = response_rx.recv().await { + loop { + let response = response_rx.recv().await.context(selector::MissingInfo { + details: "channel empty or closed", + })?; response_task(Arc::clone(&agent.client), response?).await?; } - Ok(()) }); - services - .join_next() - .await - .context(selector::NoRemainingServices {})?? + services.join_next().await.context(selector::MissingInfo { + details: "no available services", + })?? } diff --git a/src/core/orchestrator/docker.rs b/src/core/orchestrator/docker.rs index e38b7314..1569f946 100644 --- a/src/core/orchestrator/docker.rs +++ b/src/core/orchestrator/docker.rs @@ -79,8 +79,11 @@ impl LocalDockerOrchestrator { stream_info .path .join(blob.location.path.file_name().context( - selector::NoFileName { - path: blob.location.path.clone() + selector::MissingInfo { + details: format!( + "file or directory name where path = {}", + blob.location.path.to_string_lossy() + ), } )?) .to_string_lossy(), @@ -115,9 +118,12 @@ impl LocalDockerOrchestrator { )> { // Prepare configuration let (input_binds, output_bind) = Self::prepare_mount_binds(namespace_lookup, pod_job)?; - let container_name = Generator::with_naming(Name::Plain) - .next() - .context(selector::GeneratedNamesOverflow)?; + let container_name = + Generator::with_naming(Name::Plain) + .next() + .context(selector::MissingInfo { + details: "unable to generate a random name", + })?; let labels = HashMap::from([ ("org.orcapod".to_owned(), "true".to_owned()), ( @@ -164,6 +170,7 @@ impl LocalDockerOrchestrator { clippy::cast_precision_loss, clippy::cast_possible_truncation, clippy::indexing_slicing, + clippy::too_many_lines, reason = r#" - Timestamp and memory should always have a value > 0 - Container will always have a name with more than 1 character @@ -186,10 +193,13 @@ impl LocalDockerOrchestrator { .await? .iter() .map(|container_summary| async { - let container_name = &container_summary - .names - .as_ref() - .context(selector::NoContainerNames)?[0][1..]; + let container_name = + &container_summary + .names + .as_ref() + .context(selector::MissingInfo { + details: "container name(s)".to_owned(), + })?[0][1..]; Ok(( container_name.to_owned(), container_summary.clone(), diff --git a/src/core/store/filestore.rs b/src/core/store/filestore.rs index 2dcd4057..3fa3ef56 100644 --- a/src/core/store/filestore.rs +++ b/src/core/store/filestore.rs @@ -102,10 +102,11 @@ impl LocalFileStore { Self::make_annotation_relpath(name, version), ))? .next() - .context(selector::NoAnnotationFound { - class: parse_debug_name(model).to_snake_case(), - name: name.to_owned(), - version: version.to_owned(), + .context(selector::MissingInfo { + details: format!( + "annotation where class = {}, name = {name}, version = {version}", + parse_debug_name(model).to_snake_case() + ), })?; Ok(model_info.hash) } diff --git a/src/core/util.rs b/src/core/util.rs index 8721cb90..f41e4356 100644 --- a/src/core/util.rs +++ b/src/core/util.rs @@ -31,7 +31,7 @@ where Q: ?Sized + hash::Hash + Eq + fmt::Debug, K: Borrow + hash::Hash + Eq, { - Ok(map.get(key).context(selector::KeyMissing { - key: format!("{key:?}"), + Ok(map.get(key).context(selector::MissingInfo { + details: format!("key = {key:?}"), })?) } diff --git a/src/lib.rs b/src/lib.rs index 4f3a2871..99d625b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,10 @@ //! mind. extern crate uniffi as uniffi_external; uniffi_external::setup_scaffolding!(); + +#[cfg(all(feature = "default", feature = "test"))] +compile_error!(r#"Feature "default" and feature "test" cannot be enabled at the same time."#); + /// Pure Rust source. pub mod core; /// Exposed CFFI client based on [uniffi](https://crates.io/crates/uniffi). diff --git a/src/uniffi/error.rs b/src/uniffi/error.rs index e5d06ed2..f6e63a17 100644 --- a/src/uniffi/error.rs +++ b/src/uniffi/error.rs @@ -30,15 +30,6 @@ pub(crate) enum Kind { source: Box, backtrace: Option, }, - #[snafu(display( - "Received an empty response when attempting to load the alternate container image file: {path:?}." - ))] - EmptyResponseWhenLoadingContainerAltImage { - path: PathBuf, - backtrace: Option, - }, - #[snafu(display("Out of generated random names."))] - GeneratedNamesOverflow { backtrace: Option }, #[snafu(display("Incomplete {kind} packet. Missing `{missing_keys:?}` keys."))] IncompletePacket { kind: String, @@ -51,42 +42,9 @@ pub(crate) enum Kind { source: io::Error, backtrace: Option, }, - #[snafu(display( - "An invalid datetime was set for pod result for pod job (hash: {pod_job_hash})." - ))] - InvalidPodResultTerminatedDatetime { - pod_job_hash: String, - backtrace: Option, - }, - #[snafu(display("Key '{key}' was not found in map."))] - KeyMissing { - key: String, - backtrace: Option, - }, - #[snafu(display("No annotation found for `{name}:{version}` {class}."))] - NoAnnotationFound { - class: String, - name: String, - version: String, - backtrace: Option, - }, - #[snafu(display("No known container names."))] - NoContainerNames { backtrace: Option }, - #[snafu(display("Missing file or directory name ({path:?})."))] - NoFileName { - path: PathBuf, - backtrace: Option, - }, - #[snafu(display("No corresponding pod run found for pod job (hash: {pod_job_hash})."))] - NoMatchingPodRun { - pod_job_hash: String, - backtrace: Option, - }, - #[snafu(display("All services have completed."))] - NoRemainingServices { backtrace: Option }, - #[snafu(display("No tags found in provided container alternate image: {path:?}."))] - NoTagFoundInContainerAltImage { - path: PathBuf, + #[snafu(display("Missing info. Details: {details}."))] + MissingInfo { + details: String, backtrace: Option, }, #[snafu(transparent)] @@ -146,11 +104,11 @@ pub struct OrcaError { #[uniffi::export] impl OrcaError { /// Returns `true` if the error was caused by an invalid model annotation. - pub const fn is_invalid_annotation(&self) -> bool { - matches!(self.kind, Kind::NoAnnotationFound { .. }) + pub fn is_invalid_annotation(&self) -> bool { + matches!(&self.kind, Kind::MissingInfo { details, .. } if details.contains("annotation")) } /// Returns `true` if the error was caused by querying a purged pod run. - pub const fn is_purged_pod_run(&self) -> bool { - matches!(self.kind, Kind::NoMatchingPodRun { .. }) + pub fn is_purged_pod_run(&self) -> bool { + matches!(&self.kind, Kind::MissingInfo { details, .. } if details.contains("pod run")) } } diff --git a/src/uniffi/orchestrator/agent.rs b/src/uniffi/orchestrator/agent.rs index f07a811d..c2d8d00e 100644 --- a/src/uniffi/orchestrator/agent.rs +++ b/src/uniffi/orchestrator/agent.rs @@ -14,7 +14,11 @@ use futures_util::future::join_all; use getset::CloneGetters; use serde_json::Value; use snafu::{OptionExt as _, ResultExt as _}; -use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, + sync::Arc, +}; use tokio::task::JoinSet; use uniffi; use zenoh; @@ -76,7 +80,14 @@ impl AgentClient { pub async fn start_pod_jobs(&self, pod_jobs: Vec>) -> Vec { join_all(pod_jobs.iter().map(|pod_job| async { match self - .publish(&format!("request/pod_job/{}", pod_job.hash), pod_job) + .publish( + "pod_job", + BTreeMap::from([ + ("action", "request".to_owned()), + ("hash", pod_job.hash.clone()), + ]), + pod_job, + ) .await { Ok(()) => Response::Ok, @@ -97,11 +108,14 @@ impl AgentClient { .declare_subscriber(&key_expr) .await .context(selector::AgentCommunicationFailure {})?; - while let Ok(sample) = subscriber.recv_async().await { + loop { + let sample = subscriber + .recv_async() + .await + .context(selector::AgentCommunicationFailure {})?; let value = serde_json::from_slice::(&sample.payload().to_bytes())?; println!("{}: {value:#}", sample.key_expr().as_str().yellow()); } - Ok(()) } } @@ -150,7 +164,8 @@ impl Agent { let mut services = JoinSet::new(); services.spawn(start_service( Arc::new(self.clone()), - "request/pod_job/**".to_owned(), + "pod_job", + BTreeMap::from([("action", "request".to_owned())]), namespace_lookup.clone(), async |agent, inner_namespace_lookup, _, pod_job| { let pod_run = agent @@ -167,15 +182,20 @@ impl Agent { async |client, pod_result| { client .publish( - &format!( - "{}/pod_job/{}", - match &pod_result.status { - PodStatus::Completed => "success", - PodStatus::Running | PodStatus::Failed(_) | PodStatus::Unset => - "failure", - }, - pod_result.pod_job.hash - ), + "pod_job", + BTreeMap::from([ + ( + "action", + match &pod_result.status { + PodStatus::Completed => "success", + PodStatus::Running + | PodStatus::Failed(_) + | PodStatus::Unset => "failure", + } + .to_owned(), + ), + ("hash", pod_result.pod_job.hash.clone()), + ]), &pod_result, ) .await @@ -184,7 +204,8 @@ impl Agent { if let Some(store) = available_store { services.spawn(start_service( Arc::new(self.clone()), - "success/pod_job/**".to_owned(), + "pod_job", + BTreeMap::from([("action", "success".to_owned())]), namespace_lookup.clone(), { let inner_store = Arc::clone(&store); @@ -197,7 +218,8 @@ impl Agent { )); services.spawn(start_service( Arc::new(self.clone()), - "failure/pod_job/**".to_owned(), + "pod_job", + BTreeMap::from([("action", "failure".to_owned())]), namespace_lookup.clone(), async move |_, _, _, pod_result| { store.save_pod_result(&pod_result)?; @@ -206,9 +228,8 @@ impl Agent { async |_, ()| Ok(()), )); } - services - .join_next() - .await - .context(selector::NoRemainingServices {})?? + services.join_next().await.context(selector::MissingInfo { + details: "no available services".to_owned(), + })?? } } diff --git a/src/uniffi/orchestrator/docker.rs b/src/uniffi/orchestrator/docker.rs index 6ed00d78..a08137b3 100644 --- a/src/uniffi/orchestrator/docker.rs +++ b/src/uniffi/orchestrator/docker.rs @@ -109,14 +109,15 @@ impl Orchestrator for LocalDockerOrchestrator { let mut local_image = String::new(); while let Some(response) = stream.next().await { local_image = RE_IMAGE_TAG - .captures_iter(&response?.stream.context( - selector::EmptyResponseWhenLoadingContainerAltImage { - path: location.clone(), - }, - )?) + .captures_iter(&response?.stream.context(selector::MissingInfo { + details: location.to_string_lossy(), + })?) .find_map(|x| x.name("image").map(|name| name.as_str().to_owned())) - .context(selector::NoTagFoundInContainerAltImage { - path: location.clone(), + .context(selector::MissingInfo { + details: format!( + "container tags in provided container alternate image where path = {}", + location.to_string_lossy() + ), })?; } Self::prepare_container_start_inputs( @@ -192,8 +193,8 @@ impl Orchestrator for LocalDockerOrchestrator { .list_containers(HashMap::from([("label".to_owned(), labels)])) .await? .next() - .context(selector::NoMatchingPodRun { - pod_job_hash: pod_run.pod_job.hash.clone(), + .context(selector::MissingInfo { + details: format!("pod run where pod_job.hash = {}", pod_run.pod_job.hash), })?; Ok(run_info) } @@ -233,11 +234,12 @@ impl Orchestrator for LocalDockerOrchestrator { pod_run.assigned_name.clone(), result_info.status, result_info.created, - result_info - .terminated - .context(selector::InvalidPodResultTerminatedDatetime { - pod_job_hash: pod_run.pod_job.hash.clone(), - })?, + result_info.terminated.context(selector::MissingInfo { + details: format!( + "terminated where pod_run.assigned_name = {}, pod_run.pod_job.hash = {}", + pod_run.assigned_name, pod_run.pod_job.hash + ), + })?, namespace_lookup, ) } diff --git a/tests/agent.rs b/tests/agent.rs index 7ef774cb..750c94b9 100644 --- a/tests/agent.rs +++ b/tests/agent.rs @@ -9,14 +9,17 @@ pub mod fixture; use fixture::{NAMESPACE_LOOKUP_READ_ONLY, TestDirs, pod_jobs_stresser, pull_image}; -use orcapod::uniffi::{ - error::Result, - model::pod::PodResult, - orchestrator::{ - agent::{Agent, AgentClient}, - docker::LocalDockerOrchestrator, +use orcapod::{ + core::orchestrator::agent::extract_metadata, + uniffi::{ + error::Result, + model::pod::PodResult, + orchestrator::{ + agent::{Agent, AgentClient}, + docker::LocalDockerOrchestrator, + }, + store::{ModelID, Store as _, filestore::LocalFileStore}, }, - store::{ModelID, Store as _, filestore::LocalFileStore}, }; use std::{ collections::HashMap, @@ -47,13 +50,8 @@ async fn parallel_four_cores() -> Result<()> { // config let image_reference = "ghcr.io/colinianking/stress-ng:e2f96874f951a72c1c83ff49098661f0e013ac40"; pull_image(image_reference)?; - let margin_millis = 2000; + let margin_millis = 1000; let run_duration_secs = 5; - let current_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Current time is earlier than start of epoch (1970-01-01 00:00:00).") - .as_millis(); - println!("current_timestamp: {current_timestamp}"); let (group, host) = ("agent_parallel-four-cores", "host"); // api let client = AgentClient::new(group.to_owned(), host.to_owned())?; @@ -64,10 +62,15 @@ async fn parallel_four_cores() -> Result<()> { )?; // background services let mut services = JoinSet::new(); + services.spawn(async { + async_sleep(Duration::from_secs(60)).await; + panic!("Test took too long. Killing..."); + }); services.spawn({ let inner_client = client.clone(); async move { inner_client.watch("**".to_owned()).await } }); + async_sleep(Duration::from_secs(5)).await; // ensure watch is ready services.spawn({ let inner_agent = agent.clone(); let inner_store = store.clone(); @@ -77,18 +80,29 @@ async fn parallel_four_cores() -> Result<()> { .await } }); + async_sleep(Duration::from_secs(5)).await; // ensure services are ready services.spawn(async move { + let current_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Current time is earlier than start of epoch (1970-01-01 00:00:00).") + .as_millis(); + println!("current_timestamp: {current_timestamp}"); let session = zenoh::open(zenoh::Config::default()) .await .expect("Unable to create a zenoh session."); let subscriber = session - .declare_subscriber(&format!("group/{group}/*/pod_job/**")) + .declare_subscriber(&format!("**/group/{group}/**/topic/pod_job/**")) .await .expect("Unable to create subscriber."); let mut success_counter = 0; let mut failure_counter = 0; - while let Ok(sample) = subscriber.recv_async().await { - let topic_kind = sample.key_expr().as_str().split('/').collect::>()[2]; + loop { + let sample = subscriber + .recv_async() + .await + .expect("All senders have dropped."); + let metadata = extract_metadata(sample.key_expr().as_str()); + let topic_kind = metadata["action"].as_str(); if ["success", "failure"].contains(&topic_kind) { let pod_result = serde_json::from_slice::(&sample.payload().to_bytes())?; assert!( @@ -120,10 +134,6 @@ async fn parallel_four_cores() -> Result<()> { } Ok(()) }); - services.spawn(async { - async_sleep(Duration::from_secs(60)).await; - panic!("Test took too long. Killing..."); - }); // submit requests client .start_pod_jobs(pod_jobs_stresser(image_reference, run_duration_secs, 3, 1)?) diff --git a/tests/extra/python/agent_test.py b/tests/extra/python/agent_test.py index 18c48f48..7e35ea79 100644 --- a/tests/extra/python/agent_test.py +++ b/tests/extra/python/agent_test.py @@ -29,7 +29,7 @@ def count(sample): with zenoh.open(zenoh.Config()) as session: with session.declare_subscriber( - f"group/{group}/success/pod_job/**", count + f"**/action/success/**/group/{group}/**/topic/pod_job/**", count ) as subscriber: await asyncio.sleep(20) # wait for results