Skip to content

Commit e51c11d

Browse files
Clean up event metadata, remove event classifier, make agent regex more flexible, remove print to console from agent client log, and simplify store regex.
1 parent c2af9e1 commit e51c11d

File tree

7 files changed

+65
-63
lines changed

7 files changed

+65
-63
lines changed

src/core/error.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ impl From<BollardError> for OrcaError {
2222
}
2323
}
2424
}
25+
impl From<chrono::ParseError> for OrcaError {
26+
fn from(error: chrono::ParseError) -> Self {
27+
Self {
28+
kind: Kind::ChronoParseError {
29+
source: error.into(),
30+
backtrace: Some(Backtrace::capture()),
31+
},
32+
}
33+
}
34+
}
2535
impl From<PestError> for OrcaError {
2636
fn from(error: PestError) -> Self {
2737
Self {
@@ -125,6 +135,7 @@ impl fmt::Debug for OrcaError {
125135
| Kind::NoRemainingServices { backtrace, .. }
126136
| Kind::NoTagFoundInContainerAltImage { backtrace, .. }
127137
| Kind::BollardError { backtrace, .. }
138+
| Kind::ChronoParseError { backtrace, .. }
128139
| Kind::DOTError { backtrace, .. }
129140
| Kind::GlobPatternError { backtrace, .. }
130141
| Kind::IoError { backtrace, .. }

src/core/orchestrator/agent.rs

Lines changed: 32 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use crate::uniffi::{
22
error::{OrcaError, Result, selector},
3-
model::pod::{PodJob, PodResult},
43
orchestrator::agent::{Agent, AgentClient},
5-
store::ModelID,
64
};
7-
use chrono::Utc;
5+
use chrono::{DateTime, Utc};
86
use futures_util::future::FutureExt as _;
97
use regex::Regex;
108
use serde::{Deserialize, Serialize};
@@ -21,15 +19,20 @@ use tokio::{
2119
use tokio_util::task::TaskTracker;
2220

2321
#[expect(clippy::expect_used, reason = "Valid static regex")]
24-
static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
22+
static RE_AGENT_KEY_EXPR: LazyLock<Regex> = LazyLock::new(|| {
2523
Regex::new(
2624
r"(?x)
2725
^
28-
group\/(?<group>[a-z_\-]+)\/
29-
(?<action>request|reservation|success|failure)\/
30-
pod_job\/(?<pod_job_hash>[0-9a-f]+)\/
31-
host\/(?<host>[a-z_]+)\/
32-
timestamp\/(?<timestamp>.*?)
26+
group/
27+
(?<group>[a-z_\-]+)/
28+
(?<action>request|success|failure)/
29+
(?<model_type>[a-z_]+)/
30+
(?<ref>[0-9a-f]+)/
31+
.*?
32+
host/
33+
(?<host>[a-z_]+)/
34+
timestamp/
35+
(?<timestamp>.*?)
3336
$
3437
",
3538
)
@@ -40,33 +43,14 @@ static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
4043
dead_code,
4144
reason = "Need to be able to initialize to pass metadata as input."
4245
)]
43-
#[derive(Debug, Clone)]
46+
#[derive(Debug)]
4447
pub struct EventMetadata {
45-
group: String,
46-
host: String,
47-
subgroup: String,
48-
}
49-
50-
#[expect(
51-
dead_code,
52-
reason = "Need to be able to initialize to pass metadata as input."
53-
)]
54-
#[derive(Debug, Clone)]
55-
pub enum EventPayload {
56-
Request(PodJob),
57-
Reservation(ModelID),
58-
Success(PodResult),
59-
Failure(PodResult),
60-
}
61-
62-
#[expect(
63-
dead_code,
64-
reason = "Need to be able to initialize to pass metadata as input."
65-
)]
66-
#[derive(Debug, Clone)]
67-
pub struct Event {
68-
metadata: EventMetadata,
69-
payload: EventPayload,
48+
pub group: String,
49+
pub action: String,
50+
pub model_type: String,
51+
pub r#ref: String,
52+
pub host: String,
53+
pub timestamp: DateTime<Utc>,
7054
}
7155

7256
impl AgentClient {
@@ -95,7 +79,6 @@ impl AgentClient {
9579
///
9680
/// Will fail if there is an issue sending the message.
9781
pub(crate) async fn log(&self, message: &str) -> Result<()> {
98-
println!("{message}");
9982
self.publish("log", message).await
10083
}
10184
}
@@ -106,23 +89,20 @@ impl AgentClient {
10689
reason = "`result::Result<(), SendError<_>>` is the only uncaptured result since it would mean we can't transmit results over mpsc."
10790
)]
10891
pub async fn start_service<
109-
EventClassifierF, // function to classify the event payload e.g. EventPayload::{Request | Reservation | ..}
110-
RequestF, // function to run on requests
111-
RequestI, // input to the function for requests
112-
RequestR, // output to the function for requests
113-
ResponseF, // function to run on completing a request i.e. response
114-
ResponseI, // input to the function for responses
115-
ResponseR, // output to the function for responses
92+
RequestF, // function to run on requests
93+
RequestI, // input to the function for requests
94+
RequestR, // output to the function for requests
95+
ResponseF, // function to run on completing a request i.e. response
96+
ResponseI, // input to the function for responses
97+
ResponseR, // output to the function for responses
11698
>(
11799
agent: Arc<Agent>,
118100
request_key_expr: String,
119101
namespace_lookup: HashMap<String, PathBuf>,
120-
event_classifier: EventClassifierF,
121102
request_task: RequestF,
122103
response_task: ResponseF,
123104
) -> Result<()>
124105
where
125-
EventClassifierF: Fn(&RequestI) -> EventPayload + Send + 'static,
126106
RequestI: for<'serde> Deserialize<'serde> + Send + 'static,
127107
RequestF: FnOnce(Arc<Agent>, HashMap<String, PathBuf>, EventMetadata, RequestI) -> RequestR
128108
+ Clone
@@ -156,15 +136,17 @@ where
156136
while let Ok(sample) = subscriber.recv_async().await {
157137
if let (Ok(input), Some(metadata)) = (
158138
serde_json::from_slice::<RequestI>(&sample.payload().to_bytes()),
159-
RE_PODJOB_ACTION.captures(sample.key_expr().as_str()),
139+
RE_AGENT_KEY_EXPR.captures(sample.key_expr().as_str()),
160140
) {
161141
let inner_response_tx = response_tx.clone();
162142
let event_metadata = EventMetadata {
163143
group: metadata["group"].to_string(),
144+
action: metadata["action"].to_string(),
145+
model_type: metadata["model_type"].to_string(),
146+
r#ref: metadata["ref"].to_string(),
164147
host: metadata["host"].to_string(),
165-
subgroup: metadata["pod_job_hash"].to_string(),
148+
timestamp: DateTime::parse_from_rfc3339(&metadata["timestamp"])?.into(),
166149
};
167-
let _event_payload = event_classifier(&input);
168150
tasks.spawn({
169151
let inner_request_task = request_task.clone();
170152
let inner_inner_agent = Arc::clone(&inner_agent);
@@ -190,8 +172,8 @@ where
190172
}
191173
});
192174
services.spawn(async move {
193-
while let Some(content) = response_rx.recv().await {
194-
response_task(Arc::clone(&agent.client), content?).await?;
175+
while let Some(response) = response_rx.recv().await {
176+
response_task(Arc::clone(&agent.client), response?).await?;
195177
}
196178
Ok(())
197179
});

src/core/store/filestore.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ static RE_MODEL_METADATA: LazyLock<Regex> = LazyLock::new(|| {
2828
Regex::new(
2929
r"(?x)
3030
^
31-
(?<store_directory>.*)\/
32-
(?<namespace>[a-z_]+)\/
33-
(?<class>[a-z_]+)\/
34-
(?<hash>[0-9a-f]+)\/
31+
(?<store_directory>.*?)/
32+
(?<namespace>[a-z_]+)/
33+
(?<class>[a-z_]+)/
34+
(?<hash>[0-9a-f]+)/
3535
(
36-
annotation\/
36+
annotation/
3737
(?<name>[0-9a-zA-Z\-]+)
3838
-
3939
(?<version>[0-9]+\.[0-9]+\.[0-9]+)

src/uniffi/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ pub(crate) enum Kind {
9595
backtrace: Option<Backtrace>,
9696
},
9797
#[snafu(transparent)]
98+
ChronoParseError {
99+
source: Box<chrono::ParseError>,
100+
backtrace: Option<Backtrace>,
101+
},
102+
#[snafu(transparent)]
98103
DOTError {
99104
source: Box<PestError>,
100105
backtrace: Option<Backtrace>,

src/uniffi/orchestrator/agent.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::{
2-
core::orchestrator::agent::{EventPayload, start_service},
2+
core::orchestrator::agent::start_service,
33
uniffi::{
44
error::{OrcaError, Result, selector},
5-
model::pod::{PodJob, PodResult},
5+
model::pod::PodJob,
66
orchestrator::{Orchestrator, PodStatus, docker::LocalDockerOrchestrator},
77
store::{Store as _, filestore::LocalFileStore},
88
},
@@ -152,7 +152,6 @@ impl Agent {
152152
Arc::new(self.clone()),
153153
"request/pod_job/**".to_owned(),
154154
namespace_lookup.clone(),
155-
|pod_job: &PodJob| EventPayload::Request(pod_job.clone()),
156155
async |agent, inner_namespace_lookup, _, pod_job| {
157156
let pod_run = agent
158157
.orchestrator
@@ -187,7 +186,6 @@ impl Agent {
187186
Arc::new(self.clone()),
188187
"success/pod_job/**".to_owned(),
189188
namespace_lookup.clone(),
190-
|pod_result: &PodResult| EventPayload::Success(pod_result.clone()),
191189
{
192190
let inner_store = Arc::clone(&store);
193191
async move |_, _, _, pod_result| {
@@ -201,7 +199,6 @@ impl Agent {
201199
Arc::new(self.clone()),
202200
"failure/pod_job/**".to_owned(),
203201
namespace_lookup.clone(),
204-
|pod_result: &PodResult| EventPayload::Failure(pod_result.clone()),
205202
async move |_, _, _, pod_result| {
206203
store.save_pod_result(&pod_result)?;
207204
Ok(())

tests/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
)]
77

88
pub mod fixture;
9+
use chrono::DateTime;
910
use dot_parser::ast::Graph as DOTGraph;
1011
use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pod_custom, pod_job_custom, pod_job_style, str_to_vec};
1112
use glob::glob;
@@ -45,6 +46,14 @@ fn external_bollard() -> Result<()> {
4546
Ok(())
4647
}
4748

49+
#[test]
50+
fn external_chrono() {
51+
assert!(
52+
DateTime::parse_from_rfc3339("Whoops").is_err_and(contains_debug),
53+
"Did not raise a chrono error."
54+
);
55+
}
56+
4857
#[test]
4958
fn external_dot() {
5059
assert!(

tests/pipeline.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
)]
88

99
pub mod fixture;
10-
use fixture::pod_custom;
10+
use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pod_custom};
1111
use indoc::indoc;
1212
use orcapod::uniffi::{
1313
error::Result,
@@ -18,8 +18,6 @@ use orcapod::uniffi::{
1818
};
1919
use std::collections::HashMap;
2020

21-
use crate::fixture::NAMESPACE_LOOKUP_READ_ONLY;
22-
2321
#[test]
2422
fn input_packet_checksum() -> Result<()> {
2523
let pipeline = Pipeline::new(

0 commit comments

Comments
 (0)