Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
46f79a1
wip
jmcguire98 Oct 27, 2025
da48459
draft impl
jmcguire98 Oct 28, 2025
4f49dff
start wiring up status reporting
jmcguire98 Oct 28, 2025
b4185f4
finish rough implementation
jmcguire98 Oct 28, 2025
4aa4cbe
update role to allow watching events
jmcguire98 Oct 28, 2025
f2867c0
register events with kclient and create them in the system ns so we c…
jmcguire98 Oct 28, 2025
e873205
fix nack publisher config order
jmcguire98 Oct 29, 2025
f569102
fix event creation
jmcguire98 Oct 29, 2025
62e7768
sort events on processing so acks recoveries work right
jmcguire98 Oct 29, 2025
375fc6b
various cleanups
jmcguire98 Oct 29, 2025
9f53605
replace status more correctly
jmcguire98 Oct 29, 2025
8662168
fix key in ack/nack map
jmcguire98 Oct 29, 2025
11fdf86
cleanup event registration, undo setup changes no longer needed
jmcguire98 Oct 29, 2025
5f1d214
cleanup no longer needed changes to startup
jmcguire98 Oct 29, 2025
1e25af9
comment and unused var cleanups
jmcguire98 Oct 29, 2025
2ce09bd
gen
jmcguire98 Oct 29, 2025
b566384
lints
jmcguire98 Oct 29, 2025
252fa09
remove use of systemnamespace since we no longer need it
jmcguire98 Oct 29, 2025
d93746e
delete gateways from nackStateStore on removeNack when they have no r…
jmcguire98 Oct 29, 2025
f76fe63
use eventRecorder to publish events in nackPublisher
jmcguire98 Oct 29, 2025
72c5f95
resolve context todo
jmcguire98 Oct 29, 2025
47ec6c9
address shouldRespondDelta todo
jmcguire98 Oct 29, 2025
bf06891
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Oct 29, 2025
52f1bd6
regen
jmcguire98 Oct 29, 2025
0a54e70
remove unnecessary context use from publisher
jmcguire98 Oct 30, 2025
fa361f3
rename events.go -> eventmeta.go
jmcguire98 Oct 30, 2025
da1e355
unexport newPublisher and let NewNackHandler be responsible for creat…
jmcguire98 Oct 30, 2025
b713680
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Oct 30, 2025
5bacfb7
add unit testing for nack handler
jmcguire98 Oct 31, 2025
12d35b0
add publisher unit tests
jmcguire98 Oct 31, 2025
cecd241
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Oct 31, 2025
1e1dcdb
fix fmt
jmcguire98 Oct 31, 2025
e24a5de
lint
jmcguire98 Oct 31, 2025
ebc43f0
cleanup godoc comments, unexport some constants
jmcguire98 Nov 4, 2025
a98be59
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Nov 4, 2025
631e0a6
clean up things we are no longer planning to do (ack handling, status…
jmcguire98 Nov 6, 2025
e96019d
publish messages with the deployment as the involved object as well
jmcguire98 Nov 6, 2025
f77d524
cleanup
jmcguire98 Nov 6, 2025
d0e1506
pare back event permissions since we don't need an event collection a…
jmcguire98 Nov 6, 2025
2e00fba
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Nov 6, 2025
c3eb680
don't try to append resource names to event message for now
jmcguire98 Nov 6, 2025
261a216
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Nov 6, 2025
538268a
fix merge conflict formatting
jmcguire98 Nov 6, 2025
16fcf34
plumb through context and get gtw and dep so we can set UID on the ev…
jmcguire98 Nov 6, 2025
b2f92e0
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Nov 6, 2025
0dd472e
log k8s get errors and return early
jmcguire98 Nov 6, 2025
b3dacc9
fix unit tests by adding resources to teh fake client
jmcguire98 Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ func (tc TestCase) Run(
// Instead of calling full Init(), manually initialize just what we need for testing
// to avoid race conditions with XDS collection building
agentGwSyncer := NewAgwSyncer(
context.TODO(),
wellknown.DefaultAgwControllerName,
cli,
agwCollections,
Expand Down
23 changes: 19 additions & 4 deletions internal/kgateway/agentgatewaysyncer/krtxds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

_ "istio.io/istio/pkg/util/protomarshal" // Ensure we get the more efficient vtproto gRPC encoder

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/agentgatewaysyncer/nack"
kgwxds "github.com/kgateway-dev/kgateway/v2/internal/kgateway/xds"
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
"github.com/kgateway-dev/kgateway/v2/pkg/metrics"
Expand Down Expand Up @@ -162,8 +163,8 @@ func Collection[T IntoProto[TT], TT proto.Message](collection krt.Collection[T],
return PerGatewayCollection(collection, nil, krtopts)
}

// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(debugger *krt.DebugHandler, reg ...Registration) *DiscoveryServer {
// NewDiscoveryServer creates a DiscoveryServer for agentgateway that sources data from KRT collections via registered generators
func NewDiscoveryServer(debugger *krt.DebugHandler, eventPublisher *nack.NackEventPublisher, reg ...Registration) *DiscoveryServer {
out := &DiscoveryServer{
concurrentPushLimit: make(chan struct{}, features.PushThrottle),
RequestRateLimit: rate.NewLimiter(rate.Limit(features.RequestLimit), 1),
Expand All @@ -174,6 +175,7 @@ func NewDiscoveryServer(debugger *krt.DebugHandler, reg ...Registration) *Discov
debugHandlers: map[string]string{},
adsClients: map[string]*Connection{},
krtDebugger: debugger,
nackHandler: eventPublisher,
DebounceOptions: DebounceOptions{
DebounceAfter: features.DebounceAfter,
DebounceMax: features.DebounceMax,
Expand Down Expand Up @@ -233,6 +235,8 @@ type DiscoveryServer struct {
krtDebugger *krt.DebugHandler
pushOrder []string
registrations []CollectionRegistration

nackHandler *nack.NackEventPublisher
}

// Proxy contains information about an specific instance of a proxy.
Expand Down Expand Up @@ -473,7 +477,7 @@ func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryReque
stype := v3.GetShortType(req.TypeUrl)
log.Debug("ADS: REQ resources", "type", stype, "connection", con.ID(), "subscribe", len(req.ResourceNamesSubscribe), "unsubscribe", len(req.ResourceNamesUnsubscribe), "nonce", req.ResponseNonce)

shouldRespond := shouldRespondDelta(con, req)
shouldRespond := shouldRespondDelta(con, req, s.nackHandler)
if !shouldRespond {
log.Debug("no response needed")
return nil
Expand All @@ -498,7 +502,7 @@ func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryReque

// shouldRespondDelta determines whether this request needs to be responded back. It applies the ack/nack rules as per xds protocol
// using WatchedResource for previous state and discovery request for the current state.
func shouldRespondDelta(con *Connection, request *discovery.DeltaDiscoveryRequest) bool {
func shouldRespondDelta(con *Connection, request *discovery.DeltaDiscoveryRequest, nackHandler *nack.NackEventPublisher) bool {
stype := v3.GetShortType(request.TypeUrl)

// If there is an error in request that means previous response is erroneous.
Expand All @@ -513,6 +517,17 @@ func shouldRespondDelta(con *Connection, request *discovery.DeltaDiscoveryReques
wr.LastError = request.ErrorDetail.GetMessage()
return wr
})

if nackHandler != nil {
gateway := kgwxds.AgentgatewayID(con.node)
nackEvent := nack.NackEvent{
Gateway: gateway,
TypeUrl: request.TypeUrl,
ErrorMsg: request.ErrorDetail.GetMessage(),
Timestamp: time.Now(),
}
nackHandler.PublishNack(&nackEvent)
}
return false
}

Expand Down
4 changes: 2 additions & 2 deletions internal/kgateway/agentgatewaysyncer/krtxds/xds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func NewFakeDiscoveryServer(t *testing.T, initialAddress ...agentgatewaysyncer.A
krtxds.Collection[agentgatewaysyncer.Address, *api.Address](xdsAddress, opts),
krtxds.PerGatewayCollection[agwir.AgwResource, *api.Resource](xdsResource, agwResourcesByGateway, opts),
}

s := krtxds.NewDiscoveryServer(opts.Debugger, reg...)
// we won't need a mock nack event publisher for this testing, so we pass nil
s := krtxds.NewDiscoveryServer(opts.Debugger, nil, reg...)
s.Start(stop)
xdsAddress.WaitUntilSynced(stop)
xdsResource.WaitUntilSynced(stop)
Expand Down
34 changes: 34 additions & 0 deletions internal/kgateway/agentgatewaysyncer/nack/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package nack

import (
"context"
"time"

"istio.io/istio/pkg/kube"
"k8s.io/apimachinery/pkg/types"
)

type NackEventPublisher struct {
nackPublisher *Publisher
ctx context.Context
}

// NackEvent represents a NACK received from an agentgateway gateway
type NackEvent struct {
Gateway types.NamespacedName
TypeUrl string
ErrorMsg string
Timestamp time.Time
}

func NewNackEventPublisher(ctx context.Context, client kube.Client) *NackEventPublisher {
return &NackEventPublisher{
nackPublisher: newPublisher(client),
ctx: ctx,
}
}

// PublishNack publishes a NACK event to the Kubernetes Event API.
func (n *NackEventPublisher) PublishNack(nackEvent *NackEvent) {
n.nackPublisher.onNack(n.ctx, *nackEvent)
}
83 changes: 83 additions & 0 deletions internal/kgateway/agentgatewaysyncer/nack/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package nack

import (
"context"

"istio.io/istio/pkg/kube"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
"github.com/kgateway-dev/kgateway/v2/pkg/schemes"
)

var log = logging.New("nack/publisher")

// Event reasons for Kubernetes Events created by agentgateway NACK detection
const (
ReasonNack = "AgentGatewayNackError"
)

// Publisher converts NACK events from the agentgateway xDS server into Kubernetes Events.
type Publisher struct {
client kube.Client
eventRecorder record.EventRecorder
}

// newPublisher creates a new NACK event publisher that will publish k8s events
func newPublisher(client kube.Client) *Publisher {
eventBroadcaster := record.NewBroadcaster()
eventRecorder := eventBroadcaster.NewRecorder(
schemes.DefaultScheme(),
corev1.EventSource{Component: wellknown.DefaultAgwControllerName},
)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
Interface: client.Kube().CoreV1().Events(""),
})

return &Publisher{
client: client,
eventRecorder: eventRecorder,
}
}

// onNack publishes a NACK event as a k8s event.
func (p *Publisher) onNack(ctx context.Context, event NackEvent) {
var gatewayUID, deployUID types.UID
gw, err := p.client.GatewayAPI().GatewayV1().Gateways(event.Gateway.Namespace).Get(ctx, event.Gateway.Name, metav1.GetOptions{})
if err != nil {
log.Error("failed to get gateway", "error", err)
return
}
gatewayUID = gw.GetUID()
dep, err := p.client.Kube().AppsV1().Deployments(event.Gateway.Namespace).Get(ctx, event.Gateway.Name, metav1.GetOptions{})
if err != nil {
log.Error("failed to get deployment", "error", err)
return
}
deployUID = dep.GetUID()

gatewayRef := &corev1.ObjectReference{
Kind: wellknown.GatewayKind,
APIVersion: wellknown.GatewayGVK.GroupVersion().String(),
Name: event.Gateway.Name,
Namespace: event.Gateway.Namespace,
UID: gatewayUID,
}
deploymentRef := &corev1.ObjectReference{
Kind: wellknown.DeploymentGVK.Kind,
APIVersion: wellknown.DeploymentGVK.GroupVersion().String(),
Name: event.Gateway.Name,
Namespace: event.Gateway.Namespace,
UID: deployUID,
}

p.eventRecorder.Eventf(gatewayRef, corev1.EventTypeWarning, ReasonNack, event.ErrorMsg)
p.eventRecorder.Eventf(deploymentRef, corev1.EventTypeWarning, ReasonNack, event.ErrorMsg)

log.Debug("published NACK event for Gateway", "gateway", event.Gateway, "typeURL", event.TypeUrl)
}
71 changes: 71 additions & 0 deletions internal/kgateway/agentgatewaysyncer/nack/publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package nack

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"istio.io/istio/pkg/kube"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
apiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

var (
testGateway = types.NamespacedName{Name: "test-gw", Namespace: "default"}
testTypeURL = "type.googleapis.com/agentgateway.dev.resource.Resource"
testErrorMessage = "test error"
testNackEvent = NackEvent{
Gateway: testGateway,
TypeUrl: testTypeURL,
ErrorMsg: testErrorMessage,
Timestamp: time.Now(),
}
)

func TestNewPublisher(t *testing.T) {
client := kube.NewFakeClient()
publisher := newPublisher(client)

assert.NotNil(t, publisher)
assert.NotNil(t, publisher.client)
assert.NotNil(t, publisher.eventRecorder)
}

func TestPublisher_OnNack(t *testing.T) {
client := kube.NewFakeClient()
publisher := newPublisher(client)

fakeRecorder := record.NewFakeRecorder(10)
publisher.eventRecorder = fakeRecorder

ctx := context.TODO()
// Ensure involved objects exist so UID lookups succeed
_, _ = client.GatewayAPI().GatewayV1().Gateways(testGateway.Namespace).Create(ctx, &apiv1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: testGateway.Name,
Namespace: testGateway.Namespace,
},
}, metav1.CreateOptions{})
_, _ = client.Kube().AppsV1().Deployments(testGateway.Namespace).Create(ctx, &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: testGateway.Name,
Namespace: testGateway.Namespace,
},
}, metav1.CreateOptions{})

publisher.onNack(ctx, testNackEvent)

// Verify event was recorded
select {
case event := <-fakeRecorder.Events:
assert.Contains(t, event, "Warning")
assert.Contains(t, event, ReasonNack)
assert.Contains(t, event, testErrorMessage)
default:
t.Fatal("Expected event to be recorded but none was found")
}
}
2 changes: 1 addition & 1 deletion internal/kgateway/agentgatewaysyncer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ func (i WorkloadInfo) Equals(other WorkloadInfo) bool {
return equalUsingPremarshaled(i.Workload, i.MarshaledAddress, other.Workload, other.MarshaledAddress) &&
maps.Equal(i.Labels, other.Labels) &&
i.Source == other.Source &&
i.CreationTime == other.CreationTime
i.CreationTime.Equal(other.CreationTime)
}

func workloadResourceName(w *api.Workload) string {
Expand Down
6 changes: 6 additions & 0 deletions internal/kgateway/agentgatewaysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/agentgatewaysyncer/krtxds"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/agentgatewaysyncer/nack"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/agentgatewaysyncer/status"
agwir "github.com/kgateway-dev/kgateway/v2/pkg/agentgateway/ir"
"github.com/kgateway-dev/kgateway/v2/pkg/agentgateway/plugins"
Expand Down Expand Up @@ -60,11 +61,15 @@ type Syncer struct {
waitForSync []cache.InformerSynced
ready atomic.Bool

// NACK handling
EventPublisher *nack.NackEventPublisher

// features
Registrations []krtxds.Registration
}

func NewAgwSyncer(
ctx context.Context,
controllerName string,
client kube.Client,
agwCollections *plugins.AgwCollections,
Expand All @@ -79,6 +84,7 @@ func NewAgwSyncer(
additionalGatewayClasses: additionalGatewayClasses,
client: client,
statusCollections: &status.StatusCollections{},
EventPublisher: nack.NewNackEventPublisher(ctx, client),
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/kgateway/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
agwMergedPlugins := agwPluginFactory(cfg)(ctx, cfg.AgwCollections)

agwSyncer = agentgatewaysyncer.NewAgwSyncer(
ctx,
cfg.AgwControllerName,
cfg.Client,
cfg.AgwCollections,
Expand Down
4 changes: 3 additions & 1 deletion internal/kgateway/setup/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/certwatcher"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/agentgatewaysyncer/krtxds"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/agentgatewaysyncer/nack"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/xds"
"github.com/kgateway-dev/kgateway/v2/pkg/metrics"
)
Expand Down Expand Up @@ -136,14 +137,15 @@ func NewAgwControlPlane(
authenticators []security.Authenticator,
xdsAuth bool,
certWatcher *certwatcher.CertWatcher,
eventPublisher *nack.NackEventPublisher,
reg ...krtxds.Registration,
) {
baseLogger := slog.Default().With("component", "agentgateway-controlplane")

serverOpts := getGRPCServerOpts(authenticators, xdsAuth, certWatcher, baseLogger)
grpcServer := grpc.NewServer(serverOpts...)

ds := krtxds.NewDiscoveryServer(nil, reg...)
ds := krtxds.NewDiscoveryServer(nil, eventPublisher, reg...)
stop := make(chan struct{})
context.AfterFunc(ctx, func() {
close(stop)
Expand Down
2 changes: 1 addition & 1 deletion internal/kgateway/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (s *setup) Start(ctx context.Context) error {
}

if s.agwXdsListener != nil && agw != nil {
NewAgwControlPlane(ctx, s.agwXdsListener, authenticators, s.globalSettings.XdsAuth, certWatcher, agw.Registrations...)
NewAgwControlPlane(ctx, s.agwXdsListener, authenticators, s.globalSettings.XdsAuth, certWatcher, agw.EventPublisher, agw.Registrations...)
}

slog.Info("starting admin server")
Expand Down