Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/wrapperspb"
"istio.io/istio/pkg/kube/krt"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/ir"
)
Expand All @@ -22,34 +20,57 @@ func processPoolBackendObjIR(
ctx context.Context,
in ir.BackendObjectIR,
out *envoyclusterv3.Cluster,
podIdx krt.Index[string, krtcollections.LocalityPod],
) *ir.EndpointsForBackend {
// Build an endpoint list
irPool := in.ObjIr.(*inferencePool)
poolEps := irPool.resolvePoolEndpoints(podIdx)
if len(poolEps) == 0 {
logger.Warn("no endpoints resolved for InferencePool",
"namespace", irPool.obj.GetNamespace(),
"name", irPool.obj.GetName())
}

// If the pool has errors, create an empty LoadAssignment to return a 503
// Always set the cluster name up front so error paths program the right cluster.
out.Name = in.ClusterName()
out.ClusterDiscoveryType = &envoyclusterv3.Cluster_Type{Type: envoyclusterv3.Cluster_STATIC}
out.LbPolicy = envoyclusterv3.Cluster_ROUND_ROBIN

// If the pool has errors, create an empty LoadAssignment to return a 503.
if irPool.hasErrors() {
errs := irPool.snapshotErrors()
logger.Debug("skipping endpoints due to InferencePool errors",
"pool", in.ResourceName(),
"errors", irPool.errors,
"errors", errs,
)

out.LoadAssignment = &envoyendpointv3.ClusterLoadAssignment{
ClusterName: out.Name,
Endpoints: []*envoyendpointv3.LocalityLbEndpoints{{}},
}

// Still set subset config so Envoy’s view is consistent, but it won’t matter with no endpoints.
out.LbSubsetConfig = &envoyclusterv3.Cluster_LbSubsetConfig{
SubsetSelectors: []*envoyclusterv3.Cluster_LbSubsetConfig_LbSubsetSelector{{
Keys: []string{dstEndpointKey},
}},
FallbackPolicy: envoyclusterv3.Cluster_LbSubsetConfig_ANY_ENDPOINT,
}

// TODO [danehans]: Set H1/H2 app protocol programmatically:
// https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1273
addHTTP1(out)
out.CircuitBreakers = &envoyclusterv3.CircuitBreakers{
Thresholds: []*envoyclusterv3.CircuitBreakers_Thresholds{{
MaxConnections: wrapperspb.UInt32(defaultExtProcMaxRequests),
MaxPendingRequests: wrapperspb.UInt32(defaultExtProcMaxRequests),
MaxRequests: wrapperspb.UInt32(defaultExtProcMaxRequests),
}},
}

return nil
}

// Static cluster with subset lb config
out.Name = in.ClusterName()
out.ClusterDiscoveryType = &envoyclusterv3.Cluster_Type{Type: envoyclusterv3.Cluster_STATIC}
out.LbPolicy = envoyclusterv3.Cluster_ROUND_ROBIN
// Build the static cluster with subset lb from IR endpoints.
poolEps := irPool.getEndpoints()
if len(poolEps) == 0 {
logger.Warn("no endpoints resolved for InferencePool",
"namespace", irPool.obj.GetNamespace(),
"name", irPool.obj.GetName())
}

out.LbSubsetConfig = &envoyclusterv3.Cluster_LbSubsetConfig{
SubsetSelectors: []*envoyclusterv3.Cluster_LbSubsetConfig_LbSubsetSelector{{
Keys: []string{dstEndpointKey},
Expand Down Expand Up @@ -78,15 +99,14 @@ func processPoolBackendObjIR(
continue
}

// Build the LB endpoint
lbEp := &envoyendpointv3.LbEndpoint{
lbEndpoints = append(lbEndpoints, &envoyendpointv3.LbEndpoint{
HostIdentifier: &envoyendpointv3.LbEndpoint_Endpoint{
Endpoint: &envoyendpointv3.Endpoint{
Address: &envoycorev3.Address{
Address: &envoycorev3.Address_SocketAddress{
SocketAddress: &envoycorev3.SocketAddress{
Address: ep.address,
PortSpecifier: &envoycorev3.SocketAddress_PortValue{PortValue: uint32(ep.port)}, //nolint:gosec // G115: ep.port is int32 representing a port number, always in valid range
PortSpecifier: &envoycorev3.SocketAddress_PortValue{PortValue: uint32(ep.port)}, //nolint:gosec // ep.port is a valid int32 port
},
},
},
Expand All @@ -98,29 +118,20 @@ func processPoolBackendObjIR(
envoyLbNamespace: mdStruct,
},
},
}
lbEndpoints = append(lbEndpoints, lbEp)
})
}

// Attach the endpoints to the cluster load assignment
out.LoadAssignment = &envoyendpointv3.ClusterLoadAssignment{
ClusterName: out.Name,
Endpoints: []*envoyendpointv3.LocalityLbEndpoints{{
LbEndpoints: lbEndpoints,
}},
Endpoints: []*envoyendpointv3.LocalityLbEndpoints{{LbEndpoints: lbEndpoints}},
}

out.CircuitBreakers = &envoyclusterv3.CircuitBreakers{
Thresholds: []*envoyclusterv3.CircuitBreakers_Thresholds{
{
MaxConnections: wrapperspb.UInt32(defaultExtProcMaxRequests),
MaxPendingRequests: wrapperspb.UInt32(defaultExtProcMaxRequests),
MaxRequests: wrapperspb.UInt32(defaultExtProcMaxRequests),
},
},
Thresholds: []*envoyclusterv3.CircuitBreakers_Thresholds{{
MaxConnections: wrapperspb.UInt32(defaultExtProcMaxRequests),
MaxPendingRequests: wrapperspb.UInt32(defaultExtProcMaxRequests),
MaxRequests: wrapperspb.UInt32(defaultExtProcMaxRequests),
}},
}

// Return nil since we're building a static cluster
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
structpb "google.golang.org/protobuf/types/known/structpb"
"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/kube/krt/krttest"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
inf "sigs.k8s.io/gateway-api-inference-extension/api/v1"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/ir"
krtpkg "github.com/kgateway-dev/kgateway/v2/pkg/utils/krtutil"
)

func makeBackendIR(pool *inf.InferencePool) *ir.BackendObjectIR {
Expand All @@ -45,42 +40,22 @@ func TestProcessPoolBackendObjIR_BuildsLoadAssignment(t *testing.T) {
Selector: inf.LabelSelector{
MatchLabels: map[inf.LabelKey]inf.LabelValue{"app": "test"},
},
TargetPorts: []inf.Port{inf.Port{Number: 9000}},
TargetPorts: []inf.Port{{Number: 9000}},
EndpointPickerRef: inf.EndpointPickerRef{
Name: "svc",
Port: &inf.Port{Number: inf.PortNumber(9002)},
},
},
}

// Build a fake Pod and wrap it into a LocalityPod
corePod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "ns",
Labels: map[string]string{"app": "test"},
},
Status: corev1.PodStatus{PodIP: "10.0.0.1"},
}
fakeLP := krtcollections.LocalityPod{
Named: krt.NewNamed(corePod),
AugmentedLabels: corePod.Labels,
Addresses: []string{corePod.Status.PodIP},
}

// Create a mock and with the LocalityPod collection
mock := krttest.NewMock(t, []any{fakeLP})
podCol := krttest.GetMockCollection[krtcollections.LocalityPod](mock)

// Index the pods
poolKey := fmt.Sprintf("%s/%s", pool.Namespace, pool.Name)
podIdx := krtpkg.UnnamedIndex(podCol, func(p krtcollections.LocalityPod) []string {
return []string{poolKey}
})
// Build the Backend IR and seed endpoints
beIR := makeBackendIR(pool)
irp := beIR.ObjIr.(*inferencePool)
irp.setEndpoints([]endpoint{{address: "10.0.0.1", port: 9000}})

// Call the code under test
cluster := &envoyclusterv3.Cluster{}
ret := processPoolBackendObjIR(context.Background(), *makeBackendIR(pool), cluster, podIdx)
ret := processPoolBackendObjIR(context.Background(), *beIR, cluster)
assert.Nil(t, ret, "Should return nil for a static cluster")

// Validate the generated LoadAssignment
Expand All @@ -107,7 +82,7 @@ func TestProcessPoolBackendObjIR_SkipsOnErrors(t *testing.T) {
pool := &inf.InferencePool{
ObjectMeta: metav1.ObjectMeta{Name: "p", Namespace: "ns"},
Spec: inf.InferencePoolSpec{
TargetPorts: []inf.Port{inf.Port{Number: 9000}},
TargetPorts: []inf.Port{{Number: 9000}},
EndpointPickerRef: inf.EndpointPickerRef{
Name: "svc",
Port: &inf.Port{Number: inf.PortNumber(9002)},
Expand All @@ -119,13 +94,8 @@ func TestProcessPoolBackendObjIR_SkipsOnErrors(t *testing.T) {
irp := beIR.ObjIr.(*inferencePool)
irp.setErrors([]error{fmt.Errorf("failure injected")})

// Empty pod index
mock := krttest.NewMock(t, []any{})
podCol := krttest.GetMockCollection[krtcollections.LocalityPod](mock)
podIdx := krtpkg.UnnamedIndex(podCol, func(krtcollections.LocalityPod) []string { return nil })

cluster := &envoyclusterv3.Cluster{}
ret := processPoolBackendObjIR(context.Background(), *beIR, cluster, podIdx)
ret := processPoolBackendObjIR(context.Background(), *beIR, cluster)
assert.Nil(t, ret)

cla := cluster.LoadAssignment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func initInferencePoolCollections(
poolCol,
func(ctx krt.HandlerContext, ip *inf.InferencePool) *ir.BackendObjectIR {
irPool := newInferencePool(ip)

// Propagate validation errors to the DP.
if errs := validatePool(ip, commonCol.Services); len(errs) > 0 {
irPool.setErrors(errs)
}

pods := krt.Fetch(ctx, commonCol.LocalityPods, krt.FilterGeneric(func(obj any) bool {
pod, ok := obj.(krtcollections.LocalityPod)
if !ok {
Expand All @@ -121,9 +127,8 @@ func initInferencePoolCollections(
eps = append(eps, endpoint{address: ip, port: irPool.targetPorts[0].number})
}
}
if len(eps) == 0 {
return nil
}
// Always return a backend IR so the static cluster exists.
// Endpoints may be empty on first pass, they'll populate in subsequent passes.
irPool.setEndpoints(eps)
return buildBackendObjIrFromPool(irPool)
},
Expand All @@ -135,7 +140,7 @@ func initInferencePoolCollections(
backendsDP,
func(_ krt.HandlerContext, be ir.BackendObjectIR) *ir.EndpointsForBackend {
stub := &envoyclusterv3.Cluster{Name: be.ClusterName()}
return processPoolBackendObjIR(ctx, be, stub, podIdx)
return processPoolBackendObjIR(ctx, be, stub)
},
)

Expand Down
Loading