Skip to content

Commit c25efdc

Browse files
committed
Fix merging error and update to new design + change order of args
1 parent 52e0dd3 commit c25efdc

File tree

10 files changed

+131
-145
lines changed

10 files changed

+131
-145
lines changed

src/core/operator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl Operator for JoinOperator {
4040
.filter_map(|(parent_stream, parent_packets)| {
4141
(parent_stream != &stream_name).then_some(parent_packets.clone())
4242
})
43-
.chain(vec![vec![packet.clone()]].into_iter())
43+
.chain(vec![vec![packet.clone()]])
4444
.collect::<Vec<_>>();
4545
drop(received_packets);
4646

src/core/orchestrator/docker.rs

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ impl LocalDockerOrchestrator {
168168
#[expect(
169169
clippy::string_slice,
170170
clippy::indexing_slicing,
171-
clippy::too_many_lines,
172171
reason = r#"
173172
- Timestamp and memory should always have a value > 0
174173
- Container will always have a name with more than 1 character
@@ -231,7 +230,7 @@ impl LocalDockerOrchestrator {
231230
fn extract_run_info(
232231
container_summary: &ContainerSummary,
233232
container_inspect_response: &ContainerInspectResponse,
234-
) -> Option<RunInfo> {
233+
) -> Option<PodRunInfo> {
235234
let terminated_timestamp = DateTime::parse_from_rfc3339(
236235
container_inspect_response
237236
.state
@@ -241,7 +240,7 @@ impl LocalDockerOrchestrator {
241240
)
242241
.ok()?
243242
.timestamp() as u64;
244-
Some(RunInfo {
243+
Some(PodRunInfo {
245244
image: container_inspect_response
246245
.config
247246
.as_ref()?
@@ -261,47 +260,40 @@ impl LocalDockerOrchestrator {
261260
.map(|(key, value)| (key.to_owned(), value.to_owned()))
262261
})
263262
.collect(),
264-
command: format!(
265-
"{} {}",
263+
command: [
266264
container_inspect_response
267265
.config
268266
.as_ref()?
269267
.entrypoint
270268
.as_ref()?
271-
.join(" "),
269+
.clone(),
272270
container_inspect_response
273271
.config
274272
.as_ref()?
275273
.cmd
276274
.as_ref()?
277-
.join(" ")
278-
),
275+
.clone(),
276+
]
277+
.concat(),
279278
status: match (
280279
container_inspect_response.state.as_ref()?.status?,
281280
container_inspect_response.state.as_ref()?.exit_code? as i16,
282281
) {
283-
(ContainerStateStatusEnum::RUNNING, _) => Status::Running,
284-
(ContainerStateStatusEnum::EXITED, 0) => Status::Completed,
282+
(ContainerStateStatusEnum::RUNNING | ContainerStateStatusEnum::RESTARTING, _) => {
283+
PodStatus::Running
284+
}
285+
(ContainerStateStatusEnum::EXITED, 0) => PodStatus::Completed,
285286
(ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::DEAD, code) => {
286-
Status::Failed(code)
287+
PodStatus::Failed(code)
287288
}
288-
(
289-
ContainerStateStatusEnum::CREATED | ContainerStateStatusEnum::RESTARTING,
290-
code,
291-
) => {
292-
if container_inspect_response
293-
.state
294-
.as_ref()?
295-
.error
296-
.as_ref()?
297-
.is_empty()
298-
{
299-
Status::Starting
289+
(ContainerStateStatusEnum::CREATED, code) => {
290+
if container_inspect_response.state.as_ref()?.error.is_some() {
291+
PodStatus::Failed(code)
300292
} else {
301-
Status::Failed(code)
293+
PodStatus::Running
302294
}
303295
}
304-
_ => Status::Undefined,
296+
_ => PodStatus::Undefined,
305297
},
306298
mounts: container_inspect_response
307299
.mounts

src/uniffi/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ impl OrcaError {
116116
matches!(&self.kind, Kind::MissingInfo { details, .. } if details.contains("annotation"))
117117
}
118118
/// Returns `true` if the error was caused by querying a purged pod run.
119-
pub const fn is_purged_pod_run(&self) -> bool {
120-
matches!(self.kind, Kind::NoMatchingPodRun { .. })
119+
pub fn is_purged_pod_run(&self) -> bool {
120+
matches!(&self.kind, Kind::MissingInfo { details, .. } if details.contains("pod run"))
121121
}
122122
/// Returns `true` if the error was caused by an invalid file or directory path.
123123
pub const fn is_failed_to_start_pod(&self) -> bool {

src/uniffi/orchestrator/agent.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,11 @@ impl Agent {
170170
async |agent, inner_namespace_lookup, _, pod_job| {
171171
let pod_run = agent
172172
.orchestrator
173-
.start(&inner_namespace_lookup, &pod_job)
173+
.start(&pod_job, &inner_namespace_lookup)
174174
.await?;
175175
let pod_result = agent
176176
.orchestrator
177-
.get_result(&inner_namespace_lookup, &pod_run)
177+
.get_result(&pod_run, &inner_namespace_lookup)
178178
.await?;
179179
agent.orchestrator.delete(&pod_run).await?;
180180
Ok(pod_result)
@@ -190,6 +190,7 @@ impl Agent {
190190
PodStatus::Completed => "success",
191191
PodStatus::Running
192192
| PodStatus::Failed(_)
193+
| PodStatus::Undefined
193194
| PodStatus::Unset => "failure",
194195
}
195196
.to_owned(),

src/uniffi/orchestrator/docker.rs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
uniffi::{
77
error::{Kind, OrcaError, Result, selector},
88
model::pod::{PodJob, PodResult},
9-
orchestrator::{ImageKind, Orchestrator, PodRun, PodRunInfo, PodStatus, Status},
9+
orchestrator::{ImageKind, Orchestrator, PodRun, PodRunInfo, PodStatus},
1010
},
1111
};
1212
use async_trait;
@@ -19,14 +19,8 @@ use bollard::{
1919
use derive_more::Display;
2020
use futures_util::stream::{StreamExt as _, TryStreamExt as _};
2121
use snafu::{OptionExt as _, futures::TryFutureExt as _};
22-
use std::{
23-
backtrace::Backtrace, collections::HashMap, path::PathBuf, sync::Arc, time::Duration,
24-
time::Duration,
25-
};
26-
use tokio::{
27-
time::sleep as async_sleep,
28-
{fs::File, time::sleep},
29-
};
22+
use std::{backtrace::Backtrace, collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
23+
use tokio::{fs::File, time::sleep as async_sleep};
3024
use tokio_util::{
3125
bytes::{Bytes, BytesMut},
3226
codec::{BytesCodec, FramedRead},
@@ -47,18 +41,18 @@ pub struct LocalDockerOrchestrator {
4741
impl Orchestrator for LocalDockerOrchestrator {
4842
fn start_with_altimage_blocking(
4943
&self,
50-
namespace_lookup: &HashMap<String, PathBuf>,
5144
pod_job: &PodJob,
5245
image: &ImageKind,
46+
namespace_lookup: &HashMap<String, PathBuf>,
5347
) -> Result<PodRun> {
54-
ASYNC_RUNTIME.block_on(self.start_with_altimage(namespace_lookup, pod_job, image))
48+
ASYNC_RUNTIME.block_on(self.start_with_altimage(pod_job, image, namespace_lookup))
5549
}
5650
fn start_blocking(
5751
&self,
58-
namespace_lookup: &HashMap<String, PathBuf>,
5952
pod_job: &PodJob,
53+
namespace_lookup: &HashMap<String, PathBuf>,
6054
) -> Result<PodRun> {
61-
ASYNC_RUNTIME.block_on(self.start(namespace_lookup, pod_job))
55+
ASYNC_RUNTIME.block_on(self.start(pod_job, namespace_lookup))
6256
}
6357
fn list_blocking(&self) -> Result<Vec<PodRun>> {
6458
ASYNC_RUNTIME.block_on(self.list())
@@ -71,10 +65,10 @@ impl Orchestrator for LocalDockerOrchestrator {
7165
}
7266
fn get_result_blocking(
7367
&self,
74-
namespace_lookup: &HashMap<String, PathBuf>,
7568
pod_run: &PodRun,
69+
namespace_lookup: &HashMap<String, PathBuf>,
7670
) -> Result<PodResult> {
77-
ASYNC_RUNTIME.block_on(self.get_result(namespace_lookup, pod_run))
71+
ASYNC_RUNTIME.block_on(self.get_result(pod_run, namespace_lookup))
7872
}
7973
#[expect(
8074
clippy::try_err,
@@ -86,9 +80,9 @@ impl Orchestrator for LocalDockerOrchestrator {
8680
)]
8781
async fn start_with_altimage(
8882
&self,
89-
namespace_lookup: &HashMap<String, PathBuf>,
9083
pod_job: &PodJob,
9184
image: &ImageKind,
85+
namespace_lookup: &HashMap<String, PathBuf>,
9286
) -> Result<PodRun> {
9387
let (assigned_name, container_options, container_config) = match image {
9488
ImageKind::Published(remote_image) => Self::prepare_container_start_inputs(
@@ -155,8 +149,8 @@ impl Orchestrator for LocalDockerOrchestrator {
155149
}
156150
async fn start(
157151
&self,
158-
namespace_lookup: &HashMap<String, PathBuf>,
159152
pod_job: &PodJob,
153+
namespace_lookup: &HashMap<String, PathBuf>,
160154
) -> Result<PodRun> {
161155
let image_options = Some(CreateImageOptions {
162156
from_image: pod_job.pod.image.clone(),
@@ -167,9 +161,9 @@ impl Orchestrator for LocalDockerOrchestrator {
167161
.try_collect::<Vec<_>>()
168162
.await?;
169163
self.start_with_altimage(
170-
namespace_lookup,
171164
pod_job,
172165
&ImageKind::Published(pod_job.pod.image.clone()),
166+
namespace_lookup,
173167
)
174168
.await
175169
}
@@ -232,8 +226,8 @@ impl Orchestrator for LocalDockerOrchestrator {
232226
)]
233227
async fn get_result(
234228
&self,
235-
namespace_lookup: &HashMap<String, PathBuf>,
236229
pod_run: &PodRun,
230+
namespace_lookup: &HashMap<String, PathBuf>,
237231
) -> Result<PodResult> {
238232
match self
239233
.api

src/uniffi/orchestrator/mod.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ pub enum PodStatus {
2727
Completed,
2828
/// Run failed with the provided error code.
2929
Failed(i16),
30-
/// For created or restarting container
31-
Starting,
3230
/// For other container states that are not listed.
3331
Undefined,
3432
/// No status set.
@@ -81,9 +79,9 @@ pub trait Orchestrator: Send + Sync + fmt::Debug {
8179
/// Will return `Err` if there is an issue starting the container.
8280
fn start_with_altimage_blocking(
8381
&self,
84-
namespace_lookup: &HashMap<String, PathBuf>,
8582
pod_job: &PodJob,
8683
image: &ImageKind,
84+
namespace_lookup: &HashMap<String, PathBuf>,
8785
) -> Result<PodRun>;
8886
/// How to synchronously start containers. Assumes `PodJob` image is published.
8987
///
@@ -92,8 +90,8 @@ pub trait Orchestrator: Send + Sync + fmt::Debug {
9290
/// Will return `Err` if there is an issue starting the container.
9391
fn start_blocking(
9492
&self,
95-
namespace_lookup: &HashMap<String, PathBuf>,
9693
pod_job: &PodJob,
94+
namespace_lookup: &HashMap<String, PathBuf>,
9795
) -> Result<PodRun>;
9896
/// How to synchronously query containers.
9997
///
@@ -120,8 +118,8 @@ pub trait Orchestrator: Send + Sync + fmt::Debug {
120118
/// Will return `Err` if there is an issue creating a pod result.
121119
fn get_result_blocking(
122120
&self,
123-
namespace_lookup: &HashMap<String, PathBuf>,
124121
pod_run: &PodRun,
122+
namespace_lookup: &HashMap<String, PathBuf>,
125123
) -> Result<PodResult>;
126124
/// How to asynchronously start containers with an alternate image.
127125
///
@@ -130,9 +128,9 @@ pub trait Orchestrator: Send + Sync + fmt::Debug {
130128
/// Will return `Err` if there is an issue starting the container.
131129
async fn start_with_altimage(
132130
&self,
133-
namespace_lookup: &HashMap<String, PathBuf>,
134131
pod_job: &PodJob,
135132
image: &ImageKind,
133+
namespace_lookup: &HashMap<String, PathBuf>,
136134
) -> Result<PodRun>;
137135
/// How to asynchronously start containers. Assumes `PodJob` image is published.
138136
///
@@ -141,8 +139,8 @@ pub trait Orchestrator: Send + Sync + fmt::Debug {
141139
/// Will return `Err` if there is an issue starting the container.
142140
async fn start(
143141
&self,
144-
namespace_lookup: &HashMap<String, PathBuf>,
145142
pod_job: &PodJob,
143+
namespace_lookup: &HashMap<String, PathBuf>,
146144
) -> Result<PodRun>;
147145
/// How to asynchronously query containers.
148146
///
@@ -169,8 +167,8 @@ pub trait Orchestrator: Send + Sync + fmt::Debug {
169167
/// Will return `Err` if there is an issue creating a pod result.
170168
async fn get_result(
171169
&self,
172-
namespace_lookup: &HashMap<String, PathBuf>,
173170
pod_run: &PodRun,
171+
namespace_lookup: &HashMap<String, PathBuf>,
174172
) -> Result<PodResult>;
175173
}
176174
/// Orchestration execution agent daemon and client.

tests/error.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ fn external_bollard() -> Result<()> {
3939
pod.image = "nonexistent_image".to_owned();
4040
pod_job.pod = Arc::new(pod);
4141
assert!(
42-
orch.start_blocking(&NAMESPACE_LOOKUP_READ_ONLY, &pod_job)
42+
orch.start_blocking(&pod_job, &NAMESPACE_LOOKUP_READ_ONLY)
4343
.is_err_and(contains_debug),
4444
"Did not raise a bollard error."
4545
);
@@ -134,9 +134,9 @@ async fn internal_agent_communication_failure() -> Result<()> {
134134
fn internal_incomplete_packet() -> Result<()> {
135135
assert!(
136136
pod_job_custom(
137-
&pod_custom(
137+
pod_custom(
138138
"alpine:3.14",
139-
&["echo".into()],
139+
vec!["echo".into()],
140140
HashMap::from([(
141141
"key_1".into(),
142142
PathInfo {
@@ -174,7 +174,7 @@ fn internal_key_missing() {
174174
async fn internal_start_pod_jobs() -> Result<()> {
175175
let client = AgentClient::new("error_internal-start-pod-jobs".into(), "host".into())?;
176176
let mut pod_job = pod_job_custom(
177-
&pod_custom("alpine:3.14", &str_to_vec("sleep 5"), HashMap::new())?,
177+
pod_custom("alpine:3.14", str_to_vec("sleep 5"), HashMap::new())?,
178178
HashMap::new(),
179179
&NAMESPACE_LOOKUP_READ_ONLY,
180180
)?;

tests/fixture/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,13 @@ pub fn pod_result_style(
158158

159159
pub fn pod_custom(
160160
image_reference: &str,
161-
command: &[String],
161+
command: Vec<String>,
162162
input_spec: HashMap<String, PathInfo, RandomState>,
163163
) -> Result<Pod> {
164164
Pod::new(
165165
None,
166166
image_reference.into(),
167-
command.into(),
167+
command,
168168
input_spec,
169169
PathBuf::from("/tmp/output"),
170170
HashMap::new(),
@@ -176,13 +176,13 @@ pub fn pod_custom(
176176
}
177177

178178
pub fn pod_job_custom(
179-
pod: &Pod,
179+
pod: Pod,
180180
input_packet: Packet,
181181
namespace_lookup: &HashMap<String, PathBuf, RandomState>,
182182
) -> Result<PodJob> {
183183
PodJob::new(
184184
None,
185-
Arc::new(pod.clone()),
185+
pod.into(),
186186
input_packet,
187187
URI {
188188
namespace: "default".to_owned(),
@@ -205,9 +205,9 @@ pub fn pod_jobs_stresser(
205205
.map(|i| {
206206
if i <= success_count {
207207
return Ok(pod_job_custom(
208-
&pod_custom(
208+
pod_custom(
209209
image_reference,
210-
&str_to_vec(&format!("stress-ng --cpu 1 --cpu-load 100 --timeout {run_duration_secs} --metrics-brief")),
210+
str_to_vec(&format!("stress-ng --cpu 1 --cpu-load 100 --timeout {run_duration_secs} --metrics-brief")),
211211
HashMap::new()
212212
)?,
213213
HashMap::new(),
@@ -216,7 +216,7 @@ pub fn pod_jobs_stresser(
216216
.into());
217217
}
218218
Ok(pod_job_custom(
219-
&pod_custom(image_reference, &str_to_vec("sleep crash"), HashMap::new())?,
219+
pod_custom(image_reference, str_to_vec("sleep crash"), HashMap::new())?,
220220
HashMap::new(),
221221
&NAMESPACE_LOOKUP_READ_ONLY,
222222
)?

0 commit comments

Comments
 (0)