Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,42 +184,59 @@ func (r *Runner) Run(ctx context.Context) error {
return err
}

mgr, _, err := r.Setup(ctx, cfg, opts)
if err != nil {
return err
}

// --- Start Manager ---
// This blocks until a signal is received.
setupLog.Info("Controller manager starting")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "Error starting controller manager")
return err
}
setupLog.Info("Controller manager terminated")
return nil
}

func (r *Runner) Setup(ctx context.Context, cfg *rest.Config, opts *runserver.Options) (ctrl.Manager, datastore.Datastore, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls add a comment here that we split the set up into a separate function to enable integration testing of the set up.

rawConfig, err := r.parseConfigurationPhaseOne(ctx, opts)
if err != nil {
setupLog.Error(err, "Failed to parse configuration")
return err
return nil, nil, err
}

// --- Setup Datastore ---
epf, err := r.setupMetricsCollection(r.featureGates[datalayer.ExperimentalDatalayerFeatureGate], opts)
if err != nil {
return err
return nil, nil, err
}

gknn, err := extractGKNN(opts.PoolName, opts.PoolGroup, opts.PoolNamespace, opts.EndpointSelector)
if err != nil {
setupLog.Error(err, "Failed to extract GKNN")
return err
return nil, nil, err
}

startCrdReconcilers := opts.EndpointSelector == "" // If endpointSelector is empty, it means it's not in the standalone mode. Then we should start the inferencePool and other CRD Reconciler.
controllerCfg := runserver.NewControllerConfig(startCrdReconcilers)

if err := controllerCfg.PopulateControllerConfig(cfg); err != nil {
setupLog.Error(err, "Failed to populate controller config")
return err
return nil, nil, err
}

ds, err := setupDatastore(ctx, epf, int32(opts.ModelServerMetricsPort), startCrdReconcilers,
opts.PoolName, opts.PoolNamespace, opts.EndpointSelector, opts.EndpointTargetPorts)
if err != nil {
setupLog.Error(err, "Failed to setup datastore")
return err
return nil, nil, err
}
eppConfig, err := r.parseConfigurationPhaseTwo(ctx, rawConfig, ds)
if err != nil {
setupLog.Error(err, "Failed to parse configuration")
return err
return nil, nil, err
}

// --- Setup Metrics Server ---
Expand Down Expand Up @@ -248,7 +265,7 @@ func (r *Runner) Run(ctx context.Context) error {
mgr, err := runserver.NewDefaultManager(controllerCfg, *gknn, cfg, metricsServerOptions, opts.EnableLeaderElection)
if err != nil {
setupLog.Error(err, "Failed to create controller manager")
return err
return nil, nil, err
}

if opts.EnableLeaderElection {
Expand All @@ -267,15 +284,15 @@ func (r *Runner) Run(ctx context.Context) error {
setupLog.Info("Setting pprof handlers")
if err = profiling.SetupPprofHandlers(mgr); err != nil {
setupLog.Error(err, "Failed to setup pprof handlers")
return err
return nil, nil, err
}
}

// --- Initialize Core EPP Components ---
if r.schedulerConfig == nil {
err := errors.New("scheduler config must be set either by config api or through code")
setupLog.Error(err, "failed to create scheduler")
return err
return nil, nil, err
}

setupLog.Info("parsed config", "scheduler-config", r.schedulerConfig)
Expand All @@ -285,7 +302,7 @@ func (r *Runner) Run(ctx context.Context) error {
datalayerMetricsEnabled := r.featureGates[datalayer.ExperimentalDatalayerFeatureGate]
if err := r.setupDataLayer(datalayerMetricsEnabled, eppConfig.DataConfig, epf, mgr); err != nil {
setupLog.Error(err, "failed to initialize data layer")
return err
return nil, nil, err
}

saturationDetector := utilizationdetector.NewDetector(eppConfig.SaturationDetectorConfig, setupLog)
Expand All @@ -299,7 +316,7 @@ func (r *Runner) Run(ctx context.Context) error {
setupLog.Info("Initializing experimental Flow Control layer")
registry, err := fcregistry.NewFlowRegistry(eppConfig.FlowControlConfig.Registry, setupLog)
if err != nil {
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
return nil, nil, fmt.Errorf("failed to initialize Flow Registry: %w", err)
}
fc, err := fccontroller.NewFlowController(
ctx,
Expand All @@ -308,7 +325,7 @@ func (r *Runner) Run(ctx context.Context) error {
locator,
)
if err != nil {
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
return nil, nil, fmt.Errorf("failed to initialize Flow Controller: %w", err)
}
go registry.Run(ctx)
admissionController = requestcontrol.NewFlowControlAdmissionController(fc, opts.PoolName)
Expand Down Expand Up @@ -337,29 +354,20 @@ func (r *Runner) Run(ctx context.Context) error {
}
if err := serverRunner.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Failed to setup EPP controllers")
return err
return nil, nil, err
}

// --- Add Runnables to Manager ---
// Register health server.
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), ds, opts.GRPCHealthPort, isLeader, opts.EnableLeaderElection); err != nil {
return err
return nil, nil, err
}

// Register ext-proc server.
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
return err
}

// --- Start Manager ---
// This blocks until a signal is received.
setupLog.Info("Controller manager starting")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "Error starting controller manager")
return err
return nil, nil, err
}
setupLog.Info("Controller manager terminated")
return nil
return mgr, ds, nil
}

// NewEndpointPoolFromOptions constructs an EndpointPool from standalone options.
Expand Down
127 changes: 127 additions & 0 deletions test/integration/epp/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import (
"path/filepath"
"strings"
"testing"
"time"

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -41,8 +45,10 @@ import (

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
eppRunner "sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
eppServer "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
"sigs.k8s.io/gateway-api-inference-extension/test/integration"
)
Expand Down Expand Up @@ -454,3 +460,124 @@ func loadBaseResources() []*unstructured.Unstructured {
}
return objs
}

// TestCRDWatchers verifies that the EPP correctly populates its internal datastore
// by watching InferenceModelRewrite and InferenceObjective resources.
func TestCRDWatchers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create dedicated namespace for the whole test
uid := uuid.New().String()[:8]
testNamespaceName := "epp-test-" + uid
ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespaceName}}
require.NoError(t, k8sClient.Create(ctx, ns), "failed to create test namespace")
defer func() {
if err := k8sClient.Delete(context.Background(), ns); err != nil {
t.Logf("failed to cleanup namespace %s: %v", testNamespaceName, err)
}
}()

const testPoolName = "test-pool"
opts := eppServer.NewOptions()
opts.PoolName = testPoolName
opts.PoolNamespace = testNamespaceName

metricsPort, err := integration.GetFreePort()
require.NoError(t, err)
opts.MetricsPort = metricsPort

grpcPort, err := integration.GetFreePort()
require.NoError(t, err)
opts.GRPCPort = grpcPort

healthPort, err := integration.GetFreePort()
require.NoError(t, err)
opts.GRPCHealthPort = healthPort
opts.EndpointTargetPorts = []int{8000}

mgr, dataStore, err := eppRunner.NewRunner().Setup(ctx, testEnv.Config, opts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this test invoke NewTestHarness, and have NewTestHarness call runner.Setup?

if err != nil {
t.Fatalf("Error getting the epp manager")
}
go func() {
err := mgr.Start(ctx)
if err != nil && !strings.Contains(err.Error(), "context canceled") {
t.Errorf("RunInternal failed: %v", err)
}
}()

// Create the Pool and CRs
pool := &v1.InferencePool{
ObjectMeta: metav1.ObjectMeta{Name: testPoolName, Namespace: testNamespaceName},
Spec: v1.InferencePoolSpec{
TargetPorts: []v1.Port{{Number: 8000}},
Selector: v1.LabelSelector{
MatchLabels: map[v1.LabelKey]v1.LabelValue{
"app": "test",
},
},
EndpointPickerRef: v1.EndpointPickerRef{
Name: v1.ObjectName("epp"),
Port: &v1.Port{Number: 8080},
},
},
}
require.NoError(t, k8sClient.Create(ctx, pool))

rewrite := &v1alpha2.InferenceModelRewrite{
ObjectMeta: metav1.ObjectMeta{Name: "test-rewrite", Namespace: testNamespaceName},
Spec: v1alpha2.InferenceModelRewriteSpec{
PoolRef: &v1alpha2.PoolObjectReference{
Name: v1alpha2.ObjectName("test-pool"),
Group: v1alpha2.Group(v1.GroupVersion.Group), // Make sure this is set to the correct group version
Kind: v1alpha2.Kind("InferencePool"),
},
Rules: []v1alpha2.InferenceModelRewriteRule{
{
Matches: []v1alpha2.Match{
{
Model: &v1alpha2.ModelMatch{
Value: "source-model",
},
},
},
Targets: []v1alpha2.TargetModel{
{
ModelRewrite: "target-model",
Weight: 1,
},
},
},
},
},
}
require.NoError(t, k8sClient.Create(ctx, rewrite))

objective := &v1alpha2.InferenceObjective{
ObjectMeta: metav1.ObjectMeta{Name: "test-objective", Namespace: testNamespaceName},
Spec: v1alpha2.InferenceObjectiveSpec{
PoolRef: v1alpha2.PoolObjectReference{
Name: v1alpha2.ObjectName("test-pool"),
Group: v1alpha2.Group(v1.GroupVersion.Group), // Make sure this is set to the correct group version
Kind: v1alpha2.Kind("InferencePool"),
},
},
}
require.NoError(t, k8sClient.Create(ctx, objective))

require.Eventually(t, func() bool {
if dataStore == nil {
return false
}
// Verify Rewrite
if rule, _ := dataStore.ModelRewriteGet("source-model"); rule == nil {
return false
}
// Verify Objective
if obj := dataStore.ObjectiveGet("test-objective"); obj == nil {
return false
}
return true
}, 10*time.Second, 100*time.Millisecond, "Failed to watch InferenceModelRewrite or InferenceObjective")
}