Skip to content

feat: add support for MPEG-TS over unix #4389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Live streams can be published to the server with:
|[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G711 (PCMA, PCMU), LPCM|
|[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, [H265](#supported-browsers-1), H264|Opus, MPEG-4 Audio (AAC)|
|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
|[Unix/MPEG-TS](#unixmpeg-ts)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
|[Raspberry Pi Cameras](#raspberry-pi-cameras)||H264||

Live streams can be read from the server with:
Expand Down Expand Up @@ -858,6 +859,26 @@ The resulting stream will be available in path `/mypath`.

Known clients that can publish with WebRTC and WHIP are [FFmpeg](#ffmpeg) and [GStreamer](#gstreamer).

#### Unix/MPEG-TS

The server supports ingesting Unix/MPEG-TS packets (i.e. MPEG-TS packets sent over unix sockets). For instance, you can generate a multicast Unix/MPEG-TS stream with FFmpeg:

```sh
ffmpeg -re -f lavfi -i testsrc=size=1280x720:rate=30 \
-c:v libx264 -pix_fmt yuv420p -preset ultrafast -b:v 600k \
-f mpegts unix:/tmp/ffmpeg.socket
```

Edit `mediamtx.yml` and replace everything inside section `paths` with the following content:

```yml
paths:
mypath:
source: unix:/tmp/ffmpeg.socket
```

The resulting stream will be available in path `/mypath`.

## Read from the server

### By software
Expand Down
6 changes: 6 additions & 0 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,12 @@
return fmt.Errorf("'%s' is not a valid UDP URL", pconf.Source)
}

case strings.HasPrefix(pconf.Source, "unix:"):
_, _, err := net.SplitHostPort(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid unix socket", pconf.Source)
}

Check warning on line 403 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L399-L403

Added lines #L399 - L403 were not covered by tests

case strings.HasPrefix(pconf.Source, "srt://"):
_, err := gourl.Parse(pconf.Source)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions internal/core/static_source_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
rtspsource "github.com/bluenviron/mediamtx/internal/staticsources/rtsp"
srtsource "github.com/bluenviron/mediamtx/internal/staticsources/srt"
udpsource "github.com/bluenviron/mediamtx/internal/staticsources/udp"
unixsource "github.com/bluenviron/mediamtx/internal/staticsources/unix"
webrtcsource "github.com/bluenviron/mediamtx/internal/staticsources/webrtc"
)

Expand Down Expand Up @@ -102,6 +103,12 @@
Parent: s,
}

case strings.HasPrefix(s.conf.Source, "unix:"):
s.instance = &unixsource.Source{
ReadTimeout: s.readTimeout,
Parent: s,
}

Check warning on line 110 in internal/core/static_source_handler.go

View check run for this annotation

Codecov / codecov/patch

internal/core/static_source_handler.go#L106-L110

Added lines #L106 - L110 were not covered by tests

case strings.HasPrefix(s.conf.Source, "srt://"):
s.instance = &srtsource.Source{
ReadTimeout: s.readTimeout,
Expand Down
140 changes: 140 additions & 0 deletions internal/staticsources/unix/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Package unix contains the UNIX static source.
package unix

import (
"fmt"
"net"
"os"
"time"

"github.com/bluenviron/gortsplib/v4/pkg/description"
mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"

"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/mpegts"
"github.com/bluenviron/mediamtx/internal/stream"
)

// Source is a unix static source.
type Source struct {
ReadTimeout conf.Duration
Parent defs.StaticSourceParent
}

// Log implements logger.Writer.
func (s *Source) Log(level logger.Level, format string, args ...interface{}) {
s.Parent.Log(level, "[unix source] "+format, args...)
}

// Run implements StaticSource.
func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting")

network, address, err := net.SplitHostPort(params.ResolvedSource)
if err != nil {
return err
}

Check warning on line 39 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L38-L39

Added lines #L38 - L39 were not covered by tests

err = os.Remove(address)
if err != nil {
// not really important if it fails
s.Log(logger.Debug, "Failed to remove previous unix socket", err)
}

var socket net.Listener
socket, err = net.Listen(network, address)
if err != nil {
return err
}

Check warning on line 51 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L50-L51

Added lines #L50 - L51 were not covered by tests
defer socket.Close()

conn, err := socket.Accept()
if err != nil {
return err
}

Check warning on line 57 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L56-L57

Added lines #L56 - L57 were not covered by tests
defer conn.Close()

readerErr := make(chan error)
go func() {
readerErr <- s.runReader(conn)
}()

select {
case err := <-readerErr:
return err

Check warning on line 67 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L66-L67

Added lines #L66 - L67 were not covered by tests

case <-params.Context.Done():
socket.Close()
<-readerErr
return fmt.Errorf("terminated")
}
}

func (s *Source) runReader(conn net.Conn) error {
conn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
r := &mcmpegts.Reader{
R: conn,
}
err := r.Initialize()
if err != nil {
return err
}

Check warning on line 84 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L83-L84

Added lines #L83 - L84 were not covered by tests

decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"

Check warning on line 94 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L88-L94

Added lines #L88 - L94 were not covered by tests
}())
},
}

decodeErrors.Start()
defer decodeErrors.Stop()

r.OnDecodeError(func(_ error) {
decodeErrors.Increase()
})

Check warning on line 104 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L103-L104

Added lines #L103 - L104 were not covered by tests

var stream *stream.Stream

medias, err := mpegts.ToStream(r, &stream, s)
if err != nil {
return err
}

Check warning on line 111 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L110-L111

Added lines #L110 - L111 were not covered by tests

res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,
})
if res.Err != nil {
return res.Err
}

Check warning on line 119 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L118-L119

Added lines #L118 - L119 were not covered by tests

defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})

stream = res.Stream

for {
conn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
err := r.Read()
if err != nil {
return err
}
}
}

// APISourceDescribe implements StaticSource.
func (*Source) APISourceDescribe() defs.APIPathSourceOrReader {
return defs.APIPathSourceOrReader{
Type: "unixSource",
ID: "",
}

Check warning on line 139 in internal/staticsources/unix/source.go

View check run for this annotation

Codecov / codecov/patch

internal/staticsources/unix/source.go#L135-L139

Added lines #L135 - L139 were not covered by tests
}
62 changes: 62 additions & 0 deletions internal/staticsources/unix/source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package unix

import (
"bufio"
"net"
"testing"
"time"

"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/stretchr/testify/require"

"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/test"
)

func TestSource(t *testing.T) {
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
},
"unix:/tmp/mediamtx-test.sock",
&conf.Path{},
)
defer te.Close()

time.Sleep(50 * time.Millisecond)

conn, err := net.Dial("unix", "/tmp/mediamtx-test.sock")
require.NoError(t, err)
defer conn.Close()

track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}

bw := bufio.NewWriter(conn)
w := &mpegts.Writer{
W: bw,
Tracks: []*mpegts.Track{track},
}
err = w.Initialize()
require.NoError(t, err)

err = w.WriteH264(track, 0, 0, [][]byte{{ // IDR
5, 1,
}})
require.NoError(t, err)

err = w.WriteH264(track, 0, 0, [][]byte{{ // non-IDR
5, 2,
}})
require.NoError(t, err)

err = bw.Flush()
require.NoError(t, err)

<-te.Unit
}