@@ -11,9 +11,7 @@ import (
1111 "google.golang.org/protobuf/types/known/anypb"
1212 "google.golang.org/protobuf/types/known/structpb"
1313 "google.golang.org/protobuf/types/known/wrapperspb"
14- "istio.io/istio/pkg/kube/krt"
1514
16- "github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
1715 "github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils"
1816 "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/ir"
1917)
@@ -22,34 +20,57 @@ func processPoolBackendObjIR(
2220 ctx context.Context ,
2321 in ir.BackendObjectIR ,
2422 out * envoyclusterv3.Cluster ,
25- podIdx krt.Index [string , krtcollections.LocalityPod ],
2623) * ir.EndpointsForBackend {
27- // Build an endpoint list
2824 irPool := in .ObjIr .(* inferencePool )
29- poolEps := irPool .resolvePoolEndpoints (podIdx )
30- if len (poolEps ) == 0 {
31- logger .Warn ("no endpoints resolved for InferencePool" ,
32- "namespace" , irPool .obj .GetNamespace (),
33- "name" , irPool .obj .GetName ())
34- }
3525
36- // If the pool has errors, create an empty LoadAssignment to return a 503
26+ // Always set the cluster name up front so error paths program the right cluster.
27+ out .Name = in .ClusterName ()
28+ out .ClusterDiscoveryType = & envoyclusterv3.Cluster_Type {Type : envoyclusterv3 .Cluster_STATIC }
29+ out .LbPolicy = envoyclusterv3 .Cluster_ROUND_ROBIN
30+
31+ // If the pool has errors, create an empty LoadAssignment to return a 503.
3732 if irPool .hasErrors () {
33+ errs := irPool .snapshotErrors ()
3834 logger .Debug ("skipping endpoints due to InferencePool errors" ,
3935 "pool" , in .ResourceName (),
40- "errors" , irPool . errors ,
36+ "errors" , errs ,
4137 )
38+
4239 out .LoadAssignment = & envoyendpointv3.ClusterLoadAssignment {
4340 ClusterName : out .Name ,
4441 Endpoints : []* envoyendpointv3.LocalityLbEndpoints {{}},
4542 }
43+
44+ // Still set subset config so Envoy’s view is consistent, but it won’t matter with no endpoints.
45+ out .LbSubsetConfig = & envoyclusterv3.Cluster_LbSubsetConfig {
46+ SubsetSelectors : []* envoyclusterv3.Cluster_LbSubsetConfig_LbSubsetSelector {{
47+ Keys : []string {dstEndpointKey },
48+ }},
49+ FallbackPolicy : envoyclusterv3 .Cluster_LbSubsetConfig_ANY_ENDPOINT ,
50+ }
51+
52+ // TODO [danehans]: Set H1/H2 app protocol programmatically:
53+ // https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1273
54+ addHTTP1 (out )
55+ out .CircuitBreakers = & envoyclusterv3.CircuitBreakers {
56+ Thresholds : []* envoyclusterv3.CircuitBreakers_Thresholds {{
57+ MaxConnections : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
58+ MaxPendingRequests : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
59+ MaxRequests : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
60+ }},
61+ }
62+
4663 return nil
4764 }
4865
49- // Static cluster with subset lb config
50- out .Name = in .ClusterName ()
51- out .ClusterDiscoveryType = & envoyclusterv3.Cluster_Type {Type : envoyclusterv3 .Cluster_STATIC }
52- out .LbPolicy = envoyclusterv3 .Cluster_ROUND_ROBIN
66+ // Build the static cluster with subset lb from IR endpoints.
67+ poolEps := irPool .getEndpoints ()
68+ if len (poolEps ) == 0 {
69+ logger .Warn ("no endpoints resolved for InferencePool" ,
70+ "namespace" , irPool .obj .GetNamespace (),
71+ "name" , irPool .obj .GetName ())
72+ }
73+
5374 out .LbSubsetConfig = & envoyclusterv3.Cluster_LbSubsetConfig {
5475 SubsetSelectors : []* envoyclusterv3.Cluster_LbSubsetConfig_LbSubsetSelector {{
5576 Keys : []string {dstEndpointKey },
@@ -78,15 +99,14 @@ func processPoolBackendObjIR(
7899 continue
79100 }
80101
81- // Build the LB endpoint
82- lbEp := & envoyendpointv3.LbEndpoint {
102+ lbEndpoints = append (lbEndpoints , & envoyendpointv3.LbEndpoint {
83103 HostIdentifier : & envoyendpointv3.LbEndpoint_Endpoint {
84104 Endpoint : & envoyendpointv3.Endpoint {
85105 Address : & envoycorev3.Address {
86106 Address : & envoycorev3.Address_SocketAddress {
87107 SocketAddress : & envoycorev3.SocketAddress {
88108 Address : ep .address ,
89- PortSpecifier : & envoycorev3.SocketAddress_PortValue {PortValue : uint32 (ep .port )}, //nolint:gosec // G115: ep.port is int32 representing a port number, always in valid range
109+ PortSpecifier : & envoycorev3.SocketAddress_PortValue {PortValue : uint32 (ep .port )}, //nolint:gosec // ep.port is a valid int32 port
90110 },
91111 },
92112 },
@@ -98,29 +118,20 @@ func processPoolBackendObjIR(
98118 envoyLbNamespace : mdStruct ,
99119 },
100120 },
101- }
102- lbEndpoints = append (lbEndpoints , lbEp )
121+ })
103122 }
104123
105- // Attach the endpoints to the cluster load assignment
106124 out .LoadAssignment = & envoyendpointv3.ClusterLoadAssignment {
107125 ClusterName : out .Name ,
108- Endpoints : []* envoyendpointv3.LocalityLbEndpoints {{
109- LbEndpoints : lbEndpoints ,
110- }},
126+ Endpoints : []* envoyendpointv3.LocalityLbEndpoints {{LbEndpoints : lbEndpoints }},
111127 }
112-
113128 out .CircuitBreakers = & envoyclusterv3.CircuitBreakers {
114- Thresholds : []* envoyclusterv3.CircuitBreakers_Thresholds {
115- {
116- MaxConnections : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
117- MaxPendingRequests : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
118- MaxRequests : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
119- },
120- },
129+ Thresholds : []* envoyclusterv3.CircuitBreakers_Thresholds {{
130+ MaxConnections : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
131+ MaxPendingRequests : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
132+ MaxRequests : wrapperspb .UInt32 (defaultExtProcMaxRequests ),
133+ }},
121134 }
122-
123- // Return nil since we're building a static cluster
124135 return nil
125136}
126137
0 commit comments