Skip to content

Commit 755cf74

Browse files
committed
Add UDP broadcast simulation
1 parent 3bf1762 commit 755cf74

File tree

5 files changed

+180
-5
lines changed

5 files changed

+180
-5
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "udp_ipv4_broadcast"
3+
version = "0.1.0"
4+
edition = "2024"
5+
publish = false
6+
7+
[dependencies]
8+
tokio = "1"
9+
turmoil = { path = "../.." }
10+
tracing = "0.1"
11+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::{net::Ipv4Addr, time::Duration};
2+
use tracing::info;
3+
use turmoil::{IpVersion, net::UdpSocket};
4+
5+
const N_STEPS: usize = 3;
6+
7+
fn main() {
8+
tracing_subscriber::fmt()
9+
.with_env_filter(
10+
tracing_subscriber::EnvFilter::builder()
11+
.with_default_directive(tracing::level_filters::LevelFilter::INFO.into())
12+
.from_env_lossy(),
13+
)
14+
.init();
15+
16+
let tick = Duration::from_millis(100);
17+
let mut sim = turmoil::Builder::new()
18+
.tick_duration(tick)
19+
.ip_version(IpVersion::V4)
20+
.build();
21+
22+
let broadcast_port = 9000;
23+
24+
for server_index in 0..2 {
25+
sim.client(format!("server-{server_index}"), async move {
26+
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, broadcast_port)).await?;
27+
28+
let mut buf = [0; 1024];
29+
for _ in 0..N_STEPS {
30+
let (n, addr) = socket.recv_from(&mut buf).await?;
31+
let data = &buf[0..n];
32+
33+
info!("UDP packet from {} has been received: {:?}", addr, data);
34+
}
35+
Ok(())
36+
});
37+
}
38+
39+
sim.client("client", async move {
40+
let dst = (Ipv4Addr::BROADCAST, broadcast_port);
41+
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
42+
socket.set_broadcast(true)?;
43+
44+
for _ in 0..N_STEPS {
45+
let _ = socket.send_to(&[1, 2, 3], dst).await?;
46+
info!("UDP packet has been sent");
47+
48+
tokio::time::sleep(tick).await;
49+
}
50+
51+
Ok(())
52+
});
53+
54+
sim.run().unwrap();
55+
}

src/host.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::sync::Arc;
1313
use tokio::sync::{mpsc, Notify};
1414
use tokio::time::{Duration, Instant};
1515

16+
const DEFAULT_BROADCAST: bool = true;
1617
const DEFAULT_MULTICAST_LOOP: bool = true;
1718

1819
/// A host in the simulated network.
@@ -165,6 +166,7 @@ pub(crate) struct Udp {
165166

166167
struct UdpBind {
167168
bind_addr: SocketAddr,
169+
broadcast: bool,
168170
multicast_loop: bool,
169171
queue: mpsc::Sender<(Datagram, SocketAddr)>,
170172
}
@@ -177,17 +179,30 @@ impl Udp {
177179
}
178180
}
179181

180-
fn is_port_assigned(&self, port: u16) -> bool {
182+
pub(crate) fn is_port_assigned(&self, port: u16) -> bool {
181183
self.binds.keys().any(|p| *p == port)
182184
}
183185

186+
pub(crate) fn is_broadcast_enabled(&self, port: u16) -> bool {
187+
self.binds
188+
.get(&port)
189+
.map(|bind| bind.broadcast)
190+
.unwrap_or(DEFAULT_BROADCAST)
191+
}
192+
184193
pub(crate) fn is_multicast_loop_enabled(&self, port: u16) -> bool {
185194
self.binds
186195
.get(&port)
187196
.map(|bind| bind.multicast_loop)
188197
.unwrap_or(DEFAULT_MULTICAST_LOOP)
189198
}
190199

200+
pub(crate) fn set_broadcast(&mut self, port: u16, on: bool) {
201+
self.binds
202+
.entry(port)
203+
.and_modify(|bind| bind.broadcast = on);
204+
}
205+
191206
pub(crate) fn set_multicast_loop(&mut self, port: u16, on: bool) {
192207
self.binds
193208
.entry(port)
@@ -198,6 +213,7 @@ impl Udp {
198213
let (tx, rx) = mpsc::channel(self.capacity);
199214
let bind = UdpBind {
200215
bind_addr: addr,
216+
broadcast: DEFAULT_BROADCAST,
201217
multicast_loop: DEFAULT_MULTICAST_LOOP,
202218
queue: tx,
203219
};

src/net/udp.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,29 @@ impl UdpSocket {
344344
}
345345

346346
match dst {
347+
SocketAddr::V4(dst) if dst.ip().is_broadcast() => {
348+
let host = world.current_host();
349+
match host.udp.is_broadcast_enabled(src.port()) {
350+
true => world
351+
.hosts
352+
.iter()
353+
.filter(|(_, host)| host.udp.is_port_assigned(dst.port()))
354+
.map(|(addr, _)| SocketAddr::new(*addr, dst.port()))
355+
.collect::<Vec<_>>()
356+
.into_iter()
357+
.try_for_each(|dst| match dst {
358+
dst if src.ip() == dst.ip() => {
359+
send_loopback(src, dst, Protocol::Udp(packet.clone()));
360+
Ok(())
361+
}
362+
dst => world.send_message(src, dst, Protocol::Udp(packet.clone())),
363+
}),
364+
false => Err(Error::new(
365+
ErrorKind::PermissionDenied,
366+
"Broadcast is not enabled",
367+
)),
368+
}
369+
}
347370
dst if dst.ip().is_multicast() => world
348371
.multicast_groups
349372
.destination_addresses(dst)
@@ -357,12 +380,38 @@ impl UdpSocket {
357380
Ok(())
358381
}
359382
dst => world.send_message(src, dst, Protocol::Udp(packet.clone())),
360-
})?,
361-
dst if is_same(src, dst) => send_loopback(src, dst, Protocol::Udp(packet)),
362-
_ => world.send_message(src, dst, Protocol::Udp(packet))?,
383+
}),
384+
dst if is_same(src, dst) => {
385+
send_loopback(src, dst, Protocol::Udp(packet));
386+
Ok(())
387+
}
388+
_ => world.send_message(src, dst, Protocol::Udp(packet)),
363389
}
390+
}
364391

365-
Ok(())
392+
/// Gets the value of the `SO_BROADCAST` option for this socket.
393+
///
394+
/// For more information about this option, see [`set_broadcast`].
395+
///
396+
/// [`set_broadcast`]: method@Self::set_broadcast
397+
pub fn broadcast(&self) -> io::Result<bool> {
398+
let local_port = self.local_addr.port();
399+
World::current(|world| Ok(world.current_host().udp.is_broadcast_enabled(local_port)))
400+
}
401+
402+
/// Sets the value of the `SO_BROADCAST` option for this socket.
403+
///
404+
/// When enabled, this socket is allowed to send packets to a broadcast
405+
/// address.
406+
pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
407+
let local_port = match self.local_addr {
408+
SocketAddr::V4(addr) => addr.port(),
409+
_ => return Ok(()),
410+
};
411+
World::current(|world| {
412+
world.current_host_mut().udp.set_broadcast(local_port, on);
413+
Ok(())
414+
})
366415
}
367416

368417
/// Gets the value of the `IP_MULTICAST_LOOP` option for this socket.

tests/udp.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,50 @@ fn try_recv_pong(sock: &net::UdpSocket) -> Result<()> {
9595
Ok(())
9696
}
9797

98+
#[test]
99+
fn udp_ipv4_broadcast() -> Result {
100+
let mut sim = Builder::new()
101+
.tick_duration(Duration::from_millis(50))
102+
.ip_version(IpVersion::V4)
103+
.build();
104+
105+
let non_broadcast_port = 8000;
106+
let broadcast_port = 9000;
107+
sim.client("server-non-broadcast-port", async move {
108+
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, non_broadcast_port)).await?;
109+
110+
let mut buf = [0; 1];
111+
let is_timed_out = timeout(Duration::from_secs(1), socket.recv_from(&mut buf))
112+
.await
113+
.is_err();
114+
assert!(is_timed_out);
115+
116+
Ok(())
117+
});
118+
for server_index in 0..3 {
119+
sim.client(format!("server-{server_index}"), async move {
120+
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, broadcast_port)).await?;
121+
122+
let mut buf = [0; 1];
123+
socket.recv_from(&mut buf).await?;
124+
assert_eq!([1], buf);
125+
126+
Ok(())
127+
});
128+
}
129+
sim.client("client", async move {
130+
let dst = (Ipv4Addr::BROADCAST, broadcast_port);
131+
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
132+
socket.set_broadcast(true)?;
133+
134+
let _ = socket.send_to(&[1], dst).await?;
135+
136+
Ok(())
137+
});
138+
139+
sim.run()
140+
}
141+
98142
#[test]
99143
fn udp_ipv4_multicast() -> Result {
100144
let mut sim = Builder::new()

0 commit comments

Comments
 (0)