Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions async/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ package async
import (
"context"
"sync"
"time"
)

const notifiedKeyTTL = 30 * time.Second

type rootWaiter struct {
key string
ch chan struct{}
Expand Down Expand Up @@ -56,6 +59,9 @@ type Notifier struct {
waiters map[string]*rootWaiter
// +checklocks:Mutex
waiterMap map[string]map[*Waiter]bool
// +checklocks:Mutex
// Sticky: remembers recent notifications so late waiters don't miss them.
notifiedAt map[string]time.Time
}

type ReleaseFunc func()
Expand All @@ -64,6 +70,15 @@ func (n *Notifier) NewWaiter(key string) (*Waiter, ReleaseFunc) {
n.Lock()
defer n.Unlock()

// If this key was notified recently, return an already-closed channel
// so the caller retries immediately without waiting.
if t, ok := n.notifiedAt[key]; ok && time.Since(t) < notifiedKeyTTL {
ch := make(chan struct{})
close(ch)
w := &Waiter{key: key, ch: ch}
return w, func() {}
}

waiter, found := n.waiters[key]
if !found {
waiter = &rootWaiter{
Expand Down Expand Up @@ -103,6 +118,7 @@ func (n *Notifier) Reset() {
}
n.waiters = nil
n.waiterMap = nil
n.notifiedAt = nil
}

func (n *Notifier) release(w *Waiter) {
Expand All @@ -126,6 +142,11 @@ func (n *Notifier) Notify(key string) {
n.Lock()
defer n.Unlock()

if n.notifiedAt == nil {
n.notifiedAt = make(map[string]time.Time)
}
n.notifiedAt[key] = time.Now()

if w, found := n.waiters[key]; found {
w.notify()
delete(n.waiters, w.key)
Expand Down
5 changes: 3 additions & 2 deletions sfu/janus/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ func (p *janusSubscriber) joinRoom(ctx context.Context, stream *streamSelection,
return
}

waiter, stop := p.mcu.newPublisherConnectedWaiter(p.publisher, p.streamType)
defer stop()

loggedNotPublishingYet := false
retry:
Expand Down Expand Up @@ -253,11 +251,14 @@ retry:
sfuinternal.StatsWaitingForPublisherTotal.WithLabelValues(string(p.streamType)).Inc()
}

waiter, stop := p.mcu.newPublisherConnectedWaiter(p.publisher, p.streamType)
if err := waiter.Wait(ctx); err != nil {
stop()
p.Close(context.Background())
callback(err, nil)
return
}
stop()
p.logger.Printf("Retry subscribing %s from %s", p.streamType, p.publisher)
goto retry
default:
Expand Down