Skip to content

Commit 02096e4

Browse files
authored
Merge pull request #109 from 191220029/IP_management
2 parents a46579e + 702315f commit 02096e4

File tree

5 files changed

+248
-16
lines changed

5 files changed

+248
-16
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ default-members = [".", "dagrs-derive"]
1919

2020
[dependencies]
2121
dagrs-derive = { path = "dagrs-derive", optional = true, version = "0.4.3" }
22-
tokio = { version = "1.28", features = ["rt", "sync", "rt-multi-thread"] }
22+
tokio = { version = "1.28", features = ["rt", "sync", "rt-multi-thread", "time"] }
2323
log = "0.4"
2424
async-trait = "0.1.83"
2525
futures = "0.3.31"

examples/recv_any_example.rs

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
//! # Example: recv_any_example
2+
//! Demonstrates how to use the `recv_any` method of `InChannels` to receive data from any available channel.
3+
//!
4+
//!
5+
//! # Output
6+
//! When running this example, you will see output similar to:
7+
//! ```
8+
//! Received message 'Hello from Sender' from node NodeId(1)
9+
//! Received message 'Hello from SlowSender' from node NodeId(2)
10+
//! ```
11+
//!
12+
//! The first message comes from the normal sender, and the second message comes from the slow sender
13+
//! after a 500ms delay.
14+
//!
15+
//! //! This example creates a graph with two senders and one receiver:
16+
//! - A normal sender that sends messages immediately
17+
//! - A slow sender that delays 500ms before sending messages
18+
//! - A receiver that uses `recv_any` to receive messages from either sender
19+
//!
20+
//! # Output
21+
//! When running this example, you will see output similar to:
22+
//! ```
23+
//! Received message 'Hello from Sender' from node NodeId(1)
24+
//! Received message 'Hello from SlowSender' from node NodeId(2)
25+
//! ```
26+
//!
27+
//! The first message comes from the normal sender, and the second message comes from the slow sender
28+
//! after a 500ms delay.
29+
30+
use std::{sync::Arc, time::Duration};
31+
32+
use async_trait::async_trait;
33+
use dagrs::{
34+
connection::{in_channel::TypedInChannels, out_channel::TypedOutChannels},
35+
node::typed_action::TypedAction,
36+
DefaultNode, EnvVar, Graph, Node, NodeTable, Output,
37+
};
38+
use tokio::time::sleep;
39+
40+
/// An action that sends a message to its output channel
41+
#[derive(Default)]
42+
pub struct SenderAction {
43+
message: String,
44+
}
45+
46+
impl SenderAction {
47+
pub fn new(message: String) -> Self {
48+
Self { message }
49+
}
50+
}
51+
52+
#[async_trait]
53+
impl TypedAction for SenderAction {
54+
type I = ();
55+
type O = String;
56+
57+
async fn run(
58+
&self,
59+
_: TypedInChannels<Self::I>,
60+
out: TypedOutChannels<Self::O>,
61+
_: Arc<EnvVar>,
62+
) -> Output {
63+
// Send the message to all receivers
64+
out.broadcast(self.message.clone()).await;
65+
Output::Out(None)
66+
}
67+
}
68+
69+
/// An action that sends a message to its output channel after a delay
70+
#[derive(Default)]
71+
pub struct SlowSenderAction {
72+
message: String,
73+
}
74+
75+
impl SlowSenderAction {
76+
pub fn new(message: String) -> Self {
77+
Self { message }
78+
}
79+
}
80+
81+
#[async_trait]
82+
impl TypedAction for SlowSenderAction {
83+
type I = ();
84+
type O = String;
85+
86+
async fn run(
87+
&self,
88+
_: TypedInChannels<Self::I>,
89+
out: TypedOutChannels<Self::O>,
90+
_: Arc<EnvVar>,
91+
) -> Output {
92+
// Wait for 500ms before sending
93+
sleep(Duration::from_millis(500)).await;
94+
// Send the message to all receivers
95+
out.broadcast(self.message.clone()).await;
96+
Output::Out(None)
97+
}
98+
}
99+
100+
/// An action that receives messages from any available channel
101+
#[derive(Default)]
102+
pub struct ReceiverAction;
103+
104+
#[async_trait]
105+
impl TypedAction for ReceiverAction {
106+
type I = String;
107+
type O = ();
108+
109+
async fn run(
110+
&self,
111+
mut input: TypedInChannels<Self::I>,
112+
_: TypedOutChannels<Self::O>,
113+
_: Arc<EnvVar>,
114+
) -> Output {
115+
// Receive from any available channel
116+
match input.recv_any().await {
117+
Ok((sender_id, content)) => {
118+
let message = content.unwrap();
119+
println!("Received message '{}' from node {:?}", message, sender_id);
120+
}
121+
Err(e) => {
122+
eprintln!("Error receiving message: {:?}", e);
123+
}
124+
}
125+
126+
match input.recv_any().await {
127+
Ok((sender_id, content)) => {
128+
let message = content.unwrap();
129+
println!("Received message '{}' from node {:?}", message, sender_id);
130+
}
131+
Err(e) => {
132+
eprintln!("Error receiving message: {:?}", e);
133+
}
134+
}
135+
136+
Output::Out(None)
137+
}
138+
}
139+
140+
fn main() {
141+
// Create a node table
142+
let mut node_table = NodeTable::new();
143+
144+
// Create sender nodes
145+
let sender1 = DefaultNode::with_action(
146+
"Sender1".to_string(),
147+
SenderAction::new("Hello from Sender".to_string()),
148+
&mut node_table,
149+
);
150+
let sender2 = DefaultNode::with_action(
151+
"Sender2".to_string(),
152+
SlowSenderAction::new("Hello from SlowSender".to_string()),
153+
&mut node_table,
154+
);
155+
156+
// Create receiver node
157+
let receiver = DefaultNode::with_action(
158+
"Receiver".to_string(),
159+
ReceiverAction::default(),
160+
&mut node_table,
161+
);
162+
163+
// Get node IDs before adding nodes to the graph
164+
let sender1_id = sender1.id();
165+
let sender2_id = sender2.id();
166+
let receiver_id = receiver.id();
167+
168+
// Create a graph
169+
let mut graph = Graph::new();
170+
171+
// Add nodes to the graph
172+
graph.add_node(sender1);
173+
graph.add_node(sender2);
174+
graph.add_node(receiver);
175+
176+
// Add edges: both senders connect to the receiver
177+
graph.add_edge(sender1_id, vec![receiver_id]);
178+
graph.add_edge(sender2_id, vec![receiver_id]);
179+
180+
// Run the graph
181+
match graph.start() {
182+
Ok(_) => (),
183+
Err(e) => {
184+
eprintln!("Graph execution failed: {:?}", e);
185+
}
186+
}
187+
}

examples/typed_action.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ fn main() {
116116
assert_eq!(*res, 272)
117117
}
118118
Err(e) => {
119-
panic!("图执行失败: {:?}", e);
119+
panic!("Graph execution failed: {:?}", e);
120120
}
121121
}
122122
}

src/connection/in_channel.rs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
22

33
use futures::future::join_all;
4+
use futures::future::select_ok;
45
use tokio::sync::{broadcast, mpsc, Mutex};
56

67
use crate::node::node::NodeId;
@@ -29,6 +30,31 @@ impl InChannels {
2930
}
3031
}
3132

33+
/// Receives data from any available channel and returns both the sender's ID and the content.
34+
/// This method will wait until any channel has data available.
35+
pub async fn recv_any(&mut self) -> Result<(NodeId, Content), RecvErr> {
36+
let mut futures = Vec::new();
37+
let ids: Vec<NodeId> = self.keys();
38+
39+
for id in ids {
40+
let channel = self.get(&id).ok_or(RecvErr::NoSuchChannel)?;
41+
let fut = Box::pin(async move {
42+
let content = channel.lock().await.recv().await?;
43+
Ok::<_, RecvErr>((id, content))
44+
});
45+
futures.push(fut);
46+
}
47+
48+
if futures.is_empty() {
49+
return Err(RecvErr::NoSuchChannel);
50+
}
51+
52+
match select_ok(futures).await {
53+
Ok((result, _)) => Ok(result),
54+
Err(_) => Err(RecvErr::Closed),
55+
}
56+
}
57+
3258
/// Calls `blocking_recv` for all the [`InChannel`]s, and applies transformation `f` to
3359
/// the return values of the call.
3460
pub fn blocking_map<F, T>(&mut self, mut f: F) -> Vec<T>
@@ -184,6 +210,31 @@ impl<T: Send + Sync + 'static> TypedInChannels<T> {
184210
}
185211
}
186212

213+
/// Receives typed data from any available channel and returns both the sender's ID and the typed content.
214+
/// This method will wait until any channel has data available.
215+
pub async fn recv_any(&mut self) -> Result<(NodeId, Option<Arc<T>>), RecvErr> {
216+
let mut futures = Vec::new();
217+
let ids: Vec<NodeId> = self.keys();
218+
219+
for id in ids {
220+
let channel = self.get(&id).ok_or(RecvErr::NoSuchChannel)?;
221+
let fut = Box::pin(async move {
222+
let content: Content = channel.lock().await.recv().await?;
223+
Ok::<_, RecvErr>((id, content.into_inner()))
224+
});
225+
futures.push(fut);
226+
}
227+
228+
if futures.is_empty() {
229+
return Err(RecvErr::NoSuchChannel);
230+
}
231+
232+
match select_ok(futures).await {
233+
Ok((result, _)) => Ok(result),
234+
Err(_) => Err(RecvErr::Closed),
235+
}
236+
}
237+
187238
/// Calls `blocking_recv` for all the [`InChannel`]s, and applies transformation `f` to
188239
/// the return values of the call.
189240
pub fn blocking_map<F, U>(&mut self, mut f: F) -> Vec<U>
@@ -225,14 +276,6 @@ impl<T: Send + Sync + 'static> TypedInChannels<T> {
225276
}
226277
}
227278

228-
pub(crate) fn insert(&mut self, node_id: NodeId, channel: Arc<Mutex<InChannel>>) {
229-
self.0.insert(node_id, channel);
230-
}
231-
232-
pub(crate) fn close_all(&mut self) {
233-
self.0.values_mut().for_each(|c| c.blocking_lock().close());
234-
}
235-
236279
fn get(&self, id: &NodeId) -> Option<Arc<Mutex<InChannel>>> {
237280
match self.0.get(id) {
238281
Some(c) => Some(c.clone()),

src/connection/out_channel.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ impl OutChannels {
6969
pub(crate) fn insert(&mut self, node_id: NodeId, channel: Arc<Mutex<OutChannel>>) {
7070
self.0.insert(node_id, channel);
7171
}
72+
73+
/// Returns a list of all available receiver node IDs.
74+
pub fn get_receiver_ids(&self) -> Vec<NodeId> {
75+
self.0.keys().copied().collect()
76+
}
7277
}
7378

7479
/// # Output Channel
@@ -179,18 +184,15 @@ impl<T: Send + Sync + 'static> TypedOutChannels<T> {
179184
}
180185
}
181186

182-
pub(crate) fn close_all(&mut self) {
183-
self.0.clear();
184-
}
185-
186187
fn get(&self, id: &NodeId) -> Option<Arc<Mutex<OutChannel>>> {
187188
match self.0.get(id) {
188189
Some(c) => Some(c.clone()),
189190
None => None,
190191
}
191192
}
192193

193-
pub(crate) fn insert(&mut self, node_id: NodeId, channel: Arc<Mutex<OutChannel>>) {
194-
self.0.insert(node_id, channel);
194+
/// Returns a list of all available receiver node IDs.
195+
pub fn get_receiver_ids(&self) -> Vec<NodeId> {
196+
self.0.keys().copied().collect()
195197
}
196198
}

0 commit comments

Comments
 (0)