From 5494847a8c8d88ccb66452f2c5212084c4dcd3c5 Mon Sep 17 00:00:00 2001 From: Vladimir Poluliashenko Date: Fri, 29 May 2026 13:29:49 +0100 Subject: [PATCH] Fix race condition: subscriber misses publisher-connected notification When subscribers reconnect simultaneously with publishers (e.g. after a proxy timeout), the publisher's webrtcup event fires before the subscriber's waiter is created. The notification is lost, the subscriber waits until context timeout, and the call participant cannot hear others. Two issues fixed: 1. async/notifier.go: add sticky notification with 30-second TTL. Notify() now records the notification time. NewWaiter() checks if the key was notified recently and returns an already-closed channel so the caller retries immediately instead of waiting indefinitely. 2. sfu/janus/subscriber.go: move waiter creation inside the retry loop. The original code created one waiter before the loop and reused it, causing a tight busy-loop when the waiter channel was pre-closed by the sticky notifier. Creating a fresh waiter per retry eliminates this. Observed in production: clusters of 10-44 "No such feed" errors per reconnect event affecting 2-5 users, occurring ~10x/day. After fix: 0. --- async/notifier.go | 21 +++++++++++++++++++++ sfu/janus/subscriber.go | 5 +++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/async/notifier.go b/async/notifier.go index 747ec5ea..a24ad162 100644 --- a/async/notifier.go +++ b/async/notifier.go @@ -24,8 +24,11 @@ package async import ( "context" "sync" + "time" ) +const notifiedKeyTTL = 30 * time.Second + type rootWaiter struct { key string ch chan struct{} @@ -56,6 +59,9 @@ type Notifier struct { waiters map[string]*rootWaiter // +checklocks:Mutex waiterMap map[string]map[*Waiter]bool + // +checklocks:Mutex + // Sticky: remembers recent notifications so late waiters don't miss them. + notifiedAt map[string]time.Time } type ReleaseFunc func() @@ -64,6 +70,15 @@ func (n *Notifier) NewWaiter(key string) (*Waiter, ReleaseFunc) { n.Lock() defer n.Unlock() + // If this key was notified recently, return an already-closed channel + // so the caller retries immediately without waiting. + if t, ok := n.notifiedAt[key]; ok && time.Since(t) < notifiedKeyTTL { + ch := make(chan struct{}) + close(ch) + w := &Waiter{key: key, ch: ch} + return w, func() {} + } + waiter, found := n.waiters[key] if !found { waiter = &rootWaiter{ @@ -103,6 +118,7 @@ func (n *Notifier) Reset() { } n.waiters = nil n.waiterMap = nil + n.notifiedAt = nil } func (n *Notifier) release(w *Waiter) { @@ -126,6 +142,11 @@ func (n *Notifier) Notify(key string) { n.Lock() defer n.Unlock() + if n.notifiedAt == nil { + n.notifiedAt = make(map[string]time.Time) + } + n.notifiedAt[key] = time.Now() + if w, found := n.waiters[key]; found { w.notify() delete(n.waiters, w.key) diff --git a/sfu/janus/subscriber.go b/sfu/janus/subscriber.go index 0d3bd736..b6d6fcbc 100644 --- a/sfu/janus/subscriber.go +++ b/sfu/janus/subscriber.go @@ -169,8 +169,6 @@ func (p *janusSubscriber) joinRoom(ctx context.Context, stream *streamSelection, return } - waiter, stop := p.mcu.newPublisherConnectedWaiter(p.publisher, p.streamType) - defer stop() loggedNotPublishingYet := false retry: @@ -253,11 +251,14 @@ retry: sfuinternal.StatsWaitingForPublisherTotal.WithLabelValues(string(p.streamType)).Inc() } + waiter, stop := p.mcu.newPublisherConnectedWaiter(p.publisher, p.streamType) if err := waiter.Wait(ctx); err != nil { + stop() p.Close(context.Background()) callback(err, nil) return } + stop() p.logger.Printf("Retry subscribing %s from %s", p.streamType, p.publisher) goto retry default: