-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[n8n] Fix metric mappings and add full v2 metric coverage #23635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
7f24059
2991295
12f3122
8523188
1be3b3d
af60d11
43e7fc8
3db752d
fc4db3d
66e5dc3
7d4e58c
8c3703a
e8dfb08
9418e2d
8ec545d
d0b3a90
1f407d5
fbe1dd8
2844832
c5cae88
a67c26b
cc56e02
a0e259b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| Overhaul the n8n metric coverage and test harness, verified live against n8n 1.118.1 and 2.19.5: | ||
|
|
||
| - Expand baseline metric coverage with the event-driven counters that were previously missing or mis-mapped (``audit.workflow.*``, ``queue.job.stalled``, ``queue.job.dequeued``, ``nodejs.active.requests``). | ||
| - Add the n8n 2.x metric families: workflow execution duration histogram, audit workflow lifecycle counters, embed and token exchange auth counters, workflow statistics (executions, users, workflows, credentials), VM-isolated expression engine metrics, and ``process.pss.bytes``. Each family is gated on the corresponding n8n env flag; see the README for the full matrix. | ||
| - Map the broader event-bus surface (around 70 dynamic counters) so audit, AI node, user, package, variable, runner, worker, and workflow cancellation events are picked up automatically when n8n emits them. The integration test environment cannot exercise these families end to end, so they are documented as rare-event metrics and excluded from the symmetric metadata assertion. | ||
| - Add worker-only families (``node.started``, ``node.finished``, ``queue.job.dequeued``, ``runner.task.requested``) and document scraping the n8n worker process as a separate Datadog instance. | ||
| - Stop gating OpenMetrics scraping on ``/healthz/readiness``: ``n8n.readiness.check`` is still submitted, but metrics keep flowing when readiness reports degraded so SRE-relevant signals (queue depth, process state) are not lost during incidents. | ||
| - Fix the ``openmetrics_endpoint`` example in ``conf.yaml.example`` / ``spec.yaml`` to use the actual ``/metrics`` URL (the host root would silently mismatch the scrape path) and document that ``raw_metric_prefix`` must be kept in sync with a customised ``N8N_METRICS_PREFIX``. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion: this feels a bit too long and technical, could we make it shorter and more customer-friendly?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried hahaha, this is the third version, but there are so many things that change in this version that shortening it more seems we are leaving too much information out. Let me give it another round. |
||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,58 +2,55 @@ | |||||||||
| # All rights reserved | ||||||||||
| # Licensed under a 3-clause BSD style license (see LICENSE) | ||||||||||
|
|
||||||||||
| from urllib.parse import urljoin | ||||||||||
| from functools import cached_property | ||||||||||
| from typing import Any | ||||||||||
| from urllib.parse import urljoin, urlparse | ||||||||||
|
|
||||||||||
| from requests.exceptions import RequestException | ||||||||||
|
|
||||||||||
| from datadog_checks.base import OpenMetricsBaseCheckV2 | ||||||||||
| from datadog_checks.n8n.metrics import METRIC_MAP, RENAME_LABELS_MAP | ||||||||||
|
|
||||||||||
| from .config_models import ConfigMixin | ||||||||||
|
|
||||||||||
| DEFAULT_READY_ENDPOINT = '/healthz/readiness' | ||||||||||
| DEFAULT_READY_PATH = '/healthz/readiness' | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class N8nCheck(OpenMetricsBaseCheckV2, ConfigMixin): | ||||||||||
| __NAMESPACE__ = 'n8n' | ||||||||||
| DEFAULT_METRIC_LIMIT = 0 | ||||||||||
|
|
||||||||||
| def __init__(self, name, init_config, instances=None): | ||||||||||
| super(N8nCheck, self).__init__( | ||||||||||
| name, | ||||||||||
| init_config, | ||||||||||
| instances, | ||||||||||
| ) | ||||||||||
| self.openmetrics_endpoint = self.instance["openmetrics_endpoint"] | ||||||||||
| self.tags = self.instance.get('tags', []) | ||||||||||
| self._ready_endpoint = DEFAULT_READY_ENDPOINT | ||||||||||
|
|
||||||||||
| def get_default_config(self): | ||||||||||
| def get_default_config(self) -> dict[str, Any]: | ||||||||||
| return { | ||||||||||
| 'metrics': [METRIC_MAP], | ||||||||||
| 'rename_labels': RENAME_LABELS_MAP, | ||||||||||
| 'raw_metric_prefix': 'n8n_', | ||||||||||
| } | ||||||||||
|
|
||||||||||
| def _check_n8n_readiness(self): | ||||||||||
| endpoint = urljoin(self.openmetrics_endpoint, self._ready_endpoint) | ||||||||||
| response = self.http.get(endpoint) | ||||||||||
|
|
||||||||||
| # Determine metric value and status_code tag | ||||||||||
| if response.status_code is None: | ||||||||||
| self.log.warning("The readiness endpoint did not return a status code") | ||||||||||
| metric_value = 0 | ||||||||||
| metric_tags = self.tags + ['status_code:null'] | ||||||||||
| elif response.status_code == 200: | ||||||||||
| # Ready - submit 1 | ||||||||||
| metric_value = 1 | ||||||||||
| metric_tags = self.tags + [f'status_code:{response.status_code}'] | ||||||||||
| else: | ||||||||||
| # Not ready - submit 0 | ||||||||||
| metric_value = 0 | ||||||||||
| metric_tags = self.tags + [f'status_code:{response.status_code}'] | ||||||||||
|
|
||||||||||
| # Submit metric with appropriate value and status_code tag | ||||||||||
| self.gauge('readiness.check', metric_value, tags=metric_tags) | ||||||||||
|
|
||||||||||
| def check(self, instance): | ||||||||||
| super().check(instance) | ||||||||||
| @cached_property | ||||||||||
| def _readiness_endpoint(self) -> str: | ||||||||||
| parsed = urlparse(self.config.openmetrics_endpoint) | ||||||||||
| base = f'{parsed.scheme}://{parsed.netloc}' | ||||||||||
| return urljoin(base, DEFAULT_READY_PATH) | ||||||||||
|
|
||||||||||
| def _check_n8n_readiness(self) -> None: | ||||||||||
| endpoint = self._readiness_endpoint | ||||||||||
| tags = list(self.config.tags or ()) | ||||||||||
|
|
||||||||||
| try: | ||||||||||
| response = self.http.get(endpoint) | ||||||||||
| except RequestException as e: | ||||||||||
| self.log.warning("Could not reach n8n readiness endpoint %s: %s", endpoint, e) | ||||||||||
| self.gauge('readiness.check', 0, tags=tags + ['status_code:none']) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit: could be good to add the status_code when it's available (HTTP error).
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is, isn't it? Or am I misunderstanding your suggestion? is_ready = response.status_code == 200
self.gauge(
'readiness.check',
1 if is_ready else 0,
tags=tags + [f'status_code:{response.status_code}'],
)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm suggesting we set it on the failure path, inside the: except RequestException as e:
self.log.warning("Could not reach n8n readiness endpoint %s: %s", endpoint, e)
self.gauge('readiness.check', 0, tags=tags + ['status_code:none'])
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aaah, ok. When there is a Any other error (non 2xx) goes through the other branch where we add the code in the tag. I checked, just in case the Wrapper was doing the The failure path here carries no
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see, I missed that!
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, updated now. |
||||||||||
| return | ||||||||||
|
|
||||||||||
| is_ready = response.status_code == 200 | ||||||||||
| self.gauge( | ||||||||||
| 'readiness.check', | ||||||||||
| 1 if is_ready else 0, | ||||||||||
| tags=tags + [f'status_code:{response.status_code}'], | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| def check(self, instance: dict[str, Any]) -> None: | ||||||||||
| self._check_n8n_readiness() | ||||||||||
| super().check(instance) | ||||||||||
Uh oh!
There was an error while loading. Please reload this page.