Skip to content

Commit

Permalink
Rewrite exec handler
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Feb 23, 2025
1 parent 7d41dc2 commit 6fb5994
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 71 deletions.
39 changes: 0 additions & 39 deletions internal/exec/closer.go

This file was deleted.

60 changes: 37 additions & 23 deletions internal/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"io"
"net/url"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"

"github.com/AlexxIT/go2rtc/internal/app"
Expand Down Expand Up @@ -49,7 +49,7 @@ func Init() {
log = app.GetLogger("exec")
}

func execHandle(rawURL string) (core.Producer, error) {
func execHandle(rawURL string) (prod core.Producer, err error) {
rawURL, rawQuery, _ := strings.Cut(rawURL, "#")
query := streams.ParseQuery(rawQuery)

Expand All @@ -67,39 +67,55 @@ func execHandle(rawURL string) (core.Producer, error) {
rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:]
}

args := shell.QuoteSplit(rawURL[5:]) // remove `exec:`
cmd := exec.Command(args[0], args[1:]...)
cmd := shell.NewCommand(rawURL[5:]) // remove `exec:`
cmd.Stderr = &logWriter{
buf: make([]byte, 512),
debug: log.Debug().Enabled(),
}

if s := query.Get("killsignal"); s != "" {
sig := syscall.Signal(core.Atoi(s))
cmd.Cancel = func() error {
log.Debug().Msgf("[exec] kill with signal=%d", sig)
return cmd.Process.Signal(sig)
}
}

if s := query.Get("killtimeout"); s != "" {
cmd.WaitDelay = time.Duration(core.Atoi(s)) * time.Second
}

if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
}

cl := &closer{cmd: cmd, query: query}

if path == "" {
return handlePipe(rawURL, cmd, cl)
prod, err = handlePipe(rawURL, cmd)
} else {
prod, err = handleRTSP(rawURL, cmd, path)
}

return handleRTSP(rawURL, cmd, cl, path)
if err != nil {
_ = cmd.Close()
}

return
}

func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, error) {
func handlePipe(source string, cmd *shell.Command) (core.Producer, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}

rc := struct {
rd := struct {
io.Reader
io.Closer
}{
// add buffer for pipe reader to reduce syscall
bufio.NewReaderSize(stdout, core.BufferSize),
cl,
// stop cmd on close pipe call
cmd,
}

log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe")
Expand All @@ -110,9 +126,8 @@ func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, erro
return nil, err
}

prod, err := magic.Open(rc)
prod, err := magic.Open(rd)
if err != nil {
_ = rc.Close()
return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr)
}

Expand All @@ -126,7 +141,7 @@ func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, erro
return prod, nil
}

func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.Producer, error) {
func handleRTSP(source string, cmd *shell.Command, path string) (core.Producer, error) {
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}
Expand All @@ -152,23 +167,22 @@ func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.P
return nil, err
}

done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()

select {
case <-time.After(time.Minute):
case <-timeout.C:
// haven't received data from app in timeout
log.Error().Str("source", source).Msg("[exec] timeout")
_ = cl.Close()
return nil, errors.New("exec: timeout")
case <-done:
// limit message size
case <-cmd.Done():
// app fail before we receive any data
return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr)
case prod := <-waiter:
// app started successfully
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp")
setRemoteInfo(prod, source, cmd.Args)
prod.OnClose = cl.Close
prod.OnClose = cmd.Close
return prod, nil
}
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/shell/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package shell

import (
"context"
"os/exec"
)

// Command like exec.Cmd, but with support:
// - io.Closer interface
// - Wait from multiple places
// - Done channel
type Command struct {
*exec.Cmd
ctx context.Context
cancel context.CancelFunc
err error
}

func NewCommand(s string) *Command {
ctx, cancel := context.WithCancel(context.Background())
args := QuoteSplit(s)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.SysProcAttr = procAttr
return &Command{cmd, ctx, cancel, nil}
}

func (c *Command) Start() error {
if err := c.Cmd.Start(); err != nil {
return err
}

go func() {
c.err = c.Cmd.Wait()
c.cancel() // release context resources
}()

return nil
}

func (c *Command) Wait() error {
<-c.ctx.Done()
return c.err
}

func (c *Command) Run() error {
if err := c.Start(); err != nil {
return err
}
return c.Wait()
}

func (c *Command) Done() <-chan struct{} {
return c.ctx.Done()
}

func (c *Command) Close() error {
c.cancel()
return nil
}
7 changes: 7 additions & 0 deletions pkg/shell/procattr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//go:build !linux

package shell

import "syscall"

var procAttr *syscall.SysProcAttr
6 changes: 6 additions & 0 deletions pkg/shell/procattr_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package shell

import "syscall"

// will stop child if parent died (even with SIGKILL)
var procAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
6 changes: 1 addition & 5 deletions pkg/stdin/backchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stdin

import (
"encoding/json"
"errors"

"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
Expand Down Expand Up @@ -42,10 +41,7 @@ func (c *Client) Stop() (err error) {
if c.sender != nil {
c.sender.Close()
}
if c.cmd.Process == nil {
return nil
}
return errors.Join(c.cmd.Process.Kill(), c.cmd.Wait())
return c.cmd.Close()
}

func (c *Client) MarshalJSON() ([]byte, error) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/stdin/client.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package stdin

import (
"os/exec"

"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
)

// Deprecated: should be rewritten to core.Connection
type Client struct {
cmd *exec.Cmd
cmd *shell.Command

medias []*core.Media
sender *core.Sender
send int
}

func NewClient(cmd *exec.Cmd) (*Client, error) {
func NewClient(cmd *shell.Command) (*Client, error) {
c := &Client{
cmd: cmd,
medias: []*core.Media{
Expand Down

0 comments on commit 6fb5994

Please sign in to comment.