Skip to content

Commit 5abc39e

Browse files
authored
Merge pull request #94 from guzman-raphael/agent_store_pod_result
`Agent`: Add configurable storage of `PodResult`
2 parents 3b56c38 + 35b8673 commit 5abc39e

File tree

11 files changed

+342
-112
lines changed

11 files changed

+342
-112
lines changed

cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"cffi",
1919
"zenoh",
2020
"PodJob",
21+
"stresser"
2122
],
2223
"ignoreWords": [
2324
"relpath",

src/core/orchestrator/agent.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,9 @@ pub async fn start_service<
123123
) -> Result<()>
124124
where
125125
EventClassifierF: Fn(&RequestI) -> EventPayload + Send + 'static,
126-
RequestI: for<'serde> Deserialize<'serde>,
127-
RequestF: Fn(Arc<Agent>, HashMap<String, PathBuf>, EventMetadata, RequestI) -> RequestR
126+
RequestI: for<'serde> Deserialize<'serde> + Send + 'static,
127+
RequestF: FnOnce(Arc<Agent>, HashMap<String, PathBuf>, EventMetadata, RequestI) -> RequestR
128+
+ Clone
128129
+ Send
129130
+ 'static,
130131
RequestR: Future<Output = Result<ResponseI>> + Send + 'static,
@@ -164,19 +165,25 @@ where
164165
subgroup: metadata["pod_job_hash"].to_string(),
165166
};
166167
let _event_payload = event_classifier(&input);
167-
tasks.spawn(
168-
request_task(
169-
Arc::clone(&inner_agent),
170-
namespace_lookup.clone(),
171-
event_metadata,
172-
input,
173-
)
174-
.then(move |response| async move {
175-
let _: Result<(), SendError<Result<ResponseI>>> =
176-
inner_response_tx.send(response).await;
177-
Ok::<_, OrcaError>(())
178-
}),
179-
);
168+
tasks.spawn({
169+
let inner_request_task = request_task.clone();
170+
let inner_inner_agent = Arc::clone(&inner_agent);
171+
let inner_namespace_lookup = namespace_lookup.clone();
172+
async move {
173+
inner_request_task(
174+
inner_inner_agent,
175+
inner_namespace_lookup,
176+
event_metadata,
177+
input,
178+
)
179+
.then(move |response| async move {
180+
let _: Result<(), SendError<Result<ResponseI>>> =
181+
inner_response_tx.send(response).await;
182+
Ok::<_, OrcaError>(())
183+
})
184+
.await
185+
}
186+
});
180187
}
181188
}
182189
Ok(())

src/core/orchestrator/docker.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ impl LocalDockerOrchestrator {
170170
clippy::cast_precision_loss,
171171
clippy::cast_possible_truncation,
172172
clippy::indexing_slicing,
173+
clippy::too_many_lines,
173174
reason = r#"
174175
- Timestamp and memory should always have a value > 0
175176
- Container will always have a name with more than 1 character
@@ -210,13 +211,13 @@ impl LocalDockerOrchestrator {
210211
let terminated_timestamp =
211212
DateTime::parse_from_rfc3339(container_spec.state.as_ref()?.finished_at.as_ref()?)
212213
.ok()?
213-
.timestamp() as u64;
214+
.timestamp();
214215
Some((
215216
container_name,
216217
RunInfo {
217218
image: container_spec.config.as_ref()?.image.as_ref()?.clone(),
218219
created: container_summary.created? as u64,
219-
terminated: (terminated_timestamp > 0).then_some(terminated_timestamp),
220+
terminated: (terminated_timestamp > 0).then_some(terminated_timestamp as u64),
220221
env_vars: container_spec
221222
.config
222223
.as_ref()?
@@ -243,9 +244,20 @@ impl LocalDockerOrchestrator {
243244
container_spec.state.as_ref()?.exit_code? as i16,
244245
) {
245246
(ContainerStateStatusEnum::RUNNING, _) => Status::Running,
246-
(ContainerStateStatusEnum::EXITED, 0) => Status::Completed,
247-
(ContainerStateStatusEnum::EXITED, code) => Status::Failed(code),
248-
_ => todo!(),
247+
(
248+
ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::REMOVING,
249+
0,
250+
) => Status::Completed,
251+
(
252+
ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::REMOVING,
253+
code,
254+
) => Status::Failed(code),
255+
(_, code) => {
256+
todo!(
257+
"Unhandled container state: {}, exit code: {code}.",
258+
container_spec.state.as_ref()?.status.as_ref()?
259+
)
260+
}
249261
},
250262
mounts: container_spec
251263
.mounts

src/uniffi/orchestrator/agent.rs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use crate::{
22
core::orchestrator::agent::{EventPayload, start_service},
33
uniffi::{
44
error::{OrcaError, Result, selector},
5-
model::PodJob,
5+
model::{PodJob, PodResult},
66
orchestrator::{Orchestrator, Status, docker::LocalDockerOrchestrator},
7+
store::{Store as _, filestore::LocalFileStore},
78
},
89
};
910
use derive_more::Display;
@@ -17,6 +18,19 @@ use tokio::task::JoinSet;
1718
use uniffi;
1819
use zenoh;
1920

21+
/// A response, similar to Rust's `Result` but casting error to `String`.
22+
///
23+
/// This is a workaround due to `UniFFI` limitations when trying to send a collection of `Result`
24+
/// over the CFFI boundary e.g. concurrent calls where it is necessary to know the status of each
25+
/// request to determine what to retry.
26+
#[derive(uniffi::Enum)]
27+
pub enum Response {
28+
/// Success
29+
Ok,
30+
/// Error cast to `String`
31+
Err(String),
32+
}
33+
2034
/// Client to connect to an execution agent within a coordinated fleet. Connection optimized/rerouted by Zenoh.
2135
#[expect(
2236
clippy::field_scoped_visibility_modifiers,
@@ -58,14 +72,14 @@ impl AgentClient {
5872
}
5973
/// Submit many pod jobs to be processed in parallel.
6074
/// Return order will match inputs, casting outputs to `String` (since `uniffi` doesn't support sending unwrapped `Result`s).
61-
pub async fn submit_pod_jobs(&self, pod_jobs: Vec<Arc<PodJob>>) -> Vec<String> {
75+
pub async fn submit_pod_jobs(&self, pod_jobs: Vec<Arc<PodJob>>) -> Vec<Response> {
6276
join_all(pod_jobs.iter().map(|pod_job| async {
6377
match self
6478
.publish(&format!("request/pod_job/{}", pod_job.hash), pod_job)
6579
.await
6680
{
67-
Ok(()) => "ok".into(),
68-
Err(error) => error.to_string(),
81+
Ok(()) => Response::Ok,
82+
Err(error) => Response::Err(error.to_string()),
6983
}
7084
}))
7185
.await
@@ -126,13 +140,18 @@ impl Agent {
126140
/// # Errors
127141
///
128142
/// Will stop and return an error if encounters an error while processing any pod job request.
129-
pub async fn start(&self, namespace_lookup: &HashMap<String, PathBuf>) -> Result<()> {
143+
#[expect(clippy::excessive_nesting, reason = "Nesting manageable.")]
144+
pub async fn start(
145+
&self,
146+
namespace_lookup: &HashMap<String, PathBuf>,
147+
available_store: Option<Arc<LocalFileStore>>,
148+
) -> Result<()> {
130149
let mut services = JoinSet::new();
131150
services.spawn(start_service(
132151
Arc::new(self.clone()),
133152
"request/pod_job/**".to_owned(),
134153
namespace_lookup.clone(),
135-
|input: &PodJob| EventPayload::Request(input.clone()),
154+
|pod_job: &PodJob| EventPayload::Request(pod_job.clone()),
136155
async |agent, inner_namespace_lookup, _, pod_job| {
137156
let pod_run = agent
138157
.orchestrator
@@ -152,6 +171,33 @@ impl Agent {
152171
client.publish(response_topic, &pod_result).await
153172
},
154173
));
174+
if let Some(store) = available_store {
175+
services.spawn(start_service(
176+
Arc::new(self.clone()),
177+
"success/pod_job/**".to_owned(),
178+
namespace_lookup.clone(),
179+
|pod_result: &PodResult| EventPayload::Success(pod_result.clone()),
180+
{
181+
let inner_store = Arc::clone(&store);
182+
async move |_, _, _, pod_result| {
183+
inner_store.save_pod_result(&pod_result)?;
184+
Ok(())
185+
}
186+
},
187+
async |_, ()| Ok(()),
188+
));
189+
services.spawn(start_service(
190+
Arc::new(self.clone()),
191+
"failure/pod_job/**".to_owned(),
192+
namespace_lookup.clone(),
193+
|pod_result: &PodResult| EventPayload::Failure(pod_result.clone()),
194+
async move |_, _, _, pod_result| {
195+
store.save_pod_result(&pod_result)?;
196+
Ok(())
197+
},
198+
async |_, ()| Ok(()),
199+
));
200+
}
155201
services
156202
.join_next()
157203
.await

src/uniffi/orchestrator/docker.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,21 @@ use crate::{
66
uniffi::{
77
error::{OrcaError, Result, selector},
88
model::{PodJob, PodResult},
9-
orchestrator::{ImageKind, Orchestrator, PodRun, RunInfo},
9+
orchestrator::{ImageKind, Orchestrator, PodRun, RunInfo, Status},
1010
},
1111
};
1212
use async_trait;
1313
use bollard::{
1414
Docker,
1515
container::{RemoveContainerOptions, StartContainerOptions, WaitContainerOptions},
16+
errors::Error::DockerContainerWaitError,
1617
image::{CreateImageOptions, ImportImageOptions},
1718
};
1819
use derive_more::Display;
1920
use futures_util::stream::{StreamExt as _, TryStreamExt as _};
2021
use snafu::{OptionExt as _, futures::TryFutureExt as _};
21-
use std::{collections::HashMap, path::PathBuf, sync::Arc};
22-
use tokio::fs::File;
22+
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
23+
use tokio::{fs::File, time::sleep as async_sleep};
2324
use tokio_util::{
2425
bytes::{Bytes, BytesMut},
2526
codec::{BytesCodec, FramedRead},
@@ -192,12 +193,32 @@ impl Orchestrator for LocalDockerOrchestrator {
192193
})?;
193194
Ok(run_info)
194195
}
196+
#[expect(
197+
clippy::wildcard_enum_match_arm,
198+
reason = "Favor readability due to complexity in external dependency."
199+
)]
195200
async fn get_result(&self, pod_run: &PodRun) -> Result<PodResult> {
196-
self.api
201+
match self
202+
.api
197203
.wait_container(&pod_run.assigned_name, None::<WaitContainerOptions<String>>)
198204
.try_collect::<Vec<_>>()
199-
.await?;
200-
let result_info = self.get_info(pod_run).await?;
205+
.await
206+
{
207+
Ok(_) => (),
208+
Err(err) => match err {
209+
DockerContainerWaitError { .. } => (),
210+
_ => return Err(OrcaError::from(err)),
211+
},
212+
}
213+
214+
let mut result_info: RunInfo;
215+
while {
216+
result_info = self.get_info(pod_run).await?;
217+
matches!(&result_info.status, Status::Running)
218+
} {
219+
async_sleep(Duration::from_millis(100)).await;
220+
}
221+
201222
PodResult::new(
202223
None,
203224
Arc::clone(&pod_run.pod_job),

src/uniffi/store/filestore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use getset::CloneGetters;
88
use std::{fs, path::PathBuf};
99
use uniffi;
1010
/// Support for a storage backend on a local filesystem directory.
11-
#[derive(uniffi::Object, Debug, Display, CloneGetters)]
11+
#[derive(uniffi::Object, Debug, Display, CloneGetters, Clone)]
1212
#[getset(get_clone, impl_attrs = "#[uniffi::export]")]
1313
#[display("{self:#?}")]
1414
#[uniffi::export(Display)]

0 commit comments

Comments
 (0)