Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
46f79a1
wip
jmcguire98 Oct 27, 2025
da48459
draft impl
jmcguire98 Oct 28, 2025
4f49dff
start wiring up status reporting
jmcguire98 Oct 28, 2025
b4185f4
finish rough implementation
jmcguire98 Oct 28, 2025
4aa4cbe
update role to allow watching events
jmcguire98 Oct 28, 2025
f2867c0
register events with kclient and create them in the system ns so we c…
jmcguire98 Oct 28, 2025
e873205
fix nack publisher config order
jmcguire98 Oct 29, 2025
f569102
fix event creation
jmcguire98 Oct 29, 2025
62e7768
sort events on processing so acks recoveries work right
jmcguire98 Oct 29, 2025
375fc6b
various cleanups
jmcguire98 Oct 29, 2025
9f53605
replace status more correctly
jmcguire98 Oct 29, 2025
8662168
fix key in ack/nack map
jmcguire98 Oct 29, 2025
11fdf86
cleanup event registration, undo setup changes no longer needed
jmcguire98 Oct 29, 2025
5f1d214
cleanup no longer needed changes to startup
jmcguire98 Oct 29, 2025
1e25af9
comment and unused var cleanups
jmcguire98 Oct 29, 2025
2ce09bd
gen
jmcguire98 Oct 29, 2025
b566384
lints
jmcguire98 Oct 29, 2025
252fa09
remove use of systemnamespace since we no longer need it
jmcguire98 Oct 29, 2025
d93746e
delete gateways from nackStateStore on removeNack when they have no r…
jmcguire98 Oct 29, 2025
f76fe63
use eventRecorder to publish events in nackPublisher
jmcguire98 Oct 29, 2025
72c5f95
resolve context todo
jmcguire98 Oct 29, 2025
47ec6c9
address shouldRespondDelta todo
jmcguire98 Oct 29, 2025
bf06891
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Oct 29, 2025
52f1bd6
regen
jmcguire98 Oct 29, 2025
0a54e70
remove unnecessary context use from publisher
jmcguire98 Oct 30, 2025
fa361f3
rename events.go -> eventmeta.go
jmcguire98 Oct 30, 2025
da1e355
unexport newPublisher and let NewNackHandler be responsible for creat…
jmcguire98 Oct 30, 2025
b713680
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Oct 30, 2025
5bacfb7
add unit testing for nack handler
jmcguire98 Oct 31, 2025
12d35b0
add publisher unit tests
jmcguire98 Oct 31, 2025
cecd241
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Oct 31, 2025
1e1dcdb
fix fmt
jmcguire98 Oct 31, 2025
e24a5de
lint
jmcguire98 Oct 31, 2025
ebc43f0
cleanup godoc comments, unexport some constants
jmcguire98 Nov 4, 2025
a98be59
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Nov 4, 2025
631e0a6
clean up things we are no longer planning to do (ack handling, status…
jmcguire98 Nov 6, 2025
e96019d
publish messages with the deployment as the involved object as well
jmcguire98 Nov 6, 2025
f77d524
cleanup
jmcguire98 Nov 6, 2025
d0e1506
pare back event permissions since we don't need an event collection a…
jmcguire98 Nov 6, 2025
2e00fba
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Nov 6, 2025
c3eb680
don't try to append resource names to event message for now
jmcguire98 Nov 6, 2025
261a216
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Nov 6, 2025
538268a
fix merge conflict formatting
jmcguire98 Nov 6, 2025
16fcf34
plumb through context and get gtw and dep so we can set UID on the ev…
jmcguire98 Nov 6, 2025
b2f92e0
Merge branch 'main' into report-agw-nacks-on-gateways
jmcguire98 Nov 6, 2025
0dd472e
log k8s get errors and return early
jmcguire98 Nov 6, 2025
b3dacc9
fix unit tests by adding resources to teh fake client
jmcguire98 Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1alpha1/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ package v1alpha1

// Leases for leader election
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;create;update
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// +kubebuilder:rbac:groups="",resources=events,verbs=get;list;watch;create;patch
3 changes: 3 additions & 0 deletions install/helm/kgateway/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ rules:
- events
verbs:
- create
- get
- list
- patch
- watch
- apiGroups:
- ""
resources:
Expand Down
35 changes: 33 additions & 2 deletions internal/kgateway/agentgatewaysyncer/krtxds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

"github.com/kgateway-dev/kgateway/v2/pkg/logging"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/agentgatewaysyncer/nack"
kgwxds "github.com/kgateway-dev/kgateway/v2/internal/kgateway/xds"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/krtutil"
)
Expand Down Expand Up @@ -152,8 +153,8 @@ func Collection[T IntoProto[TT], TT proto.Message](collection krt.Collection[T],
return PerGatewayCollection(collection, nil, krtopts)
}

// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(debugger *krt.DebugHandler, reg ...Registration) *DiscoveryServer {
// NewDiscoveryServer creates a DiscoveryServer for agentgateway that sources data from KRT collections via registered generators
func NewDiscoveryServer(debugger *krt.DebugHandler, nackHandler *nack.NackHandler, reg ...Registration) *DiscoveryServer {
out := &DiscoveryServer{
concurrentPushLimit: make(chan struct{}, features.PushThrottle),
RequestRateLimit: rate.NewLimiter(rate.Limit(features.RequestLimit), 1),
Expand All @@ -164,6 +165,7 @@ func NewDiscoveryServer(debugger *krt.DebugHandler, reg ...Registration) *Discov
debugHandlers: map[string]string{},
adsClients: map[string]*Connection{},
krtDebugger: debugger,
nackHandler: nackHandler,
DebounceOptions: DebounceOptions{
DebounceAfter: features.DebounceAfter,
DebounceMax: features.DebounceMax,
Expand Down Expand Up @@ -223,6 +225,8 @@ type DiscoveryServer struct {
krtDebugger *krt.DebugHandler
pushOrder []string
registrations []CollectionRegistration

nackHandler *nack.NackHandler
}

// Proxy contains information about an specific instance of a proxy.
Expand Down Expand Up @@ -466,6 +470,33 @@ func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryReque
stype := v3.GetShortType(req.TypeUrl)
log.Debug("ADS: REQ resources", "type", stype, "connection", con.ID(), "subscribe", len(req.ResourceNamesSubscribe), "unsubscribe", len(req.ResourceNamesUnsubscribe), "nonce", req.ResponseNonce)

// TODO: this code should probably be moved to shouldRespondDelta, and pass in the nack handler as a parameter.
// if we need to.
if req.ErrorDetail != nil && s.nackHandler != nil {
gateway := kgwxds.AgentgatewayID(con.node)
nackEvent := nack.NackEvent{
Gateway: gateway,
TypeUrl: req.TypeUrl,
ErrorMsg: req.ErrorDetail.GetMessage(),
Timestamp: time.Now(),
}
s.nackHandler.HandleNack(&nackEvent)
}
// Check for ACK that resolves a previous NACK before processing the request
if req.ErrorDetail == nil && s.nackHandler != nil && req.ResponseNonce != "" {
// Check if there was a previous error for this type
previousInfo := con.proxy.GetWatchedResource(req.TypeUrl)
if previousInfo != nil && previousInfo.LastError != "" {
gateway := kgwxds.AgentgatewayID(con.node)
ackEvent := nack.AckEvent{
Gateway: gateway,
TypeUrl: req.TypeUrl,
Timestamp: time.Now(),
}
s.nackHandler.HandleAck(&ackEvent)
}
}

shouldRespond := shouldRespondDelta(con, req)
if !shouldRespond {
log.Debug("no response needed")
Expand Down
4 changes: 2 additions & 2 deletions internal/kgateway/agentgatewaysyncer/krtxds/xds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func NewFakeDiscoveryServer(t *testing.T, initialAddress ...agentgatewaysyncer.A
krtxds.Collection[agentgatewaysyncer.Address, *api.Address](xdsAddress, opts),
krtxds.PerGatewayCollection[agwir.AgwResource, *api.Resource](xdsResource, agwResourcesByGateway, opts),
}

s := krtxds.NewDiscoveryServer(opts.Debugger, reg...)
// we won't need a mock nack handler for this testing, so we pass nil
s := krtxds.NewDiscoveryServer(opts.Debugger, nil, reg...)
s.Start(stop)
xdsAddress.WaitUntilSynced(stop)
xdsResource.WaitUntilSynced(stop)
Expand Down
41 changes: 41 additions & 0 deletions internal/kgateway/agentgatewaysyncer/nack/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package nack

import (
"crypto/sha256"
"encoding/hex"
"strings"
)

// Event reasons for Kubernetes Events created by agentgateway NACK/ACK detection
const (
ReasonNack = "AgentgatewayNACK"
ReasonAck = "AgentgatewayACK"
)

// Annotation keys used for NACK events
const (
// AnnotationNackID is a stable hash identifying a specific NACK event
AnnotationNackID = "kgateway.dev/nack-id"

// AnnotationTypeURL is the xDS TypeURL that was rejected
AnnotationTypeURL = "kgateway.dev/type-url"

// AnnotationObservedAt is the RFC3339 timestamp when the NACK was last observed
AnnotationObservedAt = "kgateway.dev/observed-at"

// AnnotationRecoveryOf points to the NACK ID that this ACK event resolves
AnnotationRecoveryOf = "kgateway.dev/recovery-of"
)

// ComputeNackID creates a stable, identifier for a NACK event.
// The same combination of inputs will always produce the same NACK ID.
func ComputeNackID(gatewayNamespacedName, typeURL string) string {
h := sha256.New()

input := strings.Join([]string{gatewayNamespacedName, typeURL}, "|")

h.Write([]byte(input))

// Return first 16 hex characters (64 bits) for readability
return hex.EncodeToString(h.Sum(nil))[:16]
}
136 changes: 136 additions & 0 deletions internal/kgateway/agentgatewaysyncer/nack/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package nack

import (
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/kgateway-dev/kgateway/v2/pkg/logging"
)

var nackHandlerLog = logging.New("nack/handler")

type NackHandler struct {
nackStateStore map[types.NamespacedName]map[string]string
nackPublisher *Publisher
mu sync.RWMutex
}

// NackEvent represents a NACK received from an agentgateway gateway
type NackEvent struct {
Gateway types.NamespacedName
TypeUrl string
ErrorMsg string
Timestamp time.Time
}

// AckEvent represents a successful ACK received from an agentgateway gateway
type AckEvent struct {
Gateway types.NamespacedName
TypeUrl string
Timestamp time.Time
}

func NewNackHandler(nackPublisher *Publisher) *NackHandler {
return &NackHandler{
nackStateStore: make(map[types.NamespacedName]map[string]string),
nackPublisher: nackPublisher,
mu: sync.RWMutex{},
}
}

// HandleNack publishes a NACK event to the Kubernetes Event API.
func (h *NackHandler) HandleNack(nackEvent *NackEvent) {
h.nackPublisher.onNack(*nackEvent)
}

// HandleAck publishes an ACK event to the Kubernetes Event API.
func (h *NackHandler) HandleAck(ackEvent *AckEvent) {
h.nackPublisher.onAck(*ackEvent)
}

// FilterEventsAndConvertToNackStatusUpdate filters Kubernetes Events to only process NACK/ACK events for Gateways.
// Returns nil for events that should be ignored, or a NackStatusUpdate for events relevant events.
func (h *NackHandler) FilterEventsAndUpdateState(event *corev1.Event) error {
nackHandlerLog.Debug("processing event", "reason", event.Reason, "kind", event.InvolvedObject.Kind, "name", event.InvolvedObject.Name, "namespace", event.InvolvedObject.Namespace)

if event.Reason != ReasonNack && event.Reason != ReasonAck {
nackHandlerLog.Debug("ignoring event - not a NACK/ACK event", "reason", event.Reason)
return nil
}

nackID, exists := event.Annotations[AnnotationNackID]
if !exists || nackID == "" {
return fmt.Errorf("event missing NACK ID annotation: %v", event.Annotations)
}

// Handle recovery events (ACKs that reference a previous NACK)
isRecovery := event.Reason == ReasonAck
if isRecovery {
recoveryOf, hasRecovery := event.Annotations[AnnotationRecoveryOf]
if !hasRecovery || recoveryOf == "" {
return fmt.Errorf("ACK event missing recovery annotation: %v", event.Annotations)
}
h.removeNack(types.NamespacedName{Name: event.InvolvedObject.Name, Namespace: event.InvolvedObject.Namespace}, recoveryOf)
return nil
}
h.addNack(types.NamespacedName{Name: event.InvolvedObject.Name, Namespace: event.InvolvedObject.Namespace}, nackID, event.Message)
return nil
}

// ComputeStatus computes the Gateway status condition based on the current set of active NACKs for a gateway.
// - No active NACKs: No status returned
// - One active NACK: Programmed=False with specific error message
// - Multiple active NACKs: Programmed=False with aggregated error count
func (h *NackHandler) ComputeStatus(gateway types.NamespacedName) *metav1.Condition {
h.mu.RLock()
defer h.mu.RUnlock()
activeNacks := h.nackStateStore[gateway]
if len(activeNacks) == 0 {
// if there are no active NACKs, return nil (let the caller decide what the status should be)
return nil
} else {
var message string
if len(activeNacks) == 1 {
for _, msg := range activeNacks {
message = fmt.Sprintf("Configuration rejected: %s", msg)
break
}
} else {
message = fmt.Sprintf("Configuration rejected: %d errors found", len(activeNacks))
}

return &metav1.Condition{
Type: string(gwv1.GatewayConditionProgrammed),
Status: metav1.ConditionFalse,
Reason: string(gwv1.GatewayReasonInvalid),
Message: message,
LastTransitionTime: metav1.Now(),
}
}
}

// addNack adds a NACK to the Gateway's active NACK set when a NACK event is received via the Kubernetes Event API.
func (h *NackHandler) addNack(gateway types.NamespacedName, nackID, message string) {
h.mu.Lock()
defer h.mu.Unlock()
if h.nackStateStore[gateway] == nil {
h.nackStateStore[gateway] = make(map[string]string)
}
h.nackStateStore[gateway][nackID] = message
}

// removeNack removes a NACK from the Gateway's active set when an ACK event is received via the Kubernetes Event API.
func (h *NackHandler) removeNack(gateway types.NamespacedName, nackID string) {
h.mu.Lock()
defer h.mu.Unlock()
if h.nackStateStore[gateway] == nil {
return
}
delete(h.nackStateStore[gateway], nackID)
}
111 changes: 111 additions & 0 deletions internal/kgateway/agentgatewaysyncer/nack/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package nack

import (
"context"
"time"

"istio.io/istio/pkg/kube"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
)

var log = logging.New("nack/publisher")

// Publisher converts NACK events from the agentgateway xDS server into Kubernetes Events.
type Publisher struct {
ctx context.Context
client kube.Client
systemNamespace string
}

// NewPublisher creates a new NACK event publisher that will publish k8s events
func NewPublisher(ctx context.Context, client kube.Client, systemNamespace string) *Publisher {
return &Publisher{
client: client,
ctx: ctx,
systemNamespace: systemNamespace,
}
}

// onNack publishes a NACK event as a k8s event.
func (p *Publisher) onNack(event NackEvent) {
nackID := ComputeNackID(event.Gateway.Namespace+"/"+event.Gateway.Name, event.TypeUrl)

k8sEvent := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "agentgateway-nack-",
Namespace: event.Gateway.Namespace,
Annotations: map[string]string{
AnnotationNackID: nackID,
AnnotationTypeURL: event.TypeUrl,
AnnotationObservedAt: event.Timestamp.Format(time.RFC3339),
},
},
InvolvedObject: corev1.ObjectReference{
Kind: wellknown.GatewayKind,
APIVersion: wellknown.GatewayGVK.GroupVersion().String(),
Name: event.Gateway.Name,
Namespace: event.Gateway.Namespace,
},
Reason: ReasonNack,
Message: event.ErrorMsg,
Type: corev1.EventTypeWarning,
LastTimestamp: metav1.NewTime(event.Timestamp),
Count: 1,
ReportingController: wellknown.DefaultAgwControllerName,
}

_, err := p.client.Kube().CoreV1().Events(event.Gateway.Namespace).Create(
p.ctx, k8sEvent, metav1.CreateOptions{},
)
if err != nil && !errors.IsAlreadyExists(err) {
log.Error("failed to publish NACK event for Gateway", "gateway", event.Gateway, "error", err)
return
}

log.Debug("published NACK event for Gateway", "gateway", event.Gateway, "nackID", nackID, "typeURL", event.TypeUrl)
}

// onAck publishes an ACK event as a k8s event.
func (p *Publisher) onAck(event AckEvent) {
recoveredNackID := ComputeNackID(event.Gateway.Namespace+"/"+event.Gateway.Name, event.TypeUrl)

k8sEvent := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "agentgateway-ack-",
Namespace: event.Gateway.Namespace,
Annotations: map[string]string{
AnnotationNackID: ComputeNackID(event.Gateway.Namespace+"/"+event.Gateway.Name, event.TypeUrl),
AnnotationTypeURL: event.TypeUrl,
AnnotationRecoveryOf: recoveredNackID,
AnnotationObservedAt: event.Timestamp.Format(time.RFC3339),
},
},
InvolvedObject: corev1.ObjectReference{
Kind: wellknown.GatewayKind,
APIVersion: wellknown.GatewayGVK.GroupVersion().String(),
Name: event.Gateway.Name,
Namespace: event.Gateway.Namespace,
},
Reason: ReasonAck,
Message: "Configuration accepted successfully",
Type: corev1.EventTypeNormal,
LastTimestamp: metav1.NewTime(event.Timestamp),
Count: 1,
ReportingController: wellknown.DefaultAgwControllerName,
}

_, err := p.client.Kube().CoreV1().Events(event.Gateway.Namespace).Create(
p.ctx, k8sEvent, metav1.CreateOptions{},
)
if err != nil && !errors.IsAlreadyExists(err) {
log.Error("failed to publish ACK event for Gateway", "gateway", event.Gateway, "error", err)
return
}

log.Debug("published ACK event for Gateway", "gateway", event.Gateway, "typeURL", event.TypeUrl)
}
2 changes: 1 addition & 1 deletion internal/kgateway/agentgatewaysyncer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func (i WorkloadInfo) Equals(other WorkloadInfo) bool {
return equalUsingPremarshaled(i.Workload, i.MarshaledAddress, other.Workload, other.MarshaledAddress) &&
maps.Equal(i.Labels, other.Labels) &&
i.Source == other.Source &&
i.CreationTime == other.CreationTime
i.CreationTime.Equal(other.CreationTime)
}

func workloadResourceName(w *api.Workload) string {
Expand Down
Loading