Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vg/feat/multi stream gpu #3430

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ai/worker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func NewRunnerContainer(ctx context.Context, cfg RunnerContainerConfig, name str
}, nil
}

func (rc *RunnerContainer) IsBorrowed() bool {
rc.RLock()
defer rc.RUnlock()
return rc.BorrowCtx != nil
}

func runnerWaitUntilReady(ctx context.Context, client *ClientWithResponses, pollingInterval time.Duration) error {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
Expand Down
113 changes: 70 additions & 43 deletions ai/worker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log/slog"
"math"
"runtime/debug"
"strconv"
"strings"
Expand Down Expand Up @@ -43,17 +44,17 @@ var maxHealthCheckFailures = 2

// This only works right now on a single GPU because if there is another container
// using the GPU we stop it so we don't have to worry about having enough ports
var containerHostPorts = map[string]string{
"text-to-image": "8000",
"image-to-image": "8100",
"image-to-video": "8200",
"upscale": "8300",
"audio-to-text": "8400",
"llm": "8500",
"segment-anything-2": "8600",
"image-to-text": "8700",
"text-to-speech": "8800",
"live-video-to-video": "8900",
var containerHostPorts = map[string]int{
"text-to-image": 8000,
"image-to-image": 8100,
"image-to-video": 8200,
"upscale": 8300,
"audio-to-text": 8400,
"llm": 8500,
"segment-anything-2": 8600,
"image-to-text": 8700,
"text-to-speech": 8800,
"live-video-to-video": 8900,
}

// Default pipeline container image mapping to use if no overrides are provided.
Expand All @@ -70,6 +71,22 @@ var livePipelineToImage = map[string]string{
"segment_anything_2": "livepeer/ai-runner:live-app-segment_anything_2",
"noop": "livepeer/ai-runner:live-app-noop",
}
var livePipelineGpuUtilization = map[string]float64{
"streamdiffusion": 0.5,
"comfyui": 0.333,
"segment_anything_2": 0.5,
"noop": 0.333,
}

var maxContainersPerGpu int

func init() {
maxContainers := 1.0
for _, gpuShare := range livePipelineGpuUtilization {
maxContainers = math.Max(maxContainers, math.Floor(1/gpuShare))
}
maxContainersPerGpu = int(maxContainers)
}

type ImageOverrides struct {
Default string `json:"default"`
Expand Down Expand Up @@ -103,7 +120,7 @@ type DockerManager struct {

dockerClient DockerClient
// gpu ID => container name
gpuContainers map[string]string
gpuContainers map[string][]string
// container name => container
containers map[string]*RunnerContainer
mu *sync.Mutex
Expand All @@ -122,7 +139,7 @@ func NewDockerManager(overrides ImageOverrides, gpus []string, modelDir string,
modelDir: modelDir,
overrides: overrides,
dockerClient: client,
gpuContainers: make(map[string]string),
gpuContainers: make(map[string][]string),
containers: make(map[string]*RunnerContainer),
mu: &sync.Mutex{},
}
Expand Down Expand Up @@ -182,7 +199,7 @@ func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (*
var err error

for _, runner := range m.containers {
if runner.Pipeline == pipeline && runner.ModelID == modelID {
if runner.Pipeline == pipeline && runner.ModelID == modelID && !runner.IsBorrowed() {
rc = runner
break
}
Expand All @@ -197,8 +214,7 @@ func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (*
}
}

// Remove container and set the BorrowCtx so it is unavailable until returnContainer() is called by watchContainer()
delete(m.containers, rc.Name)
// Set the BorrowCtx so it is unavailable until returnContainer() is called by watchContainer()
rc.Lock()
rc.BorrowCtx = ctx
rc.Unlock()
Expand All @@ -209,14 +225,9 @@ func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (*
// returnContainer returns a container to the pool so it can be reused. It is called automatically by watchContainer
// when the BorrowCtx of the container is done or the container is IDLE.
func (m *DockerManager) returnContainer(rc *RunnerContainer) {
m.mu.Lock()
defer m.mu.Unlock()

rc.Lock()
rc.BorrowCtx = nil
rc.Unlock()

m.containers[rc.Name] = rc
}

// getContainerImageName returns the image name for the given pipeline and model ID.
Expand Down Expand Up @@ -251,7 +262,7 @@ func (m *DockerManager) HasCapacity(ctx context.Context, pipeline, modelID strin

// Check if unused managed container exists for the requested model.
for _, rc := range m.containers {
if rc.Pipeline == pipeline && rc.ModelID == modelID {
if rc.Pipeline == pipeline && rc.ModelID == modelID && !rc.IsBorrowed() {
return true
}
}
Expand All @@ -263,7 +274,7 @@ func (m *DockerManager) HasCapacity(ctx context.Context, pipeline, modelID strin
}

// Check for available GPU to allocate for a new container for the requested model.
_, err := m.allocGPU(ctx)
_, err := m.allocGPU(ctx, dockerGpuAllocation(pipeline, modelID))
return err == nil
}

Expand Down Expand Up @@ -308,13 +319,16 @@ func (m *DockerManager) pullImage(ctx context.Context, imageName string) error {
}

func (m *DockerManager) createContainer(ctx context.Context, pipeline string, modelID string, keepWarm bool, optimizationFlags OptimizationFlags) (*RunnerContainer, error) {
gpu, err := m.allocGPU(ctx)
gpu, err := m.allocGPU(ctx, dockerGpuAllocation(pipeline, modelID))
if err != nil {
return nil, err
}

// NOTE: We currently allow only one container per GPU for each pipeline.
containerHostPort := containerHostPorts[pipeline][:3] + portOffset(gpu)
offset, err := portOffset(gpu, len(m.gpuContainers[gpu]))
if err != nil {
return nil, err
}
containerHostPort := strconv.Itoa(containerHostPorts[pipeline] + offset)
containerName := dockerContainerName(pipeline, modelID, containerHostPort)
containerImage, err := m.getContainerImageName(pipeline, modelID)
if err != nil {
Expand Down Expand Up @@ -430,31 +444,33 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo
}

m.containers[containerName] = rc
m.gpuContainers[gpu] = containerName
m.gpuContainers[gpu] = append(m.gpuContainers[gpu], containerName)

go m.watchContainer(rc)

return rc, nil
}

func (m *DockerManager) allocGPU(ctx context.Context) (string, error) {
// Is there a GPU available?
func (m *DockerManager) allocGPU(ctx context.Context, reqAllocation float64) (string, error) {
// Is there a GPU with an idle container?
for _, gpu := range m.gpus {
_, ok := m.gpuContainers[gpu]
containersNames, ok := m.gpuContainers[gpu]
if !ok {
return gpu, nil
}
}

// Is there a GPU with an idle container?
for _, gpu := range m.gpus {
containerName := m.gpuContainers[gpu]
// If the container exists in this map then it is idle and if it not marked as keep warm we remove it
rc, ok := m.containers[containerName]
if ok && !rc.KeepWarm {
if err := m.destroyContainer(rc, true); err != nil {
return "", err
currAllocation := 0.0
for _, containerName := range containersNames {
// If the container is idle and if it not marked as keep warm we remove it
rc, ok := m.containers[containerName]
if ok && !rc.IsBorrowed() && !rc.KeepWarm {
if err := m.destroyContainer(rc, true); err != nil {
return "", err
}
return gpu, nil
}
currAllocation += dockerGpuAllocation(rc.Pipeline, rc.ModelID)
}
if currAllocation+reqAllocation < 1.0 {
return gpu, nil
}
}
Expand Down Expand Up @@ -610,6 +626,13 @@ func dockerContainerName(pipeline string, modelID string, suffix ...string) stri
return fmt.Sprintf("%s_%s", pipeline, sanitizedModelID)
}

func dockerGpuAllocation(pipeline string, modelID string) float64 {
if pipeline == "live-video-to-video" {
return livePipelineGpuUtilization[modelID]
}
return 1
}

func dockerRemoveContainer(client DockerClient, containerID string) error {
ctx, cancel := context.WithTimeout(context.Background(), containerRemoveTimeout)
defer cancel()
Expand Down Expand Up @@ -667,11 +690,15 @@ tickerLoop:
return nil
}

func portOffset(gpu string) string {
func portOffset(gpu string, index int) (int, error) {
if isEmulatedGPU(gpu) {
return strings.Replace(gpu, "emulated-", "", 1)
gpu = strings.Replace(gpu, "emulated-", "", 1)
}
gpuIdx, err := strconv.Atoi(gpu)
if err != nil {
return 0, fmt.Errorf("invalid GPU: %s", gpu)
}
return gpu
return maxContainersPerGpu*gpuIdx + index, nil
}

func isEmulatedGPU(gpu string) bool {
Expand Down
1 change: 1 addition & 0 deletions ai/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ func (w *Worker) EnsureImageAvailable(ctx context.Context, pipeline string, mode

func (w *Worker) Warm(ctx context.Context, pipeline string, modelID string, endpoint RunnerEndpoint, optimizationFlags OptimizationFlags) error {
if endpoint.URL == "" {
w.manager.Warm(ctx, pipeline, modelID, optimizationFlags)
return w.manager.Warm(ctx, pipeline, modelID, optimizationFlags)
}

Expand Down
Loading