-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Swap Orchestrator if no first segment recevied before timeout #3398
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3398 +/- ##
===================================================
- Coverage 32.14408% 32.13699% -0.00709%
===================================================
Files 147 147
Lines 40754 40763 +9
===================================================
Hits 13100 13100
- Misses 26880 26889 +9
Partials 774 774
Continue to review full report in Codecov by Sentry.
|
return resp, nil | ||
select { | ||
case <-firstSegmentReceived: | ||
cancelCtx() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to cancel on the other case
here, of timeout? IIUC this will cancel all trickle subs from the stream
@@ -1098,10 +1101,17 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A | |||
monitor.AIFirstSegmentDelay(delayMs, sess.OrchestratorInfo) | |||
} | |||
clog.V(common.VERBOSE).Infof(ctx, "First Segment delay=%dms streamID=%s", delayMs, params.liveParams.streamID) | |||
firstSegmentReceived <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need a select with default in case timeout has already fired, or a buffered chan
case <-firstSegmentReceived: | ||
cancelCtx() | ||
return resp, nil | ||
case <-time.After(20 * time.Second): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm torn between keeping this timeout so short. 20s does not mean the O is bad, since that will only ever be possible if the runner was warm when the stream started, which isn't always the case until we implement sth on the selection algorithm for that, to only route to Os that do have the pipeline already loaded (and are not just deploying or restarting the container for example, which is the 60s+ we've seen). Another problem could be if we are loading any workflow but the default one. Not sure how fast comfystream
would load new nodes and models, but it feels like it could easily take more than 20s.
So maybe 20s would be too short for a threshold that actually blocks Os from being used again, but could be ok if it's just a time that we timeout and start the process with another O on the gateway side, but not flagging the O as malfunctioning. I believe this would be the latter tho, with a "malfunctioning flag", right? In that case I think it'd be better to have a higher threshold here WDYT?
@@ -1090,6 +1090,9 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A | |||
} | |||
clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s", pub, sub, control, events) | |||
|
|||
firstSegmentReceived := make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make this at least size 1 to minimize the chance of late writes blocking after a timeout
Introduce the timeout for the first segment. So, if Gateway does not receive the first segment within the specified timeout, it will go back to the selection logic and select a different Orchestrator.
Then, we would have 2 timeout values defined as flags:
firstSegmentTimeout
=> Timeout to swap Orchestrator if the first segment was not receivedaiProcessingRetryTimeout
=> Timeout for the whole Gateway processing, so if Gateway is not able to process the request from Orchestrator in that time, it will fail the whole processing (currently it's set to45s
in staging/prod)TODO:
20s
to thefirstSegmentTimeout
flagfirstSegmentTimeout
andaiProcessingRetryTimeout