Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEMP - ffmpeg output debugging #3406

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions media/mediamtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
if err != nil {
return fmt.Errorf("failed to kick connection: %w", err)
}
defer resp.Body.Close()

Check warning on line 80 in media/mediamtx.go

View check run for this annotation

Codecov / codecov/patch

media/mediamtx.go#L80

Added line #L80 was not covered by tests
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("kick connection failed with status code: %d body: %s", resp.StatusCode, body)
Expand All @@ -100,6 +101,7 @@
if err != nil {
return false, fmt.Errorf("failed to get stream: %w", err)
}
defer resp.Body.Close()

Check warning on line 104 in media/mediamtx.go

View check run for this annotation

Codecov / codecov/patch

media/mediamtx.go#L104

Added line #L104 was not covered by tests
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest {
body, _ := io.ReadAll(resp.Body)
return false, fmt.Errorf("get stream failed with status code: %d body: %s", resp.StatusCode, body)
Expand Down
10 changes: 8 additions & 2 deletions media/rtmp2segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@

clog.Infof(ctx, "Starting segmentation for %s", outFilePattern)

// TODO processSegments needs to also be re-invoked after each retry;
// processes that don't immediately fail are not fully retryable otherwise
//

Check warning on line 49 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L47-L49

Added lines #L47 - L49 were not covered by tests
// create first named pipe to preempt races between ffmpeg and processSegments
createNamedPipe(fmt.Sprintf(outFilePattern, 0))
go func() {
Expand All @@ -69,8 +72,12 @@
clog.Errorf(ctx, "Stopping segmentation in=%s err=%s", in, err)
break
}
if retryCount > 0 {
time.Sleep(5 * time.Second)
}

Check warning on line 77 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L75-L77

Added lines #L75 - L77 were not covered by tests
clog.Infof(ctx, "Starting segmentation. in=%s retryCount=%d", in, retryCount)
cmd := exec.CommandContext(procCtx, "ffmpeg",
"-analyzeduration", "2500000", // 2.5 seconds

Check warning on line 80 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L80

Added line #L80 was not covered by tests
"-i", in,
"-c:a", "copy",
"-c:v", "copy",
Expand All @@ -79,11 +86,10 @@
)
output, err := cmd.CombinedOutput()
if err != nil {
clog.Errorf(ctx, "Error receiving RTMP: %v", err)
clog.Errorf(ctx, "Error receiving RTMP: %v ffmpeg output: %s", err, output)

Check warning on line 89 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L89

Added line #L89 was not covered by tests
break
}
clog.Infof(ctx, "Segmentation stopped, will retry. retryCount=%d ffmpeg output: %s", retryCount, output)
time.Sleep(5 * time.Second)
retryCount++
}
completionSignal <- true
Expand Down
11 changes: 9 additions & 2 deletions media/segment_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
type SwitchableSegmentReader struct {
mu sync.RWMutex
reader SegmentHandler
seg CloneableReader
}

func NewSwitchableSegmentReader() *SwitchableSegmentReader {
Expand All @@ -35,12 +36,18 @@
sr.mu.Lock()
defer sr.mu.Unlock()
sr.reader = newReader
if sr.seg != nil {
// immediately send the current segment instead of waiting for the next one
// clone since current segment may have already been partially consumed
sr.reader(sr.seg.Clone())
}

Check warning on line 43 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L39-L43

Added lines #L39 - L43 were not covered by tests
}

func (sr *SwitchableSegmentReader) Read(reader CloneableReader) {
sr.mu.RLock()
defer sr.mu.RUnlock()
sr.mu.Lock()
defer sr.mu.Unlock()

Check warning on line 48 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L47-L48

Added lines #L47 - L48 were not covered by tests
sr.reader(reader)
sr.seg = reader

Check warning on line 50 in media/segment_reader.go

View check run for this annotation

Codecov / codecov/patch

media/segment_reader.go#L50

Added line #L50 was not covered by tests
}

func (sr *SwitchableSegmentReader) Close() {
Expand Down
41 changes: 32 additions & 9 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"bufio"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -154,7 +155,7 @@
func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, onFistSegment func()) {
// subscribe to the outputs and send them into LPMS
subscriber := trickle.NewTrickleSubscriber(url.String())
r, w, err := os.Pipe()
_, w, err := os.Pipe()

Check warning on line 158 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L158

Added line #L158 was not covered by tests
if err != nil {
params.liveParams.stopPipeline(fmt.Errorf("error getting pipe for trickle-ffmpeg. url=%s %w", url, err))
return
Expand All @@ -168,7 +169,7 @@
ctx = clog.AddVal(ctx, "outputRTMPURL", params.liveParams.outputRTMPURL)
ctx = clog.AddVal(ctx, "mediaMTXOutputRTMPURL", params.liveParams.mediaMTXOutputRTMPURL)

multiWriter := &multiWriter{ctx: ctx, writers: []io.Writer{w, wMediaMTX}}
// multiWriter := &multiWriter{ctx: ctx, writers: []io.Writer{w, wMediaMTX}}

Check warning on line 172 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L172

Added line #L172 was not covered by tests

// read segments from trickle subscription
go func() {
Expand Down Expand Up @@ -215,7 +216,7 @@
seq := trickle.GetSeq(segment)
clog.V(8).Infof(ctx, "trickle subscribe read data received seq=%d", seq)

n, err := copySegment(segment, multiWriter)
n, err := copySegment(segment, wMediaMTX, seq)

Check warning on line 219 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L219

Added line #L219 was not covered by tests
if err != nil {
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err))
return
Expand All @@ -229,7 +230,7 @@
}()

// Studio Output ffmpeg process
go ffmpegOutput(ctx, params.liveParams.outputRTMPURL, r, params)
// go ffmpegOutput(ctx, params.liveParams.outputRTMPURL, r, params)

// MediaMTX Output ffmpeg process
go ffmpegOutput(ctx, params.liveParams.mediaMTXOutputRTMPURL, rMediaMTX, params)
Expand All @@ -250,22 +251,35 @@
}
}()
for {
clog.V(6).Infof(ctx, "Starting output rtmp")
clog.V(6).Infof(ctx, "Starting output rtmp to %s", outputUrl)

Check warning on line 254 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L254

Added line #L254 was not covered by tests
if !params.inputStreamExists() {
clog.Errorf(ctx, "Stopping output rtmp stream, input stream does not exist.")
break
}

cmd := exec.Command("ffmpeg",
"-loglevel", "debug",
"-analyzeduration", "2500000", // 2.5 seconds
"-skip_estimate_duration_from_pts", "true",

Check warning on line 263 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L261-L263

Added lines #L261 - L263 were not covered by tests
"-i", "pipe:0",
"-c:a", "copy",
"-c:v", "copy",
"-f", "flv",
outputUrl,
)
cmd.Stdin = r
output, err := cmd.CombinedOutput()
clog.Infof(ctx, "Process output: %s", output)
stdout, err := cmd.StderrPipe()
if err != nil {
clog.Errorf(ctx, "Error getting stdout pipe %v", err)
}
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
clog.Infof(ctx, "ffmpeg output:%s", scanner.Text())
}

Check warning on line 279 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L271-L279

Added lines #L271 - L279 were not covered by tests
}()
cmd.Start()
err = cmd.Wait()

Check warning on line 282 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L281-L282

Added lines #L281 - L282 were not covered by tests
if err != nil {
clog.Errorf(ctx, "Error sending RTMP out: %v", err)
return
Expand All @@ -274,9 +288,18 @@
}
}

func copySegment(segment *http.Response, w io.Writer) (int64, error) {
func copySegment(segment *http.Response, w io.Writer, nb int) (int64, error) {

Check warning on line 291 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L291

Added line #L291 was not covered by tests
defer segment.Body.Close()
return io.Copy(w, segment.Body)
var r io.Reader = segment.Body
if nb < 10 {
outputfile, err := os.Create(fmt.Sprintf("/tmp/seg-%d.ts", nb))
if err != nil {
clog.Errorf(context.Background(), "Error creating first segment file: %v", err)
} else {
r = io.TeeReader(segment.Body, outputfile)
}

Check warning on line 300 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L293-L300

Added lines #L293 - L300 were not covered by tests
}
return io.Copy(w, r)

Check warning on line 302 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L302

Added line #L302 was not covered by tests
}

func startControlPublish(control *url.URL, params aiRequestParams) {
Expand Down
13 changes: 7 additions & 6 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,13 @@
// If auth webhook is set and returns an output URL, this will be replaced
outputURL := qp.Get("rtmpOutput")

mediaMTXOutputURL := fmt.Sprintf("rtmp://%s/aiWebrtc/%s-out", remoteHost, streamName)
// Currently for webrtc we need to add a path prefix due to the ingress setup
mediaMTXStreamPrefix := r.PathValue("prefix")
if mediaMTXStreamPrefix != "" {
mediaMTXStreamPrefix = mediaMTXStreamPrefix + "/"
}

Check warning on line 436 in server/ai_mediaserver.go

View check run for this annotation

Codecov / codecov/patch

server/ai_mediaserver.go#L432-L436

Added lines #L432 - L436 were not covered by tests

mediaMTXOutputURL := fmt.Sprintf("rtmp://%s/%s%s-out", remoteHost, mediaMTXStreamPrefix, streamName)

Check warning on line 438 in server/ai_mediaserver.go

View check run for this annotation

Codecov / codecov/patch

server/ai_mediaserver.go#L438

Added line #L438 was not covered by tests
if outputURL == "" {
// re-publish to ourselves for now
// Not sure if we want this to be permanent
Expand Down Expand Up @@ -515,11 +521,6 @@
// Kick off the RTMP pull and segmentation as soon as possible
ssr := media.NewSwitchableSegmentReader()
go func() {
// Currently for webrtc we need to add a path prefix due to the ingress setup
mediaMTXStreamPrefix := r.PathValue("prefix")
if mediaMTXStreamPrefix != "" {
mediaMTXStreamPrefix = mediaMTXStreamPrefix + "/"
}
ms := media.MediaSegmenter{Workdir: ls.LivepeerNode.WorkDir, MediaMTXClient: mediaMTXClient}
ms.RunSegmentation(ctx, fmt.Sprintf("rtmp://%s/%s%s", remoteHost, mediaMTXStreamPrefix, streamName), ssr.Read)
ssr.Close()
Expand Down
Loading