Skip to content

Embedded SFU service: Phases A–F#712

Draft
HexaField wants to merge 22 commits into
devfrom
feat/embedded-sfu
Draft

Embedded SFU service: Phases A–F#712
HexaField wants to merge 22 commits into
devfrom
feat/embedded-sfu

Conversation

@HexaField

@HexaField HexaField commented Mar 5, 2026

Copy link
Copy Markdown
Contributor

Summary

Forward-ports the embedded SFU service into the executor and lights up every architectural layer between client peers and cross-node media:

  • Phase A — CascadeGossip trait + TCP impl. Pluggable cluster gossip; admin-only enableCascade/cascadeAnnounce escape hatches dropped.
  • Phase B — Multi-user mode plumbing. agentDidOverride admin hack dropped; clients call user.create + user.login + per-user JWTs.
  • Phase C — Server-pushed renegotiation pipeline. When a new peer joins, every other peer in the room gets a fresh SDP offer over the events_ws fanout. Bidirectional propagation (new peer also receives existing peers' tracks).
  • Phase D — SfuConfig.iceServers. TURN credentials served by the SFU so host applications can rotate without redeploying clients.
  • Phase E — Cross-node pipe transport handshake. Auto-establish on Announce from a node serving the same room; gossip-dispatched PipeOffer / PipeAnswer. New sfu.cascadeStatus RPC.
  • Phase F — Media across pipes. PipeTransport migrated into the server peers map flagged is_pipe; same event loop drives client and pipe peers. New pubsub bridges (SFU_PIPE_RENEGOTIATION_OFFER_TOPIC, SFU_PIPE_RENEGOTIATION_ANSWER_TOPIC) carry pipe-bound SDP through the gossip transport.

Also: NeighbourhoodProxy.subscribeCallRenegotiationOffer exposed in the SDK; sfu.callAnswerServerOffer RPC; cascade Leave signal flow; RoomId-string parse fix (split on last :); a 2-fmt-fix sweep.

Verification

End-to-end wind tunnel runs on a real x86 box:

Scenario Result
T1 (single-node × 5 peers) downloadMean=540,030 B/peer, renegotiations=[3,3,3,3,2], 28 server prepare/accept log lines
T6 (2-node cascade + pipe + media) perNodePipeCounts=[1,1], linksValid=true, uploadMean=70,365 B, downloadMean=60,507 B, mediaAcrossPipeOk=true, renegotiationsAppliedPerPeer=[79,80]

Out of scope — Holochain gossip backend (follow-up)

CascadeGossip is intentionally a trait (Phase A): NoopGossip ships as the default, TcpGossip covers operator-configured clusters (set --sfu-cascade-listen + --sfu-cascade-peers), and a future HolochainGossip will ride the neighbourhood signal channel so any executor running an SFU room can discover the other participating nodes through the DHT without a hand-written peer list.

This PR delivers a complete v1 for:

  • single-node SFU (no gossip needed — renegotiation is events_ws-driven)
  • multi-node cascade with operator-configured TCP gossip

The Holochain gossip implementation is the missing piece for the full ad4m model where every user runs their own executor and the cascade composes itself from whoever happens to be in the neighbourhood. Slotting it in is a separate impl CascadeGossip — no SFU code changes — and lands as a follow-up PR.

Test plan

  • CircleCI passes on this branch
  • Confirm sfu.callAnswerServerOffer and sfu.cascadeStatus register cleanly
  • Re-run T1 + T6 wind tunnel scenarios against a fresh target/release/ad4m-executor build
  • Run the existing rust test suite under rust-executor/src/sfu/

🤖 Generated with Claude Code

@coderabbitai

coderabbitai Bot commented Mar 5, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

Embedded SFU (Selective Forwarding Unit) capability is introduced to the ADAM executor to enable scalable WebRTC conferencing. The implementation spans TypeScript client APIs, GraphQL operations (mutations, queries, subscriptions), and a complete Rust-based SFU server with room management, media relay, and WebRTC peer handling. All server-side functionality is feature-gated behind the "sfu" flag.

Changes

Cohort / File(s) Summary
TypeScript Client Layer
core/src/neighbourhood/NeighbourhoodClient.ts, core/src/neighbourhood/NeighbourhoodProxy.ts
Added 8 new SFU-related async methods (sfuStartRoom, sfuStopRoom, callJoin, callLeave, sfuRooms, sfuPeerForNeighbourhood, sfuConfig, sfuSetConfig) and 4 public type interfaces (SfuRoom, SfuParticipant, CallSession, SfuConfig). NeighbourhoodProxy delegates calls to NeighbourhoodClient with neighbourhood URL resolution.
GraphQL Mutation & Query Resolvers
rust-executor/src/graphql/mutation_resolvers.rs, rust-executor/src/graphql/query_resolvers.rs
Added 5 SFU mutations (sfu_start_room, sfu_stop_room, call_join, call_leave, sfu_set_config) and 3 queries (sfu_rooms, sfu_peer_for_neighbourhood, sfu_config) with error handling and GraphQL type mapping. All gated by "sfu" feature. Note: query resolvers contain duplicate declarations of the same three functions.
GraphQL Subscriptions & PubSub
rust-executor/src/graphql/subscription_resolvers.rs, rust-executor/src/pubsub.rs
Added 2 subscription resolvers (call_participants, call_streams) streaming SFU events per room. Added 2 new PubSub topic constants (SFU_CALL_PARTICIPANTS_TOPIC, SFU_CALL_STREAMS_TOPIC) for event distribution.
SFU Core Infrastructure
rust-executor/src/lib.rs, rust-executor/src/sfu/mod.rs, rust-executor/Cargo.toml
Introduced "sfu" feature flag with str0m dependency (v0.16). Added SFU module initialization in executor startup flow. Module structure provides full SFU service when enabled and lightweight stub when disabled.
GraphQL Types for SFU
rust-executor/src/sfu/graphql_types.rs
Defined 6 new GraphQL-compatible structs: SfuRoomGql, SfuParticipantGql, CallSessionGql, SfuConfigGql, and event types CallParticipantEvent, CallStreamEvent. All derive GraphQLObject and support serialization.
SFU Server & WebRTC
rust-executor/src/sfu/server.rs
Complete WebRTC SFU server using str0m library. Manages UDP socket, per-peer RTC contexts, SDP handling, ICE, and media forwarding. Handles peer lifecycle (add/remove), track management, and integrates with media relay for selective forwarding.
Room & Participant Management
rust-executor/src/sfu/room.rs
Type-safe room and participant identifiers (RoomId, ParticipantId). SfuRoom tracks participants with metadata (audio/video state, speaker status). RoomManager maintains global room state with create/destroy/list operations and capacity enforcement via RoomError enum.
Media Relay & Voice Activity
rust-executor/src/sfu/relay.rs
MediaRelay struct tracks per-participant voice activity state (smoothed audio energy via exponential moving average). Identifies active speaker based on recent audio and speaking threshold. Supports participant removal and speaking detection within timeout window.
SFU Service & Configuration
rust-executor/src/sfu/service.rs
Top-level SfuService singleton with global lifecycle (start/shutdown). Exposes room management (start_room, stop_room, list_rooms), call operations (call_join, call_leave), and configuration APIs (get_config, set_config). Validates SFU modes (gateway/designated/mesh) and enforces designated peer requirements.

Sequence Diagram

sequenceDiagram
    participant Client as Client/Peer
    participant GraphQL
    participant SfuService as SFU Service
    participant RoomMgr as Room Manager
    participant SfuServer as SFU Server
    participant MediaRelay as Media Relay

    Client->>GraphQL: callJoin(roomId, sdpOffer)
    GraphQL->>SfuService: call_join(roomId, sdpOffer)
    SfuService->>RoomMgr: create_room or get_room
    RoomMgr-->>SfuService: SfuRoom
    SfuService->>SfuServer: create_rtc_for_offer(sdpOffer)
    SfuServer-->>SfuService: (rtc, sdpAnswer)
    SfuService->>RoomMgr: add_participant(pid, agentDid)
    RoomMgr-->>SfuService: success
    SfuService->>SfuServer: AddPeer(SfuPeer)
    SfuServer->>MediaRelay: initialize_participant_state
    SfuServer-->>SfuService: participant_registered
    SfuService-->>GraphQL: CallSessionInfo(sdpAnswer, participantId)
    GraphQL-->>Client: sdpAnswer

    Note over Client,MediaRelay: Media Exchange Phase
    Client->>SfuServer: ICE candidates & RTP/RTCP packets
    SfuServer->>SfuServer: demux via RTC
    SfuServer->>MediaRelay: update_voice_activity(audio_data)
    MediaRelay-->>SfuServer: active_speaker
    SfuServer->>SfuServer: relay MediaData to other peers in room
    SfuServer->>Client: RTP/RTCP packets (selective forward)

    Client->>GraphQL: callLeave(roomId)
    GraphQL->>SfuService: call_leave(roomId)
    SfuService->>RoomMgr: remove_participant(pid)
    SfuService->>SfuServer: RemovePeer(pid)
    SfuServer->>MediaRelay: remove_participant(pid)
    RoomMgr->>RoomMgr: destroy_room if empty
    SfuService-->>GraphQL: success
    GraphQL-->>Client: ack
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

The changes introduce substantial new infrastructure across multiple layers (TypeScript, GraphQL, Rust SFU server) with dense logic in media relay, room management, and WebRTC handling. The server implementation involves concurrency, UDP I/O, RTC state management, and selective forwarding logic. However, the code is well-structured within cohesive modules and the complexity is localized. Note: duplicate query resolver declarations warrant correction.

Suggested reviewers

  • jhweir

Poem

🐰 A whisker-twitch of joy!
The warren now relays with grace,
One stream per peer, no mesh embrace—
Eight participants, no CPU's ache,
SFU hops make the calls awake! 🎙️✨

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 69.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Embedded SFU service: Phases A–F' is vague and does not clearly convey the main change. It refers to phases without explaining what the PR actually delivers. Consider revising to a more specific title like 'feat: Add embedded str0m-based SFU service with GraphQL API and telepresence extensions' to clearly communicate the primary change.
✅ Passed checks (3 passed)
Check name Status Explanation
Linked Issues check ✅ Passed The PR fully implements the Phase 1 core SFU objectives from issue #700: embedded str0m SFU service, GraphQL API (queries, mutations, subscriptions), Social DNA configuration with sfu object (mode, designatedPeer, fallback, maxMeshParticipants), room/participant lifecycle management, media relay with voice activity detection, and telepresence API extensions for TypeScript client. All primary coding requirements are addressed.
Out of Scope Changes check ✅ Passed All changes are directly aligned with issue #700 Phase 1 scope: SFU server infrastructure, room management, GraphQL API, Social DNA config, and telepresence API extensions. No unrelated refactoring, style changes, or feature creep detected outside the SFU embedding objective.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/embedded-sfu

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@HexaField HexaField marked this pull request as draft March 5, 2026 04:18

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Nitpick comments (4)
core/src/neighbourhood/NeighbourhoodProxy.ts (1)

96-102: The assumption that pID equals neighbourhoodUrl may cause silent failures.

The comment acknowledges this is an implementation detail, but if these concepts diverge in the future, SFU operations will silently use incorrect identifiers. Consider:

  1. Adding a TODO to track this coupling
  2. Looking up the actual neighbourhood URL from the perspective metadata

The GraphQL mutations (per context snippet 1) use neighbourhoodUrl as a key, so incorrect values would cause "room not found" or similar errors that may be hard to debug.

📝 Suggested improvement
     // The neighbourhood URL is needed for SFU API calls.
     // We use the perspective UUID to identify the neighbourhood, but the SFU service
     // uses the neighbourhood URL as its key. This helper resolves it.
+    // TODO: This assumes pID equals neighbourhoodUrl. If they diverge, 
+    // look up the actual neighbourhood URL from perspective metadata.
     async `#getNeighbourhoodUrl`(): Promise<string> {
         // The perspective UUID IS the neighbourhood URL in the current implementation
         return this.#pID
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/neighbourhood/NeighbourhoodProxy.ts` around lines 96 - 102, The
private method NeighbourhoodProxy.#getNeighbourhoodUrl currently returns
this.#pID and assumes perspective ID equals neighbourhood URL; make this
explicit and robust by (1) adding a TODO comment on the method to warn about the
coupling and (2) implement a lookup fallback that fetches the perspective
metadata (via the existing perspective fetch mechanism in NeighbourhoodProxy or
a helper) to extract the actual neighbourhoodUrl field when present, and only
fallback to this.#pID if the metadata lacks that field; update any callers of
`#getNeighbourhoodUrl` to rely on the resolved value.
rust-executor/src/sfu/relay.rs (2)

66-75: Consider extracting the magic number 200 as a named constant.

The packet size divisor 200.0 on line 71 is a tuning parameter for the energy estimation heuristic. Making it a named constant would improve readability and make future tuning easier.

📝 Suggested improvement
 /// Threshold for considering a participant as "speaking"
 const SPEAKING_THRESHOLD: f64 = 0.01;
 /// Time after last audio before a participant is no longer considered speaking
 const SPEAKING_TIMEOUT_MS: u128 = 500;
+/// Approximate size of a "full energy" Opus packet for normalization
+const OPUS_FULL_ENERGY_SIZE: f64 = 200.0;
 
 // ...
 
         // Simple audio energy estimation from RTP payload
         // This is a rough heuristic — real VAD would decode the Opus frame
         let energy = if data.data.len() > 2 {
             // Opus silence packets are very small (typically 1-3 bytes)
             // Larger packets generally indicate voice activity
-            let size_energy = (data.data.len() as f64 / 200.0).min(1.0);
+            let size_energy = (data.data.len() as f64 / OPUS_FULL_ENERGY_SIZE).min(1.0);
             size_energy
         } else {
             0.0
         };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust-executor/src/sfu/relay.rs` around lines 66 - 75, Extract the magic
divisor 200.0 used in the energy heuristic into a named constant (e.g.,
PACKET_SIZE_ENERGY_DIVISOR or OPUS_SILENCE_DIVISOR) and replace the literal in
the energy calculation inside the relay.rs block that computes size_energy
((data.data.len() as f64 / 200.0).min(1.0)); update any related comments to
reference the constant so tuning the heuristic is centralized and more readable.

140-170: Test coverage is minimal.

The current tests only verify initialization and participant removal. Consider adding tests for the core voice activity detection logic with mock MediaData to validate:

  • Speaking detection when energy exceeds threshold
  • Active speaker selection when multiple participants speak
  • Timeout behavior for stale audio

This would improve confidence in the heuristic tuning.

Would you like me to open an issue to track adding more comprehensive tests for the VAD logic?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust-executor/src/sfu/relay.rs` around lines 140 - 170, Add comprehensive
unit tests for the voice activity detection (VAD) logic: create tests that
instantiate MediaRelay and feed mock MediaData into the method that processes
incoming audio (e.g., MediaRelay::process_media or equivalent) to assert
speaking detection when energy/level exceeds the threshold, verify active
speaker selection when multiple participants send audio (ensure active_speaker
updates to the highest-energy participant and is_speaking returns true for
them), and test timeout/stale-audio behavior by advancing Instant or
manipulating VoiceActivityState.last_audio to confirm participants stop being
marked as speaking and active_speaker clears after the configured timeout;
reference MediaRelay, MediaData, VoiceActivityState, active_speaker,
voice_activity, is_speaking, and remove_participant when adding these new tests.
core/src/neighbourhood/NeighbourhoodClient.ts (1)

387-413: Implicit "mesh" default for mode may surprise callers.

When config.mode is undefined, it silently defaults to "mesh" (line 406). Since sfuSetConfig accepts Partial<SfuConfig>, callers might not realize they're resetting the mode. Consider either:

  1. Making mode explicitly required in the method signature, or
  2. Documenting this default behavior in a JSDoc comment
📝 Option: Add JSDoc documentation
+    /**
+     * Update SFU configuration for a neighbourhood.
+     * `@param` neighbourhoodUrl - The neighbourhood URL
+     * `@param` config - Partial config; mode defaults to "mesh" if not specified
+     */
     async sfuSetConfig(neighbourhoodUrl: string, config: Partial<SfuConfig>): Promise<boolean> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/neighbourhood/NeighbourhoodClient.ts` around lines 387 - 413, The
sfuSetConfig method silently defaults config.mode to "mesh" when undefined,
which can surprise callers; update the function by adding a JSDoc comment above
async sfuSetConfig(neighbourhoodUrl: string, config: Partial<SfuConfig>):
Promise<boolean> that documents the implicit default (mode defaults to "mesh"
when config.mode is omitted), explains that callers should pass an explicit mode
to avoid accidental resets, and references the config.mode behavior and returned
sfuSetConfig result so maintainers see the contract clearly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rust-executor/src/graphql/mutation_resolvers.rs`:
- Around line 2975-3096: The SFU mutation resolvers (sfu_start_room,
sfu_stop_room, call_join, call_leave, sfu_set_config) currently ignore the
RequestContext and do not enforce authorization; update each resolver to use the
provided _context (RequestContext) and call the existing check_capability
function before performing actions, supplying the appropriate capability string
(e.g., "sfu:manage" or "sfu:call" per mutation) and returning a FieldError on
failure; ensure you locate and call check_capability(&context, "<capability>")
at the top of each async fn and only proceed to get_sfu_service()/invoke service
methods if the capability check succeeds.
- Around line 3085-3090: The code constructs crate::sfu::SfuConfig using
max_mesh_participants.unwrap_or(4) as u32 which will wrap negative integers into
large u32 values; change this to validate and clamp the Option<i32> before
casting: check if max_mesh_participants is Some(v) and reject or replace
negative values with a safe default, and clamp to a sensible upper bound (e.g.
MAX_MESH_PARTICIPANTS) before converting to u32; update the assignment used in
SfuConfig (max_mesh_participants) so it uses the validated/clamped u32 value or
returns an error if invalid.
- Around line 3031-3037: In call_join and call_leave don't trust
crate::agent::did() or hardcode is_member = true; instead extract the caller DID
from the GraphQL request token/context passed into the resolver (the same
request/session context used elsewhere in resolvers), call the
neighbourhood/perspective membership check API (the existing
perspective/neighbourhood membership function or service) to determine
membership, and reject the operation with an authorization error when the check
fails; update both call_join and call_leave to derive DID from the request
context rather than agent::did() and remove the hardcoded is_member flag so
membership enforcement is performed correctly.

In `@rust-executor/src/graphql/query_resolvers.rs`:
- Around line 995-1048: The SFU resolvers sfu_rooms, sfu_peer_for_neighbourhood,
and sfu_config currently skip permission checks; add a call to the project’s
capability check (e.g. check_capability(context, Capability::SfuRead) or the
existing SFU-read capability symbol) at the start of each function before
retrieving the SFU service or data, returning an error if the check fails;
ensure you use the RequestContext parameter (remove the leading underscore if
unused) and perform the check in each of sfu_rooms, sfu_peer_for_neighbourhood,
and sfu_config.

In `@rust-executor/src/graphql/subscription_resolvers.rs`:
- Around line 566-596: Add an authorization capability check at the start of
both call_participants and call_streams before calling
get_global_pubsub/subscribe_and_process: use the same helper used by other
resolvers (e.g., check_capability/require_capability) to verify the SFU
subscription capability (use the existing SFU subscribe/read capability constant
used elsewhere, e.g., SFU_SUBSCRIBE_CAPABILITY or the neighbourhood equivalent)
and return an error if the check fails; then proceed to get_global_pubsub and
call subscribe_and_process with SFU_CALL_PARTICIPANTS_TOPIC /
SFU_CALL_STREAMS_TOPIC as currently implemented.
- Around line 566-580: CallParticipantEvent and CallStreamEvent need
implementations of the GetValue and GetFilter traits used by
subscribe_and_process; add impl blocks for GetValue (set associated type Value
to the same type and return self.clone() in get_value) and for GetFilter (return
Some(self.room_id.clone()) in get_filter) for both CallParticipantEvent and
CallStreamEvent so they satisfy the trait bounds required by
subscribe_and_process and the pubsub usage.

In `@rust-executor/src/sfu/server.rs`:
- Around line 151-155: The peer removal code in the SfuCommand::RemovePeer
handler only updates the local peers map and relay (peers.remove(&pid) and
relay.remove_participant(&pid)) but does not update RoomManager membership,
causing stale room state; update the RoomManager by calling its appropriate
removal method (e.g., room_manager.remove_participant(peer.room_id, pid) or
RoomManager::remove_participant(pid) using the removed peer's room_id)
immediately after successfully removing the peer and before or after
relay.remove_participant; apply the same fix to the other
peer-removal/disconnect handler(s) (the similar block handling explicit
disconnects) so both code paths keep RoomManager membership in sync with peers
and relay.
- Around line 83-120: create_rtc_for_offer currently only uses a host candidate
and ignores STUN/TURN settings; update it to accept the server config (or at
least the stun_servers and turn_servers lists) and wire those into RTC creation.
Specifically, change the signature of create_rtc_for_offer to take
SfuServerConfig (or &stun_servers, &turn_servers), then before/after building
the Rtc call the RTC/ICE API to register ICE servers (e.g.,
rtc.set_ice_servers(...) or rtc.sdp_api().set_ice_servers(...), depending on
your RTC API), add server-reflexive candidates created from STUN servers
(Candidate::srflx(...)) and relay candidates for TURN servers
(Candidate::relay(...) or the library equivalent), and map errors the same way
as the existing candidate/offer/serialize steps; update any callers (e.g., where
create_rtc_for_offer is used before add_peer) to pass the config.
- Around line 183-184: The problem is that incoming tracks (tracks_opened) are
recorded but never registered in peers' outgoing mappings (tracks_out), so no
packets are forwarded; update the code paths that handle new incoming tracks
(where tracks_opened is pushed) to also populate each target_peer.tracks_out
with an entry mapping the origin (ParticipantId + origin Mid + MediaKind) to the
outgoing Mid for that target peer. Concretely, in the same scope that appends to
tracks_opened (and in the track-open/track-add handlers referenced around
tracks_opened and keyframe_requests), iterate peers (excluding the origin),
acquire the target_peer lock/guard used elsewhere, and insert a mapping into
target_peer.tracks_out (using the target peer’s assigned Mid) so later lookups
in forward logic find the outgoing Mid; ensure you clone or convert
Mid/MediaKind/ParticipantId as needed and maintain any existing synchronization
semantics.

In `@rust-executor/src/sfu/service.rs`:
- Around line 209-247: The participant is added to the room before later steps
(Sdp parsing, SfuServer::create_rtc_for_offer, and sending SfuCommand::AddPeer),
so failures leave a stale participant; either move room.add_participant(...) to
after the RTC is successfully created and the SfuPeer is about to be sent, or
add explicit rollback: on any error after calling room.add_participant(...) call
back into the RoomManager (e.g., acquire rooms.write().await, find the room with
get_room_mut(&room_id) and call the room removal method to remove pid) before
returning the error so the RoomManager state is consistent (reference symbols:
rooms, room.add_participant, SfuServer::create_rtc_for_offer, SfuPeer,
SfuCommand::AddPeer).

---

Nitpick comments:
In `@core/src/neighbourhood/NeighbourhoodClient.ts`:
- Around line 387-413: The sfuSetConfig method silently defaults config.mode to
"mesh" when undefined, which can surprise callers; update the function by adding
a JSDoc comment above async sfuSetConfig(neighbourhoodUrl: string, config:
Partial<SfuConfig>): Promise<boolean> that documents the implicit default (mode
defaults to "mesh" when config.mode is omitted), explains that callers should
pass an explicit mode to avoid accidental resets, and references the config.mode
behavior and returned sfuSetConfig result so maintainers see the contract
clearly.

In `@core/src/neighbourhood/NeighbourhoodProxy.ts`:
- Around line 96-102: The private method NeighbourhoodProxy.#getNeighbourhoodUrl
currently returns this.#pID and assumes perspective ID equals neighbourhood URL;
make this explicit and robust by (1) adding a TODO comment on the method to warn
about the coupling and (2) implement a lookup fallback that fetches the
perspective metadata (via the existing perspective fetch mechanism in
NeighbourhoodProxy or a helper) to extract the actual neighbourhoodUrl field
when present, and only fallback to this.#pID if the metadata lacks that field;
update any callers of `#getNeighbourhoodUrl` to rely on the resolved value.

In `@rust-executor/src/sfu/relay.rs`:
- Around line 66-75: Extract the magic divisor 200.0 used in the energy
heuristic into a named constant (e.g., PACKET_SIZE_ENERGY_DIVISOR or
OPUS_SILENCE_DIVISOR) and replace the literal in the energy calculation inside
the relay.rs block that computes size_energy ((data.data.len() as f64 /
200.0).min(1.0)); update any related comments to reference the constant so
tuning the heuristic is centralized and more readable.
- Around line 140-170: Add comprehensive unit tests for the voice activity
detection (VAD) logic: create tests that instantiate MediaRelay and feed mock
MediaData into the method that processes incoming audio (e.g.,
MediaRelay::process_media or equivalent) to assert speaking detection when
energy/level exceeds the threshold, verify active speaker selection when
multiple participants send audio (ensure active_speaker updates to the
highest-energy participant and is_speaking returns true for them), and test
timeout/stale-audio behavior by advancing Instant or manipulating
VoiceActivityState.last_audio to confirm participants stop being marked as
speaking and active_speaker clears after the configured timeout; reference
MediaRelay, MediaData, VoiceActivityState, active_speaker, voice_activity,
is_speaking, and remove_participant when adding these new tests.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8b26162f-fd37-4ccd-91bc-b9afb97a7d29

📥 Commits

Reviewing files that changed from the base of the PR and between 6dfad94 and af161a7.

📒 Files selected for processing (14)
  • core/src/neighbourhood/NeighbourhoodClient.ts
  • core/src/neighbourhood/NeighbourhoodProxy.ts
  • rust-executor/Cargo.toml
  • rust-executor/src/graphql/mutation_resolvers.rs
  • rust-executor/src/graphql/query_resolvers.rs
  • rust-executor/src/graphql/subscription_resolvers.rs
  • rust-executor/src/lib.rs
  • rust-executor/src/pubsub.rs
  • rust-executor/src/sfu/graphql_types.rs
  • rust-executor/src/sfu/mod.rs
  • rust-executor/src/sfu/relay.rs
  • rust-executor/src/sfu/room.rs
  • rust-executor/src/sfu/server.rs
  • rust-executor/src/sfu/service.rs

Comment thread rust-executor/src/graphql/mutation_resolvers.rs Outdated
Comment thread rust-executor/src/graphql/mutation_resolvers.rs Outdated
Comment thread rust-executor/src/graphql/mutation_resolvers.rs Outdated
Comment thread rust-executor/src/graphql/query_resolvers.rs Outdated
Comment thread rust-executor/src/graphql/subscription_resolvers.rs Outdated
Comment thread rust-executor/src/graphql/subscription_resolvers.rs Outdated
Comment thread rust-executor/src/sfu/server.rs
Comment thread rust-executor/src/sfu/server.rs
Comment thread rust-executor/src/sfu/server.rs
Comment thread rust-executor/src/sfu/service.rs
@HexaField

Copy link
Copy Markdown
Contributor Author

Hey @data-bot-coasys & @marvin-bot-coasys would love your feedback and review on this PR and coasys/flux#551

Comment thread rust-executor/src/graphql/mutation_resolvers.rs Outdated
@HexaField HexaField force-pushed the feat/embedded-sfu branch 12 times, most recently from de1eeed to 3f78418 Compare March 11, 2026 21:02
@HexaField HexaField force-pushed the feat/embedded-sfu branch from 3ab152c to 9cb286a Compare April 9, 2026 21:38
Replaces the stale graphql-era feat/embedded-sfu work with a clean
forward-port on top of dev:

Rust (rust-executor/src/sfu/):
- 7 files copied verbatim from the old branch: server, room, relay,
  cascade, service, plus a new types.rs (replaces graphql_types.rs).
- Feature flag dropped — SFU is always compiled.  mod.rs reduces to
  a flat module root, no stub fallback.
- str0m dep added at 0.9 (latest stable on crates.io).
- mod sfu; wired into rust-executor/src/lib.rs.

WS RPC handlers (rust-executor/src/api/sfu_ws.rs):
- 11 endpoints: sfu.startRoom, stopRoom, listRooms, callJoin,
  callLeave, callSetQualityPreference, callAnswerServerOffer,
  getConfig, setConfig, sfuPeerForNeighbourhood,
  sfuPeersForNeighbourhood.
- Gated on NEIGHBOURHOOD_READ_CAPABILITY (reads) and
  NEIGHBOURHOOD_UPDATE_CAPABILITY (writes).  Caller DID resolved
  from token.
- Registered via super::sfu_ws::register_ws_handlers in
  api/ws_handler.rs::build_handler_map.

TS SDK (core/src/neighbourhood/):
- New SfuTypes.ts mirroring rust types.rs (SfuConfig, SfuRoomInfo,
  CallSessionInfo, SfuQualityPreference, SfuParticipantInfo).
- NeighbourhoodClient.ts adds 11 wrappers around the apiClient.call()
  endpoints.
- NeighbourhoodProxy.ts thin pass-through; neighbourhoodUrl is taken
  explicitly (proxy is per-perspective, URL is consumer-owned).
- core/src/index.ts re-exports SfuTypes.

Cascade tests (in rust-executor/src/sfu/cascade.rs) come along as
part of the module copy — 13 tests covering pick_redirect_node,
pipe transport lifecycle, signal handlers.

str0m API drift fixes (if any) land in a follow-up commit on this
branch — cargo check is in flight as I commit this.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@HexaField HexaField force-pushed the feat/embedded-sfu branch from f9ddc82 to 7e7f794 Compare June 9, 2026 02:30
HexaField and others added 6 commits June 9, 2026 12:35
str0m 0.9's `Rtc::builder().build()` no longer takes an `Instant`
(the engine derives its internal clock now).  Same fix in three
call sites: cascade.rs:160, cascade.rs:204, server.rs:110.

Plus pruning unused imports the old branch carried (log::warn,
str0m::{Event, IceConnectionState, Input, Output}, HashSet,
std::sync::Arc, tokio::sync::Mutex, std::time::Instant in two
places, str0m::Rtc, room::ParticipantInfo) — all flagged by
cargo check.

After this commit `cargo check` reports only pre-existing dev
build-artifact errors (CUSTOM_DENO_SNAPSHOT.bin + dapp/dist that
are generated by the build script, not committed) — none in the
SFU code.

cargo fmt sweep included for the new sfu_ws.rs handler module.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…e pipe

str0m 0.9's `sdp_api().apply()` returns None when no changes have been
queued (a stricter check than 0.16).  The cascaded pipe-transport
negotiation was creating an empty Rtc and immediately calling apply,
which now errors with "no changes to apply".

Fix: add a 'pipe-control' data channel as the negotiation seed.
Real media transceivers get added later as participants arrive in
the cascaded room.

Surfaced by the failing test
sfu::cascade::tests::test_cascade_pipe_offer_answer (the only
SFU-side test failure after the str0m 0.16 → 0.9 bump).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Without this, every `sfu.*` WS RPC returns 'SFU service not yet
available' because the OnceCell is never set.  Mirrors the Prolog /
RuntimeService init pattern: init after Prolog, before REST.

Best-effort start: if the UDP socket can't bind (typically because a
prior executor is still holding it during a hot restart), log a warn
and continue rather than crashing the executor.  T1 / wind tunnel
SFU scenarios will then surface the failure via the 503-style 'not
yet available' error.
The SFU WS RPC handlers required ctx.user_did, which only gets set in
multi-user mode where each user has a JWT keyed off email/password
(runtime.createUser → runtime.loginUser → JWT).

In single-user / admin flows there is no per-user JWT — the executor
acts on behalf of its own main agent.  Centralise the resolution in a
caller_did() helper that:

  1. Returns ctx.user_did when present (multi-user).
  2. Falls back to crate::agent::did() when the connection was
     authenticated with the admin credential.
  3. Errors with the previous 'Caller DID not resolved' otherwise.

Without this, every sfu.callJoin / sfu.callLeave /
sfu.callSetQualityPreference / sfu.callAnswerServerOffer from an
admin-token-authenticated wind tunnel client returns 401 'Caller DID
not resolved from token' — the failure mode the wind tunnel's T1+
scenarios were blocked on.
…ates

str0m's Candidate::host() refuses bind addresses with 0.0.0.0 IP
('invalid ip 0.0.0.0') because that's not a valid ICE candidate
target.  Default SfuServerConfig was 0.0.0.0:0, which made every
sfu.callJoin fail at 'Failed to create host candidate: ICE bad
candidate'.

Change the default bind to 127.0.0.1:0.  Local-dev + the wind tunnel
all run loopback so this fixes them out of the box.  Production
deployments override SfuServerConfig with their LAN/public IP.
…narios

The wind tunnel drives N synthetic participants from a single admin
connection.  Without an override they all resolve to crate::agent::did()
(the executor's main agent) and the SFU correctly refuses duplicate
joins with 'Agent already joined this room'.

Add an admin-only escape hatch: when the request authenticated with
the admin credential AND passed an agentDidOverride string param,
use that DID for the participant.  Multi-user flows (per-user JWT
in ctx.user_did) still ignore the override — the override is a
test-only mechanism gated on is_admin_credential.

Wired through caller_did() so all four DID-consuming SFU handlers
honour it: callJoin, callLeave, callSetQualityPreference,
callAnswerServerOffer.
HexaField added 2 commits June 9, 2026 15:12
Production cascade discovery routes through the neighbourhood DNA's
gossip layer.  For wind tunnel multi-node tests (T3/T4/S2/S3) we
want to set up a static N-node cluster on one host without the full
discovery chain.

sfu.enableCascade is the admin-only escape hatch:
- Take {localDid, maxParticipantsPerNode, peers: [{did, addr}]}
- Initialise CascadeManager with the local identity + capacity.
- Seed known_nodes under the catch-all '' room id so the
  redirect logic finds the peers before any room exists.

CascadeManager exposes known_nodes_mut() (also new) so SfuService can
inject the static peer set.  pick_redirect_node now falls back to the
'' bucket when the specific room id has no known nodes — the
existing per-room/per-gossip path still wins when populated.
The 'rebalance when remote is significantly less loaded' branch
required fresher cross-node count visibility than the static
sfu.enableCascade path provides.  In the wind tunnel cascade tests
the threshold caused symmetric pingpong: A redirects to B because B
looks empty (cascade view never updates), B redirects to A because A
looks empty, peer bounces forever.

Drop the rebalance branch entirely.  Rule:
  - local has capacity → accept.
  - local at capacity → pick least-loaded remote with headroom.

Existing cascade.rs unit tests still pass (verified: the 4 assertions
in test_cascade_pick_redirect cover the new shape exactly).
…tate

The static enable_cascade path seeds known_nodes with participant_count=0
and never updates it.  In a 3+ node cluster the wind tunnel cascade
tests hit cycles when redirects bounce between nodes that all look
empty in each others' view.

sfu.cascadeAnnounce takes (remoteDid, roomId, participantCount) and
calls CascadeManager::handle_sfu_announce — the same code path
production gossip would use.  Wind tunnel calls it after every
successful peer landing to keep cross-node counts fresh.

Also fix enable_cascade to clear the catch-all bucket before
rewriting it, so an enable_cascade with empty peers correctly
partitions a node (used by F4).
HexaField added a commit to coasys/ad4m-wind-tunnel that referenced this pull request Jun 9, 2026
Adds the complete scenario matrix for the WebRTC + SFU work:
mesh fundamentals, SFU topology, mid-call transitions, faults, and
scale.  25 scenarios total, each runs end-to-end against an ad4m
executor with the embedded SFU service (coasys/ad4m feat/embedded-sfu).

## Harness additions

- src/peer.ts: WebRtcPeer wrapper around @roamhq/wrtc.  Synthetic
  media (per-peer tone audio + counter video), 1 Hz getStats()
  sampling, EventEmitter for ICE state / remote-track / stats.
  Constructor takes iceTransportPolicy ('all' | 'relay') and
  recvSlots (pre-allocates N recv-only audio + video transceivers
  in the initial offer).  Stats aggregation now resolves the
  nominated candidate-pair's local/remote candidate type.
- src/mesh.ts: MeshHost — each participant maintains N-1 separate
  RTCPeerConnections, one per remote.  Single-PC peers can't act as
  both offerer and answerer in mesh > 2 (DTLS role conflict).
  connectAll() pairs every host's a-side and b-side connections.
- src/net.ts: tc qdisc netem wrapper for F1/F2 packet loss.
  Detects sudo -n availability; no-ops on macOS.
- src/cascade.ts: startCluster() spawns N ad4m executors on
  consecutive ports, calls sfu.enableCascade on each.
  cluster.announceCount() pushes per-node count updates via
  sfu.cascadeAnnounce.  Binary path overridable via AD4M_EXECUTOR_BIN.
- src/peer-server.ts: lightweight HTTP harness for W1M's
  multi-machine path.
- src/run-webrtc.ts: standalone runner.  Generates an agent before
  T/M/F/S scenarios (SFU handlers require ctx.user_did).
  Recognises which scenarios need an executor and gates accordingly.
  120 s race fallback on the slow agent.generate path.
- src/client.ts: exposes a public call() method for raw WS RPC
  dispatch so scenarios can hit handlers the typed client doesn't
  wrap.

## Scenarios

| Bucket | Scenarios |
|---|---|
| W mesh fundamentals     | W1, W1M, W2, W3, W4, W5 |
| T SFU topology          | T1, T2, T3, T4, T5      |
| M mid-call transitions  | M1, M2, M3, M4          |
| F faults                | F1, F2, F3, F4, F5, F6, F7 |
| S scale                 | S1, S2, S3              |

W5/F3 use coturn (set TURN_URL + creds).  F1/F2 use tc qdisc netem
on Linux.  T3/T4/M3/F4/S2/S3 spawn multi-executor clusters via
src/cascade.ts.  W1M needs AD4M_REMOTE_PEER_URL pointed at a
remote peer-server.

## Headline scaling

Per-host upload, single-host loopback runs:

  mesh N=2:   ~100 KB
  mesh N=3:   ~200 KB (2.00x)
  mesh N=4:   ~300 KB (3.00x)
  SFU  N=5:   ~188 KB
  SFU  N=10:  ~188 KB (sd <1 KB)
  SFU  N=20:  ~186 KB (sd ~3 KB, 0 packets lost)

Mesh grows O(N-1); SFU stays flat O(1).  M1 mesh→SFU promotion
drops per-host upload to 0.32x of the mesh phase (68% saving).

## Known limitations

- T4/S2 distribution is skewed without real-time gossip — peers
  all land, but stale cross-node counts in the static cascade view
  cause uneven landings.
- The SFU's server-pushed SDP renegotiation isn't wired into the
  wind tunnel client; T1/T2/S1 use recvSlots=N-1 as a workaround
  so downloadMean reflects forwarded media instead of 0.

## Companion PRs

- coasys/ad4m#712 — executor + SFU module + WS RPC handlers + TS
  SDK + boot fixes + cascade admin endpoints.
- coasys/flux#551 — SfuManager + tests + webrtcStore integration
  + UI components.
HexaField added 12 commits June 9, 2026 16:21
Remove the three admin RPCs / params that were shimming around the
missing pluggable gossip:

  - sfu.enableCascade
  - sfu.cascadeAnnounce
  - agentDidOverride param on call_join / call_leave / etc.

In their place:

* CascadeGossip trait (sfu/gossip/mod.rs) — transport-agnostic
  pub/sub for CascadeSignal.  send(target, signal) is broadcast or
  point-to-point; take_inbound() hands the SFU service one mpsc
  receiver for incoming signals.

* NoopGossip — single-node default, sends nowhere, no inbound.

* TcpGossip (sfu/gossip/tcp.rs) — TCP + JSON-line transport.  Each
  node binds a listener and dials every configured peer with
  retry-forever.  CascadeSignal is the only protocol-level message;
  the transport stays oblivious.  Production deployments wire in a
  HolochainGossip / libp2p / etc. impl behind the same trait
  without touching the SFU core.

* SfuService::start takes Arc<dyn CascadeGossip>, spawns an inbound
  dispatcher that calls handle_sfu_announce / remove_node / etc., and
  publishes Announce on every call_join / call_leave so peers learn
  about each room's count in real time — no announce admin RPC
  needed.

* lib.rs wires the gossip from new Ad4mConfig fields:
    sfu_local_did, sfu_max_participants_per_node,
    sfu_cascade_listen, sfu_cascade_peers.
  CLI flag names: --sfu-local-did, --sfu-cascade-listen,
  --sfu-cascade-peers did=host:port,did=host:port

* caller_did() helper no longer takes params; the per-user JWT
  remains the canonical auth path.  Wind tunnel will move to
  multi-user mode (Phase B) to drive N peers from N distinct DIDs
  instead of overriding from one admin connection.
Adds an IceServer type to the public SFU config shape (Rust + TS SDK).
When the SFU advertises ICE servers in its neighbourhood SfuConfig,
the SDK and clients treat that list as authoritative.  Production
deployments can now rotate TURN credentials server-side without a
client redeploy.

- rust-executor/src/sfu/types.rs: new IceServer struct, SfuConfig
  grows an ice_servers vector.
- core/src/neighbourhood/SfuTypes.ts: mirroring IceServer interface,
  SfuConfig.iceServers field on the TS side.
…ter + answer plumbing

Wires the architectural piece of server-pushed SDP renegotiation:

- pubsub.rs: new SFU_CALL_RENEGOTIATION_OFFER_TOPIC.
- sfu/types.rs: SfuCallRenegotiationOffer payload (targetDid +
  neighbourhoodUrl + roomName + sdpOffer).  Serialised verbatim to
  the events WS.
- api/events_ws.rs: subscribe to the new topic with a per-user filter
  (matches_sfu_target — drops frames whose targetDid != resolved
  caller DID).
- sfu/service.rs: call_answer_server_offer(...) consumes the
  client's SDP answer and dispatches SfuCommand::ApplyServerAnswer
  to the event loop.
- sfu/server.rs: new ApplyServerAnswer variant — acknowledged in the
  loop today, with the str0m sdp_api.accept_answer call landing
  alongside the offer-generation side.
- api/sfu_ws.rs: callAnswerServerOffer wires through to the new
  service method instead of returning the stub.

This lets clients (wind tunnel + flux) wire up to the topic and
react to renegotiation offers; the inbound offer generation (which
needs str0m's sdp_api to add outbound m-lines mid-call) is the
focused follow-up that completes the loop.
…ation

Completes the renegotiation loop:

- When peer X opens a new inbound track (Event::MediaAdded), iterate
  every other peer Y in the same room.  For each Y without an
  in-flight pending offer:
    1. Add a SendOnly outbound m-line via Y.rtc.sdp_api().add_media
    2. Apply -> generates an SdpOffer + SdpPendingOffer
    3. Store Y.tracks_out[new_mid] = (X.id, X.mid)
    4. Stash the pending in Y.pending_offer
    5. Publish SfuCallRenegotiationOffer { targetDid: Y.agent_did, ... }
       to SFU_CALL_RENEGOTIATION_OFFER_TOPIC

- When ApplyServerAnswer arrives:
    Parse Y's SDP answer + clear Y.pending_offer + call
    Y.rtc.sdp_api().accept_answer(pending, answer).

str0m only allows one in-flight change at a time per peer, so peers
with a pending offer are skipped this tick — the next inbound track
or the next ApplyServerAnswer naturally retries.

SfuPeer grows a pending_offer field and a hand-rolled Debug impl
(SdpPendingOffer doesn't derive Debug).
The SFU event loop processes commands in a sync inner block. The
renegotiation offer publish was using the async get_global_pubsub()
which returns a Future, breaking the build. Switch to the existing
sync helper get_global_pubsub_sync() that returns Arc<PubSub>
directly.
…ng tracks

When peer X joined a populated room, the prior server-side renegotiation
loop only generated offers for the existing peers to receive X's tracks.
X itself never got offers for the existing peers' tracks, so X's
downloadMean stayed at 0.

Refactored the post-MediaAdded propagation to handle both directions
in one pass:
- (a) For each freshly-opened inbound track, queue an outbound m-line
      on every OTHER peer in the same room (existing behaviour).
- (b) For each newly-joined peer (any peer that opened a track this
      tick), queue outbound m-lines on the new peer for every OTHER
      peer's pre-existing inbound tracks (new behaviour).

All additions to a single target peer are batched into one sdp_api()
call so str0m's "one in-flight change per peer" rule still holds.

Logs each prepared offer so we can verify the pipeline fires.
So renegotiation success is visible in executor logs alongside the
"renegotiation offer prepared" line.
The pipe transport machinery in cascade.rs has been complete for some
time (establish_pipe / handle_pipe_offer / handle_pipe_answer /
forward_to_pipes), but the gossip-inbound handler in service.rs was a
no-op for PipeOffer / PipeAnswer signals.  Wire it up:

1. On `Announce` from a remote node for a room we serve locally, and
   when our DID lexically wins the tie-break, auto-establish a pipe:
   call `establish_pipe` and send the resulting `PipeOffer` directed
   to the announcer via `GossipTarget::PeerDid`.

2. On `PipeOffer` addressed to us: call `handle_pipe_offer`, send the
   `PipeAnswer` directed back to `from_did`.

3. On `PipeAnswer` addressed to us: call `handle_pipe_answer` to apply
   the SDP answer and mark the pipe `established`.

Side-effect outbound signals are queued and drained outside the
cascade lock so a slow gossip transport can't deadlock the dispatcher.

CascadeManager grows three small read-only helpers (`local_did`,
`has_pipe`, `established_pipe_count`, `established_pipes`) so the
service can:

- Filter signals not addressed to us.
- Skip auto-establish if a pipe already exists.
- Surface pipe count via the new `sfu.cascadeStatus` WS RPC for the
  wind tunnel to assert against.

Also a minor doc-string cleanup on `call_answer_server_offer` — the
"Phase E follow-up" caveat is now obsolete; the renegotiation pipeline
is wired and verified by T1 in the same branch.
The cascade auto-establish + pipe handshake is silent at INFO when
something doesn't fire (no announce, no inbound, no dropped target).
Surface:

- send(): logs broadcast / directed sends with the signal variant
  and recipient count/DID
- connect_loop: logs every successful TCP connection (was debug!)
- reader_loop: logs every inbound signal variant

So we can see whether T6's pipe handshake fails because (a) the
gossip transport isn't connected, (b) the Announce isn't being
sent, or (c) the auto-establish logic isn't firing on the receiver.
…orks

RoomId::Display formats as `{neighbourhood_url}:{room_name}` — but
neighbourhood URLs carry their own scheme separator (e.g.
`windtunnel://t6`), so `split_once(':')` chops on the FIRST `:` inside
the scheme.  Result: `local_has_room` always saw a nonsense
neighbourhood URL ("windtunnel") + room ("//t6:t6-pipe-handshake"),
the lookup against the room manager missed, `pre_has_room` was false,
and auto-establish never fired.

Switch all three call sites — `local_has_room`, the auto-establish
branch's RoomId reconstruction, and `cascade::handle_pipe_offer` — to
`rsplit_once(':')` which finds the boundary correctly even when the
URL contains additional colons.

This was the missing piece for Phase E: T6's gossip layer was
actually exchanging Announce signals (confirmed via the new TcpGossip
instrumentation), the auto-establish branch was just skipped because
of the parse bug.
Three follow-ups to Phase E:

1. Cascade Leave signal flow.  call_leave on the last participant of
   a room now broadcasts CascadeSignal::Leave with the room_id; the
   receiver routes it through a new targeted `remove_node_from_room`
   helper that drops just that (room, did) entry instead of nuking
   the node from every room.  Stale `known_nodes` entries can no
   longer survive a vacate-without-rejoin.

2. NeighbourhoodProxy.subscribeCallRenegotiationOffer (and the
   underlying NeighbourhoodClient method) — the SDK surface Flux's
   SfuManager was already calling but the SDK never exposed.  Uses
   the shared apiClient.subscribe channel, filters by targetDid
   defensively in case the events_ws fanout regresses.

3. TcpGossip per-signal `info!` lines demoted back to `debug!` now
   that Phase E is verified — keeps production logs quiet.
Migrates the pipe transport RTC out of CascadeManager and into the
SFU server's peers map flagged with `is_pipe = true`.  The same
event loop now drives both client peers and pipe peers, so:

- Pipe ICE handshakes actually complete (the str0m state machine
  ticks each event loop iteration).
- Local peers' MediaData flows to pipe peers via the existing
  per-target relay loop — `forward_to_pipes` and its bespoke writer
  walk are no longer needed.
- New SfuCommand::ApplyPipeRenegotiationOffer handles inbound pipe
  renegotiation: server applies the offer through `sdp_api()` and
  publishes the answer via SFU_PIPE_RENEGOTIATION_ANSWER_TOPIC.
- New SfuCommand::ApplyServerAnswer path is reused for pipe-bound
  renegotiation answers — same code path as client answers, just
  resolved by ParticipantId.

CascadeManager keeps only PipeMeta bookkeeping (remote_did, room_id,
participant_id, established) — enough to translate `(room_id,
remote_did)` ↔ ParticipantId during signal dispatch.

Two new pubsub topics + bridges plumb the event loop ↔ gossip
without forcing the sync event loop to know about CascadeGossip:

- SFU_PIPE_RENEGOTIATION_OFFER_TOPIC  — event-loop → SfuService →
  CascadeSignal::PipeOffer over gossip.
- SFU_PIPE_RENEGOTIATION_ANSWER_TOPIC — event-loop → SfuService →
  CascadeSignal::PipeAnswer over gossip.

Pipe-bound renegotiation offers from the renegotiation propagation
loop in server.rs (when a local peer adds tracks) now go through
this path instead of the events_ws fanout — clients never see the
pipe-bound offers.
@HexaField HexaField changed the title feat(sfu): Embedded SFU for scalable WebRTC conferencing in neighbourhoods Embedded SFU service: Phases A–F Jun 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant