Skip to content

Commit e9d2a7b

Browse files
authored
Merge pull request #105 from guzman-raphael/command_as_args
Make `Pod`'s command more flexible
2 parents 4948925 + 3e3c744 commit e9d2a7b

File tree

14 files changed

+73
-67
lines changed

14 files changed

+73
-67
lines changed

src/core/orchestrator/docker.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,6 @@ impl LocalDockerOrchestrator {
130130
),
131131
("org.orcapod.pod_job.hash".to_owned(), pod_job.hash.clone()),
132132
]);
133-
let command = pod_job
134-
.pod
135-
.command
136-
.split_whitespace()
137-
.map(String::from)
138-
.collect::<Vec<_>>();
139133

140134
Ok((
141135
container_name.clone(),
@@ -145,8 +139,8 @@ impl LocalDockerOrchestrator {
145139
}),
146140
Config {
147141
image: Some(image),
148-
entrypoint: Some(command[..1].to_vec()),
149-
cmd: Some(command[1..].to_vec()),
142+
entrypoint: Some(pod_job.pod.command[..1].to_vec()),
143+
cmd: Some(pod_job.pod.command[1..].to_vec()),
150144
env: pod_job.env_vars.as_ref().map(|provided_env_vars| {
151145
provided_env_vars
152146
.iter()
@@ -170,7 +164,6 @@ impl LocalDockerOrchestrator {
170164
clippy::cast_precision_loss,
171165
clippy::cast_possible_truncation,
172166
clippy::indexing_slicing,
173-
clippy::too_many_lines,
174167
reason = r#"
175168
- Timestamp and memory should always have a value > 0
176169
- Container will always have a name with more than 1 character
@@ -229,16 +222,11 @@ impl LocalDockerOrchestrator {
229222
.map(|(key, value)| (key.to_owned(), value.to_owned()))
230223
})
231224
.collect(),
232-
command: format!(
233-
"{} {}",
234-
container_spec
235-
.config
236-
.as_ref()?
237-
.entrypoint
238-
.as_ref()?
239-
.join(" "),
240-
container_spec.config.as_ref()?.cmd.as_ref()?.join(" ")
241-
),
225+
command: [
226+
container_spec.config.as_ref()?.entrypoint.as_ref()?.clone(),
227+
container_spec.config.as_ref()?.cmd.as_ref()?.clone(),
228+
]
229+
.concat(),
242230
status: match (
243231
container_spec.state.as_ref()?.status.as_ref()?,
244232
container_spec.state.as_ref()?.exit_code? as i16,

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ pub mod core;
2828
/// values are owned by Rust
2929
/// 1. No default trait implementations
3030
/// 1. (Rust limitation) No associated functions in traits e.g. class methods in Python
31+
/// 1. Hint: Enum variants with named fields offer a better UX (e.g. in Python) as opposed to
32+
/// unnamed enum fields i.e. will show up in help.
3133
pub mod uniffi;

src/uniffi/model/pod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ pub struct Pod {
3737
pub hash: String,
3838
/// Reproducible environment for compute.
3939
pub image: String,
40-
/// Space-delimited shell command to begin computation.
41-
pub command: String,
40+
/// Shell command to begin computation. First element is the executable and remaining elements
41+
/// are the arguments.
42+
pub command: Vec<String>,
4243
/// Exposed, internal input specification.
4344
#[serde(serialize_with = "serialize_hashmap")]
4445
pub input_spec: HashMap<String, PathInfo>,
@@ -68,7 +69,7 @@ impl Pod {
6869
pub fn new(
6970
annotation: Option<Annotation>,
7071
image: String,
71-
command: String,
72+
command: Vec<String>,
7273
input_spec: HashMap<String, PathInfo>,
7374
output_dir: PathBuf,
7475
output_spec: HashMap<String, PathInfo>,

src/uniffi/orchestrator/agent.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::{
77
store::{Store as _, filestore::LocalFileStore},
88
},
99
};
10+
use colored::Colorize as _;
1011
use derive_more::Display;
1112
use futures_executor::block_on;
1213
use futures_util::future::join_all;
@@ -70,9 +71,9 @@ impl AgentClient {
7071
})?,
7172
})
7273
}
73-
/// Submit many pod jobs to be processed in parallel.
74+
/// Start many pod jobs to be processed in parallel.
7475
/// Return order will match inputs, casting outputs to `String` (since `uniffi` doesn't support sending unwrapped `Result`s).
75-
pub async fn submit_pod_jobs(&self, pod_jobs: Vec<Arc<PodJob>>) -> Vec<Response> {
76+
pub async fn start_pod_jobs(&self, pod_jobs: Vec<Arc<PodJob>>) -> Vec<Response> {
7677
join_all(pod_jobs.iter().map(|pod_job| async {
7778
match self
7879
.publish(&format!("request/pod_job/{}", pod_job.hash), pod_job)
@@ -98,7 +99,7 @@ impl AgentClient {
9899
.context(selector::AgentCommunicationFailure {})?;
99100
while let Ok(sample) = subscriber.recv_async().await {
100101
let value = serde_json::from_slice::<Value>(&sample.payload().to_bytes())?;
101-
println!("{}: {value:#}", sample.key_expr().as_str());
102+
println!("{}: {value:#}", sample.key_expr().as_str().yellow());
102103
}
103104
Ok(())
104105
}
@@ -162,13 +163,19 @@ impl Agent {
162163
Ok(pod_result)
163164
},
164165
async |client, pod_result| {
165-
let response_topic = match &pod_result.status {
166-
Status::Completed => &format!("success/pod_job/{}", pod_result.pod_job.hash),
167-
Status::Running | Status::Failed(_) | Status::Unset => {
168-
&format!("failure/pod_job/{}", pod_result.pod_job.hash)
169-
}
170-
};
171-
client.publish(response_topic, &pod_result).await
166+
client
167+
.publish(
168+
&format!(
169+
"{}/pod_job/{}",
170+
match &pod_result.status {
171+
Status::Completed => "success",
172+
Status::Running | Status::Failed(_) | Status::Unset => "failure",
173+
},
174+
pod_result.pod_job.hash
175+
),
176+
&pod_result,
177+
)
178+
.await
172179
},
173180
));
174181
if let Some(store) = available_store {

src/uniffi/orchestrator/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub struct RunInfo {
4242
/// Environment variables set in environment.
4343
pub env_vars: HashMap<String, String>,
4444
/// Command used to start run.
45-
pub command: String,
45+
pub command: Vec<String>,
4646
/// Current run status.
4747
pub status: Status,
4848
/// Mounted volume binds to the environment.

tests/agent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ async fn parallel_four_cores() -> Result<()> {
126126
});
127127
// submit requests
128128
client
129-
.submit_pod_jobs(pod_jobs_stresser(image_reference, run_duration_secs, 3, 1)?)
129+
.start_pod_jobs(pod_jobs_stresser(image_reference, run_duration_secs, 3, 1)?)
130130
.await;
131131

132132
services

tests/error.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
pub mod fixture;
99
use dot_parser::ast::Graph as DOTGraph;
10-
use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pod_custom, pod_job_custom, pod_job_style};
10+
use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pod_custom, pod_job_custom, pod_job_style, str_to_vec};
1111
use glob::glob;
1212
use orcapod::{
1313
core::crypto::hash_file,
@@ -124,7 +124,7 @@ fn internal_incomplete_packet() -> Result<()> {
124124
pod_job_custom(
125125
&pod_custom(
126126
"alpine:3.14",
127-
"echo",
127+
&["echo".into()],
128128
HashMap::from([(
129129
"key_1".into(),
130130
PathInfo {
@@ -159,15 +159,15 @@ fn internal_key_missing() {
159159
}
160160

161161
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
162-
async fn submit_pod_jobs() -> Result<()> {
162+
async fn start_pod_jobs() -> Result<()> {
163163
let client = AgentClient::new("error".into(), "host".into())?;
164164
let mut pod_job = pod_job_custom(
165-
&pod_custom("alpine:3.14", "sleep 5", HashMap::new())?,
165+
&pod_custom("alpine:3.14", &str_to_vec("sleep 5"), HashMap::new())?,
166166
HashMap::new(),
167167
&NAMESPACE_LOOKUP_READ_ONLY,
168168
)?;
169169
pod_job.hash = "bad?hash".into();
170-
let responses = client.submit_pod_jobs(vec![pod_job.into()]).await;
170+
let responses = client.start_pod_jobs(vec![pod_job.into()]).await;
171171
assert!(
172172
responses.len() == 1,
173173
"Client received an unexpected number of pod job request responses."

tests/extra/python/agent_test.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def count(sample):
3131
with session.declare_subscriber(
3232
f"group/{group}/success/pod_job/**", count
3333
) as subscriber:
34-
await asyncio.sleep(20)
34+
await asyncio.sleep(20) # wait for results
3535

3636
if counter != pod_job_count:
3737
raise Exception(f"Unexpected successful pod job count: {counter}.")
@@ -48,7 +48,7 @@ async def main(client, agent, test_dir, namespace_lookup, pod_jobs):
4848
await asyncio.sleep(5) # ensure service ready
4949

5050
try:
51-
await client.submit_pod_jobs(pod_jobs=pod_jobs)
51+
await client.start_pod_jobs(pod_jobs=pod_jobs)
5252
await verify(client.group(), len(pod_jobs))
5353
finally:
5454
shutil.rmtree(test_dir)
@@ -82,7 +82,9 @@ async def main(client, agent, test_dir, namespace_lookup, pod_jobs):
8282
pod=Pod(
8383
annotation=None,
8484
image="ghcr.io/colinianking/stress-ng:e2f96874f951a72c1c83ff49098661f0e013ac40",
85-
command="stress-ng --cpu 1 --cpu-load 100 --timeout 5 --metrics-brief",
85+
command="stress-ng --cpu 1 --cpu-load 100 --timeout 5 --metrics-brief".split(
86+
" "
87+
),
8688
input_spec={},
8789
output_dir="/tmp/output",
8890
output_spec={},

tests/extra/python/smoke_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def create_pod(data, _):
2828
version="1.0.0",
2929
),
3030
image="alpine:3.14",
31-
command="sleep 1",
31+
command="sleep 1".split(" "),
3232
input_spec={},
3333
output_dir="/tmp/output",
3434
output_spec={},

tests/fixture/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub fn pod_style() -> Result<Pod> {
4141
version: "1.0.0".to_owned(),
4242
}),
4343
"example.server.com/user/style-transfer:1.0.0".to_owned(),
44-
"python /run.py".to_owned(),
44+
str_to_vec("python /run.py"),
4545
HashMap::from([
4646
(
4747
"extra-style".to_owned(),
@@ -148,7 +148,7 @@ pub fn pod_result_style(
148148

149149
pub fn pod_custom(
150150
image_reference: &str,
151-
command: &str,
151+
command: &[String],
152152
input_spec: HashMap<String, PathInfo, RandomState>,
153153
) -> Result<Pod> {
154154
Pod::new(
@@ -197,7 +197,7 @@ pub fn pod_jobs_stresser(
197197
return Ok(pod_job_custom(
198198
&pod_custom(
199199
image_reference,
200-
&format!("stress-ng --cpu 1 --cpu-load 100 --timeout {run_duration_secs} --metrics-brief"),
200+
&str_to_vec(&format!("stress-ng --cpu 1 --cpu-load 100 --timeout {run_duration_secs} --metrics-brief")),
201201
HashMap::new()
202202
)?,
203203
HashMap::new(),
@@ -206,7 +206,7 @@ pub fn pod_jobs_stresser(
206206
.into());
207207
}
208208
Ok(pod_job_custom(
209-
&pod_custom(image_reference, "sleep crash", HashMap::new())?,
209+
&pod_custom(image_reference, &str_to_vec("sleep crash"), HashMap::new())?,
210210
HashMap::new(),
211211
&NAMESPACE_LOOKUP_READ_ONLY,
212212
)?
@@ -272,6 +272,10 @@ pub fn pull_image(reference: &str) -> Result<()> {
272272

273273
// --- util ---
274274

275+
pub fn str_to_vec(v: &str) -> Vec<String> {
276+
v.split_whitespace().map(String::from).collect()
277+
}
278+
275279
pub struct TestDirs(pub HashMap<String, TempDir>);
276280

277281
impl TestDirs {

0 commit comments

Comments
 (0)