Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions demos/std/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion demos/std/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ members = [
"log-router-tcp",
"log-router-serial",
"stream-plotting",
"tilt-app", "ergot-bridge-pair-udp",
"tilt-app",
"ergot-bridge-pair-udp",
]
resolver = "2"

Expand Down
2 changes: 2 additions & 0 deletions demos/std/ergot-bridge-pair-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ log = { workspace = true }
ergot = { workspace = true }
mutex = { workspace = true }
tokio = { workspace = true }
serde = { version = "1.0.219", default-features = false, features = ["derive"] }
postcard-schema = { version = "0.2.5", features = ["derive"] }

[[bin]]
name = "controller"
Expand Down
64 changes: 64 additions & 0 deletions demos/std/ergot-bridge-pair-udp/src/bin/controller.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use ergot::{
endpoint,
toolkits::tokio_udp::{
EdgeStack, new_controller_stack, new_std_queue, register_edge_interface,
},
topic,
well_known::DeviceInfo,
Address,
};
use log::{debug, info, warn};
use postcard_schema::Schema;
use serde::{Deserialize, Serialize};
use tokio::{net::UdpSocket, select, time, time::sleep};

use ergot::interface_manager::profiles::direct_edge::tokio_udp::InterfaceKind;
Expand All @@ -16,6 +20,15 @@ use tokio::time::interval;

topic!(YeetTopic, u64, "topic/yeet");

// Define the calculator endpoint: Request is AddRequest, Response is i32
#[derive(Serialize, Deserialize, Schema, Debug, Clone)]
pub struct AddRequest {
pub a: i32,
pub b: i32,
}

endpoint!(CalculatorEndpoint, AddRequest, i32, "calc/add");

#[tokio::main]
async fn main() -> io::Result<()> {
let queue = new_std_queue(4096);
Expand All @@ -33,6 +46,7 @@ async fn main() -> io::Result<()> {
tokio::task::spawn(basic_services(stack.clone(), port));
tokio::task::spawn(yeeter(stack.clone()));
tokio::task::spawn(yeet_listener(stack.clone(), 0));
tokio::task::spawn(calculator_client(stack.clone()));

register_edge_interface(&stack, udp_socket, &queue, InterfaceKind::Controller)
.await
Expand Down Expand Up @@ -97,6 +111,7 @@ async fn yeeter(stack: EdgeStack) {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
info!("Sending broadcast message from controller");
println!("📤 Controller sending YeetTopic: counter = {}", ctr);
stack.topics().broadcast::<YeetTopic>(&ctr, None).unwrap();
ctr += 1;
}
Expand All @@ -119,7 +134,56 @@ async fn yeet_listener(stack: EdgeStack, id: u8) {
msg = hdl.recv() => {
packets_this_interval += 1;
debug!("{}: Listener id:{} got {}", msg.hdr, id, msg.t);
println!("📨 Received YeetTopic message: counter = {}", msg.t);
}
}
}
}

async fn calculator_client(stack: EdgeStack) {
// Wait for services to start
tokio::time::sleep(Duration::from_secs(3)).await;

// Controller is on network 1, node 1 (CENTRAL_NODE_ID)
// Target is on network 1, node 2 (EDGE_NODE_ID)
// Calculator server will be on port 1 on the target
let calc_addr = Address {
network_id: 1,
node_id: 2,
port_id: 1,
};

let mut counter = 1;
loop {
tokio::time::sleep(Duration::from_secs(7)).await;

let request = AddRequest {
a: counter * 10,
b: counter * 5,
};

info!("Sending calculator request: {} + {} to target (1.2:1)", request.a, request.b);
println!("➡️ Sending request: {} + {} to target (1.2:1)", request.a, request.b);

// Send to target node with 3 second timeout
match tokio::time::timeout(
Duration::from_secs(3),
stack.endpoints().request::<CalculatorEndpoint>(calc_addr, &request, None)
).await {
Ok(Ok(response)) => {
info!("Got response: {}", response);
println!("⬅️ Got response: {}", response);
}
Ok(Err(e)) => {
warn!("Request failed: {:?}", e);
println!("❌ Request failed: {:?}", e);
}
Err(_) => {
warn!("Request timeout");
println!("❌ Request timeout after 3s");
}
}

counter += 1;
}
}
40 changes: 40 additions & 0 deletions demos/std/ergot-bridge-pair-udp/src/bin/target.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use ergot::{
endpoint,
toolkits::tokio_udp::{EdgeStack, new_std_queue, new_target_stack, register_edge_interface},
topic,
well_known::DeviceInfo,
};
use log::{debug, info};
use postcard_schema::Schema;
use serde::{Deserialize, Serialize};
use tokio::{net::UdpSocket, select, time, time::sleep};

use ergot::interface_manager::profiles::direct_edge::tokio_udp::InterfaceKind;
Expand All @@ -13,6 +16,15 @@ use std::{io, pin::pin, time::Duration};

topic!(YeetTopic, u64, "topic/yeet");

// Define the calculator endpoint: Request is AddRequest, Response is i32
#[derive(Serialize, Deserialize, Schema, Debug, Clone)]
pub struct AddRequest {
pub a: i32,
pub b: i32,
}

endpoint!(CalculatorEndpoint, AddRequest, i32, "calc/add");

#[tokio::main]
async fn main() -> io::Result<()> {
//env_logger::init();
Expand All @@ -34,6 +46,7 @@ async fn main() -> io::Result<()> {
tokio::task::spawn(basic_services(stack.clone(), port));
tokio::task::spawn(yeeter(stack.clone()));
tokio::task::spawn(yeet_listener(stack.clone(), 0));
tokio::task::spawn(calculator_server(stack.clone()));

register_edge_interface(&stack, udp_socket, &queue, InterfaceKind::Target)
.await
Expand Down Expand Up @@ -66,6 +79,7 @@ async fn yeeter(stack: EdgeStack) {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
info!("Sending broadcast message from target");
println!("📤 Target sending YeetTopic: counter = {}", ctr);
stack.topics().broadcast::<YeetTopic>(&ctr, None).unwrap();
ctr += 1;
}
Expand All @@ -88,7 +102,33 @@ async fn yeet_listener(stack: EdgeStack, id: u8) {
msg = hdl.recv() => {
packets_this_interval += 1;
debug!("{}: Listener id:{} got {}", msg.hdr, id, msg.t);
println!("📨 Received YeetTopic message: counter = {}", msg.t);
}
}
}
}

async fn calculator_server(stack: EdgeStack) {
info!("Starting calculator endpoint server");
println!("🧮 Calculator endpoint server started");

// Use None for auto-assigned port
let server = stack.endpoints().bounded_server::<CalculatorEndpoint, 4>(None);
let server = pin!(server);
let mut server_hdl = server.attach();

let port = server_hdl.port();
info!("Calculator server listening on port {}", port);
println!("🧮 Calculator server listening on port {}", port);

loop {
let _ = server_hdl
.serve(async |req: &AddRequest| {
let result = req.a + req.b;
info!("Calculator: {} + {} = {}", req.a, req.b, result);
println!("🧮 Request: {} + {} = {}", req.a, req.b, result);
result
})
.await;
}
}