Skip to content
Open
427 changes: 216 additions & 211 deletions apis/go/mlops/scheduler/scheduler.pb.go

Large diffs are not rendered by default.

83 changes: 42 additions & 41 deletions apis/mlops/scheduler/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ message LlmSpec {

message ModelRuntimeInfo {
oneof modelRuntimeInfo {
MLServerModelSettings mlserver = 1;
MLServerModelSettings mlserver = 1;
TritonModelConfig triton = 2;
}
}
}

message MLServerModelSettings {
Expand Down Expand Up @@ -150,24 +150,24 @@ message ModelVersionStatus {
uint32 version = 2;
string serverName = 3;
optional KubernetesMeta kubernetesMeta = 4;
map<int32,ModelReplicaStatus> modelReplicaState = 5;
map<int32, ModelReplicaStatus> modelReplicaState = 5;
ModelStatus state = 6;
optional Model modelDefn = 7;
}

message ModelStatus {
enum ModelState {
ModelStateUnknown = 0;
ModelProgressing = 1;
ModelAvailable = 2;
ModelFailed = 3;
ModelTerminating = 4;
ModelTerminated = 5;
ModelTerminateFailed = 6;
ScheduleFailed = 7;
ModelScaledDown = 8;
ModelCreate = 9;
ModelTerminate = 10;
ModelStateUnknown = 0;
ModelProgressing = 1;
ModelAvailable = 2;
ModelFailed = 3;
ModelTerminating = 4;
ModelTerminated = 5;
ModelTerminateFailed = 6;
ScheduleFailed = 7;
ModelScaledDown = 8;
ModelCreate = 9;
ModelTerminate = 10;
}
ModelState state = 1;
string reason = 2;
Expand All @@ -180,19 +180,19 @@ message ModelStatus {

message ModelReplicaStatus {
enum ModelReplicaState {
ModelReplicaStateUnknown = 0;
LoadRequested = 1;
Loading = 2;
Loaded = 3;
LoadFailed = 4;
UnloadRequested = 5;
Unloading = 6;
Unloaded = 7;
UnloadFailed = 8;
Available = 9;
LoadedUnavailable = 10;
UnloadEnvoyRequested = 11;
Draining = 12;
ModelReplicaStateUnknown = 0;
LoadRequested = 1;
Loading = 2;
Loaded = 3;
LoadFailed = 4;
UnloadRequested = 5;
Unloading = 6;
Unloaded = 7;
UnloadFailed = 8;
Available = 9;
LoadedUnavailable = 10;
UnloadEnvoyRequested = 11;
Draining = 12;
}
ModelReplicaState state = 1;
string reason = 2;
Expand Down Expand Up @@ -356,7 +356,7 @@ message PipelineStep {
string name = 1;
repeated string inputs = 2;
optional uint32 joinWindowMs = 3; // Join window millisecs, some nonzero default (TBD)
map<string,string> tensorMap = 4; // optional map of tensor name mappings
map<string, string> tensorMap = 4; // optional map of tensor name mappings
JoinOp inputsJoin = 5;
repeated string triggers = 6;
JoinOp triggersJoin = 7;
Expand All @@ -379,7 +379,7 @@ message PipelineInput {
optional uint32 joinWindowMs = 3; // Join window millisecs for output, default 0
JoinOp joinType = 4;
JoinOp triggersJoin = 5;
map<string,string> tensorMap = 6; // optional map of tensor name mappings
map<string, string> tensorMap = 6; // optional map of tensor name mappings
}

message PipelineOutput {
Expand All @@ -391,7 +391,7 @@ message PipelineOutput {
repeated string steps = 1;
uint32 joinWindowMs = 2; // Join window millisecs for output, default 0
JoinOp stepsJoin = 3;
map<string,string> tensorMap = 4; // optional map of tensor name mappings
map<string, string> tensorMap = 4; // optional map of tensor name mappings
}

message LoadPipelineResponse {
Expand Down Expand Up @@ -446,6 +446,7 @@ message PipelineVersionState {
PipelineTerminating = 6;
PipelineTerminated = 7;
PipelineRebalancing = 8;
PipelineFailedTerminating = 9;
}
uint32 pipelineVersion = 1;
PipelineStatus status = 2;
Expand Down Expand Up @@ -481,17 +482,17 @@ message ControlPlaneResponse {


message ModelUpdateMessage {
enum ModelOperation {
Unknown = 0;
Create = 1;
Delete = 2;
}
ModelOperation op = 1;
string model = 2;
uint32 version = 3;
string uid = 4;
uint64 timestamp = 5;
string stream = 6;
enum ModelOperation {
Unknown = 0;
Create = 1;
Delete = 2;
}
ModelOperation op = 1;
string model = 2;
uint32 version = 3;
string uid = 4;
uint64 timestamp = 5;
string stream = 6;
}

message ModelUpdateStatusMessage {
Expand Down
51 changes: 51 additions & 0 deletions tests/integration/godog/components/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright (c) 2024 Seldon Technologies Ltd.

Use of this software is governed BY
(1) the license included in the LICENSE file or
(2) if the license included in the LICENSE file is the Business Source License 1.1,
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/

package components

import (
"fmt"

"github.com/seldonio/seldon-core/tests/integration/godog/k8sclient"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)

const (
KafkaNodePool ComponentName = "KafkaNodePool"
)

func StartComponents(k8sClient *k8sclient.K8sClient, namespace string) (*EnvManager, error) {
var kafkaNodePoolGVK = schema.GroupVersionKind{
Group: "kafka.strimzi.io",
Version: "v1beta2",
Kind: "KafkaNodePool",
}

kafkaNodePool := NewK8sObjectComponent(
k8sClient,
KafkaNodePool,
kafkaNodePoolGVK,
types.NamespacedName{Namespace: namespace, Name: "kafka"},
UnavailableByDeleting,
)

runtime := NewSeldonRuntimeComponent(
k8sClient,
namespace,
"seldon",
)

env, err := NewEnvManager(kafkaNodePool, runtime)
if err != nil {
return nil, fmt.Errorf("failed to bootstrap components: %v", err)
}

return env, nil
}
97 changes: 97 additions & 0 deletions tests/integration/godog/components/component_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright (c) 2024 Seldon Technologies Ltd.

Use of this software is governed BY
(1) the license included in the LICENSE file or
(2) if the license included in the LICENSE file is the Business Source License 1.1,
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/

package components

import (
"context"
"fmt"
"sync"
)

type Component interface {
Name() ComponentName
Snapshot(ctx context.Context) error
Restore(ctx context.Context) error
MakeUnavailable(ctx context.Context) error
MakeAvailable(ctx context.Context) error
Scale(ctx context.Context, replicas int32) error // optional capability
}

type ComponentName string

type EnvManager struct {
mu sync.Mutex
components map[ComponentName]Component
order []ComponentName // deterministic snapshot/restore ordering
}

func NewEnvManager(cs ...Component) (*EnvManager, error) {
m := make(map[ComponentName]Component, len(cs))
order := make([]ComponentName, 0, len(cs))

for _, c := range cs {
if c == nil {
return nil, fmt.Errorf("nil component provided")
}
name := c.Name()
if name == "" {
return nil, fmt.Errorf("component has empty name: %T", c)
}
if _, exists := m[name]; exists {
return nil, fmt.Errorf("duplicate component name: %q", name)
}
m[name] = c
order = append(order, name)
}

return &EnvManager{
components: m,
order: order,
}, nil
}

func (e *EnvManager) Component(name ComponentName) (Component, error) {
c, ok := e.components[name]
if !ok {
return nil, fmt.Errorf("unknown component: %q", name)
}
return c, nil
}

func (e *EnvManager) SnapshotAll(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
for _, name := range e.order {
if err := e.components[name].Snapshot(ctx); err != nil {
return fmt.Errorf("snapshot %s: %w", name, err)
}
}
return nil
}

func (e *EnvManager) RestoreAll(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
for _, name := range e.order {
if err := e.components[name].Restore(ctx); err != nil {
return fmt.Errorf("restore %s: %w", name, err)
}
}
return nil
}

func (e *EnvManager) Runtime() *SeldonRuntimeComponent {
for _, comp := range e.components {
if c, ok := comp.(*SeldonRuntimeComponent); ok {
return c
}
}
return nil
}
Loading
Loading