Skip to content

Commit 75c3102

Browse files
author
Ryan Alameddine
committed
feat(hydro_cli): added basic wrapper for hydro deploy Maelstrom integration
1 parent 3136e0f commit 75c3102

File tree

9 files changed

+495
-1
lines changed

9 files changed

+495
-1
lines changed

Cargo.lock

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ members = [
44
"benches",
55
"hydro_cli",
66
"hydro_cli_examples",
7+
"hydro_cli_maelstrom",
78
"hydroflow",
89
"hydroflow_cli_integration",
910
"hydroflow_datalog",

hydro_cli/src/core/hydroflow_crate/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,16 @@ impl Service for HydroflowCrate {
247247
&args,
248248
)
249249
.await?;
250+
251+
//send the id over
252+
binary
253+
.write()
254+
.await
255+
.stdin()
256+
.await
257+
.send(format!("id: {}\n", self.id))
258+
.await?;
259+
250260

251261
let mut bind_config = HashMap::new();
252262
for (port_name, bind_type) in self.port_to_bind.iter() {

hydro_cli_examples/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ name = "pn_counter_delta"
4646
[[example]]
4747
name = "ws_chat_server"
4848

49+
[[example]]
50+
name = "maelstrom_unique_id"
51+
4952
[dev-dependencies]
5053
hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] }
5154
hydroflow_datalog = { path = "../hydroflow_datalog" }
+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use hydroflow::hydroflow_syntax;
2+
use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedSink};
3+
use hydroflow::util::serialize_to_bytes;
4+
use serde::{Serialize, Deserialize};
5+
use serde_json::Value;
6+
7+
#[derive(Debug, Clone, Serialize, Deserialize)]
8+
pub struct EchoMsg {
9+
pub msg_id: Value,
10+
pub echo: String
11+
}
12+
13+
#[derive(Debug, Clone, Serialize, Deserialize)]
14+
pub struct EchoOkMsg {
15+
pub echo: String,
16+
pub in_reply_to: Value
17+
}
18+
19+
impl EchoMsg {
20+
/// Generate EchoOkMsg response to this EchoMsg
21+
fn response(EchoMsg {echo, msg_id: source_msg_id}: Self) -> EchoOkMsg{
22+
EchoOkMsg {echo, in_reply_to: source_msg_id}
23+
}
24+
}
25+
26+
27+
#[hydroflow::main]
28+
async fn main() {
29+
let mut ports = hydroflow::util::cli::init().await;
30+
31+
// TODO: use ConnectedDemux?
32+
let echo_in = ports
33+
.port("echo_in")
34+
.connect::<ConnectedDirect>()
35+
.await
36+
.into_source();
37+
let echo_out = ports
38+
.port("echo_out")
39+
.connect::<ConnectedDirect>()
40+
.await
41+
.into_sink();
42+
43+
let df = hydroflow_syntax! {
44+
input = source_stream(echo_in)
45+
-> map(Result::unwrap)
46+
-> map(|x| x.to_vec())
47+
-> map(String::from_utf8)
48+
-> map(Result::unwrap);
49+
50+
output = map(|x| serde_json::to_string(&x))
51+
-> map(Result::unwrap)
52+
-> map(serialize_to_bytes)
53+
-> dest_sink(echo_out);
54+
55+
56+
input
57+
-> map(|x| serde_json::from_str::<EchoMsg>(&x).unwrap())
58+
//-> map(|x| EchoMsg {msg_id: x.msg_id, echo: x.echo + "hi"})
59+
-> map(EchoMsg::response)
60+
-> output;
61+
};
62+
63+
hydroflow::util::cli::launch_flow(df).await;
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use hydroflow::hydroflow_syntax;
2+
use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedSink};
3+
use hydroflow::util::serialize_to_bytes;
4+
use serde::{Serialize, Deserialize};
5+
use serde_json::{Value, json};
6+
7+
#[derive(Debug, Clone, Serialize, Deserialize)]
8+
pub struct Generate {
9+
pub msg_id: Value
10+
}
11+
12+
#[derive(Debug, Clone, Serialize, Deserialize)]
13+
pub struct GenerateOk {
14+
pub id: Value,
15+
pub in_reply_to: Value
16+
}
17+
18+
impl Generate {
19+
/// Generate GenerateOk response to this Generate message
20+
pub fn respond(self, i: usize, node_id: &str) -> GenerateOk{
21+
let id = json!([i, node_id]);
22+
23+
GenerateOk {id, in_reply_to: self.msg_id}
24+
}
25+
}
26+
27+
28+
#[hydroflow::main]
29+
async fn main() {
30+
let mut ports = hydroflow::util::cli::init().await;
31+
let node_id = ports.node_id.clone();
32+
33+
// TODO: use ConnectedDemux?
34+
let gen_in = ports
35+
.port("gen_in")
36+
.connect::<ConnectedDirect>()
37+
.await
38+
.into_source();
39+
let ok_out = ports
40+
.port("ok_out")
41+
.connect::<ConnectedDirect>()
42+
.await
43+
.into_sink();
44+
45+
let df = hydroflow_syntax! {
46+
input = source_stream(gen_in)
47+
-> map(Result::unwrap)
48+
-> map(|x| x.to_vec())
49+
-> map(String::from_utf8)
50+
-> map(Result::unwrap);
51+
52+
output = map(|x| serde_json::to_string(&x))
53+
-> map(Result::unwrap)
54+
-> map(serialize_to_bytes)
55+
-> dest_sink(ok_out);
56+
57+
58+
input
59+
-> map(|x| serde_json::from_str::<Generate>(&x).unwrap())
60+
-> enumerate::<'static>() //-> enumerate() will fail!
61+
-> map(|(i, x)| x.respond(i, &node_id))
62+
-> output;
63+
};
64+
65+
hydroflow::util::cli::launch_flow(df).await;
66+
}

hydro_cli_maelstrom/Cargo.toml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "hydro_cli_maelstrom"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
serde_json = "1"
10+
serde = { version = "1", features = [ "derive" ] }
11+
futures = { version = "0.3" }
12+
bytes = "1.1.0"
13+
bincode = "1.3"
14+
15+
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
16+
tokio = { version = "1.16", features = [ "full" ] }
17+
tokio-util = { version = "0.7.4", features = [ "net", "codec" ] }
18+
19+
[target.'cfg(target_arch = "wasm32")'.dependencies]
20+
tokio = { version = "1.16", features = [ "rt" , "sync", "macros", "io-util", "time" ] }
21+
tokio-util = { version = "0.7.4", features = [ "codec" ] }

0 commit comments

Comments
 (0)