Skip to content

Commit c1fec0c

Browse files
committed
Save progress
1 parent 88fad98 commit c1fec0c

File tree

6 files changed

+108
-43
lines changed

6 files changed

+108
-43
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ colored = "2.1.0"
4444
derive_more = { version = "2.0.1", features = ["display"] }
4545
# traverse DOT graphs
4646
dot-parser = "0.5.1"
47+
encode = "1.0.0"
4748
# allow sync functions to be called from async using `block_on`
4849
futures-executor = "0.3.31"
4950
# chaining async calls and processing stream data in local docker orchestrator

src/model/packet.rs

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
use arrow::array::RecordBatch;
12
use serde::{Deserialize, Serialize};
2-
use std::{collections::HashMap, path::PathBuf};
3+
use sha2::Sha256;
4+
use starfix::arrow_digester::{self, ArrowDigester};
5+
use std::{
6+
collections::{HashMap, HashSet},
7+
path::PathBuf,
8+
};
39
use uniffi;
410

511
use crate::{error::Result, util::get};
@@ -92,5 +98,51 @@ impl PathSet {
9298
}
9399
}
94100

95-
/// A complete set of inputs to be provided to a computational unit.
96-
pub type Packet = HashMap<String, PathSet>;
101+
trait Packet {
102+
fn get_hash(&self) -> Vec<u8>;
103+
104+
/// Get the field names of the arrow table contained in the packet, not including tags
105+
fn get_fields_name(&self) -> HashSet<String>;
106+
107+
/// Get tags column names if any
108+
fn get_tags_col_names(&self) -> Option<HashSet<String>>;
109+
}
110+
111+
/// Unit of data that is used within pipeline and pods
112+
pub struct ArrowPacket {
113+
/// Unique hash identifier for the packet
114+
hash: Vec<u8>,
115+
/// Arrow Record Batch containing the data for the packet
116+
record_batch: RecordBatch,
117+
/// Optional tags associated with the packet, which should be fields names of the arrow_table
118+
tags: Option<HashSet<String>>,
119+
}
120+
121+
impl ArrowPacket {
122+
pub fn new(record_batch: RecordBatch, tags: Option<HashSet<String>>) -> Self {
123+
Self {
124+
hash: ArrowDigester::<Sha256>::hash_record_batch(&record_batch),
125+
record_batch,
126+
tags,
127+
}
128+
}
129+
}
130+
131+
impl Packet for ArrowPacket {
132+
fn get_hash(&self) -> Vec<u8> {
133+
self.hash.clone()
134+
}
135+
136+
fn get_fields_name(&self) -> HashSet<String> {
137+
self.record_batch
138+
.schema()
139+
.fields()
140+
.iter()
141+
.map(|f| f.name().clone())
142+
.collect()
143+
}
144+
145+
fn get_tags_col_names(&self) -> Option<String> {
146+
self.tags.clone()
147+
}
148+
}

src/model/pod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
error::{Kind, OrcaError, Result},
1111
model::{
1212
Annotation, ToYaml,
13-
packet::{Blob, BlobKind, Packet, PathInfo, PathSet, URI},
13+
packet::{ArrowPacket, Blob, BlobKind, PathInfo, PathSet, URI},
1414
serialize_hashmap, serialize_hashmap_option,
1515
},
1616
util::get,
@@ -159,7 +159,7 @@ pub struct PodJob {
159159
pub pod: Arc<Pod>,
160160
/// Attached, external input packet.
161161
#[serde(serialize_with = "serialize_hashmap")]
162-
pub input_packet: Packet,
162+
pub input_packet: ArrowPacket,
163163
/// Attached, external output directory.
164164
pub output_dir: URI,
165165
/// Maximum allowable cores in fractional cores for the computation.
@@ -182,7 +182,7 @@ impl PodJob {
182182
pub fn new(
183183
annotation: Option<Annotation>,
184184
pod: Arc<Pod>,
185-
mut input_packet: Packet,
185+
mut input_packet: ArrowPacket,
186186
output_dir: URI,
187187
cpu_limit: f32,
188188
memory_limit: u64,
@@ -251,7 +251,7 @@ pub struct PodResult {
251251
pub pod_job: Arc<PodJob>,
252252
/// Produced, external output packet.
253253
#[serde(serialize_with = "serialize_hashmap")]
254-
pub output_packet: Packet,
254+
pub output_packet: ArrowPacket,
255255
/// Name given by orchestrator.
256256
pub assigned_name: String,
257257
/// Status of compute run when terminated.

src/operator.rs

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,18 @@ use tokio::sync::Mutex;
77
use crate::{
88
crypto::hash_buffer,
99
error::Result,
10-
model::{ToYaml, packet::Packet, serialize_hashmap},
10+
model::{ToYaml, packet::ArrowPacket, serialize_hashmap},
1111
};
1212

1313
/// Trait that all operators must implement for it to work in the pipeline
1414
#[async_trait::async_trait]
1515
pub trait Operator {
1616
/// Method where the operator get pass a packet for processing one at a time
17-
async fn process_packet(&self, stream_name: String, packet: Packet) -> Result<Vec<Packet>>;
17+
async fn process_packet(
18+
&self,
19+
stream_name: String,
20+
packet: ArrowPacket,
21+
) -> Result<Vec<ArrowPacket>>;
1822
}
1923

2024
/// Operator class that map `input_keys` to `output_key`, effectively renaming it
@@ -52,7 +56,7 @@ impl MapOperator {
5256
/// Operator class that join packets from multiple parent streams into one packet
5357
pub struct JoinOperator {
5458
parent_count: usize,
55-
received_packets: Arc<Mutex<HashMap<String, Vec<Packet>>>>,
59+
received_packets: Arc<Mutex<HashMap<String, Vec<ArrowPacket>>>>,
5660
}
5761

5862
impl JoinOperator {
@@ -67,7 +71,11 @@ impl JoinOperator {
6771

6872
#[async_trait::async_trait]
6973
impl Operator for JoinOperator {
70-
async fn process_packet(&self, stream_name: String, packet: Packet) -> Result<Vec<Packet>> {
74+
async fn process_packet(
75+
&self,
76+
stream_name: String,
77+
packet: ArrowPacket,
78+
) -> Result<Vec<ArrowPacket>> {
7179
let mut received_packets = self.received_packets.lock().await;
7280
received_packets
7381
.entry(stream_name.clone())
@@ -105,7 +113,7 @@ impl Operator for JoinOperator {
105113

106114
#[async_trait::async_trait]
107115
impl Operator for MapOperator {
108-
async fn process_packet(&self, _: String, packet: Packet) -> Result<Vec<Packet>> {
116+
async fn process_packet(&self, _: String, packet: ArrowPacket) -> Result<Vec<ArrowPacket>> {
109117
Ok(vec![
110118
packet
111119
.iter()
@@ -140,7 +148,7 @@ mod tests {
140148

141149
use crate::{
142150
error::Result,
143-
model::packet::{Blob, BlobKind, Packet, PathSet, URI},
151+
model::packet::{ArrowPacket, Blob, BlobKind, PathSet, URI},
144152
operator::{JoinOperator, MapOperator, Operator},
145153
};
146154
use std::{collections::HashMap, path::PathBuf};
@@ -161,8 +169,8 @@ mod tests {
161169

162170
async fn next_batch(
163171
operator: impl Operator,
164-
packets: Vec<(String, Packet)>,
165-
) -> Result<Vec<Packet>> {
172+
packets: Vec<(String, ArrowPacket)>,
173+
) -> Result<Vec<ArrowPacket>> {
166174
let mut next_packets = vec![];
167175
for (stream_name, packet) in packets {
168176
next_packets.extend(operator.process_packet(stream_name, packet).await?);
@@ -178,7 +186,7 @@ mod tests {
178186
.map(|i| {
179187
(
180188
"left".into(),
181-
Packet::from([make_packet_key(
189+
ArrowPacket::from([make_packet_key(
182190
"subject".into(),
183191
format!("left/subject{i}.png"),
184192
)]),
@@ -190,7 +198,7 @@ mod tests {
190198
.map(|i| {
191199
(
192200
"right".into(),
193-
Packet::from([make_packet_key(
201+
ArrowPacket::from([make_packet_key(
194202
"style".into(),
195203
format!("right/style{i}.t7"),
196204
)]),
@@ -204,27 +212,27 @@ mod tests {
204212
assert_eq!(
205213
next_batch(operator, input_streams).await?,
206214
vec![
207-
Packet::from([
215+
ArrowPacket::from([
208216
make_packet_key("subject".into(), "left/subject0.png".into()),
209217
make_packet_key("style".into(), "right/style0.t7".into()),
210218
]),
211-
Packet::from([
219+
ArrowPacket::from([
212220
make_packet_key("subject".into(), "left/subject1.png".into()),
213221
make_packet_key("style".into(), "right/style0.t7".into()),
214222
]),
215-
Packet::from([
223+
ArrowPacket::from([
216224
make_packet_key("subject".into(), "left/subject2.png".into()),
217225
make_packet_key("style".into(), "right/style0.t7".into()),
218226
]),
219-
Packet::from([
227+
ArrowPacket::from([
220228
make_packet_key("subject".into(), "left/subject0.png".into()),
221229
make_packet_key("style".into(), "right/style1.t7".into()),
222230
]),
223-
Packet::from([
231+
ArrowPacket::from([
224232
make_packet_key("subject".into(), "left/subject1.png".into()),
225233
make_packet_key("style".into(), "right/style1.t7".into()),
226234
]),
227-
Packet::from([
235+
ArrowPacket::from([
228236
make_packet_key("subject".into(), "left/subject2.png".into()),
229237
make_packet_key("style".into(), "right/style1.t7".into()),
230238
]),
@@ -243,7 +251,7 @@ mod tests {
243251
operator
244252
.process_packet(
245253
"right".into(),
246-
Packet::from([make_packet_key("style".into(), "right/style0.t7".into())])
254+
ArrowPacket::from([make_packet_key("style".into(), "right/style0.t7".into())])
247255
)
248256
.await?,
249257
vec![],
@@ -254,7 +262,7 @@ mod tests {
254262
operator
255263
.process_packet(
256264
"right".into(),
257-
Packet::from([make_packet_key("style".into(), "right/style1.t7".into())])
265+
ArrowPacket::from([make_packet_key("style".into(), "right/style1.t7".into())])
258266
)
259267
.await?,
260268
vec![],
@@ -265,18 +273,18 @@ mod tests {
265273
operator
266274
.process_packet(
267275
"left".into(),
268-
Packet::from([make_packet_key(
276+
ArrowPacket::from([make_packet_key(
269277
"subject".into(),
270278
"left/subject0.png".into()
271279
)])
272280
)
273281
.await?,
274282
vec![
275-
Packet::from([
283+
ArrowPacket::from([
276284
make_packet_key("subject".into(), "left/subject0.png".into()),
277285
make_packet_key("style".into(), "right/style0.t7".into()),
278286
]),
279-
Packet::from([
287+
ArrowPacket::from([
280288
make_packet_key("subject".into(), "left/subject0.png".into()),
281289
make_packet_key("style".into(), "right/style1.t7".into()),
282290
]),
@@ -291,7 +299,7 @@ mod tests {
291299
.map(|i| {
292300
(
293301
"left".into(),
294-
Packet::from([make_packet_key(
302+
ArrowPacket::from([make_packet_key(
295303
"subject".into(),
296304
format!("left/subject{i}.png"),
297305
)]),
@@ -301,19 +309,19 @@ mod tests {
301309
)
302310
.await?,
303311
vec![
304-
Packet::from([
312+
ArrowPacket::from([
305313
make_packet_key("subject".into(), "left/subject1.png".into()),
306314
make_packet_key("style".into(), "right/style0.t7".into()),
307315
]),
308-
Packet::from([
316+
ArrowPacket::from([
309317
make_packet_key("subject".into(), "left/subject1.png".into()),
310318
make_packet_key("style".into(), "right/style1.t7".into()),
311319
]),
312-
Packet::from([
320+
ArrowPacket::from([
313321
make_packet_key("subject".into(), "left/subject2.png".into()),
314322
make_packet_key("style".into(), "right/style0.t7".into()),
315323
]),
316-
Packet::from([
324+
ArrowPacket::from([
317325
make_packet_key("subject".into(), "left/subject2.png".into()),
318326
make_packet_key("style".into(), "right/style1.t7".into()),
319327
]),
@@ -332,13 +340,13 @@ mod tests {
332340
operator
333341
.process_packet(
334342
"parent".into(),
335-
Packet::from([
343+
ArrowPacket::from([
336344
make_packet_key("key_old".into(), "some/key.txt".into()),
337345
make_packet_key("subject".into(), "some/subject.txt".into()),
338346
]),
339347
)
340348
.await?,
341-
vec![Packet::from([
349+
vec![ArrowPacket::from([
342350
make_packet_key("key_new".into(), "some/key.txt".into()),
343351
make_packet_key("subject".into(), "some/subject.txt".into()),
344352
]),],

src/pipeline_runner.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
selector::{self},
2020
},
2121
model::{
22-
packet::{Packet, PathSet, URI},
22+
packet::{ArrowPacket, PathSet, URI},
2323
pipeline::{Kernel, PipelineJob, PipelineNode, PipelineResult, PipelineStatus},
2424
pod::{Pod, PodJob, PodResult, PodStatus},
2525
serialize_hashmap,
@@ -75,7 +75,7 @@ impl PipelineRunInternal {
7575
}
7676

7777
// Utils functions
78-
async fn send_packets(&self, node_id: &str, output_packets: &Vec<Packet>) -> Result<()> {
78+
async fn send_packets(&self, node_id: &str, output_packets: &Vec<ArrowPacket>) -> Result<()> {
7979
Ok(self
8080
.session
8181
.put(
@@ -344,7 +344,7 @@ impl DockerPipelineRunner {
344344

345345
while let Ok(payload) = subscriber.recv_async().await {
346346
// Extract the message from the payload
347-
let packets: Vec<Packet> = serde_json::from_slice(&payload.payload().to_bytes())?;
347+
let packets: Vec<ArrowPacket> = serde_json::from_slice(&payload.payload().to_bytes())?;
348348

349349
if packets.is_empty() {
350350
// Output node exited, thus we can exit the capture task too
@@ -540,7 +540,7 @@ impl DockerPipelineRunner {
540540
.context(selector::AgentCommunicationFailure)?;
541541

542542
// Extract out the packets
543-
let packets: Vec<Packet> = serde_json::from_slice(&sample.payload().to_bytes())?;
543+
let packets: Vec<ArrowPacket> = serde_json::from_slice(&sample.payload().to_bytes())?;
544544

545545
// Check if the packets are empty, if so that means the node is finished processing
546546
if packets.is_empty() {
@@ -589,7 +589,11 @@ impl DockerPipelineRunner {
589589
/// As a result, each processor only needs to worry about writing their own function to process the msg
590590
#[async_trait]
591591
trait NodeProcessor: Send + Sync {
592-
async fn process_incoming_packet(&mut self, sender_node_hash: &str, incoming_packet: &Packet);
592+
async fn process_incoming_packet(
593+
&mut self,
594+
sender_node_hash: &str,
595+
incoming_packet: &ArrowPacket,
596+
);
593597

594598
/// Notifies the processor that the parent node has completed processing
595599
/// If it is the last parent to complete, it will wait for all processing task to finish
@@ -626,7 +630,7 @@ impl PodProcessor {
626630
node_hash: String,
627631
pod: Arc<Pod>,
628632
incoming_packet: HashMap<String, PathSet>,
629-
) -> Result<Packet> {
633+
) -> Result<ArrowPacket> {
630634
// Hash the input_packet to create a unique identifier for the pod job
631635
let input_packet_hash = {
632636
let mut buf = Vec::new();

tests/fixture/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use orcapod::{
1212
error::Result,
1313
model::{
1414
Annotation,
15-
packet::{Blob, BlobKind, Packet, PathInfo, PathSet, URI},
15+
packet::{Blob, BlobKind, ArrowPacket, PathInfo, PathSet, URI},
1616
pipeline::{Kernel, NodeURI, Pipeline, PipelineJob},
1717
pod::{Pod, PodJob, PodResult, PodStatus, RecommendedSpecs},
1818
},
@@ -181,7 +181,7 @@ pub fn pod_custom(
181181

182182
pub fn pod_job_custom(
183183
pod: Pod,
184-
input_packet: Packet,
184+
input_packet: ArrowPacket,
185185
namespace_lookup: &HashMap<String, PathBuf, RandomState>,
186186
) -> Result<PodJob> {
187187
PodJob::new(

0 commit comments

Comments
 (0)