Skip to content

Commit d8d8813

Browse files
committed
Rework route computation and invalidation
One major issue with the previous implementation is that is I previously assumed routes to only depend on the source bound; this is false. This commit also makes route invalidation more precise: the dependencies of `compute_data_routes` are better understood, although this needs testing.
1 parent 64287e2 commit d8d8813

23 files changed

Lines changed: 369 additions & 239 deletions

File tree

zenoh/src/net/routing/dispatcher/face.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ use crate::net::{
4444
dispatcher::{
4545
interests::{finalize_pending_interests, RemoteInterest},
4646
queries::{
47-
disable_matches_query_routes, finalize_pending_queries, merge_qabl_infos,
48-
route_send_response, route_send_response_final, Query,
47+
finalize_pending_queries, merge_qabl_infos, route_send_response,
48+
route_send_response_final, Query,
4949
},
5050
region::RegionMap,
5151
},
@@ -132,7 +132,7 @@ pub struct FaceState {
132132
pub(crate) pending_queries: HashMap<RequestId, (Arc<Query>, CancellationToken)>,
133133
pub(crate) mcast_group: Option<TransportMulticast>,
134134
pub(crate) in_interceptors: Option<Arc<ArcSwapOption<InterceptorsChain>>>,
135-
/// Downcasts to `HatFace`.
135+
/// Map from `Region` to `HatFace`.
136136
pub(crate) hats: RegionMap<Box<dyn Any + Send + Sync>>,
137137
pub(crate) task_controller: TaskController,
138138
pub(crate) is_local: bool,
@@ -520,8 +520,6 @@ impl Primitives for Face {
520520
declares.push((p.clone(), m))
521521
});
522522

523-
wtables.data.disable_all_routes();
524-
525523
drop(wtables);
526524
drop(ctrl_lock);
527525
for (p, m) in declares {
@@ -582,7 +580,7 @@ impl Primitives for Face {
582580
let src_fid = ctx.src_face.id;
583581

584582
for mut res in hats[region].unregister_face_subscribers(ctx.reborrow()) {
585-
disable_matches_data_routes(ctx.tables, &mut res);
583+
hats[region].disable_data_routes(ctx.tables, &mut res);
586584

587585
let mut remaining = hats
588586
.values_mut()
@@ -601,7 +599,7 @@ impl Primitives for Face {
601599
}
602600

603601
for mut res in hats[region].unregister_face_queryables(ctx.reborrow()) {
604-
disable_matches_query_routes(ctx.tables, &mut res);
602+
hats[region].disable_query_routes(ctx.tables, &mut res);
605603

606604
let remaining = hats
607605
.iter()

zenoh/src/net/routing/dispatcher/interests.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -422,8 +422,6 @@ impl Face {
422422
_node_id: NodeId,
423423
send_declare: &mut SendDeclare,
424424
) {
425-
let _span = tracing::debug_span!("declare_final", interest_id).entered();
426-
427425
let tables = &mut *wtables;
428426

429427
let mut ctx = DispatcherContext {
@@ -437,13 +435,15 @@ impl Face {
437435
let region = ctx.src_face.region;
438436

439437
if region.bound().is_south() {
440-
tracing::error!(
441-
"Received current interest finalization from south-bound face. \
442-
This message should only flow downstream"
443-
);
438+
tracing::error!("Received DeclareFinal from south-bound face");
444439
return;
445440
}
446441

442+
// TODO(regions): this is too conservative, the north hat should be able to decide what
443+
// keyexpr(s)—if not all—are affected and whether this finalization concerns subscribers
444+
// or queryables or borth.
445+
hats[region].disable_all_routes(ctx.tables);
446+
447447
match hats[region].route_declare_final(ctx.reborrow(), interest_id) {
448448
RouteCurrentDeclareResult::Noop | RouteCurrentDeclareResult::NoBreadcrumb => {} // ¯\_(ツ)_/¯
449449
RouteCurrentDeclareResult::Breadcrumb { interest } => {

zenoh/src/net/routing/dispatcher/pubsub.rs

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use zenoh_protocol::{
2121
core::{Region, Reliability, WireExpr},
2222
network::{declare::SubscriberId, push::ext, Push},
2323
};
24-
use zenoh_sync::get_mut_unchecked;
2524

2625
use super::{
2726
face::FaceState,
@@ -32,7 +31,6 @@ use crate::net::routing::{
3231
dispatcher::{
3332
face::Face,
3433
local_resources::{LocalResourceInfoTrait, LocalResources},
35-
tables::TablesData,
3634
},
3735
gateway::{get_or_set_route, node_id_as_source, Direction, RouteBuilder},
3836
hat::{DispatcherContext, SendDeclare},
@@ -105,6 +103,8 @@ impl Face {
105103
sub_info,
106104
);
107105

106+
hats[region].disable_data_routes(ctx.tables, &mut res);
107+
108108
for region in hats.regions().copied().collect_vec() {
109109
let other_info = hats
110110
.values()
@@ -113,8 +113,6 @@ impl Face {
113113
.reduce(|_, _| SubscriberInfo);
114114

115115
hats[region].propagate_subscriber(ctx.reborrow(), res.clone(), other_info);
116-
117-
disable_matches_data_routes(ctx.tables, &mut res);
118116
}
119117

120118
drop(wtables);
@@ -192,7 +190,7 @@ impl Face {
192190
if let Some(mut res) =
193191
hats[region].unregister_subscriber(ctx.reborrow(), id, res.clone(), node_id)
194192
{
195-
disable_matches_data_routes(ctx.tables, &mut res);
193+
hats[region].disable_data_routes(ctx.tables, &mut res);
196194

197195
let mut remaining = tables
198196
.hats
@@ -212,34 +210,6 @@ impl Face {
212210
}
213211
}
214212

215-
/// Disables data routes for the given [`Resource`].
216-
///
217-
/// ## Note
218-
///
219-
/// **Changes in data/query routes are not hat-local**. For example, a north peer hat has routes for data
220-
/// that originate from south-bound remotes but has no routes for data that originate in its north
221-
/// region, thus a change in a broker's data routes affects the routes of the north peer hat.
222-
pub(crate) fn disable_matches_data_routes(_tables: &mut TablesData, res: &mut Arc<Resource>) {
223-
if res.ctx.is_some() {
224-
for hat in get_mut_unchecked(res).context_mut().hats.values_mut() {
225-
hat.disable_data_routes();
226-
}
227-
228-
for match_ in &res.context().matches {
229-
let mut match_ = match_.upgrade().unwrap();
230-
if !Arc::ptr_eq(&match_, res) {
231-
for hat in get_mut_unchecked(&mut match_)
232-
.context_mut()
233-
.hats
234-
.values_mut()
235-
{
236-
hat.disable_data_routes();
237-
}
238-
}
239-
}
240-
}
241-
}
242-
243213
macro_rules! treat_timestamp {
244214
($hlc:expr, $payload:expr, $drop:expr) => {
245215
// if an HLC was configured (via Config.add_timestamp),
@@ -286,7 +256,7 @@ fn get_data_route(
286256
) -> Arc<Route> {
287257
let node_id = tables.hats[region].map_routing_context(&tables.data, src_face, node_id);
288258
let compute_route =
289-
|| tables.hats[region].compute_data_route(&tables.data, src_face, expr, node_id);
259+
|| tables.hats[region].compute_data_route(&tables.data, &src_face.region, expr, node_id);
290260
match expr
291261
.resource()
292262
.as_ref()
@@ -296,7 +266,7 @@ fn get_data_route(
296266
Some(data_routes) => get_or_set_route(
297267
data_routes,
298268
tables.data.hats[region].routes_version,
299-
&src_face.region.bound(),
269+
&src_face.region,
300270
node_id,
301271
compute_route,
302272
),

zenoh/src/net/routing/dispatcher/queries.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::net::routing::{
4343
dispatcher::{
4444
face::Face,
4545
local_resources::{LocalResourceInfoTrait, LocalResources},
46-
tables::{Tables, TablesData},
46+
tables::Tables,
4747
},
4848
gateway::{get_or_set_route, node_id_as_source, QueryDirection, RouteBuilder},
4949
hat::{DispatcherContext, SendDeclare, UnregisterEntityResult},
@@ -127,9 +127,9 @@ impl Face {
127127
qabl_info,
128128
);
129129

130-
for region in hats.regions().copied().collect_vec() {
131-
disable_matches_query_routes(ctx.tables, &mut res);
130+
hats[region].disable_query_routes(ctx.tables, &mut res);
132131

132+
for region in hats.regions().copied().collect_vec() {
133133
let other_info = hats
134134
.values()
135135
.filter(|hat| hat.region() != region)
@@ -213,7 +213,7 @@ impl Face {
213213
match hats[region].unregister_queryable(ctx.reborrow(), id, res.clone(), node_id) {
214214
UnregisterEntityResult::Noop => {} // ¯\_(ツ)_/¯
215215
UnregisterEntityResult::InfoUpdate { mut res } => {
216-
disable_matches_query_routes(ctx.tables, &mut res);
216+
hats[region].disable_query_routes(ctx.tables, &mut res);
217217

218218
for region in hats.regions().copied().collect_vec() {
219219
let other_info = hats
@@ -226,6 +226,8 @@ impl Face {
226226
}
227227
}
228228
UnregisterEntityResult::LastUnregistered { mut res } => {
229+
hats[region].disable_query_routes(ctx.tables, &mut res);
230+
229231
let remainder = hats
230232
.values()
231233
.filter_map(|hat| {
@@ -600,34 +602,6 @@ impl Timed for QueryCleanup {
600602
}
601603
}
602604

603-
/// Disables query routes for the given [`Resource`].
604-
///
605-
/// ## Note
606-
///
607-
/// **Changes in data/query routes are not hat-local**. For example, a north peer hat has routes for query
608-
/// that originate from south-bound remotes but has no routes for query that originate in its north
609-
/// region, thus a change in a broker's query routes affects the routes of the north peer hat.
610-
pub(crate) fn disable_matches_query_routes(_tables: &mut TablesData, res: &mut Arc<Resource>) {
611-
if res.ctx.is_some() {
612-
for hat in get_mut_unchecked(res).context_mut().hats.values_mut() {
613-
hat.disable_query_routes();
614-
}
615-
616-
for match_ in &res.context().matches {
617-
let mut match_ = match_.upgrade().unwrap();
618-
if !Arc::ptr_eq(&match_, res) {
619-
for hat in get_mut_unchecked(&mut match_)
620-
.context_mut()
621-
.hats
622-
.values_mut()
623-
{
624-
hat.disable_query_routes();
625-
}
626-
}
627-
}
628-
}
629-
}
630-
631605
#[inline]
632606
fn get_query_route(
633607
tables: &Tables,
@@ -638,7 +612,7 @@ fn get_query_route(
638612
) -> Arc<QueryTargetQablSet> {
639613
let node_id = tables.hats[region].map_routing_context(&tables.data, src_face, routing_context);
640614
let compute_route =
641-
|| tables.hats[region].compute_query_route(&tables.data, src_face, expr, node_id);
615+
|| tables.hats[region].compute_query_route(&tables.data, &src_face.region, expr, node_id);
642616
if let Some(query_routes) = expr
643617
.resource()
644618
.as_ref()
@@ -648,7 +622,7 @@ fn get_query_route(
648622
return get_or_set_route(
649623
query_routes,
650624
tables.data.hats[region].routes_version,
651-
&src_face.region.bound(),
625+
&src_face.region,
652626
node_id,
653627
compute_route,
654628
);

zenoh/src/net/routing/dispatcher/region.rs

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,29 @@ use std::{
1616
ops::{Index, IndexMut},
1717
};
1818

19-
use zenoh_protocol::core::{Bound, Region};
19+
use zenoh_protocol::core::Region;
2020

2121
// TODO(regions): optimization
2222
#[derive(Debug, Default)]
2323
pub(crate) struct RegionMap<D>(hashbrown::HashMap<Region, D>);
2424

2525
impl<D> RegionMap<D> {
26+
pub(crate) fn get(&self, region: &Region) -> Option<&D> {
27+
self.0.get(region)
28+
}
29+
30+
pub(crate) fn clear(&mut self) {
31+
self.0.clear();
32+
}
33+
34+
pub(crate) fn get_mut(&mut self, region: &Region) -> Option<&mut D> {
35+
self.0.get_mut(region)
36+
}
37+
38+
pub(crate) fn insert(&mut self, region: Region, value: D) -> Option<D> {
39+
self.0.insert(region, value)
40+
}
41+
2642
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Region, &D)> {
2743
self.0.iter()
2844
}
@@ -125,37 +141,3 @@ impl<D> IndexMut<Region> for RegionMap<D> {
125141
self.index_mut(&region)
126142
}
127143
}
128-
129-
#[derive(Debug, Default)]
130-
pub(crate) struct BoundMap<D> {
131-
north: Option<D>,
132-
south: Option<D>,
133-
}
134-
135-
impl<D> BoundMap<D> {
136-
pub(crate) fn clear(&mut self) {
137-
self.north.take();
138-
self.south.take();
139-
}
140-
141-
pub(crate) fn get(&self, bound: &Bound) -> Option<&D> {
142-
match bound {
143-
Bound::North => self.north.as_ref(),
144-
Bound::South => self.south.as_ref(),
145-
}
146-
}
147-
148-
pub(crate) fn get_mut(&mut self, bound: &Bound) -> Option<&mut D> {
149-
match bound {
150-
Bound::North => self.north.as_mut(),
151-
Bound::South => self.south.as_mut(),
152-
}
153-
}
154-
155-
pub(crate) fn insert(&mut self, bound: &Bound, value: D) -> Option<D> {
156-
match bound {
157-
Bound::North => self.north.replace(value),
158-
Bound::South => self.south.replace(value),
159-
}
160-
}
161-
}

0 commit comments

Comments
 (0)