Skip to content

Commit ae856dc

Browse files
committed
ai/live: Add a minimum segment duration for new WHIP.
1 parent bceec72 commit ae856dc

File tree

2 files changed

+37
-3
lines changed

2 files changed

+37
-3
lines changed

media/rtp_segmenter.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package media
33
import (
44
"errors"
55
"sync"
6+
"time"
67

78
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
89
"github.com/pion/webrtc/v4"
@@ -26,6 +27,10 @@ type RTPSegmenter struct {
2627
hasVideo bool
2728
maxQueueSize int // Maximum number of packets to buffer per queue
2829
tsWatermark int64 // Timestamp of the last written packet
30+
31+
segStartTs int64 // timestamp of the current segment
32+
segStartTime time.Time // wall clock of when the current segment started
33+
minSegDur time.Duration // minimum segment duration
2934
}
3035

3136
type audioPacket struct {
@@ -47,10 +52,11 @@ type trackWriter struct {
4752
writeVideo func(pts, dts int64, data [][]byte) error
4853
}
4954

50-
func NewRTPSegmenter(tracks []RTPTrack, ssr *SwitchableSegmentReader) *RTPSegmenter {
55+
func NewRTPSegmenter(tracks []RTPTrack, ssr *SwitchableSegmentReader, segDur time.Duration) *RTPSegmenter {
5156
s := &RTPSegmenter{
5257
ssr: ssr,
5358
maxQueueSize: 20,
59+
minSegDur: segDur,
5460
}
5561
s.tracks = s.setupTracks(tracks)
5662
return s
@@ -73,7 +79,28 @@ func (s *RTPSegmenter) StartSegment(startTs int64) {
7379
s.mpegtsWriter = mpegts.NewWriter(writer, newTracks)
7480
s.ssr.Read(writer.MakeReader())
7581
s.mediaWriter = writer
82+
s.segStartTs = startTs
83+
s.segStartTime = time.Now()
7684
}
85+
86+
func (s *RTPSegmenter) ShouldStartSegment(pts int64, tb uint32) bool {
87+
s.mu.Lock()
88+
defer s.mu.Unlock()
89+
if s.segStartTime.IsZero() {
90+
return true
91+
}
92+
// Enforce minimum real (wall-clock) time
93+
if time.Since(s.segStartTime) < s.minSegDur {
94+
return false
95+
}
96+
// Enforce minimum PTS time
97+
needed := int64(s.minSegDur.Seconds() * float64(tb))
98+
if (pts - s.segStartTs) < needed {
99+
return false
100+
}
101+
return true
102+
}
103+
77104
func (s *RTPSegmenter) WriteVideo(source RTPTrack, pts, dts int64, au [][]byte) error {
78105
s.mu.Lock()
79106
defer s.mu.Unlock()

media/whip_server.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,16 @@ func (s *WHIPServer) CreateWHIP(ctx context.Context, ssr *SwitchableSegmentReade
183183
if audioTrack != nil {
184184
tracks = append(tracks, audioTrack)
185185
}
186+
minSegDur := 1 * time.Second
187+
segDurEnv := os.Getenv("LIVE_AI_MIN_SEG_DUR")
188+
if segDurEnv != "" {
189+
if parsed, err := time.ParseDuration(segDurEnv); err == nil {
190+
minSegDur = parsed
191+
}
192+
}
186193
trackCodecs := make([]string, len(tracks))
187194
timeDecoder := rtptime.NewGlobalDecoder2()
188-
segmenter := NewRTPSegmenter(tracks, ssr)
195+
segmenter := NewRTPSegmenter(tracks, ssr, minSegDur)
189196
var wg sync.WaitGroup // to wait for all tracks to complete
190197
for i, track := range tracks {
191198
trackCodecs[i] = track.Codec().MimeType
@@ -286,7 +293,7 @@ func handleRTP(ctx context.Context, segmenter *RTPSegmenter, timeDecoder *rtptim
286293
}
287294

288295
idr := h264.IsRandomAccess(au)
289-
if idr {
296+
if idr && segmenter.ShouldStartSegment(dts, track.Codec().ClockRate) {
290297
segmenter.StartSegment(dts)
291298
}
292299

0 commit comments

Comments
 (0)