Skip to content

Commit 093e95f

Browse files
committed
feat: support traffic distribution
1 parent 8ce4af2 commit 093e95f

File tree

4 files changed

+194
-1
lines changed

4 files changed

+194
-1
lines changed

internal/ingress/controller/controller.go

+7
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@ func getIngressPodZone(svc *apiv1.Service) string {
156156
}
157157
}
158158
}
159+
if svc.Spec.TrafficDistribution != nil && *svc.Spec.TrafficDistribution == apiv1.ServiceTrafficDistributionPreferClose {
160+
if foundZone, ok := k8s.IngressNodeDetails.GetLabels()[apiv1.LabelTopologyZone]; ok {
161+
klog.V(3).Infof("Svc has traffic distribution enabled, try to use zone %q where controller pod is running for Service %q ", foundZone, svcKey)
162+
return foundZone
163+
}
164+
}
165+
159166
return emptyZone
160167
}
161168

internal/ingress/controller/endpointslices.go

+22
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c
4444
return upsServers
4545
}
4646

47+
// we need to check if there is at least one endpoint with controller zone
48+
// if we use traffic distribution
49+
useTrafficDistribution := s.Spec.TrafficDistribution != nil && *s.Spec.TrafficDistribution == corev1.ServiceTrafficDistributionPreferClose
50+
4751
// using a map avoids duplicated upstream servers when the service
4852
// contains multiple port definitions sharing the same targetport
4953
processedUpstreamServers := make(map[string]struct{})
@@ -115,6 +119,24 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c
115119
useTopologyHints = false
116120
if zoneForHints != emptyZone {
117121
useTopologyHints = true
122+
123+
// check if endpointslices have zone hints with controller zone
124+
if useTrafficDistribution {
125+
foundEndpointsForZone := false
126+
for _, ep := range eps.Endpoints {
127+
for _, epzone := range ep.Hints.ForZones {
128+
if epzone.Name == zoneForHints {
129+
foundEndpointsForZone = true
130+
break
131+
}
132+
}
133+
}
134+
if !foundEndpointsForZone {
135+
klog.V(3).Infof("No endpoints found for zone %q in Service %q", zoneForHints, svcKey)
136+
useTopologyHints = false
137+
}
138+
}
139+
118140
// check if all endpointslices have zone hints
119141
for _, ep := range eps.Endpoints {
120142
if ep.Hints == nil || len(ep.Hints.ForZones) == 0 {

internal/ingress/controller/endpointslices_test.go

+164
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"fmt"
2121
"testing"
2222

23+
"k8s.io/utils/ptr"
24+
2325
corev1 "k8s.io/api/core/v1"
2426
discoveryv1 "k8s.io/api/discovery/v1"
2527
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -916,6 +918,168 @@ func TestGetEndpointsFromSlices(t *testing.T) {
916918
},
917919
},
918920
},
921+
{
922+
"should return one endpoint which belongs to zone",
923+
&corev1.Service{
924+
Spec: corev1.ServiceSpec{
925+
Type: corev1.ServiceTypeClusterIP,
926+
ClusterIP: "1.1.1.1",
927+
Ports: []corev1.ServicePort{
928+
{
929+
Name: "default",
930+
TargetPort: intstr.FromString("port-1"),
931+
},
932+
},
933+
TrafficDistribution: ptr.To(corev1.ServiceTrafficDistributionPreferClose),
934+
},
935+
},
936+
&corev1.ServicePort{
937+
Name: "port-1",
938+
TargetPort: intstr.FromString("port-1"),
939+
},
940+
corev1.ProtocolTCP,
941+
"eu-west-1b",
942+
func(string) ([]*discoveryv1.EndpointSlice, error) {
943+
return []*discoveryv1.EndpointSlice{{
944+
ObjectMeta: metav1.ObjectMeta{
945+
Labels: map[string]string{discoveryv1.LabelServiceName: "default"},
946+
},
947+
Endpoints: []discoveryv1.Endpoint{
948+
{
949+
Addresses: []string{"1.1.1.1"},
950+
Conditions: discoveryv1.EndpointConditions{
951+
Ready: &[]bool{true}[0],
952+
},
953+
Hints: &[]discoveryv1.EndpointHints{{
954+
ForZones: []discoveryv1.ForZone{{
955+
Name: "eu-west-1b",
956+
}},
957+
}}[0],
958+
},
959+
{
960+
Addresses: []string{"1.1.1.2"},
961+
Conditions: discoveryv1.EndpointConditions{
962+
Ready: &[]bool{true}[0],
963+
},
964+
Hints: &[]discoveryv1.EndpointHints{{
965+
ForZones: []discoveryv1.ForZone{{
966+
Name: "eu-west-1a",
967+
}},
968+
}}[0],
969+
},
970+
{
971+
Addresses: []string{"1.1.1.3"},
972+
Conditions: discoveryv1.EndpointConditions{
973+
Ready: &[]bool{true}[0],
974+
},
975+
Hints: &[]discoveryv1.EndpointHints{{
976+
ForZones: []discoveryv1.ForZone{{
977+
Name: "eu-west-1c",
978+
}},
979+
}}[0],
980+
},
981+
},
982+
Ports: []discoveryv1.EndpointPort{
983+
{
984+
Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0],
985+
Port: &[]int32{80}[0],
986+
Name: &[]string{"port-1"}[0],
987+
},
988+
},
989+
}}, nil
990+
},
991+
[]ingress.Endpoint{
992+
{
993+
Address: "1.1.1.1",
994+
Port: "80",
995+
},
996+
},
997+
},
998+
{
999+
"should return all endpoints because no endpoints with controller zone",
1000+
&corev1.Service{
1001+
Spec: corev1.ServiceSpec{
1002+
Type: corev1.ServiceTypeClusterIP,
1003+
ClusterIP: "1.1.1.1",
1004+
Ports: []corev1.ServicePort{
1005+
{
1006+
Name: "default",
1007+
TargetPort: intstr.FromString("port-1"),
1008+
},
1009+
},
1010+
TrafficDistribution: ptr.To(corev1.ServiceTrafficDistributionPreferClose),
1011+
},
1012+
},
1013+
&corev1.ServicePort{
1014+
Name: "port-1",
1015+
TargetPort: intstr.FromString("port-1"),
1016+
},
1017+
corev1.ProtocolTCP,
1018+
"eu-west-1b",
1019+
func(string) ([]*discoveryv1.EndpointSlice, error) {
1020+
return []*discoveryv1.EndpointSlice{{
1021+
ObjectMeta: metav1.ObjectMeta{
1022+
Labels: map[string]string{discoveryv1.LabelServiceName: "default"},
1023+
},
1024+
Endpoints: []discoveryv1.Endpoint{
1025+
{
1026+
Addresses: []string{"1.1.1.1"},
1027+
Conditions: discoveryv1.EndpointConditions{
1028+
Ready: &[]bool{true}[0],
1029+
},
1030+
Hints: &[]discoveryv1.EndpointHints{{
1031+
ForZones: []discoveryv1.ForZone{{
1032+
Name: "eu-west-1a",
1033+
}},
1034+
}}[0],
1035+
},
1036+
{
1037+
Addresses: []string{"1.1.1.2"},
1038+
Conditions: discoveryv1.EndpointConditions{
1039+
Ready: &[]bool{true}[0],
1040+
},
1041+
Hints: &[]discoveryv1.EndpointHints{{
1042+
ForZones: []discoveryv1.ForZone{{
1043+
Name: "eu-west-1c",
1044+
}},
1045+
}}[0],
1046+
},
1047+
{
1048+
Addresses: []string{"1.1.1.3"},
1049+
Conditions: discoveryv1.EndpointConditions{
1050+
Ready: &[]bool{true}[0],
1051+
},
1052+
Hints: &[]discoveryv1.EndpointHints{{
1053+
ForZones: []discoveryv1.ForZone{{
1054+
Name: "eu-west-1c",
1055+
}},
1056+
}}[0],
1057+
},
1058+
},
1059+
Ports: []discoveryv1.EndpointPort{
1060+
{
1061+
Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0],
1062+
Port: &[]int32{80}[0],
1063+
Name: &[]string{"port-1"}[0],
1064+
},
1065+
},
1066+
}}, nil
1067+
},
1068+
[]ingress.Endpoint{
1069+
{
1070+
Address: "1.1.1.1",
1071+
Port: "80",
1072+
},
1073+
{
1074+
Address: "1.1.1.2",
1075+
Port: "80",
1076+
},
1077+
{
1078+
Address: "1.1.1.3",
1079+
Port: "80",
1080+
},
1081+
},
1082+
},
9191083
}
9201084

9211085
for _, testCase := range tests {

pkg/flags/flags.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ Takes the form "<host>:port". If not provided, no admission controller is starte
232232

233233
disableSyncEvents = flags.Bool("disable-sync-events", false, "Disables the creation of 'Sync' event resources")
234234

235-
enableTopologyAwareRouting = flags.Bool("enable-topology-aware-routing", false, "Enable topology aware routing feature, needs service object annotation service.kubernetes.io/topology-mode sets to auto.")
235+
enableTopologyAwareRouting = flags.Bool("enable-topology-aware-routing", false, "Enable topology aware routing feature, needs service object annotation service.kubernetes.io/topology-mode sets to auto or trafficDistribution.")
236236
)
237237

238238
flags.StringVar(&nginx.MaxmindMirror, "maxmind-mirror", "", `Maxmind mirror url (example: http://geoip.local/databases.`)

0 commit comments

Comments
 (0)