Skip to content

Commit f50cb27

Browse files
committed
Add scaffolding for routing integation tests
1 parent d8d8813 commit f50cb27

15 files changed

Lines changed: 1312 additions & 67 deletions

File tree

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,11 @@ jobs:
210210

211211
- name: Run tests stable (sudo macos)
212212
if: ${{ matrix.os == 'macos-latest' }}
213-
run: sudo cargo nextest run -p zenoh
213+
run: sudo cargo nextest run -p zenoh -F test
214214

215215
- name: Run tests stable
216216
if: ${{ matrix.os != 'macos-latest' }}
217-
run: cargo nextest run -p zenoh
217+
run: cargo nextest run -p zenoh -F test
218218

219219
- name: Check for feature leaks
220220
if: ${{ matrix.os == 'ubuntu-latest' }}

_typos.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
extend-exclude = [
33
# Ignore files containing hexa.
44
"io/zenoh-transport/tests/*.rs",
5+
"zenoh/src/net/tests/regions",
56
"zenoh/tests/authentication.rs",
67
"zenoh/tests/open_time.rs",
78
]

commons/zenoh-protocol/src/core/region.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ pub enum Region {
9090
}
9191

9292
impl Region {
93+
pub const fn default_south(mode: WhatAmI) -> Self {
94+
Self::South { id: 0, mode }
95+
}
96+
9397
pub fn bound(&self) -> Bound {
9498
match self {
9599
Region::North => Bound::North,

io/zenoh-transport/src/unicast/test_helpers.rs

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,156 @@
1111
// Contributors:
1212
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
1313
//
14+
use std::{fmt::DebugStruct, sync::Arc, time::Duration};
15+
16+
use async_trait::async_trait;
17+
use tokio::sync::MutexGuard as AsyncMutexGuard;
1418
use zenoh_core::zcondfeat;
19+
use zenoh_link::Link;
20+
use zenoh_protocol::{
21+
core::{Bound, RegionName, WhatAmI, ZenohIdProto},
22+
network::{NetworkBody, NetworkBodyMut, NetworkMessage, NetworkMessageMut},
23+
transport::TransportSn,
24+
};
25+
use zenoh_result::ZResult;
26+
27+
use crate::{
28+
unicast::{
29+
authentication::TransportAuthId,
30+
link::LinkUnicastWithOpenAck,
31+
transport_unicast_inner::{AddLinkResult, TransportStatus, TransportUnicastTrait},
32+
TransportConfigUnicast, TransportManagerBuilderUnicast, TransportUnicast,
33+
},
34+
TransportManager, TransportPeerEventHandler,
35+
};
36+
37+
/// A minimal stub implementation of [`TransportUnicastTrait`] for use in routing tests.
38+
///
39+
/// All methods panic with `unimplemented!()` except `get_zid`, `get_whatami`, and `schedule`.
40+
/// `schedule` converts each incoming [`NetworkMessageMut`] to an owned [`NetworkMessage`] and
41+
/// calls the `on_schedule` callback provided at construction time, allowing tests to observe
42+
/// every message that the routing layer sends to this face without a real network connection.
43+
pub struct MockTransportUnicastInner {
44+
zid: ZenohIdProto,
45+
whatami: WhatAmI,
46+
on_schedule: Arc<dyn Fn(NetworkMessage) + Send + Sync>,
47+
}
48+
49+
#[async_trait]
50+
impl TransportUnicastTrait for MockTransportUnicastInner {
51+
fn set_callback(&self, _callback: Arc<dyn TransportPeerEventHandler>) {}
52+
53+
async fn get_status(&self) -> AsyncMutexGuard<'_, TransportStatus> {
54+
unimplemented!("MockTransportUnicastInner::get_status")
55+
}
56+
57+
fn get_zid(&self) -> ZenohIdProto {
58+
self.zid
59+
}
60+
61+
fn get_whatami(&self) -> WhatAmI {
62+
self.whatami
63+
}
64+
65+
fn get_callback(&self) -> Option<Arc<dyn TransportPeerEventHandler>> {
66+
unimplemented!("MockTransportUnicastInner::get_callback")
67+
}
68+
69+
fn get_links(&self) -> Vec<Link> {
70+
vec![]
71+
}
1572

16-
use crate::{unicast::TransportManagerBuilderUnicast, TransportManager};
73+
fn get_auth_ids(&self) -> TransportAuthId {
74+
unimplemented!("MockTransportUnicastInner::get_auth_ids")
75+
}
76+
77+
#[cfg(feature = "shared-memory")]
78+
fn is_shm(&self) -> bool {
79+
false
80+
}
81+
82+
fn is_qos(&self) -> bool {
83+
false
84+
}
85+
86+
fn region_name(&self) -> Option<RegionName> {
87+
None
88+
}
89+
90+
fn get_bound(&self) -> Option<Bound> {
91+
None
92+
}
93+
94+
fn get_config(&self) -> &TransportConfigUnicast {
95+
unimplemented!("MockTransportUnicastInner::get_config")
96+
}
97+
98+
#[cfg(feature = "stats")]
99+
fn stats(&self) -> zenoh_stats::TransportStats {
100+
unimplemented!("MockTransportUnicastInner::stats")
101+
}
102+
103+
async fn add_link(
104+
&self,
105+
_link: LinkUnicastWithOpenAck,
106+
_other_initial_sn: TransportSn,
107+
_other_lease: Duration,
108+
) -> AddLinkResult {
109+
unimplemented!("MockTransportUnicastInner::add_link")
110+
}
111+
112+
fn schedule(&self, msg: NetworkMessageMut) -> ZResult<bool> {
113+
let body = match msg.body {
114+
NetworkBodyMut::Push(p) => NetworkBody::Push(p.clone()),
115+
NetworkBodyMut::Request(r) => NetworkBody::Request(r.clone()),
116+
NetworkBodyMut::Response(r) => NetworkBody::Response(r.clone()),
117+
NetworkBodyMut::ResponseFinal(r) => NetworkBody::ResponseFinal(r.clone()),
118+
NetworkBodyMut::Interest(i) => NetworkBody::Interest(i.clone()),
119+
NetworkBodyMut::Declare(d) => NetworkBody::Declare(d.clone()),
120+
NetworkBodyMut::OAM(o) => NetworkBody::OAM(o.clone()),
121+
};
122+
(self.on_schedule)(NetworkMessage {
123+
body,
124+
reliability: msg.reliability,
125+
});
126+
Ok(true)
127+
}
128+
129+
async fn close(&self, _reason: u8) -> ZResult<()> {
130+
Ok(())
131+
}
132+
133+
fn add_debug_fields<'a, 'b: 'a, 'c>(
134+
&self,
135+
s: &'c mut DebugStruct<'a, 'b>,
136+
) -> &'c mut DebugStruct<'a, 'b> {
137+
s
138+
}
139+
}
140+
141+
/// Creates a [`TransportUnicast`] backed by a [`MockTransportUnicastInner`].
142+
///
143+
/// Every message that the routing layer sends to this face is converted to an owned
144+
/// [`NetworkMessage`] and forwarded to `on_schedule`. The caller can use this callback to
145+
/// record or assert on outgoing messages without a real network connection.
146+
///
147+
/// The caller must keep the returned `Arc<MockTransportUnicastInner>` alive for the duration of
148+
/// any operation that uses the `TransportUnicast`, because `TransportUnicast` only holds a `Weak`
149+
/// reference.
150+
pub fn mock_transport_unicast(
151+
zid: ZenohIdProto,
152+
whatami: WhatAmI,
153+
on_schedule: Arc<dyn Fn(NetworkMessage) + Send + Sync>,
154+
) -> (TransportUnicast, Arc<MockTransportUnicastInner>) {
155+
let inner = Arc::new(MockTransportUnicastInner {
156+
zid,
157+
whatami,
158+
on_schedule,
159+
});
160+
let erased: Arc<dyn TransportUnicastTrait> = inner.clone();
161+
let transport = TransportUnicast::from(&erased);
162+
(transport, inner)
163+
}
17164

18165
pub fn make_transport_manager_builder(
19166
#[cfg(feature = "transport_multilink")] max_links: usize,

zenoh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ shared-memory = [
5959
"zenoh-transport/shared-memory",
6060
]
6161
stats = ["zenoh-stats", "zenoh-transport/stats"]
62+
test = ["zenoh-transport/test"]
6263
tracing-instrument = [
6364
"zenoh-runtime/tracing-instrument",
6465
"zenoh-task/tracing-instrument",

zenoh/src/net/primitives/demux.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::net::routing::{
3232
};
3333

3434
pub struct DeMux {
35-
face: Face,
35+
pub(crate) face: Face,
3636
pub(crate) transport: Option<TransportUnicast>,
3737
pub(crate) interceptor: Arc<ArcSwapOption<InterceptorsChain>>,
3838
}

zenoh/src/net/routing/gateway.rs

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,21 @@ use crate::net::{
5050
pub struct GatewayBuilder<'c> {
5151
config: &'c ExpandedConfig,
5252
hlc: Option<Arc<HLC>>,
53-
hats: Vec<Region>,
5453
#[cfg(feature = "stats")]
5554
stats: Option<zenoh_stats::StatsRegistry>,
55+
#[cfg(test)]
56+
subregions: Option<Vec<Region>>,
5657
}
5758

5859
impl<'conf> GatewayBuilder<'conf> {
5960
pub fn new(config: &'conf ExpandedConfig) -> GatewayBuilder<'conf> {
6061
Self {
6162
config,
6263
hlc: None,
63-
hats: Vec::new(),
6464
#[cfg(feature = "stats")]
6565
stats: None,
66+
#[cfg(test)]
67+
subregions: None,
6668
}
6769
}
6870

@@ -72,8 +74,8 @@ impl<'conf> GatewayBuilder<'conf> {
7274
}
7375

7476
#[cfg(test)]
75-
pub fn hat(mut self, region: Region) -> Self {
76-
self.hats.push(region);
77+
pub fn subregions(mut self, subregions: Vec<Region>) -> Self {
78+
self.subregions.replace(subregions);
7779
self
7880
}
7981

@@ -83,40 +85,46 @@ impl<'conf> GatewayBuilder<'conf> {
8385
self
8486
}
8587

86-
pub fn build(mut self) -> ZResult<Gateway> {
87-
if self.hats.is_empty() {
88-
self.hats.extend([(Region::North), (Region::Local)]);
89-
}
90-
88+
pub fn build(self) -> ZResult<Gateway> {
9189
let mode = self.config.mode();
9290

93-
match self.config.gateway.south.clone().unwrap_or_default() {
94-
GatewaySouthConf::Preset(GatewayPresetConf::Auto) => match mode {
95-
WhatAmI::Router => {
96-
for mode in [WhatAmI::Client, WhatAmI::Peer] {
97-
self.hats.push(Region::South {
98-
id: usize::default(),
99-
mode,
100-
});
91+
let mut regions = vec![Region::North];
92+
93+
let mut set_regions_with_config = || {
94+
match self.config.gateway.south.clone().unwrap_or_default() {
95+
GatewaySouthConf::Preset(GatewayPresetConf::Auto) => match mode {
96+
WhatAmI::Router => {
97+
for mode in [WhatAmI::Client, WhatAmI::Peer] {
98+
regions.push(Region::default_south(mode));
99+
}
101100
}
102-
}
103-
WhatAmI::Peer => {
104-
self.hats.push(Region::South {
105-
id: usize::default(),
106-
mode: WhatAmI::Client,
107-
});
108-
}
109-
WhatAmI::Client => {}
110-
},
111-
GatewaySouthConf::Custom(subregions) => {
112-
for (id, _) in subregions.iter().enumerate() {
113-
// NOTE(regions): we create three hats per subregion.
114-
// If memory usage is an issue, we should create then lazily.
115-
for mode in [WhatAmI::Client, WhatAmI::Peer, WhatAmI::Router] {
116-
self.hats.push(Region::South { id, mode });
101+
WhatAmI::Peer => {
102+
regions.push(Region::default_south(WhatAmI::Client));
103+
}
104+
WhatAmI::Client => {}
105+
},
106+
GatewaySouthConf::Custom(subregions) => {
107+
for (id, _) in subregions.iter().enumerate() {
108+
// NOTE(regions): we create three hats per subregion.
109+
// If memory usage is an issue, we should create then lazily.
110+
for mode in [WhatAmI::Client, WhatAmI::Peer, WhatAmI::Router] {
111+
regions.push(Region::South { id, mode });
112+
}
117113
}
118114
}
119115
}
116+
117+
regions.push(Region::Local);
118+
};
119+
120+
#[cfg(not(test))]
121+
set_regions_with_config();
122+
123+
#[cfg(test)]
124+
if let Some(subregions) = self.subregions {
125+
regions.extend_from_slice(&subregions);
126+
} else {
127+
set_regions_with_config()
120128
}
121129

122130
let zid = ZenohIdProto::from(self.config.id());
@@ -126,7 +134,7 @@ impl<'conf> GatewayBuilder<'conf> {
126134
.stats
127135
.unwrap_or_else(|| zenoh_stats::StatsRegistry::new(zid, mode, &*crate::LONG_VERSION));
128136

129-
tracing::trace!(hats = ?self.hats, "New gateway");
137+
tracing::trace!(?regions, "New gateway");
130138

131139
Ok(Gateway {
132140
tables: Arc::new(TablesLock {
@@ -135,16 +143,15 @@ impl<'conf> GatewayBuilder<'conf> {
135143
zid,
136144
self.hlc,
137145
self.config,
138-
self.hats
146+
regions
139147
.iter()
140148
.copied()
141149
.map(|b| (b, tables::HatTablesData::new()))
142150
.collect(),
143151
#[cfg(feature = "stats")]
144152
stats,
145153
)?,
146-
hats: self
147-
.hats
154+
hats: regions
148155
.iter()
149156
.copied()
150157
.map(|region| -> (Region, Box<dyn HatTrait + Send + Sync>) {

zenoh/src/net/routing/hat/broker/interests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -397,15 +397,15 @@ impl HatInterestTrait for Hat {
397397

398398
let wire_expr = Resource::decl_key(&res, &mut dst);
399399
(ctx.send_declare)(
400-
&dbg!(dst).primitives,
400+
&dst.primitives,
401401
RoutingContext::with_expr(
402-
dbg!(Declare {
402+
Declare {
403403
interest_id: Some(interest.src_interest_id),
404404
ext_qos: declare::ext::QoSType::DECLARE,
405405
ext_tstamp: None,
406406
ext_nodeid: declare::ext::NodeIdType::DEFAULT,
407407
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
408-
}),
408+
},
409409
res.expr().to_string(),
410410
),
411411
);

0 commit comments

Comments
 (0)