Skip to content

Commit 1c6203e

Browse files
committed
Merge remote-tracking branch 'upstream' into worktree-docker-patch
2 parents 26cb12e + a713eaa commit 1c6203e

File tree

10 files changed

+377
-69
lines changed

10 files changed

+377
-69
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ heck = "0.5.0"
5757
hex = "0.4.3"
5858
# hashmaps that preserve insertion order
5959
indexmap = { version = "2.9.0", features = ["serde"] }
60-
# utilities for iterables
60+
# utilities for iterables e.g. cartesian products
6161
itertools = "0.14.0"
6262
# random name generator
6363
names = "0.14.0"
@@ -160,6 +160,7 @@ std_instead_of_core = { level = "allow", priority = 127 } # we shou
160160
string_add = { level = "allow", priority = 127 } # simple concat ok
161161
string_lit_chars_any = { level = "allow", priority = 127 } # favor readability until a perf case comes up
162162
use_debug = { level = "warn", priority = 127 } # debug print
163+
wildcard_enum_match_arm = { level = "allow", priority = 127 } # allow wildcard match arm in enums
163164

164165
# temporary
165166
single_call_fn = { level = "allow", priority = 127 } # remove once more models need pointer serializers/deserializers

src/core/error.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ use serde_yaml;
77
use std::{
88
backtrace::{Backtrace, BacktraceStatus},
99
fmt::{self, Formatter},
10-
io,
11-
path::{self},
10+
io, path,
1211
};
1312
use tokio::task;
1413

src/core/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ inner_attr_to_each! {
1818
#![cfg(feature = "default")]
1919
pub(crate) mod crypto;
2020
pub(crate) mod model;
21+
pub(crate) mod operator;
2122
pub(crate) mod orchestrator;
2223
}
2324

@@ -34,5 +35,6 @@ inner_attr_to_each! {
3435
)]
3536
pub mod crypto;
3637
pub mod model;
38+
pub mod operator;
3739
pub mod orchestrator;
3840
}
Lines changed: 3 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,12 @@
1-
use crate::{
2-
core::util::get_type_name,
3-
uniffi::{
4-
error::Result,
5-
model::pod::{Pod, PodJob},
6-
},
7-
};
1+
use crate::{core::util::get_type_name, uniffi::error::Result};
82
use heck::ToSnakeCase as _;
93
use indexmap::IndexMap;
10-
use serde::{Deserialize as _, Deserializer, Serialize, Serializer};
4+
use serde::{Serialize, Serializer};
115
use serde_yaml::{self, Value};
126
use std::{
137
collections::{BTreeMap, HashMap},
148
hash::BuildHasher,
159
result,
16-
sync::Arc,
1710
};
1811
/// Converts a model instance into a consistent yaml.
1912
///
@@ -64,48 +57,4 @@ where
6457
sorted.serialize(serializer)
6558
}
6659

67-
#[expect(clippy::expect_used, reason = "Serde requires this signature.")]
68-
pub fn deserialize_pod<'de, D>(deserializer: D) -> result::Result<Arc<Pod>, D::Error>
69-
where
70-
D: Deserializer<'de>,
71-
{
72-
let value = Value::deserialize(deserializer)?;
73-
(value).as_str().map_or_else(
74-
|| {
75-
Ok(serde_yaml::from_value(value.clone())
76-
.expect("Failed to convert from serde value to specific type."))
77-
},
78-
|hash| {
79-
Ok({
80-
Pod {
81-
hash: hash.to_owned(),
82-
..Pod::default()
83-
}
84-
.into()
85-
})
86-
},
87-
)
88-
}
89-
90-
#[expect(clippy::expect_used, reason = "Serde requires this signature.")]
91-
pub fn deserialize_pod_job<'de, D>(deserializer: D) -> result::Result<Arc<PodJob>, D::Error>
92-
where
93-
D: Deserializer<'de>,
94-
{
95-
let value = Value::deserialize(deserializer)?;
96-
(value).as_str().map_or_else(
97-
|| {
98-
Ok(serde_yaml::from_value(value.clone())
99-
.expect("Failed to convert from serde value to specific type."))
100-
},
101-
|hash| {
102-
Ok({
103-
PodJob {
104-
hash: hash.to_owned(),
105-
..PodJob::default()
106-
}
107-
.into()
108-
})
109-
},
110-
)
111-
}
60+
pub mod pod;

src/core/model/pod.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use crate::uniffi::model::pod::{Pod, PodJob};
2+
use serde::{Deserialize as _, Deserializer};
3+
use serde_yaml::{self, Value};
4+
use std::{result, sync::Arc};
5+
6+
#[expect(clippy::expect_used, reason = "Serde requires this signature.")]
7+
pub fn deserialize_pod<'de, D>(deserializer: D) -> result::Result<Arc<Pod>, D::Error>
8+
where
9+
D: Deserializer<'de>,
10+
{
11+
let value = Value::deserialize(deserializer)?;
12+
(value).as_str().map_or_else(
13+
|| {
14+
Ok(serde_yaml::from_value(value.clone())
15+
.expect("Failed to convert from serde value to specific type."))
16+
},
17+
|hash| {
18+
Ok({
19+
Pod {
20+
hash: hash.to_owned(),
21+
..Pod::default()
22+
}
23+
.into()
24+
})
25+
},
26+
)
27+
}
28+
29+
#[expect(clippy::expect_used, reason = "Serde requires this signature.")]
30+
pub fn deserialize_pod_job<'de, D>(deserializer: D) -> result::Result<Arc<PodJob>, D::Error>
31+
where
32+
D: Deserializer<'de>,
33+
{
34+
let value = Value::deserialize(deserializer)?;
35+
(value).as_str().map_or_else(
36+
|| {
37+
Ok(serde_yaml::from_value(value.clone())
38+
.expect("Failed to convert from serde value to specific type."))
39+
},
40+
|hash| {
41+
Ok({
42+
PodJob {
43+
hash: hash.to_owned(),
44+
..PodJob::default()
45+
}
46+
.into()
47+
})
48+
},
49+
)
50+
}

src/core/operator.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use crate::uniffi::{error::Result, model::packet::Packet};
2+
use async_trait;
3+
use itertools::Itertools as _;
4+
use std::{clone::Clone, collections::HashMap, iter::IntoIterator, sync::Arc};
5+
use tokio::sync::Mutex;
6+
7+
#[async_trait::async_trait]
8+
pub trait Operator {
9+
async fn next(&self, stream_name: String, packet: Packet) -> Result<Vec<Packet>>;
10+
}
11+
12+
pub struct JoinOperator {
13+
parent_count: usize,
14+
received_packets: Arc<Mutex<HashMap<String, Vec<Packet>>>>,
15+
}
16+
17+
impl JoinOperator {
18+
pub fn new(parent_count: usize) -> Self {
19+
Self {
20+
parent_count,
21+
received_packets: Arc::new(Mutex::new(HashMap::new())),
22+
}
23+
}
24+
}
25+
26+
#[async_trait::async_trait]
27+
impl Operator for JoinOperator {
28+
async fn next(&self, stream_name: String, packet: Packet) -> Result<Vec<Packet>> {
29+
let mut received_packets = self.received_packets.lock().await;
30+
received_packets
31+
.entry(stream_name.clone())
32+
.or_default()
33+
.push(packet.clone());
34+
Ok(
35+
if self.parent_count - usize::from(!received_packets.contains_key(&stream_name))
36+
== received_packets.len()
37+
{
38+
let packets_to_multiplex = received_packets
39+
.iter()
40+
.filter_map(|(parent_stream, parent_packets)| {
41+
(parent_stream != &stream_name).then_some(parent_packets.clone())
42+
})
43+
.chain(vec![vec![packet.clone()]].into_iter())
44+
.collect::<Vec<_>>();
45+
drop(received_packets);
46+
47+
packets_to_multiplex
48+
.into_iter()
49+
.multi_cartesian_product()
50+
.map(|packet_combinations_to_merge| {
51+
packet_combinations_to_merge
52+
.into_iter()
53+
.flat_map(IntoIterator::into_iter)
54+
.collect::<HashMap<_, _>>()
55+
})
56+
.collect()
57+
} else {
58+
vec![]
59+
},
60+
)
61+
}
62+
}
63+
64+
pub struct MapOperator {
65+
map: HashMap<String, String>,
66+
}
67+
68+
impl MapOperator {
69+
pub fn new(map: &HashMap<String, String>) -> Self {
70+
Self { map: map.clone() }
71+
}
72+
}
73+
74+
#[async_trait::async_trait]
75+
impl Operator for MapOperator {
76+
async fn next(&self, _: String, packet: Packet) -> Result<Vec<Packet>> {
77+
Ok(vec![
78+
packet
79+
.iter()
80+
.map(|(packet_key, path_set)| {
81+
(
82+
self.map
83+
.get(packet_key)
84+
.map_or_else(|| packet_key.clone(), Clone::clone),
85+
path_set.clone(),
86+
)
87+
})
88+
.collect(),
89+
])
90+
}
91+
}

src/core/store/filestore.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use heck::ToSnakeCase as _;
1616
use regex::Regex;
1717
use serde::{Serialize, de::DeserializeOwned};
1818
use serde_yaml;
19-
use snafu::OptionExt as _;
19+
use snafu::{OptionExt as _, ResultExt as _};
2020
use std::{
2121
fmt, fs,
2222
path::{Path, PathBuf},
@@ -189,15 +189,17 @@ impl LocalFileStore {
189189
model_id: &ModelID,
190190
) -> Result<(T, Option<Annotation>, String)> {
191191
match model_id {
192-
ModelID::Hash(hash) => Ok((
193-
serde_yaml::from_str(&fs::read_to_string(self.make_path(
194-
&T::default(),
195-
hash,
196-
Self::SPEC_RELPATH,
197-
))?)?,
198-
None,
199-
hash.to_owned(),
200-
)),
192+
ModelID::Hash(hash) => {
193+
let path = self.make_path(&T::default(), hash, Self::SPEC_RELPATH);
194+
Ok((
195+
serde_yaml::from_str(
196+
&fs::read_to_string(path.clone())
197+
.context(selector::InvalidFilepath { path })?,
198+
)?,
199+
None,
200+
hash.to_owned(),
201+
))
202+
}
201203
ModelID::Annotation(name, version) => {
202204
let hash = self.lookup_hash(&T::default(), name, version)?;
203205
Ok((

src/uniffi/model/pod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::{
22
core::{
33
crypto::{hash_blob, hash_buffer},
44
model::{
5-
deserialize_pod, deserialize_pod_job, serialize_hashmap, serialize_hashmap_option,
6-
to_yaml,
5+
pod::{deserialize_pod, deserialize_pod_job},
6+
serialize_hashmap, serialize_hashmap_option, to_yaml,
77
},
88
util::get,
99
validation::validate_packet,

src/uniffi/orchestrator/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::uniffi::{
55
pod::{PodJob, PodResult},
66
},
77
};
8+
use async_trait;
89
use serde::{Deserialize, Serialize};
910
use std::{collections::HashMap, fmt, path::PathBuf, sync::Arc};
1011
use uniffi;

0 commit comments

Comments
 (0)