Skip to content

Swap Orchestrator if no first segment recevied before timeout #3398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.AIVerboseLogs = flag.Bool("aiVerboseLogs", *cfg.AIVerboseLogs, "Set to true to enable verbose logs for the AI runner containers created by the worker")
cfg.AIRunnerImageOverrides = flag.String("aiRunnerImageOverrides", *cfg.AIRunnerImageOverrides, `Specify overrides for the Docker images used by the AI runner. Example: '{"default": "livepeer/ai-runner:v1.0", "batch": {"text-to-speech": "livepeer/ai-runner:text-to-speech-v1.0"}, "live": {"another-pipeline": "livepeer/ai-runner:another-pipeline-v1.0"}}'`)
cfg.AIProcessingRetryTimeout = flag.Duration("aiProcessingRetryTimeout", *cfg.AIProcessingRetryTimeout, "Timeout for retrying to initiate AI processing request")
cfg.AIStartupOrchSwapTimeout = flag.Duration("aiStartupOrchSwapTimeout", *cfg.AIStartupOrchSwapTimeout, "Timeout to wait for Orchestrator return first output segment")
cfg.AIRunnerContainersPerGPU = flag.Int("aiRunnerContainersPerGPU", *cfg.AIRunnerContainersPerGPU, "Number of AI runner containers to run per GPU; default to 1")

// Live AI:
Expand Down
4 changes: 4 additions & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
AIRunnerImageOverrides *string
AIVerboseLogs *bool
AIProcessingRetryTimeout *time.Duration
AIStartupOrchSwapTimeout *time.Duration
AIRunnerContainersPerGPU *int
KafkaBootstrapServers *string
KafkaUsername *string
Expand Down Expand Up @@ -220,6 +221,7 @@
defaultAIRunnerImage := "livepeer/ai-runner:latest"
defaultAIVerboseLogs := false
defaultAIProcessingRetryTimeout := 2 * time.Second
defaultAIStartupOrchSwapTimeout := 15 * time.Second

Check warning on line 224 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L224

Added line #L224 was not covered by tests
defaultAIRunnerContainersPerGPU := 1
defaultAIRunnerImageOverrides := ""
defaultLiveAIAuthWebhookURL := ""
Expand Down Expand Up @@ -333,6 +335,7 @@
AIRunnerImage: &defaultAIRunnerImage,
AIVerboseLogs: &defaultAIVerboseLogs,
AIProcessingRetryTimeout: &defaultAIProcessingRetryTimeout,
AIStartupOrchSwapTimeout: &defaultAIStartupOrchSwapTimeout,

Check warning on line 338 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L338

Added line #L338 was not covered by tests
AIRunnerContainersPerGPU: &defaultAIRunnerContainersPerGPU,
AIRunnerImageOverrides: &defaultAIRunnerImageOverrides,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
Expand Down Expand Up @@ -523,6 +526,7 @@
glog.Errorf("Error creating livepeer node: %v", err)
}
n.AIProcesssingRetryTimeout = *cfg.AIProcessingRetryTimeout
n.AIStartupOrchSwapTimeout = *cfg.AIStartupOrchSwapTimeout

Check warning on line 529 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L529

Added line #L529 was not covered by tests

if *cfg.OrchSecret != "" {
n.OrchSecret, _ = common.ReadFromFile(*cfg.OrchSecret)
Expand Down
1 change: 1 addition & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type LivepeerNode struct {
AIWorker AI
AIWorkerManager *RemoteAIWorkerManager
AIProcesssingRetryTimeout time.Duration
AIStartupOrchSwapTimeout time.Duration

// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Expand Down
35 changes: 19 additions & 16 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@

func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) {
ctx = clog.AddVal(ctx, "url", url.Redacted())
mid := extractMid(url.Path)

Check warning on line 30 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L30

Added line #L30 was not covered by tests
publisher, err := trickle.NewTricklePublisher(url.String())
if err != nil {
clog.Infof(ctx, "error publishing trickle. err=%s", err)
params.liveParams.stopPipeline(fmt.Errorf("Error publishing trickle %w", err))
params.liveParams.stop(mid, fmt.Errorf("Error publishing trickle %w", err))

Check warning on line 34 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L34

Added line #L34 was not covered by tests
return
}

Expand All @@ -45,7 +46,7 @@
sess: sess.BroadcastSession,
inPixels: inPixels,
priceInfo: priceInfo,
mid: extractMid(url.Path),
mid: mid,

Check warning on line 49 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L49

Added line #L49 was not covered by tests
})
}
paymentProcessor = NewLivePaymentProcessor(ctx, params.liveParams.paymentProcessInterval, sendPaymentFunc)
Expand All @@ -69,7 +70,7 @@
thisSeq, atMax := slowOrchChecker.BeginSegment()
if atMax {
clog.Infof(ctx, "Orchestrator is slow - terminating")
params.liveParams.stopPipeline(fmt.Errorf("slow orchestrator"))
params.liveParams.stop(mid, fmt.Errorf("slow orchestrator"))

Check warning on line 73 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L73

Added line #L73 was not covered by tests
cancel()
return
// TODO switch orchestrators
Expand Down Expand Up @@ -119,7 +120,7 @@
}
if errors.Is(err, trickle.StreamNotFoundErr) {
clog.Infof(ctx, "Stream no longer exists on orchestrator; terminating")
params.liveParams.stopPipeline(fmt.Errorf("Stream does not exist"))
params.liveParams.stop(mid, fmt.Errorf("Stream does not exist"))

Check warning on line 123 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L123

Added line #L123 was not covered by tests
return
}
// Retry segment only if nothing has been sent yet
Expand Down Expand Up @@ -170,16 +171,17 @@
}

func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession, onFistSegment func()) {
mid := extractMid(url.Path)

Check warning on line 174 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L174

Added line #L174 was not covered by tests
// subscribe to the outputs and send them into LPMS
subscriber := trickle.NewTrickleSubscriber(url.String())
r, w, err := os.Pipe()
if err != nil {
params.liveParams.stopPipeline(fmt.Errorf("error getting pipe for trickle-ffmpeg. url=%s %w", url, err))
params.liveParams.stop(mid, fmt.Errorf("error getting pipe for trickle-ffmpeg. url=%s %w", url, err))

Check warning on line 179 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L179

Added line #L179 was not covered by tests
return
}
rMediaMTX, wMediaMTX, err := os.Pipe()
if err != nil {
params.liveParams.stopPipeline(fmt.Errorf("error getting pipe for MediaMTX trickle-ffmpeg. url=%s %w", url, err))
params.liveParams.stop(mid, fmt.Errorf("error getting pipe for MediaMTX trickle-ffmpeg. url=%s %w", url, err))

Check warning on line 184 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L184

Added line #L184 was not covered by tests
return
}
ctx = clog.AddVal(ctx, "url", url.Redacted())
Expand Down Expand Up @@ -210,7 +212,7 @@
segment, err = subscriber.Read()
if err != nil {
if errors.Is(err, trickle.EOS) || errors.Is(err, trickle.StreamNotFoundErr) {
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe end of stream: %w", err))
params.liveParams.stop(mid, fmt.Errorf("trickle subscribe end of stream: %w", err))

Check warning on line 215 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L215

Added line #L215 was not covered by tests
return
}
var sequenceNonexistent *trickle.SequenceNonexistent
Expand All @@ -222,7 +224,7 @@
err = fmt.Errorf("trickle subscribe error reading: %w", err)
clog.Infof(ctx, "%s", err)
if retries > maxRetries {
params.liveParams.stopPipeline(err)
params.liveParams.stop(mid, err)

Check warning on line 227 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L227

Added line #L227 was not covered by tests
return
}
retries++
Expand All @@ -236,7 +238,7 @@

n, err := copySegment(segment, multiWriter)
if err != nil {
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err))
params.liveParams.stop(mid, fmt.Errorf("trickle subscribe error copying: %w", err))

Check warning on line 241 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L241

Added line #L241 was not covered by tests
return
}
if firstSegment {
Expand Down Expand Up @@ -266,14 +268,14 @@

// Studio Output ffmpeg process
if params.liveParams.outputRTMPURL != "" {
go ffmpegOutput(ctx, params.liveParams.outputRTMPURL, r, params)
go ffmpegOutput(ctx, mid, params.liveParams.outputRTMPURL, r, params)

Check warning on line 271 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L271

Added line #L271 was not covered by tests
}

// MediaMTX Output ffmpeg process
go ffmpegOutput(ctx, params.liveParams.mediaMTXOutputRTMPURL, rMediaMTX, params)
go ffmpegOutput(ctx, mid, params.liveParams.mediaMTXOutputRTMPURL, rMediaMTX, params)

Check warning on line 275 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L275

Added line #L275 was not covered by tests
}

func ffmpegOutput(ctx context.Context, outputUrl string, r io.ReadCloser, params aiRequestParams) {
func ffmpegOutput(ctx context.Context, mid string, outputUrl string, r io.ReadCloser, params aiRequestParams) {

Check warning on line 278 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L278

Added line #L278 was not covered by tests
ctx = clog.AddVal(ctx, "rtmpOut", outputUrl)
defer func() {
r.Close()
Expand All @@ -284,7 +286,7 @@
err = errors.New("unknown error")
}
clog.Errorf(ctx, "LPMS panic err=%v", err)
params.liveParams.stopPipeline(fmt.Errorf("LPMS panic %w", err))
params.liveParams.stop(mid, fmt.Errorf("LPMS panic %w", err))

Check warning on line 289 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L289

Added line #L289 was not covered by tests
}
}()
for {
Expand All @@ -294,7 +296,7 @@
break
}

cmd := exec.Command("ffmpeg",
cmd := exec.CommandContext(ctx, "ffmpeg",

Check warning on line 299 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L299

Added line #L299 was not covered by tests
"-analyzeduration", "2500000", // 2.5 seconds
"-i", "pipe:0",
"-c:a", "copy",
Expand Down Expand Up @@ -377,6 +379,7 @@
const clearStreamDelay = 1 * time.Minute

func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) {
mid := extractMid(url.Path)

Check warning on line 382 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L382

Added line #L382 was not covered by tests
subscriber := trickle.NewTrickleSubscriber(url.String())
stream := params.liveParams.stream
streamId := params.liveParams.streamID
Expand Down Expand Up @@ -425,7 +428,7 @@
if retries > maxRetries {
clog.Infof(ctx, "Too many errors reading events; stopping subscription err=%v", err)
err = fmt.Errorf("Error reading subscription: %w", err)
params.liveParams.stopPipeline(err)
params.liveParams.stop(mid, err)

Check warning on line 431 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L431

Added line #L431 was not covered by tests
return
}
clog.Infof(ctx, "Error reading events subscription: err=%v retry=%d", err, retries)
Expand Down Expand Up @@ -526,7 +529,7 @@
eventTime := lastEvent
lastEventMu.Unlock()
if time.Now().Sub(eventTime) > maxEventGap {
params.liveParams.stopPipeline(errors.New("timeout waiting for events"))
params.liveParams.stop(mid, errors.New("timeout waiting for events"))

Check warning on line 532 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L532

Added line #L532 was not covered by tests
eventTicker.Stop()
return
}
Expand Down
2 changes: 2 additions & 0 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@
streamID: streamID,
pipelineID: pipelineID,
stopPipeline: stopPipeline,
initCompleted: make(chan struct{}),

Check warning on line 594 in server/ai_mediaserver.go

View check run for this annotation

Codecov / codecov/patch

server/ai_mediaserver.go#L594

Added line #L594 was not covered by tests
sendErrorEvent: sendErrorEvent,
},
}
Expand Down Expand Up @@ -883,6 +884,7 @@
streamID: streamID,
pipelineID: pipelineID,
stopPipeline: stopPipeline,
initCompleted: make(chan struct{}),

Check warning on line 887 in server/ai_mediaserver.go

View check run for this annotation

Codecov / codecov/patch

server/ai_mediaserver.go#L887

Added line #L887 was not covered by tests
sendErrorEvent: sendErrorEvent,
orchestrator: orchestrator,
},
Expand Down
48 changes: 46 additions & 2 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/livepeer/go-livepeer/ai/worker"
Expand Down Expand Up @@ -115,12 +116,33 @@
paymentProcessInterval time.Duration

// Stops the pipeline with an error. Also kicks the input
stopPipeline func(error)
stopPipeline func(error)
lastMid string
initCompleted chan struct{}
mu sync.Mutex

// Report an error event
sendErrorEvent func(error)
}

func (p liveRequestParams) start(mid string) {
p.mu.Lock()
defer p.mu.Unlock()
p.lastMid = mid

Check warning on line 131 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L128-L131

Added lines #L128 - L131 were not covered by tests
}

func (p liveRequestParams) stop(mid string, err error) {
<-p.initCompleted

p.mu.Lock()
lastMid := p.lastMid
p.mu.Unlock()

if mid == lastMid {
p.stopPipeline(err)
}

Check warning on line 143 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L134-L143

Added lines #L134 - L143 were not covered by tests
}

// CalculateTextToImageLatencyScore computes the time taken per pixel for an text-to-image request.
func CalculateTextToImageLatencyScore(took time.Duration, req worker.GenTextToImageJSONRequestBody, outPixels int64) float64 {
if outPixels <= 0 {
Expand Down Expand Up @@ -1088,6 +1110,9 @@
return nil, fmt.Errorf("invalid events URL: %w", err)
}
clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s", pub, sub, control, events)
firstSegmentReceived := make(chan struct{}, 1)
params.liveParams.start(extractMid(pub.Path))
ctx, cancelCtx := context.WithCancel(ctx)

Check warning on line 1115 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1113-L1115

Added lines #L1113 - L1115 were not covered by tests

startControlPublish(ctx, control, params)
startTricklePublish(ctx, pub, params, sess)
Expand All @@ -1108,10 +1133,21 @@
})
}
clog.V(common.VERBOSE).Infof(ctx, "First Segment delay=%dms streamID=%s", delayMs, params.liveParams.streamID)
select {
case firstSegmentReceived <- struct{}{}:
default:

Check warning on line 1138 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1136-L1138

Added lines #L1136 - L1138 were not covered by tests
}

})
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
select {
case <-firstSegmentReceived:
return resp, nil
case <-time.After(params.node.AIStartupOrchSwapTimeout):
cancelCtx()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on the overall flow, where does the O swap happen?

return nil, errors.New("timeout waiting for first segment")

Check warning on line 1148 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1143-L1148

Added lines #L1143 - L1148 were not covered by tests
}

}

// extractMid extracts the mid (manifest ID) from the publish URL
Expand Down Expand Up @@ -1500,6 +1536,7 @@
cctx, cancel := context.WithTimeout(ctx, processingRetryTimeout)
defer cancel()

defer completeAIRequest(params)

Check warning on line 1539 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1539

Added line #L1539 was not covered by tests
tries := 0
var retryableSessions []*AISession
for tries < maxTries {
Expand Down Expand Up @@ -1579,6 +1616,13 @@
return resp, nil
}

func completeAIRequest(params aiRequestParams) {
if params.liveParams.initCompleted == nil {
return
}
close(params.liveParams.initCompleted)

Check warning on line 1623 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1619-L1623

Added lines #L1619 - L1623 were not covered by tests
}

// isRetryableError checks if the error is a transient error that can be retried.
func isRetryableError(err error) bool {
return errContainsMsg(err, "ticketparams expired")
Expand Down
Loading