-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathsource.go
140 lines (116 loc) · 2.87 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Package unix contains the UNIX static source.
package unix
import (
"fmt"
"net"
"os"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/mpegts"
"github.com/bluenviron/mediamtx/internal/stream"
)
// Source is a unix static source.
type Source struct {
ReadTimeout conf.Duration
Parent defs.StaticSourceParent
}
// Log implements logger.Writer.
func (s *Source) Log(level logger.Level, format string, args ...interface{}) {
s.Parent.Log(level, "[unix source] "+format, args...)
}
// Run implements StaticSource.
func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting")
network, address, err := net.SplitHostPort(params.ResolvedSource)
if err != nil {
return err
}
err = os.Remove(address)
if err != nil {
// not really important if it fails
s.Log(logger.Debug, "Failed to remove previous unix socket", err)
}
var socket net.Listener
socket, err = net.Listen(network, address)
if err != nil {
return err
}
defer socket.Close()
conn, err := socket.Accept()
if err != nil {
return err
}
defer conn.Close()
readerErr := make(chan error)
go func() {
readerErr <- s.runReader(conn)
}()
select {
case err := <-readerErr:
return err
case <-params.Context.Done():
socket.Close()
<-readerErr
return fmt.Errorf("terminated")
}
}
func (s *Source) runReader(conn net.Conn) error {
conn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
r := &mcmpegts.Reader{
R: conn,
}
err := r.Initialize()
if err != nil {
return err
}
decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
},
}
decodeErrors.Start()
defer decodeErrors.Stop()
r.OnDecodeError(func(_ error) {
decodeErrors.Increase()
})
var stream *stream.Stream
medias, err := mpegts.ToStream(r, &stream, s)
if err != nil {
return err
}
res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,
})
if res.Err != nil {
return res.Err
}
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
stream = res.Stream
for {
conn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
err := r.Read()
if err != nil {
return err
}
}
}
// APISourceDescribe implements StaticSource.
func (*Source) APISourceDescribe() defs.APIPathSourceOrReader {
return defs.APIPathSourceOrReader{
Type: "unixSource",
ID: "",
}
}