Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/di/k8sxds.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ var K8sXdsSet = wire.NewSet(
ProvideLRSServer,
)

func ProvideSnapshotter(ctx context.Context, k8sClient kubernetes.Interface) (*snapshot.Snapshotter, func()) {
func ProvideSnapshotter(ctx context.Context, k8sClient kubernetes.Interface, subZoneLabel snapshot.SubZoneLabel) (*snapshot.Snapshotter, func()) {
stopCtx, stop := context.WithCancel(ctx)
snapshotter := snapshot.New(k8sClient)
snapshotter := snapshot.New(k8sClient, subZoneLabel)

go func() {
err := snapshotter.Start(stopCtx)
Expand Down
5 changes: 3 additions & 2 deletions internal/di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"github.com/google/wire"
"github.com/wongnai/xds/debug"
"github.com/wongnai/xds/snapshot"
"google.golang.org/grpc"
"k8s.io/client-go/kubernetes"
)
Expand All @@ -25,7 +26,7 @@ type DevServer struct {
GrpcServer *grpc.Server
}

func InitializeServer(ctx context.Context, statsIntervalSeconds StatsIntervalSeconds) (Servers, func(), error) {
func InitializeServer(ctx context.Context, statsIntervalSeconds StatsIntervalSeconds, subZoneLabel snapshot.SubZoneLabel) (Servers, func(), error) {
wire.Build(
KubernetesSet,
GrpcSet,
Expand All @@ -38,7 +39,7 @@ func InitializeServer(ctx context.Context, statsIntervalSeconds StatsIntervalSec
return Servers{}, nil, nil
}

func InitializeTestServer(ctx context.Context, kubeClient kubernetes.Interface, statsIntervalSeconds StatsIntervalSeconds) (TestServer, func(), error) {
func InitializeTestServer(ctx context.Context, kubeClient kubernetes.Interface, statsIntervalSeconds StatsIntervalSeconds, subZoneLabel snapshot.SubZoneLabel) (TestServer, func(), error) {
wire.Build(
GrpcSet,
K8sXdsSet,
Expand Down
9 changes: 5 additions & 4 deletions internal/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/wongnai/xds/internal/di"
"github.com/wongnai/xds/meter"
"github.com/wongnai/xds/snapshot"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"k8s.io/klog/v2"
)
Expand All @@ -19,13 +20,15 @@ func main() {

var statsIntervalInSeconds int64
flag.CommandLine.Int64Var(&statsIntervalInSeconds, "statsinterval", 300, "stats update interval in seconds")
subZoneLabel := flag.String("sub-zone-label", snapshot.DefaultSubZoneLabel,
"Kubernetes node label read as sub-zone when a Service requests sub_zone locality")
flag.Parse()

ctx := context.Background()

meter.InstallPromExporter()

servers, stop, err := di.InitializeServer(context.Background(), statsIntervalInSeconds)
servers, stop, err := di.InitializeServer(context.Background(), statsIntervalInSeconds, snapshot.SubZoneLabel(*subZoneLabel))
if err != nil {
klog.Fatal(err)
}
Expand Down
99 changes: 66 additions & 33 deletions snapshot/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package snapshot
import (
"context"
"fmt"
"maps"
"slices"
"sort"

"github.com/ccoveille/go-safecast/v2"
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/wongnai/xds/meter"
"go.opentelemetry.io/otel/metric"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -22,15 +23,15 @@ import (
)

type endpointCacheItem struct {
version string
resources []types.Resource
version string
nodeVersion uint64
mode LocalityMode
resources []types.Resource
}

func (s *Snapshotter) startEndpoints(ctx context.Context) error {
emit := func() {}

store := k8scache.NewUndeltaStore(func(v []interface{}) {
emit()
s.triggerEndpointsEmit()
}, k8scache.DeletionHandlingMetaNamespaceKeyFunc)

reflector := k8scache.NewReflector(&k8scache.ListWatch{
Expand All @@ -44,7 +45,7 @@ func (s *Snapshotter) startEndpoints(ctx context.Context) error {

var lastSnapshotHash uint64

emit = func() {
emit := func() {
version := reflector.LastSyncResourceVersion()
s.kubeEventCounter.Add(ctx, 1, metric.WithAttributes(meter.ResourceAttrKey.String("endpoints")))

Expand All @@ -53,7 +54,7 @@ func (s *Snapshotter) startEndpoints(ctx context.Context) error {
hash, err := resourcesHash(endpointsResources)
if err == nil {
if hash == lastSnapshotHash {
klog.V(5).Info("new snapshot is equivalent to the previous one")
klog.V(5).Info("new endpoints snapshot is equivalent to the previous one")
return
}
lastSnapshotHash = hash
Expand All @@ -64,13 +65,11 @@ func (s *Snapshotter) startEndpoints(ctx context.Context) error {
resourcesByType := resourcesToMap(endpointsResources)
s.setEndpointResourcesByType(resourcesByType)

snapshot, err := cache.NewSnapshot(version, resourcesByType)
if err != nil {
panic(err)
}

s.endpointsCache.SetSnapshot(ctx, "", snapshot)
s.endpointsCache.setResources(ctx, version, resourcesByType)
}
s.emitMu.Lock()
s.emitEndpointsFn = emit
s.emitMu.Unlock()

reflector.Run(ctx.Done())
return nil
Expand Down Expand Up @@ -101,40 +100,41 @@ func (s *Snapshotter) kubeEndpointToResources(ep *corev1.Endpoints) []types.Reso
klog.Errorf("fail to get object key: %s", err)
return nil
}
if val, ok := s.endpointResourceCache[name]; ok && val.version == ep.ResourceVersion {

mode := s.localityModeFor(ep.Namespace, ep.Name)
nodeVersion := s.nodeLocality.getVersion()

if val, ok := s.endpointResourceCache[name]; ok &&
val.version == ep.ResourceVersion &&
val.nodeVersion == nodeVersion &&
val.mode == mode {
return val.resources
}

var out []types.Resource

for _, subset := range ep.Subsets {
for _, port := range subset.Ports {
var portName string
var clusterName string
if port.Name == "" {
portName = fmt.Sprintf("%s.%s:%d", ep.Name, ep.Namespace, port.Port)
clusterName = fmt.Sprintf("%s.%s:%d", ep.Name, ep.Namespace, port.Port)
} else {
portName = fmt.Sprintf("%s.%s:%s", ep.Name, ep.Namespace, port.Name)
clusterName = fmt.Sprintf("%s.%s:%s", ep.Name, ep.Namespace, port.Name)
}

cla := &endpointv3.ClusterLoadAssignment{
ClusterName: portName,
Endpoints: []*endpointv3.LocalityLbEndpoints{
{
LoadBalancingWeight: wrapperspb.UInt32(1),
Locality: &corev3.Locality{},
LbEndpoints: []*endpointv3.LbEndpoint{},
},
},
ClusterName: clusterName,
}
out = append(out, cla)

sortedAddresses := subset.Addresses
sort.SliceStable(sortedAddresses, func(i, j int) bool {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might as well use slices.SortStable if you're changing this code.

Also, not sure if this was a problem before, but both of these sort the slice in place. They don't make a copy.

l := sortedAddresses[i].IP
r := sortedAddresses[j].IP
return l < r
return sortedAddresses[i].IP < sortedAddresses[j].IP
})

portU32 := safecast.MustConvert[uint32](port.Port)
groups := map[string]*endpointv3.LocalityLbEndpoints{}

for _, addr := range sortedAddresses {
hostname := addr.Hostname
if hostname == "" && addr.TargetRef != nil {
Expand All @@ -143,9 +143,18 @@ func (s *Snapshotter) kubeEndpointToResources(ep *corev1.Endpoints) []types.Reso
if hostname == "" && addr.NodeName != nil {
hostname = *addr.NodeName
}
portU32 := safecast.MustConvert[uint32](port.Port)

cla.Endpoints[0].LbEndpoints = append(cla.Endpoints[0].LbEndpoints, &endpointv3.LbEndpoint{
loc := s.localityForAddress(addr, mode)
key := loc.GetZone() + localityKeySep + loc.GetSubZone()
g, ok := groups[key]
if !ok {
g = &endpointv3.LocalityLbEndpoints{
Locality: loc,
LoadBalancingWeight: wrapperspb.UInt32(1),
}
groups[key] = g
}
g.LbEndpoints = append(g.LbEndpoints, &endpointv3.LbEndpoint{
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to do this even if !ok (g is the zero value)?

HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &corev3.Address{
Expand All @@ -164,13 +173,37 @@ func (s *Snapshotter) kubeEndpointToResources(ep *corev1.Endpoints) []types.Reso
},
})
}

// Emit in sorted locality-key order so hash is stable.
for _, k := range slices.Sorted(maps.Keys(groups)) {
cla.Endpoints = append(cla.Endpoints, groups[k])
}
}
}

s.endpointResourceCache[name] = endpointCacheItem{
version: ep.ResourceVersion,
resources: out,
version: ep.ResourceVersion,
nodeVersion: nodeVersion,
mode: mode,
resources: out,
}

return out
}

// localityForAddress builds the Locality for a single endpoint
// address according to the service's locality mode.
func (s *Snapshotter) localityForAddress(addr corev1.EndpointAddress, mode LocalityMode) *corev3.Locality {
if mode == LocalityNone || addr.NodeName == nil {
return &corev3.Locality{}
}
info := s.nodeLocality.get(*addr.NodeName)
switch mode {
case LocalityZone:
return &corev3.Locality{Zone: info.zone}
case LocalitySubZone:
return &corev3.Locality{Zone: info.zone, SubZone: info.subZone}
default:
return &corev3.Locality{}
}
}
Loading