Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 114 additions & 18 deletions pkg/kgateway/extensions2/plugins/waypoint/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

apisettings "github.com/kgateway-dev/kgateway/v2/api/settings"
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/extensions2/plugins/waypoint/waypointquery"
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/query"
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/utils"
"github.com/kgateway-dev/kgateway/v2/pkg/kgateway/wellknown"
sdk "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
Expand Down Expand Up @@ -116,15 +118,23 @@ func (t *PerClientProcessor) processBackend(kctx krt.HandlerContext, ctx context
}

// All preliminary checks passed, process the ingress use waypoint
processIngressUseWaypoint(in, out)
processIngressUseWaypoint(in, out, &t.commonCols.Settings)
}

// processIngressUseWaypoint configures the cluster of the connected gateway to have a static
// inlined addresses of the destination service. This will cause the traffic from the kgateway
// to be redirected to the waypoint by the ztunnel.
func processIngressUseWaypoint(in ir.BackendObjectIR, out *envoyclusterv3.Cluster) {
// Addresses are sorted based on DNS lookup family setting, with the primary address in Address
// and additional addresses in AdditionalAddresses.
func processIngressUseWaypoint(in ir.BackendObjectIR, out *envoyclusterv3.Cluster, settings *apisettings.Settings) {
addresses := waypointquery.BackendAddresses(in)

// Sort addresses based on DNS lookup family setting. Since this is a static cluster
// with inlined addresses, we can't use DnsLookupFamily (which only applies to DNS-based
// discovery). Instead, we sort the addresses based on the setting and use the primary
// address in Address and additional addresses in AdditionalAddresses.
sortedAddresses := sortAddressesByDnsLookupFamily(addresses, settings)

// Set the output cluster to be of type STATIC and instead of the default EDS and add
// the addresses of the backend embedded into the CLA of this cluster config.
out.ClusterDiscoveryType = &envoyclusterv3.Cluster_Type{
Expand All @@ -133,37 +143,123 @@ func processIngressUseWaypoint(in ir.BackendObjectIR, out *envoyclusterv3.Cluste
out.EdsClusterConfig = nil
out.LoadAssignment = &envoyendpointv3.ClusterLoadAssignment{
ClusterName: out.GetName(),
Endpoints: make([]*envoyendpointv3.LocalityLbEndpoints, 0, len(addresses)),
Endpoints: make([]*envoyendpointv3.LocalityLbEndpoints, 0, 1),
}

for _, addr := range addresses {
out.GetLoadAssignment().Endpoints = append(out.GetLoadAssignment().GetEndpoints(), claEndpoint(addr, uint32(in.Port))) //nolint:gosec // G115: BackendObjectIR.Port is int32 representing a port number, always in valid range
if endpoint := claEndpoint(sortedAddresses, uint32(in.Port)); endpoint != nil { //nolint:gosec // G115: BackendObjectIR.Port is int32 representing a port number, always in valid range
out.GetLoadAssignment().Endpoints = append(out.GetLoadAssignment().GetEndpoints(), endpoint)
}
}

func claEndpoint(address string, port uint32) *envoyendpointv3.LocalityLbEndpoints {
// claEndpoint creates a LocalityLbEndpoints with the primary address in Address
// and additional addresses in AdditionalAddresses.
func claEndpoint(addresses []string, port uint32) *envoyendpointv3.LocalityLbEndpoints {
if len(addresses) == 0 {
return nil
}

// Primary address goes in Address
primaryAddr := addresses[0]
endpoint := &envoyendpointv3.Endpoint{
Address: &envoycorev3.Address{
Address: &envoycorev3.Address_SocketAddress{
SocketAddress: &envoycorev3.SocketAddress{
Address: primaryAddr,
PortSpecifier: &envoycorev3.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
}

// Additional addresses go in AdditionalAddresses
if len(addresses) > 1 {
additionalAddresses := make([]*envoyendpointv3.Endpoint_AdditionalAddress, 0, len(addresses)-1)
for _, addr := range addresses[1:] {
additionalAddresses = append(additionalAddresses, &envoyendpointv3.Endpoint_AdditionalAddress{
Address: &envoycorev3.Address{
Address: &envoycorev3.Address_SocketAddress{
SocketAddress: &envoycorev3.SocketAddress{
Address: addr,
PortSpecifier: &envoycorev3.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
})
}
endpoint.AdditionalAddresses = additionalAddresses
}

return &envoyendpointv3.LocalityLbEndpoints{
LbEndpoints: []*envoyendpointv3.LbEndpoint{
{
HostIdentifier: &envoyendpointv3.LbEndpoint_Endpoint{
Endpoint: &envoyendpointv3.Endpoint{
Address: &envoycorev3.Address{
Address: &envoycorev3.Address_SocketAddress{
SocketAddress: &envoycorev3.SocketAddress{
Address: address,
PortSpecifier: &envoycorev3.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
},
Endpoint: endpoint,
},
},
},
}
}

// sortAddressesByDnsLookupFamily sorts addresses based on the DNS lookup family setting.
// Returns a sorted list of addresses where the first address will be used as primary
// (in Address) and the rest as additional (in AdditionalAddresses).
// Since static clusters can't use DnsLookupFamily (it only applies to DNS-based discovery),
// we sort the addresses based on the setting.
func sortAddressesByDnsLookupFamily(addresses []string, settings *apisettings.Settings) []string {
if settings == nil {
// Default to V4_PREFERRED if settings are not available
return sortAddressesByDnsLookupFamily(addresses, &apisettings.Settings{
DnsLookupFamily: apisettings.DnsLookupFamilyV4Preferred,
})
}

// For ALL mode, we don't need to separate by family - just return all addresses
if settings.DnsLookupFamily == apisettings.DnsLookupFamilyAll {
return addresses
}

// Separate IPv4 and IPv6 addresses for other modes
var ipv4Addrs, ipv6Addrs []string
for _, addr := range addresses {
validIPv4, _, err := utils.IsIpv4Address(addr)
if err != nil {
// Skip invalid addresses
continue
}
if validIPv4 {
ipv4Addrs = append(ipv4Addrs, addr)
} else {
ipv6Addrs = append(ipv6Addrs, addr)
}
}

// Sort based on DNS lookup family setting
var sortedAddresses []string
switch settings.DnsLookupFamily {
case apisettings.DnsLookupFamilyV4Only:
// Only IPv4 addresses
sortedAddresses = ipv4Addrs
case apisettings.DnsLookupFamilyV6Only:
// Only IPv6 addresses
sortedAddresses = ipv6Addrs
case apisettings.DnsLookupFamilyV4Preferred:
// IPv4 first, then IPv6 as additional addresses
sortedAddresses = append(ipv4Addrs, ipv6Addrs...)
case apisettings.DnsLookupFamilyAuto:
// IPv6 first, then IPv4 as additional addresses
sortedAddresses = append(ipv6Addrs, ipv4Addrs...)
default:
// Default to V4_PREFERRED for unknown values
sortedAddresses = append(ipv4Addrs, ipv6Addrs...)
}

return sortedAddresses
}

// hasIngressUseWaypointLabel checks if the backend or any relevant namespace/alias has the ingress-use-waypoint label.
func hasIngressUseWaypointLabel(kctx krt.HandlerContext, commonCols *collections.CommonCollections, in ir.BackendObjectIR) bool {
// Check the backend's own label first
Expand Down
Loading