Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions argocd/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,39 @@ files:
items:
type: string
example: []
- name: genresources_stream_applications_enabled
hidden: true
fleet_configurable: false
description: |
Opt-in. When true, maintain a persistent stream to the Argo CD
``/api/v1/stream/applications`` endpoint and emit Application changes in
near-real-time. Clusters and Repositories are still collected by the
periodic rescrape. Default false (polling only, the previous behavior).
value:
type: boolean
example: false
- name: genresources_rescrape_interval_seconds
hidden: true
fleet_configurable: false
description: |
Seconds between full rescrapes of all resource types when streaming is
enabled. The rescrape refreshes TTLs, detects deletions, and backfills
anything the stream missed. Minimum of 1.
value:
type: integer
example: 1200
minimum: 1
- name: genresources_stream_backoff_max_seconds
hidden: true
fleet_configurable: false
description: |
Maximum backoff, in seconds, between application stream reconnection
attempts. Backoff starts at 1 second and doubles up to this cap,
resetting on a successful connection. Minimum of 1.
value:
type: integer
example: 30
minimum: 1
- template: instances/openmetrics
overrides:
openmetrics_endpoint.required: false
Expand Down
5 changes: 5 additions & 0 deletions argocd/datadog_checks/argocd/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ def check(self, instance):
self.log.exception("genresources: collection cycle failed")
super().check(instance)

def cancel(self):
if self._resource_collector is not None:
self._resource_collector.stop()
super().cancel()

def parse_config(self):
endpoint_configs = [
("app_controller_endpoint", APP_CONTROLLER_NAMESPACE, APPLICATION_CONTROLLER_METRICS),
Expand Down
12 changes: 12 additions & 0 deletions argocd/datadog_checks/argocd/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ def instance_genresources_max_resources_per_cycle():
return 10000


def instance_genresources_rescrape_interval_seconds():
return 1200


def instance_genresources_stream_applications_enabled():
return False


def instance_genresources_stream_backoff_max_seconds():
return 30


def instance_genresources_ttl_seconds():
return 21600

Expand Down
3 changes: 3 additions & 0 deletions argocd/datadog_checks/argocd/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class InstanceConfig(BaseModel):
genresources_exclude_paths: Optional[tuple[str, ...]] = None
genresources_extra_include_paths: Optional[tuple[str, ...]] = None
genresources_max_resources_per_cycle: Optional[int] = Field(None, ge=1)
genresources_rescrape_interval_seconds: Optional[int] = Field(None, ge=1)
genresources_stream_applications_enabled: Optional[bool] = None
genresources_stream_backoff_max_seconds: Optional[int] = Field(None, ge=1)
genresources_ttl_seconds: Optional[int] = Field(None, ge=1)
headers: Optional[MappingProxyType[str, Any]] = None
histogram_buckets_as_distributions: Optional[bool] = None
Expand Down
103 changes: 84 additions & 19 deletions argocd/datadog_checks/argocd/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import hashlib
import json
import os
import threading
import time
from collections.abc import Callable
from dataclasses import dataclass
Expand All @@ -28,6 +29,7 @@
REPOSITORY_INCLUDE,
URL_CREDENTIALS_PATTERN,
)
from .stream_listener import ArgocdApplicationStreamListener

if TYPE_CHECKING:
from .check import ArgocdCheck
Expand Down Expand Up @@ -181,6 +183,8 @@ def _repository_key(item: dict) -> str:
),
)

APPLICATION_SPEC = next(spec for spec in RESOURCE_TYPE_SPECS if spec.resource_type == "argocd_application")


def _is_excluded(path: str, exclude_paths: tuple[str, ...]) -> bool:
"""True if a path equals an exclude entry or is nested beneath one (subtree match)."""
Expand Down Expand Up @@ -217,6 +221,12 @@ def __init__(self, check: "ArgocdCheck") -> None:
self._resubmit_interval: int = max(1, self._ttl_seconds // 2)
self._collection_interval: int = config.genresources_collection_interval_seconds
self._last_collect: float = 0.0
self._stream_enabled: bool = bool(config.genresources_stream_applications_enabled)
self._rescrape_interval: int = config.genresources_rescrape_interval_seconds
self._backoff_max: int = config.genresources_stream_backoff_max_seconds
self._last_rescrape: float = 0.0
self._submitted_lock = threading.RLock()
self._listener: ArgocdApplicationStreamListener | None = None
self._includes: dict[str, dict[str, list[str]]] = {
spec.resource_type: _build_include(spec.include, self._extra_paths, self._exclude_paths)
for spec in RESOURCE_TYPE_SPECS
Expand All @@ -238,16 +248,21 @@ def __init__(self, check: "ArgocdCheck") -> None:
)

def collect(self) -> None:
seen_at = int(time.time())
if seen_at - self._last_collect < self._collection_interval:
return
self._last_collect = seen_at

if not self._endpoint:
self.check.log.warning("collect_genresources is enabled but genresources_endpoint is not set; skipping")
for spec in RESOURCE_TYPE_SPECS:
self.check.gauge(GENRESOURCES_API_UP_METRIC, 0, tags=[f"resource_type:{spec.resource_type}"])
return
if self._stream_enabled:
self._collect_with_stream()
else:
self._collect_polling()

def _collect_polling(self) -> None:
seen_at = int(time.time())
if seen_at - self._last_collect < self._collection_interval:
return
self._last_collect = seen_at

expire_at = seen_at + self._ttl_seconds
force_full = (seen_at - self._last_full_submit) >= self._resubmit_interval
Expand All @@ -257,6 +272,54 @@ def collect(self) -> None:
for spec in RESOURCE_TYPE_SPECS:
self._collect_type(spec, seen_at=seen_at, expire_at=expire_at, force_full=force_full)

def _collect_with_stream(self) -> None:
"""Supervise the application stream listener and run a periodic full rescrape of every type."""
self._ensure_listener()
seen_at = int(time.time())
if seen_at - self._last_rescrape < self._rescrape_interval:
return
self._last_rescrape = seen_at
expire_at = seen_at + self._ttl_seconds
for spec in RESOURCE_TYPE_SPECS:
self._collect_type(spec, seen_at=seen_at, expire_at=expire_at, force_full=True)

def _ensure_listener(self) -> None:
if self._listener is None:
self._listener = ArgocdApplicationStreamListener(
self.check,
self,
endpoint=self._endpoint,
auth_token=self._auth_token,
backoff_max_seconds=self._backoff_max,
)
if not self._listener.is_alive():
self._listener.start()

def stop(self) -> None:
"""Stop the stream listener cleanly; called from the check's cancel()."""
if self._listener is not None:
self._listener.cancel()
self._listener.join(timeout=10)

def emit_stream_application(self, application: dict) -> None:
"""Emit a single application received from the stream (ADDED/MODIFIED) through the shared pipeline."""
seen_at = int(time.time())
self._emit_item(
application, APPLICATION_SPEC, seen_at=seen_at, expire_at=seen_at + self._ttl_seconds, force_full=False
)

def forget_application(self, application: dict) -> None:
"""Drop a deleted application from the dedup cache so it stops refreshing and expires via TTL."""
try:
key = _application_key(application)
except Exception:
return
if self._instance_prefix:
key = f"{self._instance_prefix}{KEY_SEPARATOR}{key}"
cache_key = f"argocd_application{KEY_SEPARATOR}{key}"
with self._submitted_lock:
self._submitted.pop(cache_key, None)

def _collect_type(self, spec: ResourceTypeSpec, *, seen_at: int, expire_at: int, force_full: bool) -> None:
tags = [f"resource_type:{spec.resource_type}"]
try:
Expand Down Expand Up @@ -284,7 +347,8 @@ def _collect_type(self, spec: ResourceTypeSpec, *, seen_at: int, expire_at: int,
if cache_key is not None:
seen.add(cache_key)
namespace = f"{spec.resource_type}{KEY_SEPARATOR}"
self._submitted = {k: v for k, v in self._submitted.items() if not k.startswith(namespace) or k in seen}
with self._submitted_lock:
self._submitted = {k: v for k, v in self._submitted.items() if not k.startswith(namespace) or k in seen}

def _fetch(self, api_path: str) -> list[dict]:
url = self._endpoint.rstrip("/") + api_path
Expand Down Expand Up @@ -314,17 +378,18 @@ def _emit_item(
key = f"{self._instance_prefix}{KEY_SEPARATOR}{key}"
include = self._includes[spec.resource_type]
cache_key = f"{spec.resource_type}{KEY_SEPARATOR}{key}"
if force_full or self._submitted.get(cache_key) != token:
try:
self.check.submit_generic_resource(
type=spec.resource_type,
key=key,
fields=item,
include=include,
seen_at=seen_at,
expire_at=expire_at,
)
self._submitted[cache_key] = token
except Exception:
self.check.log.exception("genresources: failed to submit %s (key=%s)", spec.resource_type, key)
with self._submitted_lock:
if force_full or self._submitted.get(cache_key) != token:
try:
self.check.submit_generic_resource(
type=spec.resource_type,
key=key,
fields=item,
include=include,
seen_at=seen_at,
expire_at=expire_at,
)
self._submitted[cache_key] = token
except Exception:
self.check.log.exception("genresources: failed to submit %s (key=%s)", spec.resource_type, key)
return cache_key
3 changes: 3 additions & 0 deletions argocd/datadog_checks/argocd/resources_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
KEY_SEPARATOR = "|"

GENRESOURCES_API_UP_METRIC = "argocd.genresources.api.up"
GENRESOURCES_STREAM_UP_METRIC = "argocd.genresources.stream.up"
GENRESOURCES_STREAM_EVENTS_METRIC = "argocd.genresources.stream.events_received"
GENRESOURCES_STREAM_RECONNECTS_METRIC = "argocd.genresources.stream.reconnects"

APPLICATION_INCLUDE: dict[str, tuple[str, ...]] = {
"paths": (
Expand Down
145 changes: 145 additions & 0 deletions argocd/datadog_checks/argocd/stream_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

"""Persistent listener for the ArgoCD application watch stream (/api/v1/stream/applications)."""

from __future__ import annotations

import json
import threading
from typing import TYPE_CHECKING

from .resources_constants import (
GENRESOURCES_STREAM_EVENTS_METRIC,
GENRESOURCES_STREAM_RECONNECTS_METRIC,
GENRESOURCES_STREAM_UP_METRIC,
)

if TYPE_CHECKING:
from .check import ArgocdCheck
from .resources import ArgocdResourceCollector

STREAM_PATH = "/api/v1/stream/applications"
CONNECT_TIMEOUT_SECONDS = 10
READ_TIMEOUT_SECONDS = 60
INITIAL_BACKOFF_SECONDS = 1


class ArgocdApplicationStreamListener:
"""Holds a persistent connection to the ArgoCD application watch stream and emits changes via the collector.

Each watch event arrives as a line ``{"result": {"type": "ADDED|MODIFIED|DELETED", "application": {...}}}``;
the embedded application is the same shape as a polled one, so it flows through the collector's existing
sanitize/allow-list/dedup/submit pipeline unchanged. Runs on a single dedicated daemon thread: the clean
shutdown path is ``cancel()`` (signal + close the socket) followed by ``join()``; ``daemon=True`` is only a
backstop for a hard interpreter exit that never calls ``cancel()``.
"""

def __init__(
self,
check: "ArgocdCheck",
collector: "ArgocdResourceCollector",
*,
endpoint: str,
auth_token: str | None,
backoff_max_seconds: int,
) -> None:
self.check = check
self._collector = collector
self._endpoint = endpoint.rstrip("/")
self._auth_token = auth_token
self._backoff_max = max(1, backoff_max_seconds)
self._stop = threading.Event()
self._thread: threading.Thread | None = None
self._response = None

def start(self) -> None:
if self.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(target=self._run, name="argocd-genresources-stream", daemon=True)
self._thread.start()

def is_alive(self) -> bool:
return self._thread is not None and self._thread.is_alive()

def cancel(self) -> None:
"""Signal the loop to stop and close the active connection to unblock ``iter_lines``. Does not join."""
self._stop.set()
response = self._response
if response is not None:
try:
response.close()
except Exception:
pass

def join(self, timeout: float | None = None) -> None:
if self._thread is not None:
self._thread.join(timeout)

def _sleep(self, seconds: float) -> None:
"""Interruptible backoff sleep that wakes immediately on cancel()."""
self._stop.wait(seconds)

def _run(self) -> None:
backoff = INITIAL_BACKOFF_SECONDS
while not self._stop.is_set():
got_data = False
try:
got_data = self._stream_once()
except Exception as exc:
if not self._stop.is_set():
self.check.log.warning("genresources: application stream error: %s", exc)
if self._stop.is_set():
break
# Back off on every disconnect (clean or error); reset only after a connection that
# actually delivered data, so a server that closes on connect can't become a hot loop.
self.check.gauge(GENRESOURCES_STREAM_UP_METRIC, 0)
self.check.count(GENRESOURCES_STREAM_RECONNECTS_METRIC, 1)
if got_data:
backoff = INITIAL_BACKOFF_SECONDS
self._sleep(backoff)
if not got_data:
backoff = min(backoff * 2, self._backoff_max)
self.check.gauge(GENRESOURCES_STREAM_UP_METRIC, 0)

def _stream_once(self) -> bool:
"""Open the stream and process events until it ends or stop is signaled; return True if any line arrived."""
url = self._endpoint + STREAM_PATH
kwargs: dict = {"stream": True, "timeout": (CONNECT_TIMEOUT_SECONDS, READ_TIMEOUT_SECONDS)}
if self._auth_token:
kwargs["headers"] = {"Authorization": f"Bearer {self._auth_token}"}
response = self.check.http.get(url, **kwargs)
self._response = response
got_data = False
try:
response.raise_for_status()
self.check.gauge(GENRESOURCES_STREAM_UP_METRIC, 1)
for line in response.iter_lines():
if self._stop.is_set():
break
if line:
got_data = True
self._handle_line(line)
finally:
self._response = None
response.close()
return got_data

def _handle_line(self, line: bytes) -> None:
try:
event = json.loads(line)
except (ValueError, TypeError):
return
result = event.get("result") if isinstance(event, dict) else None
if not isinstance(result, dict):
return
application = result.get("application")
if not isinstance(application, dict):
return
self.check.count(GENRESOURCES_STREAM_EVENTS_METRIC, 1)
if result.get("type") == "DELETED":
self._collector.forget_application(application)
else:
self._collector.emit_stream_application(application)
Loading
Loading