Skip to content

Commit 76f2117

Browse files
committed
Let network faults affect in-flight messages
When injecting network faults (partition, hold) into a link, also update the state of messages already in the `sent` queue, instead of having the fault only affect messages that are enqueued in the future. This changes enables writing tests in which one observes the contents of a link to find the point in time in which a given message is sent, to then prevent that message from getting delivered.
1 parent e099732 commit 76f2117

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed

src/top.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,8 @@ impl Link {
449449
if do_rand {
450450
self.state_a_b = State::RandPartition;
451451
self.state_b_a = State::RandPartition;
452+
453+
self.sent.clear();
452454
}
453455
}
454456
(State::RandPartition, _) | (_, State::RandPartition) => {
@@ -463,6 +465,10 @@ impl Link {
463465
fn hold(&mut self) {
464466
self.state_a_b = State::Hold;
465467
self.state_b_a = State::Hold;
468+
469+
for sent in &mut self.sent {
470+
sent.status = DeliveryStatus::Hold;
471+
}
466472
}
467473

468474
// This link becomes healthy, and any held messages are scheduled for delivery.
@@ -479,6 +485,8 @@ impl Link {
479485
fn explicit_partition(&mut self) {
480486
self.state_a_b = State::ExplicitPartition;
481487
self.state_b_a = State::ExplicitPartition;
488+
489+
self.sent.clear();
482490
}
483491

484492
fn partition_oneway(&mut self, from: IpAddr, to: IpAddr) {
@@ -487,6 +495,8 @@ impl Link {
487495
} else {
488496
self.state_b_a = State::ExplicitPartition;
489497
}
498+
499+
self.sent.retain(|sent| sent.src.ip() != from);
490500
}
491501

492502
fn repair_oneway(&mut self, from: IpAddr, to: IpAddr) {

tests/tcp.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,3 +1393,80 @@ fn try_write() -> Result {
13931393

13941394
sim.run()
13951395
}
1396+
1397+
#[test]
1398+
fn network_partition_drops_inflight_messages() {
1399+
let mut sim = Builder::new()
1400+
.min_message_latency(Duration::from_millis(10))
1401+
.build();
1402+
1403+
sim.host("server", || async {
1404+
let listener = bind().await?;
1405+
let (mut sock, _) = listener.accept().await?;
1406+
sock.write_u8(1).await?;
1407+
turmoil::partition("server", "client");
1408+
Ok(())
1409+
});
1410+
1411+
sim.client("client", async {
1412+
let mut sock = TcpStream::connect(("server", PORT)).await?;
1413+
sock.read_u8().await?;
1414+
Ok(())
1415+
});
1416+
1417+
let result = sim.run();
1418+
assert!(result.is_err());
1419+
}
1420+
1421+
#[test]
1422+
fn network_partition_oneway_drops_inflight_messages() {
1423+
let mut sim = Builder::new()
1424+
.min_message_latency(Duration::from_millis(10))
1425+
.build();
1426+
1427+
sim.host("server", || async {
1428+
let listener = bind().await?;
1429+
let (mut sock, _) = listener.accept().await?;
1430+
sock.write_u8(1).await?;
1431+
turmoil::partition_oneway("server", "client");
1432+
Ok(())
1433+
});
1434+
1435+
sim.client("client", async {
1436+
let mut sock = TcpStream::connect(("server", PORT)).await?;
1437+
sock.read_u8().await?;
1438+
Ok(())
1439+
});
1440+
1441+
let result = sim.run();
1442+
assert!(result.is_err());
1443+
}
1444+
1445+
#[test]
1446+
fn network_hold_holds_inflight_messages() {
1447+
let mut sim = Builder::new()
1448+
.min_message_latency(Duration::from_millis(10))
1449+
.build();
1450+
1451+
sim.host("server", || async {
1452+
let listener = bind().await?;
1453+
let (mut sock, _) = listener.accept().await?;
1454+
sock.write_u8(1).await?;
1455+
1456+
turmoil::hold("server", "client");
1457+
tokio::time::sleep(Duration::from_secs(1)).await;
1458+
turmoil::release("server", "client");
1459+
1460+
Ok(())
1461+
});
1462+
1463+
sim.client("client", async {
1464+
let mut sock = TcpStream::connect(("server", PORT)).await?;
1465+
let result = timeout(Duration::from_millis(500), sock.read_u8()).await;
1466+
assert!(result.is_err());
1467+
sock.read_u8().await?;
1468+
Ok(())
1469+
});
1470+
1471+
sim.run().unwrap();
1472+
}

0 commit comments

Comments
 (0)