Skip to content

Commit 8e79db4

Browse files
authored
fix(tee): re-announce TeeAttestationAnnounce until admitted (fleet-join) (#2460)
1 parent 1430731 commit 8e79db4

2 files changed

Lines changed: 339 additions & 14 deletions

File tree

crates/node/primitives/src/client.rs

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,33 @@ impl NodeClient {
422422
Ok(())
423423
}
424424

425+
/// Publish raw payload on the namespace topic `ns/<hex(namespace_id)>`
426+
/// immediately, without the "wait for mesh, then publish anyway" loop of
427+
/// [`publish_on_namespace`](Self::publish_on_namespace).
428+
///
429+
/// Returns the mesh peer count observed at publish time so a caller running
430+
/// its own re-announce loop (e.g. the fleet-join admission wait) can tell a
431+
/// publish into a live mesh apart from a publish into an empty mesh — the
432+
/// latter is lost forever because gossipsub does not replay. Re-announcing
433+
/// each poll cycle means a *later* mesh window still receives a fresh copy,
434+
/// which a single up-front publish (the bug this fixes) never could.
435+
///
436+
/// Kept separate from [`publish_on_namespace`](Self::publish_on_namespace)
437+
/// so the up-front wait-then-publish semantics other callers rely on are
438+
/// untouched; this is the opt-in, per-cycle building block.
439+
pub async fn publish_on_namespace_now(
440+
&self,
441+
namespace_id: [u8; 32],
442+
payload: Vec<u8>,
443+
) -> eyre::Result<usize> {
444+
let topic_str = format!("ns/{}", hex::encode(namespace_id));
445+
let topic = TopicHash::from_raw(topic_str);
446+
447+
let mesh_peers = self.network_client.mesh_peer_count(topic.clone()).await;
448+
let _ignored = self.network_client.publish(topic, payload).await?;
449+
Ok(mesh_peers)
450+
}
451+
425452
pub async fn get_peers_count(&self, context: Option<&ContextId>) -> usize {
426453
let Some(context) = context else {
427454
return self.network_client.peer_count().await;
@@ -731,3 +758,217 @@ impl NodeClient {
731758
.await
732759
}
733760
}
761+
762+
#[cfg(test)]
763+
mod publish_on_namespace_now_tests {
764+
//! Unit tests for the re-announce building block
765+
//! [`NodeClient::publish_on_namespace_now`] and the re-announce-until-
766+
//! admitted loop it powers in the fleet-join handler.
767+
//!
768+
//! The fleet-join admission wait lives in `calimero-server` (it needs the
769+
//! `ctx_client` admission read, which this crate does not see), so the loop
770+
//! itself is reproduced here against the same publish primitive. This is the
771+
//! smallest real unit: a live `NetworkClient` backed by a stub network actor
772+
//! that counts `Publish` messages and reports a settable mesh peer count —
773+
//! no libp2p transport, no server crate. The full owner-side admission path
774+
//! is covered by `calimero-node`'s `local_governance_node_e2e.rs`.
775+
//!
776+
//! Runs under `#[actix::test]` (single-threaded actix System) so the stub
777+
//! actor's mailbox is pumped by the same runtime that drives the client's
778+
//! `.await`s — `Actor::create` + `LazyRecipient::init`, the documented
779+
//! pattern from `calimero-utils-actix`'s own `lazy_tests.rs`.
780+
781+
use std::sync::atomic::{AtomicUsize, Ordering};
782+
use std::sync::Arc;
783+
use std::time::Duration;
784+
785+
use actix::Actor;
786+
use calimero_blobstore::config::BlobStoreConfig;
787+
use calimero_blobstore::{BlobManager as BlobStore, FileSystem};
788+
use calimero_network_primitives::client::NetworkClient;
789+
use calimero_network_primitives::messages::{MessageId, NetworkMessage};
790+
use calimero_store::db::InMemoryDB;
791+
use calimero_store::Store;
792+
use calimero_utils_actix::LazyRecipient;
793+
use tokio::sync::{broadcast, mpsc};
794+
795+
use super::{BlobManager, NodeClient, SyncClient};
796+
797+
/// Stub network actor: records how many times a `Publish` is requested and
798+
/// reports whatever mesh peer count the test sets via the shared atomic.
799+
/// Resolves `Publish`/`MeshPeerCount` outcomes so the awaiting client future
800+
/// completes; every other variant is dropped (none are reached here).
801+
struct CountingNetworkActor {
802+
publish_count: Arc<AtomicUsize>,
803+
mesh_peers: Arc<AtomicUsize>,
804+
}
805+
806+
impl Actor for CountingNetworkActor {
807+
type Context = actix::Context<Self>;
808+
}
809+
810+
impl actix::Handler<NetworkMessage> for CountingNetworkActor {
811+
type Result = ();
812+
813+
fn handle(&mut self, msg: NetworkMessage, _ctx: &mut Self::Context) -> Self::Result {
814+
match msg {
815+
NetworkMessage::MeshPeerCount { outcome, .. } => {
816+
let _ = outcome.send(self.mesh_peers.load(Ordering::SeqCst));
817+
}
818+
NetworkMessage::Publish { outcome, .. } => {
819+
let _prev = self.publish_count.fetch_add(1, Ordering::SeqCst);
820+
let _ = outcome.send(Ok(MessageId(b"stub".to_vec())));
821+
}
822+
_ => {}
823+
}
824+
}
825+
}
826+
827+
/// Build a `NodeClient` whose `network_client` is wired to a freshly started
828+
/// [`CountingNetworkActor`] on the current actix System. Only the network
829+
/// path is exercised by `publish_on_namespace_now`; the remaining fields are
830+
/// minimal real stubs. Returns the client plus the shared publish-count and
831+
/// mesh-peer atomics for assertions. The `TempDir` is returned so the
832+
/// caller keeps the blobstore filesystem alive for the test's duration.
833+
async fn make_client() -> (
834+
NodeClient,
835+
Arc<AtomicUsize>,
836+
Arc<AtomicUsize>,
837+
tempfile::TempDir,
838+
) {
839+
let tmp = tempfile::tempdir().expect("tempdir");
840+
let store = Store::new(Arc::new(InMemoryDB::owned()));
841+
842+
let blob_cfg =
843+
BlobStoreConfig::new(tmp.path().to_path_buf().try_into().expect("utf8 blob path"));
844+
let fs = FileSystem::new(&blob_cfg).await.expect("blob fs");
845+
let blob_manager = BlobManager::new(BlobStore::new(store.clone(), fs));
846+
847+
let network_recipient = LazyRecipient::<NetworkMessage>::new();
848+
let network_client = NetworkClient::new(network_recipient.clone());
849+
850+
let publish_count = Arc::new(AtomicUsize::new(0));
851+
let mesh_peers = Arc::new(AtomicUsize::new(0));
852+
853+
let actor = CountingNetworkActor {
854+
publish_count: Arc::clone(&publish_count),
855+
mesh_peers: Arc::clone(&mesh_peers),
856+
};
857+
let _addr = CountingNetworkActor::create(move |ctx| {
858+
assert!(network_recipient.init(ctx), "network recipient init");
859+
actor
860+
});
861+
862+
let (event_sender, _) = broadcast::channel(16);
863+
let (ctx_sync_tx, _ctx_sync_rx) = mpsc::channel(8);
864+
let (ns_sync_tx, _ns_sync_rx) = mpsc::channel(8);
865+
let (ns_join_tx, _ns_join_rx) = mpsc::channel(8);
866+
let (open_subgroup_join_tx, _open_rx) = mpsc::channel(8);
867+
let sync_client =
868+
SyncClient::new(ctx_sync_tx, ns_sync_tx, ns_join_tx, open_subgroup_join_tx);
869+
870+
let node_client = NodeClient::new(
871+
store,
872+
blob_manager,
873+
network_client,
874+
LazyRecipient::new(),
875+
event_sender,
876+
sync_client,
877+
String::new(),
878+
None,
879+
);
880+
881+
(node_client, publish_count, mesh_peers, tmp)
882+
}
883+
884+
/// `publish_on_namespace_now` publishes exactly once per call and reports the
885+
/// mesh peer count observed at publish time (here: 0 — the empty-mesh case
886+
/// that silently dropped the one-shot announce before this fix).
887+
#[actix::test]
888+
async fn publishes_once_and_reports_empty_mesh() {
889+
let (client, publish_count, _mesh, _tmp) = make_client().await;
890+
891+
let observed = client
892+
.publish_on_namespace_now([0x11; 32], b"announce".to_vec())
893+
.await
894+
.expect("publish_on_namespace_now");
895+
896+
assert_eq!(observed, 0, "no mesh peers were set");
897+
assert_eq!(
898+
publish_count.load(Ordering::SeqCst),
899+
1,
900+
"exactly one publish per call"
901+
);
902+
}
903+
904+
/// `publish_on_namespace_now` surfaces a non-zero mesh peer count when one
905+
/// is present — the signal a caller uses to know the announce landed in a
906+
/// live mesh rather than an empty one.
907+
#[actix::test]
908+
async fn reports_live_mesh_peer_count() {
909+
let (client, _publish_count, mesh, _tmp) = make_client().await;
910+
mesh.store(2, Ordering::SeqCst);
911+
912+
let observed = client
913+
.publish_on_namespace_now([0x33; 32], b"announce".to_vec())
914+
.await
915+
.expect("publish_on_namespace_now");
916+
917+
assert_eq!(observed, 2, "must report the live mesh peer count");
918+
}
919+
920+
/// Locks in the P1 fix: a re-announce loop that publishes every cycle while
921+
/// not admitted publishes MORE THAN ONCE over the wait window (the one-shot
922+
/// bug published exactly once), and STOPS the moment admission is observed —
923+
/// no further announces after admitted. This mirrors the integrated loop in
924+
/// `crates/server/src/admin/handlers/tee/fleet_join.rs`.
925+
#[actix::test]
926+
async fn reannounce_loop_publishes_more_than_once_then_stops_on_admission() {
927+
let (client, publish_count, _mesh, _tmp) = make_client().await;
928+
929+
// Mirror the fleet-join handler loop: publish up front, then on each
930+
// not-yet-admitted cycle re-check admission, sleep, then re-publish
931+
// (re-publish AFTER the sleep, as the handler does, so the first
932+
// re-announce doesn't fire back-to-back with the up-front publish).
933+
// Admission flips true after a few cycles; once true the loop must break
934+
// BEFORE publishing again. A fast poll keeps the test sub-second.
935+
const ADMIT_AFTER_CYCLES: usize = 3;
936+
const POLL: Duration = Duration::from_millis(10);
937+
const MAX_CYCLES: usize = 50; // hard safety bound
938+
939+
// First (up-front) announce, as the handler does before its loop.
940+
let _ = client
941+
.publish_on_namespace_now([0x22; 32], b"announce".to_vec())
942+
.await
943+
.expect("first announce");
944+
945+
let mut cycles = 0;
946+
let mut admitted = false;
947+
while cycles < MAX_CYCLES {
948+
// Admission check FIRST so we never re-announce after admitted.
949+
if cycles >= ADMIT_AFTER_CYCLES {
950+
admitted = true;
951+
break;
952+
}
953+
tokio::time::sleep(POLL).await;
954+
let _ = client
955+
.publish_on_namespace_now([0x22; 32], b"announce".to_vec())
956+
.await
957+
.expect("re-announce");
958+
cycles += 1;
959+
}
960+
961+
assert!(admitted, "loop must observe admission");
962+
let total = publish_count.load(Ordering::SeqCst);
963+
assert!(
964+
total > 1,
965+
"re-announce must publish more than once over the wait window, got {total}"
966+
);
967+
// up-front (1) + one per not-yet-admitted cycle (ADMIT_AFTER_CYCLES).
968+
assert_eq!(
969+
total,
970+
1 + ADMIT_AFTER_CYCLES,
971+
"must stop announcing the instant admission is observed"
972+
);
973+
}
974+
}

crates/server/src/admin/handlers/tee/fleet_join.rs

Lines changed: 98 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,17 @@ pub async fn handler(
119119
.into_response();
120120
}
121121

122+
// Fire the first announce up front. A single publish at fleet-join time is
123+
// lost forever if it lands in an empty gossipsub mesh (no replay), which is
124+
// the common case for a NAT'd/relay owner whose mesh forms only
125+
// intermittently. The admission loop below therefore RE-announces every
126+
// poll cycle until admitted or the deadline, so a *later* mesh window still
127+
// receives a fresh copy. If even this first publish errors at the transport
128+
// level we bail (subscription with no announce is useless); a publish into
129+
// an empty mesh is *not* an error and is expected to be retried below.
122130
if let Err(err) = state
123131
.node_client
124-
.publish_on_namespace(group_id_bytes, payload)
132+
.publish_on_namespace_now(group_id_bytes, payload.clone())
125133
.await
126134
{
127135
warn!(error=?err, "Failed to broadcast, unsubscribing from namespace");
@@ -139,29 +147,65 @@ pub async fn handler(
139147
info!(
140148
group_id = %req.group_id,
141149
%our_public_key,
142-
"TeeAttestationAnnounce broadcast; waiting for admission then joining contexts"
150+
"TeeAttestationAnnounce broadcast; re-announcing until admission then joining contexts"
143151
);
144152

145-
// Poll for group admission, then auto-join all contexts in the namespace
153+
// Poll for group admission, then auto-join all contexts in the namespace.
154+
//
155+
// Re-announce strategy: this loop both (a) checks for admission and (b)
156+
// re-publishes the announce each cycle the node is not yet admitted. The
157+
// re-announce is request-scoped (bounded by `MAX_ADMISSION_WAIT`) rather
158+
// than a long-lived background task: the mdma sidecar already re-polls
159+
// should-join and re-invokes fleet-join, so each call covering one mesh
160+
// window is sufficient, and a request-scoped loop needs no extra actor /
161+
// lifecycle management. See the handler-level rationale comment.
146162
let mut contexts_joined = Vec::new();
147163
let mut admitted = false;
148164
let mut auto_follow_enabled = false;
149165

166+
// Overall bound for one fleet-join call. The sidecar re-invokes across a
167+
// larger window, so this only needs to cover a single mesh-formation
168+
// attempt comfortably.
150169
const MAX_ADMISSION_WAIT: std::time::Duration = std::time::Duration::from_secs(30);
170+
// Interval between admission checks AND between re-announces — short enough
171+
// that a transient mesh window (mesh peers appear, then vanish) is hit by a
172+
// fresh publish, but not so tight it spams the topic.
151173
const ADMISSION_POLL: std::time::Duration = std::time::Duration::from_secs(2);
152174

153175
let deadline = tokio::time::Instant::now() + MAX_ADMISSION_WAIT;
154176

155-
while tokio::time::Instant::now() < deadline {
156-
match state
157-
.ctx_client
158-
.list_group_contexts(ListGroupContextsRequest {
159-
group_id,
160-
offset: 0,
161-
limit: 100,
162-
})
163-
.await
164-
{
177+
// `loop {}` (not `while now < deadline`) so the deadline is only checked
178+
// *after* an admission check, never right after a sleep — otherwise an
179+
// admission that completes during the final sleep would be lost to a false
180+
// "timed out" / `admitted:false`. The deadline break lives in the `Err`
181+
// arm below, immediately after the (failed) admission check.
182+
loop {
183+
// Bound each admission check so a stuck context-manager actor can't
184+
// extend the handler past MAX_ADMISSION_WAIT: a check that exceeds the
185+
// poll interval is mapped to a (retriable) error and handled by the
186+
// `Err` arm below, exactly like a not-yet-admitted result. A
187+
// slow-but-not-stuck actor whose check nears ADMISSION_POLL makes the
188+
// effective cycle up to ~2x ADMISSION_POLL; that's acceptable, and we
189+
// keep the budget at ADMISSION_POLL (rather than shrinking it) so a
190+
// normally-fast actor isn't spuriously timed out. The overall deadline
191+
// still bounds total wall-clock either way.
192+
let admission = tokio::time::timeout(
193+
ADMISSION_POLL,
194+
state
195+
.ctx_client
196+
.list_group_contexts(ListGroupContextsRequest {
197+
group_id,
198+
offset: 0,
199+
limit: 100,
200+
}),
201+
)
202+
.await
203+
.unwrap_or_else(|_| {
204+
Err(eyre::eyre!(
205+
"list_group_contexts exceeded the admission poll budget"
206+
))
207+
});
208+
match admission {
165209
Ok(entries) => {
166210
info!(
167211
group_id = %req.group_id,
@@ -239,7 +283,47 @@ pub async fn handler(
239283
}
240284
Err(err) => {
241285
tracing::debug!(error=?err, "Admission check not yet successful, retrying...");
242-
tokio::time::sleep(ADMISSION_POLL).await;
286+
287+
// Stop once past the deadline — but only here, AFTER the
288+
// admission check above, so an admission that landed during the
289+
// previous sleep is observed on this iteration instead of being
290+
// lost to a false "timed out".
291+
if tokio::time::Instant::now() >= deadline {
292+
break;
293+
}
294+
295+
// Cap the poll sleep to the remaining budget so the loop wakes
296+
// for its final admission check right at the deadline rather
297+
// than up to ADMISSION_POLL past it.
298+
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
299+
tokio::time::sleep(remaining.min(ADMISSION_POLL)).await;
300+
301+
// Re-announce AFTER the poll sleep, and only if we're still
302+
// before the deadline. Doing it here (rather than before the
303+
// sleep) avoids both a duplicate publish fired back-to-back with
304+
// the up-front one at t=0 and a wasted publish right as we give
305+
// up. A single up-front publish is lost if the mesh was empty at
306+
// fleet-join (gossipsub does not replay), so re-publishing each
307+
// cycle delivers a fresh copy to a mesh window that opens later.
308+
// Best effort — a transport error here is logged, not fatal.
309+
if tokio::time::Instant::now() < deadline {
310+
match state
311+
.node_client
312+
.publish_on_namespace_now(group_id_bytes, payload.clone())
313+
.await
314+
{
315+
Ok(mesh_peers) => tracing::debug!(
316+
group_id = %req.group_id,
317+
mesh_peers,
318+
"re-announced TeeAttestationAnnounce while awaiting admission"
319+
),
320+
Err(reannounce_err) => warn!(
321+
group_id = %req.group_id,
322+
error = ?reannounce_err,
323+
"re-announce publish failed; will retry next cycle"
324+
),
325+
}
326+
}
243327
}
244328
}
245329
}

0 commit comments

Comments
 (0)