Skip to content

Commit 37d7409

Browse files
authored
Merge pull request #1762 from seydx/preload
Preload Streams
2 parents 549da02 + 4dd1f73 commit 37d7409

File tree

6 files changed

+194
-31
lines changed

6 files changed

+194
-31
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ Ultimate camera streaming application with support for RTSP, WebRTC, HomeKit, FF
7676
* [Incoming sources](#incoming-sources)
7777
* [Stream to camera](#stream-to-camera)
7878
* [Publish stream](#publish-stream)
79+
* [Preload stream](#preload-stream)
7980
* [Module: API](#module-api)
8081
* [Module: RTSP](#module-rtsp)
8182
* [Module: RTMP](#module-rtmp)
@@ -835,6 +836,26 @@ streams:
835836
- **Telegram Desktop App** > Any public or private channel or group (where you admin) > Live stream > Start with... > Start streaming.
836837
- **YouTube** > Create > Go live > Stream latency: Ultra low-latency > Copy: Stream URL + Stream key.
837838

839+
### Preload stream
840+
841+
You can preload any stream on go2rtc start. This is useful for cameras that take a long time to start up.
842+
843+
```yaml
844+
preload:
845+
camera1: # default: video&audio = ANY
846+
camera2: "video" # preload only video track
847+
camera3: "video=h264&audio=opus" # preload H264 video and OPUS audio
848+
849+
streams:
850+
camera1:
851+
- rtsp://192.168.1.100/stream
852+
camera2:
853+
- rtsp://192.168.1.101/stream
854+
camera3:
855+
- rtsp://192.168.1.102/h265stream
856+
- ffmpeg:camera3#video=h264#audio=opus#hardware
857+
```
858+
838859
### Module: API
839860

840861
The HTTP API is the main part for interacting with the application. Default address: `http://localhost:1984/`.

api/openapi.yaml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,54 @@ paths:
237237

238238

239239

240+
/api/preload:
241+
put:
242+
summary: Preload new stream
243+
tags: [ Streams list ]
244+
parameters:
245+
- name: src
246+
in: query
247+
description: Stream source (name)
248+
required: true
249+
schema: { type: string }
250+
example: "camera1"
251+
- name: video
252+
in: query
253+
description: Video codecs filter
254+
required: false
255+
schema: { type: string }
256+
example: all,h264,h265,...
257+
- name: audio
258+
in: query
259+
description: Audio codecs filter
260+
required: false
261+
schema: { type: string }
262+
example: all,aac,opus,...
263+
- name: microphone
264+
in: query
265+
description: Microphone codecs filter
266+
required: false
267+
schema: { type: string }
268+
example: all,aac,opus,...
269+
responses:
270+
default:
271+
description: Default response
272+
delete:
273+
summary: Delete preloaded stream
274+
tags: [ Streams list ]
275+
parameters:
276+
- name: src
277+
in: query
278+
description: Stream source (name)
279+
required: true
280+
schema: { type: string }
281+
example: "camera1"
282+
responses:
283+
default:
284+
description: Default response
285+
286+
287+
240288
/api/streams?src={src}:
241289
get:
242290
summary: Get stream info in JSON format

internal/streams/api.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/AlexxIT/go2rtc/internal/api"
77
"github.com/AlexxIT/go2rtc/internal/app"
8+
"github.com/AlexxIT/go2rtc/pkg/core"
89
"github.com/AlexxIT/go2rtc/pkg/probe"
910
)
1011

@@ -27,7 +28,7 @@ func apiStreams(w http.ResponseWriter, r *http.Request) {
2728
return
2829
}
2930

30-
cons := probe.NewProbe(query)
31+
cons := probe.Create("probe", query)
3132
if len(cons.Medias) != 0 {
3233
cons.WithRequest(r)
3334
if err := stream.AddConsumer(cons); err != nil {
@@ -122,3 +123,51 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) {
122123

123124
api.Response(w, dot, "text/vnd.graphviz")
124125
}
126+
127+
func apiPreload(w http.ResponseWriter, r *http.Request) {
128+
query := r.URL.Query()
129+
src := query.Get("src")
130+
131+
// check if stream exists
132+
stream := Get(src)
133+
if stream == nil {
134+
http.Error(w, "", http.StatusNotFound)
135+
return
136+
}
137+
138+
switch r.Method {
139+
case "PUT":
140+
// it's safe to delete from map while iterating
141+
for k := range query {
142+
switch k {
143+
case core.KindVideo, core.KindAudio, "microphone":
144+
default:
145+
delete(query, k)
146+
}
147+
}
148+
149+
rawQuery := query.Encode()
150+
151+
if err := AddPreload(stream, rawQuery); err != nil {
152+
http.Error(w, err.Error(), http.StatusInternalServerError)
153+
return
154+
}
155+
156+
if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil {
157+
http.Error(w, err.Error(), http.StatusInternalServerError)
158+
}
159+
160+
case "DELETE":
161+
if err := DelPreload(stream); err != nil {
162+
http.Error(w, err.Error(), http.StatusInternalServerError)
163+
return
164+
}
165+
166+
if err := app.PatchConfig([]string{"preload", src}, nil); err != nil {
167+
http.Error(w, err.Error(), http.StatusInternalServerError)
168+
}
169+
170+
default:
171+
http.Error(w, "", http.StatusMethodNotAllowed)
172+
}
173+
}

internal/streams/preload.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package streams
2+
3+
import (
4+
"errors"
5+
"net/url"
6+
"sync"
7+
8+
"github.com/AlexxIT/go2rtc/pkg/probe"
9+
)
10+
11+
var preloads = map[*Stream]*probe.Probe{}
12+
var preloadsMu sync.Mutex
13+
14+
func Preload(stream *Stream, rawQuery string) {
15+
if err := AddPreload(stream, rawQuery); err != nil {
16+
log.Error().Err(err).Caller().Send()
17+
}
18+
}
19+
20+
func AddPreload(stream *Stream, rawQuery string) error {
21+
if rawQuery == "" {
22+
rawQuery = "video&audio"
23+
}
24+
25+
query, err := url.ParseQuery(rawQuery)
26+
if err != nil {
27+
return err
28+
}
29+
30+
preloadsMu.Lock()
31+
defer preloadsMu.Unlock()
32+
33+
if cons := preloads[stream]; cons != nil {
34+
stream.RemoveConsumer(cons)
35+
}
36+
37+
cons := probe.Create("preload", query)
38+
39+
if err = stream.AddConsumer(cons); err != nil {
40+
return err
41+
}
42+
43+
preloads[stream] = cons
44+
return nil
45+
}
46+
47+
func DelPreload(stream *Stream) error {
48+
preloadsMu.Lock()
49+
defer preloadsMu.Unlock()
50+
51+
if cons := preloads[stream]; cons != nil {
52+
stream.RemoveConsumer(cons)
53+
delete(preloads, stream)
54+
return nil
55+
}
56+
57+
return errors.New("streams: preload not found")
58+
}

internal/streams/streams.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ import (
1414

1515
func Init() {
1616
var cfg struct {
17-
Streams map[string]any `yaml:"streams"`
18-
Publish map[string]any `yaml:"publish"`
17+
Streams map[string]any `yaml:"streams"`
18+
Publish map[string]any `yaml:"publish"`
19+
Preload map[string]string `yaml:"preload"`
1920
}
2021

2122
app.LoadConfig(&cfg)
@@ -28,17 +29,24 @@ func Init() {
2829

2930
api.HandleFunc("api/streams", apiStreams)
3031
api.HandleFunc("api/streams.dot", apiStreamsDOT)
32+
api.HandleFunc("api/preload", apiPreload)
3133

32-
if cfg.Publish == nil {
34+
if cfg.Publish == nil && cfg.Preload == nil {
3335
return
3436
}
3537

3638
time.AfterFunc(time.Second, func() {
39+
// range for nil map is OK
3740
for name, dst := range cfg.Publish {
3841
if stream := Get(name); stream != nil {
3942
Publish(stream, dst)
4043
}
4144
}
45+
for name, rawQuery := range cfg.Preload {
46+
if stream := Get(name); stream != nil {
47+
Preload(stream, rawQuery)
48+
}
49+
}
4250
})
4351
}
4452

pkg/probe/producer.go renamed to pkg/probe/consumer.go

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Probe struct {
1111
core.Connection
1212
}
1313

14-
func NewProbe(query url.Values) *Probe {
14+
func Create(name string, query url.Values) *Probe {
1515
medias := core.ParseQuery(query)
1616

1717
for _, value := range query["microphone"] {
@@ -32,39 +32,18 @@ func NewProbe(query url.Values) *Probe {
3232
return &Probe{
3333
Connection: core.Connection{
3434
ID: core.NewID(),
35-
FormatName: "probe",
35+
FormatName: name,
3636
Medias: medias,
3737
},
3838
}
3939
}
4040

41-
func (p *Probe) GetMedias() []*core.Media {
42-
return p.Medias
43-
}
44-
4541
func (p *Probe) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
4642
sender := core.NewSender(media, track.Codec)
47-
sender.Bind(track)
48-
p.Senders = append(p.Senders, sender)
49-
return nil
50-
}
51-
52-
func (p *Probe) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
53-
receiver := core.NewReceiver(media, codec)
54-
p.Receivers = append(p.Receivers, receiver)
55-
return receiver, nil
56-
}
57-
58-
func (p *Probe) Start() error {
59-
return nil
60-
}
61-
62-
func (p *Probe) Stop() error {
63-
for _, receiver := range p.Receivers {
64-
receiver.Close()
65-
}
66-
for _, sender := range p.Senders {
67-
sender.Close()
43+
sender.Handler = func(pkt *core.Packet) {
44+
p.Send += len(pkt.Payload)
6845
}
46+
sender.HandleRTP(track)
47+
p.Senders = append(p.Senders, sender)
6948
return nil
7049
}

0 commit comments

Comments
 (0)