Skip to content

Commit 6239306

Browse files
committed
ai/live: Add a minimum segment duration for new WHIP.
1 parent 04fd5d4 commit 6239306

File tree

3 files changed

+97
-8
lines changed

3 files changed

+97
-8
lines changed

media/rtp_segmenter.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package media
33
import (
44
"errors"
55
"sync"
6+
"time"
67

78
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
89
"github.com/pion/webrtc/v4"
@@ -26,6 +27,10 @@ type RTPSegmenter struct {
2627
hasVideo bool
2728
maxQueueSize int // Maximum number of packets to buffer per queue
2829
tsWatermark int64 // Timestamp of the last written packet
30+
31+
segStartTs int64 // timestamp of the current segment
32+
segStartTime time.Time // wall clock of when the current segment started
33+
minSegDur time.Duration // minimum segment duration
2934
}
3035

3136
type audioPacket struct {
@@ -47,10 +52,11 @@ type trackWriter struct {
4752
writeVideo func(pts, dts int64, data [][]byte) error
4853
}
4954

50-
func NewRTPSegmenter(tracks []RTPTrack, ssr *SwitchableSegmentReader) *RTPSegmenter {
55+
func NewRTPSegmenter(tracks []RTPTrack, ssr *SwitchableSegmentReader, segDur time.Duration) *RTPSegmenter {
5156
s := &RTPSegmenter{
5257
ssr: ssr,
5358
maxQueueSize: 20,
59+
minSegDur: segDur,
5460
}
5561
s.tracks = s.setupTracks(tracks)
5662
return s
@@ -73,7 +79,28 @@ func (s *RTPSegmenter) StartSegment(startTs int64) {
7379
s.mpegtsWriter = mpegts.NewWriter(writer, newTracks)
7480
s.ssr.Read(writer.MakeReader())
7581
s.mediaWriter = writer
82+
s.segStartTs = startTs
83+
s.segStartTime = time.Now()
7684
}
85+
86+
func (s *RTPSegmenter) ShouldStartSegment(pts int64, tb uint32) bool {
87+
s.mu.Lock()
88+
defer s.mu.Unlock()
89+
if s.segStartTime.IsZero() {
90+
return true
91+
}
92+
// Enforce minimum real (wall-clock) time
93+
if time.Since(s.segStartTime) < s.minSegDur {
94+
return false
95+
}
96+
// Enforce minimum PTS time
97+
needed := int64(s.minSegDur.Seconds() * float64(tb))
98+
if (pts - s.segStartTs) < needed {
99+
return false
100+
}
101+
return true
102+
}
103+
77104
func (s *RTPSegmenter) WriteVideo(source RTPTrack, pts, dts int64, au [][]byte) error {
78105
s.mu.Lock()
79106
defer s.mu.Unlock()

media/rtp_segmenter_test.go

+60-5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package media
33
import (
44
"sync"
55
"testing"
6+
"time"
67

78
"github.com/pion/webrtc/v4"
89
"github.com/stretchr/testify/require"
@@ -34,7 +35,7 @@ func TestRTPSegmenterQueueLimit(t *testing.T) {
3435

3536
require := require.New(t)
3637
ssr := NewSwitchableSegmentReader()
37-
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr)
38+
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr, 0)
3839
seg.StartSegment(0)
3940

4041
// Override maxQueueSize for testing
@@ -81,7 +82,7 @@ func TestRTPSegmenterQueueLimit(t *testing.T) {
8182
func TestRTPSegmenterVideoOnly(t *testing.T) {
8283
require := require.New(t)
8384
ssr := NewSwitchableSegmentReader()
84-
seg := NewRTPSegmenter([]RTPTrack{videoTrack}, ssr)
85+
seg := NewRTPSegmenter([]RTPTrack{videoTrack}, ssr, 0)
8586

8687
// Verify we have video but no audio
8788
require.True(seg.hasVideo)
@@ -107,7 +108,7 @@ func TestRTPSegmenterVideoOnly(t *testing.T) {
107108
func TestRTPSegmenterAudioOnly(t *testing.T) {
108109
require := require.New(t)
109110
ssr := NewSwitchableSegmentReader()
110-
seg := NewRTPSegmenter([]RTPTrack{audioTrack}, ssr)
111+
seg := NewRTPSegmenter([]RTPTrack{audioTrack}, ssr, 0)
111112

112113
// Verify we have audio but no video
113114
require.True(seg.hasAudio)
@@ -133,7 +134,7 @@ func TestRTPSegmenterAudioOnly(t *testing.T) {
133134
func TestRTPSegmenterConcurrency(t *testing.T) {
134135
require := require.New(t)
135136
ssr := NewSwitchableSegmentReader()
136-
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr)
137+
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr, 0)
137138

138139
// Start a segment
139140
seg.StartSegment(0)
@@ -179,7 +180,7 @@ func TestRTPSegmenterConcurrency(t *testing.T) {
179180
func TestRTPSegmenterLatePacketDropping(t *testing.T) {
180181
require := require.New(t)
181182
ssr := NewSwitchableSegmentReader()
182-
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr)
183+
seg := NewRTPSegmenter([]RTPTrack{videoTrack, audioTrack}, ssr, 0)
183184

184185
// Start a segment
185186
seg.StartSegment(0)
@@ -228,3 +229,57 @@ func TestRTPSegmenterLatePacketDropping(t *testing.T) {
228229
// Verify low watermark is updated
229230
require.Equal(int64(7000), seg.videoQueue[0].pts)
230231
}
232+
233+
func TestRTPSegmenterMinSegmentDurationWallClock(t *testing.T) {
234+
require := require.New(t)
235+
236+
ssr := NewSwitchableSegmentReader()
237+
// Use a short minSegDur so we don't slow tests too much
238+
minSegDur := 100 * time.Millisecond
239+
seg := NewRTPSegmenter([]RTPTrack{videoTrack}, ssr, minSegDur)
240+
241+
// Initially no segment
242+
require.False(seg.IsReady(), "No active segment yet")
243+
244+
// Start an initial segment
245+
seg.StartSegment(0)
246+
require.True(seg.IsReady(), "Segment should be started")
247+
248+
// Immediately check if we should start a new segment
249+
// Not enough wall time has passed
250+
shouldStart := seg.ShouldStartSegment(1, 1) // ~1 second in 1hz timescale
251+
require.False(shouldStart, "Should not start new segment (wall-clock < minDur)")
252+
253+
// Wait less than the minSegDur, ensure we still don't start
254+
time.Sleep(minSegDur / 2)
255+
shouldStart = seg.ShouldStartSegment(2, 1) // ~2 seconds
256+
require.False(shouldStart, "Still under the wall-clock limit")
257+
258+
// Wait enough time, then we can start a new segment
259+
time.Sleep(minSegDur / 2)
260+
shouldStart = seg.ShouldStartSegment(3, 1) // ~3 seconds PTS
261+
require.True(shouldStart, "Segment should have started")
262+
}
263+
264+
func TestRTPSegmenterMinSegmentDurationPTS(t *testing.T) {
265+
require := require.New(t)
266+
ssr := NewSwitchableSegmentReader()
267+
268+
// 1 second min
269+
minSegDur := 10 * time.Millisecond
270+
seg := NewRTPSegmenter([]RTPTrack{videoTrack}, ssr, minSegDur)
271+
272+
// Start initial segment at pts=0
273+
seg.StartSegment(0)
274+
require.True(seg.IsReady(), "Segment should be active")
275+
276+
// Even if we wait in real clock, if PTS is still less than 1s, no new segment
277+
time.Sleep(minSegDur)
278+
require.False(seg.ShouldStartSegment(9, 1000)) // 9ms < 10ms minSegDur
279+
require.True(seg.ShouldStartSegment(10, 1000)) // 10ms == minSegDur
280+
281+
seg.StartSegment(12)
282+
time.Sleep(minSegDur) // because we also check the wall clock
283+
require.False(seg.ShouldStartSegment(20, 1000), "PTS") // 20ms < 22ms (12ms start + 10ms minSegDur)
284+
require.True(seg.ShouldStartSegment(23, 1000), "PTS") // 23ms > 22ms
285+
}

media/whip_server.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,16 @@ func (s *WHIPServer) CreateWHIP(ctx context.Context, ssr *SwitchableSegmentReade
183183
if audioTrack != nil {
184184
tracks = append(tracks, audioTrack)
185185
}
186+
minSegDur := 1 * time.Second
187+
segDurEnv := os.Getenv("LIVE_AI_MIN_SEG_DUR")
188+
if segDurEnv != "" {
189+
if parsed, err := time.ParseDuration(segDurEnv); err == nil {
190+
minSegDur = parsed
191+
}
192+
}
186193
trackCodecs := make([]string, len(tracks))
187194
timeDecoder := rtptime.NewGlobalDecoder2()
188-
segmenter := NewRTPSegmenter(tracks, ssr)
195+
segmenter := NewRTPSegmenter(tracks, ssr, minSegDur)
189196
var wg sync.WaitGroup // to wait for all tracks to complete
190197
for i, track := range tracks {
191198
trackCodecs[i] = track.Codec().MimeType
@@ -286,7 +293,7 @@ func handleRTP(ctx context.Context, segmenter *RTPSegmenter, timeDecoder *rtptim
286293
}
287294

288295
idr := h264.IsRandomAccess(au)
289-
if idr {
296+
if idr && segmenter.ShouldStartSegment(dts, track.Codec().ClockRate) {
290297
segmenter.StartSegment(dts)
291298
}
292299

0 commit comments

Comments
 (0)