Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
if a.options.metricsPort > 0 {
a.metrics = metrics.NewAgentMetrics()
metrics.RegisterK8sClientMetrics()
metrics.RegisterAgentEventQueueDepthCollector(a.queues)
metrics.RegisterAgentEventWriterMetrics(a)
}

appInformer, err := informer.NewInformer(ctx, appInformerOptions...)
Expand Down Expand Up @@ -637,6 +639,12 @@ func (a *Agent) SetConnected(connected bool) {
a.connected.Store(connected)
}

// CurrentEventWriter returns the outbound event writer when connected, or nil.
// It implements metrics.EventWriterLookup for Prometheus scraping.
func (a *Agent) CurrentEventWriter() *event.EventWriter {
return a.eventWriter
}
Comment on lines +642 to +646

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Data race on a.eventWriter.

CurrentEventWriter() reads a.eventWriter without synchronization, while handleStreamEvents in agent/connection.go (Line 188) assigns a.eventWriter = event.NewEventWriter(...) from a different goroutine. Metrics scrapes happen on the Prometheus HTTP handler goroutine, so this is a concurrent read/write of an interface/pointer value — a data race per the Go memory model, even though the pointer is never reassigned after the first write.

At best the scraper will see nil briefly and skip; at worst -race builds will flag it and on some architectures the read could observe a torn value.

🔒 Proposed fix using atomic.Pointer (or a mutex)
-	eventWriter *event.EventWriter
+	eventWriter atomic.Pointer[event.EventWriter]

Callers then use a.eventWriter.Load() / a.eventWriter.Store(ew) / a.eventWriter.CompareAndSwap(...). CurrentEventWriter() becomes:

func (a *Agent) CurrentEventWriter() *event.EventWriter {
    return a.eventWriter.Load()
}

And in agent/connection.go:

if ew := a.eventWriter.Load(); ew == nil {
    // build opts...
    a.eventWriter.Store(event.NewEventWriter("", stream, ewOpts...))
} else {
    ew.UpdateTarget(stream)
}

A dedicated sync.RWMutex guarding eventWriter would also work.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agent/agent.go` around lines 642 - 646, CurrentEventWriter currently reads
a.eventWriter without synchronization while handleStreamEvents assigns it from
another goroutine, causing a data race; change the Agent field backing
eventWriter to a synchronized holder (either use sync/atomic's
atomic.Pointer[*event.EventWriter] or protect with a sync.RWMutex), update
CurrentEventWriter to return the safely loaded value (use Load() or read under
RLock), and update the code in handleStreamEvents (where it calls
event.NewEventWriter and ew.UpdateTarget) to Store/CompareAndSwap the new writer
or perform writes under Lock so reads from the Prometheus scrape handler see a
consistent value.


func log() *logrus.Entry {
return logging.GetDefaultLogger().ModuleLogger("Agent")
}
Expand Down
9 changes: 8 additions & 1 deletion agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/argoproj-labs/argocd-agent/internal/kube"
"github.com/argoproj-labs/argocd-agent/internal/logging/logfields"
"github.com/argoproj-labs/argocd-agent/internal/manager"
"github.com/argoproj-labs/argocd-agent/internal/metrics"
"github.com/argoproj-labs/argocd-agent/internal/resync"
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi"
"github.com/argoproj-labs/argocd-agent/pkg/types"
Expand Down Expand Up @@ -178,7 +179,13 @@ func (a *Agent) handleStreamEvents() error {
defer streamCancel()

if a.eventWriter == nil {
a.eventWriter = event.NewEventWriter("", stream)
var ewOpts []event.EventWriterOption
if a.metrics != nil {
ewOpts = append(ewOpts, event.WithOnRetryExhausted(func(string) {
metrics.IncAgentEventWriterRetriesExhaustedDrop()
}))
}
a.eventWriter = event.NewEventWriter("", stream, ewOpts...)
} else {
a.eventWriter.UpdateTarget(stream)
}
Expand Down
13 changes: 13 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ Here is the list of available metrics:
| `principal_events_sent` | counter | The total number of events sent by principal. |
| `principal_event_processing_time` | histogramVec | Histogram of time taken to process events (in seconds). |
| `principal_errors` | counterVec | The total number of errors occurred in principal. |
| `principal_event_queue_depth` | gaugeVec | Current number of CloudEvents in the principal send or receive queue for a queue pair (per connected agent / client). |
| `principal_event_writer_sent_pending` | gaugeVec | Resources with a CloudEvent sent to the agent and awaiting ACK or resend (principal EventWriter). |
| `principal_event_writer_resend_due` | gaugeVec | Subset of `sent_pending` where the retry timer has elapsed and a resend may run. |
| `principal_event_writer_resend_backoff_wait` | gaugeVec | Subset of `sent_pending` waiting on backoff before the next resend attempt. |
| `principal_event_writer_retries_exhausted_drop_total` | counterVec | CloudEvents dropped after exhausting send retries (principal to agent stream). |

### Agent Metrics
| Metric | Type | Description |
Expand All @@ -32,6 +37,11 @@ Here is the list of available metrics:
| `agent_events_sent` | counter | The total number of events sent by agent. |
| `agent_event_processing_time` | histogramVec | Histogram of time taken to process events (in seconds). |
| `agent_errors` | counterVec | The total number of errors occurred in agent. |
| `agent_event_queue_depth` | gaugeVec | Current number of CloudEvents in the agent send or receive queue for a queue pair. |
| `agent_event_writer_sent_pending` | gaugeVec | Resources with a CloudEvent sent to the principal and awaiting ACK or resend (agent EventWriter). |
| `agent_event_writer_resend_due` | gaugeVec | Subset of `sent_pending` where the retry timer has elapsed and a resend may run. |
| `agent_event_writer_resend_backoff_wait` | gaugeVec | Subset of `sent_pending` waiting on backoff before the next resend attempt. |
| `agent_event_writer_retries_exhausted_drop_total` | counterVec | CloudEvents dropped after exhausting send retries (agent to principal stream). |

Here is the list of available labels:

Expand All @@ -41,3 +51,6 @@ Here is the list of available labels:
| `call_status` | success | Status of event processing. Possible values are: success, failure, discarded, not-allowed. |
| `agent_name` | agent-managed | Name of Agent. Possible values are: agent-managed, agent-autonomous. |
| `resource_type` | application | Type of resource. Possible values are: application, app project, resource, resourceResync. |
| `queue` | default / agent name | Queue pair key: on the agent this is typically `default`; on the principal it identifies the agent or client queue pair. |
| `direction` | send / recv | Whether the depth is for the outbound (send) or inbound (receive) queue. |
| `agent` | client id / empty | Agent (client) id for principal EventWriter metrics; empty string for the single agent outbound writer counters/gauges. |
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/redis/go-redis/v9 v9.18.0
github.com/rs/zerolog v1.35.0
github.com/sirupsen/logrus v1.9.4
Expand Down Expand Up @@ -148,7 +149,6 @@ require (
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/r3labs/diff/v3 v3.0.2 // indirect
Expand Down
62 changes: 58 additions & 4 deletions internal/event/event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,24 @@ type EventWriter struct {
// agentName is the name of the agent for which this EventWriter is responsible.
agentName string

// onRetryExhausted is called once per resource when an event is dropped after
// exceeding maxEventRetries (optional; used for metrics).
onRetryExhausted func(resourceID string)

log *logrus.Entry
}

// EventWriterOption configures an EventWriter.
type EventWriterOption func(*EventWriter)

// WithOnRetryExhausted registers a callback invoked when a sent event is removed
// after exhausting retries. resourceID is the map key (not used as a metric label).
func WithOnRetryExhausted(fn func(resourceID string)) EventWriterOption {
return func(ew *EventWriter) {
ew.onRetryExhausted = fn
}
}

type eventMessage struct {
// when this lock is owned, never attempt to THEN acquire `eventWriter.mu`, as this will lead to a deadlock.
// If you require `eventWriter.mu`, you must acquire that lock FIRST, before attempting to acquiring `mu` (to avoid deadlock)
Expand All @@ -71,14 +86,18 @@ type eventMessage struct {
// NewEventWriter creates a new EventWriter for the given target stream.
// If you create an EventWriter targeting the principal, an empty agentName
// should be used.
func NewEventWriter(agentName string, target streamWriter) *EventWriter {
return &EventWriter{
func NewEventWriter(agentName string, target streamWriter, opts ...EventWriterOption) *EventWriter {
ew := &EventWriter{
unsentEvents: map[string]*eventQueue{},
sentEvents: map[string]*eventMessage{},
target: target,
agentName: agentName,
log: logging.GetDefaultLogger().ModuleLogger("EventWriter").WithField(logfields.ClientAddr, grpcutil.AddressFromContext(target.Context())).WithField(logfields.Agent, agentName),
}
for _, o := range opts {
o(ew)
}
return ew
}

func (ew *EventWriter) UpdateTarget(target streamWriter) {
Expand Down Expand Up @@ -176,6 +195,28 @@ func (ew *EventWriter) Get(resID string) *eventMessage {
return nil
}

// ObserveSentResendState calls observer with counts derived from sentEvents:
// sentPending is len(sentEvents); resendDue is entries whose retryAfter is nil
// or not after now; resendBackoff is sentPending - resendDue.
// Lock order: EventWriter.mu RLock, then each eventMessage.mu RLock.
func (ew *EventWriter) ObserveSentResendState(observer func(sentPending, resendDue, resendBackoff int)) {
now := time.Now()
ew.mu.RLock()
pending := len(ew.sentEvents)
due := 0
for _, msg := range ew.sentEvents {
msg.mu.RLock()
ra := msg.retryAfter
msg.mu.RUnlock()
if ra == nil || !ra.After(now) {
due++
}
}
ew.mu.RUnlock()
backoff := pending - due
observer(pending, due, backoff)
}

func (ew *EventWriter) Remove(ev *cloudevents.Event) {
ew.mu.Lock()
defer ew.mu.Unlock()
Expand Down Expand Up @@ -307,9 +348,13 @@ func (ew *EventWriter) retrySentEvent(resID string, sentMsg *eventMessage) {
logCtx.Warnf("Event failed after %d retries, giving up to unblock queue", sentMsg.retryCount)
sentMsg.mu.Unlock()

// Remove from sentEvents to unblock the queue
ew.mu.Lock()
delete(ew.sentEvents, resID)
if cur, ok := ew.sentEvents[resID]; ok && cur == sentMsg {
if ew.onRetryExhausted != nil {
ew.onRetryExhausted(resID)
}
delete(ew.sentEvents, resID)
}
ew.mu.Unlock()
return
}
Expand Down Expand Up @@ -466,6 +511,15 @@ func (ewm *EventWritersMap) Remove(agentName string) {
delete(ewm.eventWriters, agentName)
}

// ObserveWriters calls fn for each agent name and EventWriter while holding a read lock on the map.
func (ewm *EventWritersMap) ObserveWriters(fn func(agentName string, ew *EventWriter)) {
ewm.mu.RLock()
defer ewm.mu.RUnlock()
for name, w := range ewm.eventWriters {
fn(name, w)
}
}

// eventQueue is a queue of eventMessages where the items are coalesced by type.
type eventQueue struct {
mu sync.RWMutex
Expand Down
70 changes: 70 additions & 0 deletions internal/event/event_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,76 @@ func TestEventWriter(t *testing.T) {
require.NotContains(t, evSender.sentEvents, resID)
})

t.Run("ObserveSentResendState counts pending due and backoff", func(t *testing.T) {
fs := &fakeStream{}
evSender := NewEventWriter("test", fs)
evSender.ObserveSentResendState(func(p, d, b int) {
require.Equal(t, 0, p)
require.Equal(t, 0, d)
require.Equal(t, 0, b)
})

ev := es.ApplicationEvent(Create, app1)
resID := createResourceID(app1.ObjectMeta)
evSender.Add(ev)
evSender.sendEvent(resID)
evSender.ObserveSentResendState(func(p, d, b int) {
require.Equal(t, 1, p)
require.Equal(t, 0, d)
require.Equal(t, 1, b)
})

sentMsg := evSender.sentEvents[resID]
past := time.Now().Add(-time.Second)
sentMsg.retryAfter = &past
evSender.ObserveSentResendState(func(p, d, b int) {
require.Equal(t, 1, p)
require.Equal(t, 1, d)
require.Equal(t, 0, b)
})
})

t.Run("onRetryExhausted called once when retries exhausted", func(t *testing.T) {
fs := &fakeStream{}
var hookResID string
var hookCalls int
evSender := NewEventWriter("test", fs, WithOnRetryExhausted(func(rid string) {
hookCalls++
hookResID = rid
}))

ev := es.ApplicationEvent(Create, app1)
resID := createResourceID(app1.ObjectMeta)
evSender.Add(ev)
evSender.sendEvent(resID)
sentMsg := evSender.sentEvents[resID]
require.NotNil(t, sentMsg)

for i := 0; i <= maxEventRetries; i++ {
pastTime := time.Now().Add(-1 * time.Second)
sentMsg.retryAfter = &pastTime
evSender.retrySentEvent(resID, sentMsg)
}

require.NotContains(t, evSender.sentEvents, resID)
require.Equal(t, 1, hookCalls)
require.Equal(t, resID, hookResID)
})

t.Run("EventWritersMap ObserveWriters visits each writer", func(t *testing.T) {
m := NewEventWritersMap()
fs1 := &fakeStream{}
fs2 := &fakeStream{}
m.Add("a1", NewEventWriter("a1", fs1))
m.Add("a2", NewEventWriter("a2", fs2))
var names []string
m.ObserveWriters(func(agent string, ew *EventWriter) {
names = append(names, agent)
require.NotNil(t, ew)
})
require.ElementsMatch(t, []string{"a1", "a2"}, names)
})

t.Run("should not send ACK events to sentEvents", func(t *testing.T) {
fs := &fakeStream{}
evSender := NewEventWriter("test", fs)
Expand Down
Loading
Loading