Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ai/live: Add a minimum segment duration for new WHIP. #3451

Merged
merged 3 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion media/rtp_segmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package media
import (
"errors"
"sync"
"time"

"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/pion/webrtc/v4"
Expand All @@ -26,6 +27,10 @@ type RTPSegmenter struct {
hasVideo bool
maxQueueSize int // Maximum number of packets to buffer per queue
tsWatermark int64 // Timestamp of the last written packet

segStartTs int64 // timestamp of the current segment
segStartTime time.Time // wall clock of when the current segment started
minSegDur time.Duration // minimum segment duration
}

type audioPacket struct {
Expand All @@ -47,10 +52,11 @@ type trackWriter struct {
writeVideo func(pts, dts int64, data [][]byte) error
}

func NewRTPSegmenter(tracks []RTPTrack, ssr *SwitchableSegmentReader) *RTPSegmenter {
func NewRTPSegmenter(tracks []RTPTrack, ssr *SwitchableSegmentReader, segDur time.Duration) *RTPSegmenter {
s := &RTPSegmenter{
ssr: ssr,
maxQueueSize: 20,
minSegDur: segDur,
}
s.tracks = s.setupTracks(tracks)
return s
Expand All @@ -73,7 +79,28 @@ func (s *RTPSegmenter) StartSegment(startTs int64) {
s.mpegtsWriter = mpegts.NewWriter(writer, newTracks)
s.ssr.Read(writer.MakeReader())
s.mediaWriter = writer
s.segStartTs = startTs
s.segStartTime = time.Now()
}

func (s *RTPSegmenter) ShouldStartSegment(pts int64, tb uint32) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.segStartTime.IsZero() {
return true
}
// Enforce minimum real (wall-clock) time
if time.Since(s.segStartTime) < s.minSegDur {
return false
}
// Enforce minimum PTS time
needed := int64(s.minSegDur.Seconds() * float64(tb))
if (pts - s.segStartTs) < needed {
return false
}
return true
}

func (s *RTPSegmenter) WriteVideo(source RTPTrack, pts, dts int64, au [][]byte) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
65 changes: 60 additions & 5 deletions media/rtp_segmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package media
import (
"sync"
"testing"
"time"

"github.com/pion/webrtc/v4"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -34,7 +35,7 @@ func TestRTPSegmenterQueueLimit(t *testing.T) {

require := require.New(t)
ssr := NewSwitchableSegmentReader()
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr)
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr, 0)
seg.StartSegment(0)

// Override maxQueueSize for testing
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestRTPSegmenterQueueLimit(t *testing.T) {
func TestRTPSegmenterVideoOnly(t *testing.T) {
require := require.New(t)
ssr := NewSwitchableSegmentReader()
seg := NewRTPSegmenter([]RTPTrack{videoTrack}, ssr)
seg := NewRTPSegmenter([]RTPTrack{videoTrack}, ssr, 0)

// Verify we have video but no audio
require.True(seg.hasVideo)
Expand All @@ -107,7 +108,7 @@ func TestRTPSegmenterVideoOnly(t *testing.T) {
func TestRTPSegmenterAudioOnly(t *testing.T) {
require := require.New(t)
ssr := NewSwitchableSegmentReader()
seg := NewRTPSegmenter([]RTPTrack{audioTrack}, ssr)
seg := NewRTPSegmenter([]RTPTrack{audioTrack}, ssr, 0)

// Verify we have audio but no video
require.True(seg.hasAudio)
Expand All @@ -133,7 +134,7 @@ func TestRTPSegmenterAudioOnly(t *testing.T) {
func TestRTPSegmenterConcurrency(t *testing.T) {
require := require.New(t)
ssr := NewSwitchableSegmentReader()
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr)
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr, 0)

// Start a segment
seg.StartSegment(0)
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestRTPSegmenterConcurrency(t *testing.T) {
func TestRTPSegmenterLatePacketDropping(t *testing.T) {
require := require.New(t)
ssr := NewSwitchableSegmentReader()
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr)
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr, 0)

// Start a segment
seg.StartSegment(0)
Expand Down Expand Up @@ -228,3 +229,57 @@ func TestRTPSegmenterLatePacketDropping(t *testing.T) {
// Verify low watermark is updated
require.Equal(int64(7000), seg.videoQueue[0].pts)
}

func TestRTPSegmenterMinSegmentDurationWallClock(t *testing.T) {
require := require.New(t)

ssr := NewSwitchableSegmentReader()
// Use a short minSegDur so we don't slow tests too much
minSegDur := 100 * time.Millisecond
seg := NewRTPSegmenter([]RTPTrack{videoTrack}, ssr, minSegDur)

// Initially no segment
require.False(seg.IsReady(), "No active segment yet")

// Start an initial segment
seg.StartSegment(0)
require.True(seg.IsReady(), "Segment should be started")

// Immediately check if we should start a new segment
// Not enough wall time has passed
shouldStart := seg.ShouldStartSegment(1, 1) // ~1 second in 1hz timescale
require.False(shouldStart, "Should not start new segment (wall-clock < minDur)")

// Wait less than the minSegDur, ensure we still don't start
time.Sleep(minSegDur / 2)
shouldStart = seg.ShouldStartSegment(2, 1) // ~2 seconds
require.False(shouldStart, "Still under the wall-clock limit")

// Wait enough time, then we can start a new segment
time.Sleep(minSegDur / 2)
shouldStart = seg.ShouldStartSegment(3, 1) // ~3 seconds PTS
require.True(shouldStart, "Segment should have started")
}

func TestRTPSegmenterMinSegmentDurationPTS(t *testing.T) {
require := require.New(t)
ssr := NewSwitchableSegmentReader()

// 1 second min
minSegDur := 10 * time.Millisecond
seg := NewRTPSegmenter([]RTPTrack{videoTrack}, ssr, minSegDur)

// Start initial segment at pts=0
seg.StartSegment(0)
require.True(seg.IsReady(), "Segment should be active")

// Even if we wait in real clock, if PTS is still less than 1s, no new segment
time.Sleep(minSegDur)
require.False(seg.ShouldStartSegment(9, 1000)) // 9ms < 10ms minSegDur
require.True(seg.ShouldStartSegment(10, 1000)) // 10ms == minSegDur

seg.StartSegment(12)
time.Sleep(minSegDur) // because we also check the wall clock
require.False(seg.ShouldStartSegment(20, 1000), "PTS") // 20ms < 22ms (12ms start + 10ms minSegDur)
require.True(seg.ShouldStartSegment(23, 1000), "PTS") // 23ms > 22ms
}
11 changes: 9 additions & 2 deletions media/whip_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,16 @@ func (s *WHIPServer) CreateWHIP(ctx context.Context, ssr *SwitchableSegmentReade
if audioTrack != nil {
tracks = append(tracks, audioTrack)
}
minSegDur := 1 * time.Second
segDurEnv := os.Getenv("LIVE_AI_MIN_SEG_DUR")
if segDurEnv != "" {
if parsed, err := time.ParseDuration(segDurEnv); err == nil {
minSegDur = parsed
}
}
trackCodecs := make([]string, len(tracks))
timeDecoder := rtptime.NewGlobalDecoder2()
segmenter := NewRTPSegmenter(tracks, ssr)
segmenter := NewRTPSegmenter(tracks, ssr, minSegDur)
var wg sync.WaitGroup // to wait for all tracks to complete
for i, track := range tracks {
trackCodecs[i] = track.Codec().MimeType
Expand Down Expand Up @@ -286,7 +293,7 @@ func handleRTP(ctx context.Context, segmenter *RTPSegmenter, timeDecoder *rtptim
}

idr := h264.IsRandomAccess(au)
if idr {
if idr && segmenter.ShouldStartSegment(dts, track.Codec().ClockRate) {
segmenter.StartSegment(dts)
}

Expand Down
Loading