11package rtsp
22
33import (
4+ "bytes"
45 "context"
56 "errors"
67 "log/slog"
@@ -49,8 +50,9 @@ const maxGOPCachePackets = 512
4950// LiveSource is a Source backed by an external H.264 producer. The producer
5051// (e.g. internal/rpicamera) pushes AccessUnit values onto Push; the source's
5152// Run goroutine encodes them into RTP packets and writes them to the
52- // gortsplib stream. DESCRIBE waits via Ready until the first IDR is
53- // observed so clients never see a pre-keyframe SDP.
53+ // gortsplib stream. DESCRIBE waits via Ready until SPS, PPS, and the first
54+ // IDR have all been observed so clients never see a pre-keyframe SDP nor
55+ // miss the parameter sets the SDP advertises.
5456//
5557// LiveSource does not own the producer's lifecycle; callers stop the
5658// producer separately when they cancel the Source's ctx.
@@ -69,6 +71,13 @@ type LiveSource struct {
6971 readyOnce sync.Once
7072 ready chan struct {}
7173
74+ // SPS/PPS cache for in-band repetition. Some producers (V4L2 /
75+ // libcamera path on Pi 3) emit parameter sets only at stream start, so
76+ // writeAU re-prepends them to every IDR that arrives without them.
77+ // Accessed only from Run, so no mutex is needed.
78+ psSPS []byte
79+ psPPS []byte
80+
7281 // gopMu guards gopCache. The cache holds the RTP packets emitted for
7382 // the most recent complete (or in-progress) GOP — i.e. the last IDR's
7483 // packets followed by every subsequent non-IDR's packets, until the
@@ -92,11 +101,9 @@ type cachedPacket struct {
92101}
93102
94103// NewLiveSource builds a LiveSource for the codec described by probe. SPS
95- // and PPS may be empty on the probe — they are extracted from the first
96- // IDR access unit and written into the gortsplib H264 format before the
97- // ready signal releases DESCRIBE, so the SDP carries sprop-parameter-sets
98- // and clients are not stuck on "non-existing PPS referenced". logger may
99- // be nil.
104+ // and PPS may be empty on the probe — they are extracted from the live
105+ // stream and mirrored into the gortsplib H264 format before Ready releases
106+ // DESCRIBE so the SDP carries sprop-parameter-sets. logger may be nil.
100107func NewLiveSource (probe * ProbeResult , logger * slog.Logger ) * LiveSource {
101108 if logger == nil {
102109 logger = obs .Discard ()
@@ -148,8 +155,8 @@ func (l *LiveSource) AttachStream(stream *gortsplib.ServerStream, media *descrip
148155 l .media = media
149156}
150157
151- // Ready satisfies Source. Blocks until the first IDR has been observed or
152- // ctx is canceled.
158+ // Ready satisfies Source. Blocks until SPS, PPS, and an IDR from the same
159+ // access unit have been observed, or ctx is canceled.
153160func (l * LiveSource ) Ready (ctx context.Context ) error {
154161 select {
155162 case <- l .ready :
@@ -160,8 +167,8 @@ func (l *LiveSource) Ready(ctx context.Context) error {
160167}
161168
162169// Run drains the AU channel until ctx is canceled or the channel is closed.
163- // Pre-IDR frames are dropped so DESCRIBE returns only when subsequent frames
164- // can decode standalone.
170+ // Frames before the source has seen SPS, PPS, and an IDR are dropped so
171+ // DESCRIBE returns only when subsequent frames can decode standalone.
165172func (l * LiveSource ) Run (ctx context.Context ) error {
166173 enc := & rtph264.Encoder {
167174 PayloadType : 96 ,
@@ -179,17 +186,14 @@ func (l *LiveSource) Run(ctx context.Context) error {
179186 if ! ok {
180187 return nil
181188 }
189+ l .absorbParameterSets (au .NALs )
182190 isIDR := hasH264IDR (au .NALs )
183- if isIDR {
184- l .fillH264ParameterSets (au .NALs )
191+ if isIDR && len (l .psSPS ) > 0 && len (l .psPPS ) > 0 {
185192 l .readyOnce .Do (func () { close (l .ready ) })
186193 }
187194 select {
188195 case <- l .ready :
189196 default :
190- // Pre-IDR frame: drop so the first packet a client sees
191- // after DESCRIBE is the keyframe that joinable decoding
192- // requires.
193197 continue
194198 }
195199 if err := l .writeAU (enc , au , isIDR ); err != nil {
@@ -203,7 +207,11 @@ func (l *LiveSource) Run(ctx context.Context) error {
203207}
204208
205209func (l * LiveSource ) writeAU (enc * rtph264.Encoder , au AccessUnit , isIDR bool ) error {
206- pkts , err := enc .Encode (au .NALs )
210+ nals := au .NALs
211+ if isIDR {
212+ nals = l .maybePrependParameterSets (nals )
213+ }
214+ pkts , err := enc .Encode (nals )
207215 if err != nil {
208216 l .logger .Warn ("rtsp livesource: rtp encode" , "err" , err )
209217 return nil
@@ -305,42 +313,95 @@ func (l *LiveSource) ReplayGOP(ss *gortsplib.ServerSession) {
305313 l .logger .Debug ("rtsp livesource: replayed gop" , "packets" , len (entries ))
306314}
307315
308- // fillH264ParameterSets pulls SPS (NAL type 7) and PPS (NAL type 8) out of
309- // the first IDR access unit and writes them into the attached gortsplib
310- // format.H264 so DESCRIBE emits sprop-parameter-sets in the SDP.
311- //
312- // Without this, clients that subscribe between IDRs never see SPS/PPS in
313- // either the SDP (empty) or in-band (already passed) and decode loops
314- // indefinitely on "non-existing PPS referenced".
315- func (l * LiveSource ) fillH264ParameterSets (nals [][]byte ) {
316+ // absorbParameterSets caches any SPS/PPS found in nals and mirrors the
317+ // latest pre-ready values into the gortsplib format so the SDP
318+ // sprop-parameter-sets stays current. Runs on every AU because some producers
319+ // emit parameter sets in a frame separate from the first IDR.
320+ func (l * LiveSource ) absorbParameterSets (nals [][]byte ) {
321+ var newSPS , newPPS []byte
322+ for _ , nal := range nals {
323+ if len (nal ) == 0 {
324+ continue
325+ }
326+ switch nal [0 ] & 0x1F {
327+ case h264NALTypeSPS :
328+ newSPS = nal
329+ case h264NALTypePPS :
330+ newPPS = nal
331+ }
332+ }
333+ if newSPS == nil && newPPS == nil {
334+ return
335+ }
336+ changed := false
337+ if newSPS != nil && ! bytes .Equal (l .psSPS , newSPS ) {
338+ l .psSPS = bytes .Clone (newSPS )
339+ changed = true
340+ }
341+ if newPPS != nil && ! bytes .Equal (l .psPPS , newPPS ) {
342+ l .psPPS = bytes .Clone (newPPS )
343+ changed = true
344+ }
345+ if changed {
346+ l .mirrorParameterSetsToFormat ()
347+ }
348+ }
349+
350+ func (l * LiveSource ) mirrorParameterSetsToFormat () {
351+ if l .isReady () {
352+ return
353+ }
316354 if l .media == nil || len (l .media .Formats ) == 0 {
317355 return
318356 }
319357 h , ok := l .media .Formats [0 ].(* format.H264 )
320358 if ! ok {
321359 return
322360 }
323- if len (h .SPS ) > 0 && len (h .PPS ) > 0 {
324- return
361+ if len (l .psSPS ) > 0 {
362+ h .SPS = bytes .Clone (l .psSPS )
363+ }
364+ if len (l .psPPS ) > 0 {
365+ h .PPS = bytes .Clone (l .psPPS )
366+ }
367+ }
368+
369+ func (l * LiveSource ) isReady () bool {
370+ select {
371+ case <- l .ready :
372+ return true
373+ default :
374+ return false
325375 }
326- var sps , pps []byte
376+ }
377+
378+ // maybePrependParameterSets returns nals with cached SPS/PPS prepended
379+ // when either is missing in-band. The input slice is not modified.
380+ func (l * LiveSource ) maybePrependParameterSets (nals [][]byte ) [][]byte {
381+ var hasSPS , hasPPS bool
327382 for _ , nal := range nals {
328383 if len (nal ) == 0 {
329384 continue
330385 }
331386 switch nal [0 ] & 0x1F {
332387 case h264NALTypeSPS :
333- sps = nal
388+ hasSPS = true
334389 case h264NALTypePPS :
335- pps = nal
390+ hasPPS = true
336391 }
337392 }
338- if len (h .SPS ) == 0 && len (sps ) > 0 {
339- h .SPS = sps
393+ if hasSPS && hasPPS {
394+ return nals
395+ }
396+ out := make ([][]byte , 0 , len (nals )+ 2 )
397+ if ! hasSPS && len (l .psSPS ) > 0 {
398+ out = append (out , l .psSPS )
340399 }
341- if len ( h . PPS ) == 0 && len (pps ) > 0 {
342- h . PPS = pps
400+ if ! hasPPS && len (l . psPPS ) > 0 {
401+ out = append ( out , l . psPPS )
343402 }
403+ out = append (out , nals ... )
404+ return out
344405}
345406
346407// hasH264IDR scans access-unit NAL units for an IDR slice (NAL type 5).
0 commit comments