-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[n8n] Fix metric mappings and add full v2 metric coverage #23635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7f24059
2991295
12f3122
8523188
1be3b3d
af60d11
43e7fc8
3db752d
fc4db3d
66e5dc3
7d4e58c
8c3703a
e8dfb08
9418e2d
8ec545d
d0b3a90
1f407d5
fbe1dd8
2844832
c5cae88
a67c26b
cc56e02
a0e259b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| Improve the n8n metric coverage: | ||
|
|
||
| - Correct missing or incorrect metrics. | ||
| - Add metrics introduced in n8n 2.x (workflow execution duration, audit events, authentication, workflow and user statistics, expression engine, and process memory). | ||
| - Track n8n's dynamic events (workflow cancellations, audit activity, AI nodes, user and credential changes, package and variable changes). | ||
| - Add support for monitoring n8n worker processes alongside the main process. |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,58 +2,55 @@ | |||||||||
| # All rights reserved | ||||||||||
| # Licensed under a 3-clause BSD style license (see LICENSE) | ||||||||||
|
|
||||||||||
| from urllib.parse import urljoin | ||||||||||
| from functools import cached_property | ||||||||||
| from typing import Any | ||||||||||
| from urllib.parse import urljoin, urlparse | ||||||||||
|
|
||||||||||
| from requests.exceptions import RequestException | ||||||||||
|
|
||||||||||
| from datadog_checks.base import OpenMetricsBaseCheckV2 | ||||||||||
| from datadog_checks.n8n.metrics import METRIC_MAP, RENAME_LABELS_MAP | ||||||||||
|
|
||||||||||
| from .config_models import ConfigMixin | ||||||||||
|
|
||||||||||
| DEFAULT_READY_ENDPOINT = '/healthz/readiness' | ||||||||||
| DEFAULT_READY_PATH = '/healthz/readiness' | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class N8nCheck(OpenMetricsBaseCheckV2, ConfigMixin): | ||||||||||
| __NAMESPACE__ = 'n8n' | ||||||||||
| DEFAULT_METRIC_LIMIT = 0 | ||||||||||
|
|
||||||||||
| def __init__(self, name, init_config, instances=None): | ||||||||||
| super(N8nCheck, self).__init__( | ||||||||||
| name, | ||||||||||
| init_config, | ||||||||||
| instances, | ||||||||||
| ) | ||||||||||
| self.openmetrics_endpoint = self.instance["openmetrics_endpoint"] | ||||||||||
| self.tags = self.instance.get('tags', []) | ||||||||||
| self._ready_endpoint = DEFAULT_READY_ENDPOINT | ||||||||||
|
|
||||||||||
| def get_default_config(self): | ||||||||||
| def get_default_config(self) -> dict[str, Any]: | ||||||||||
| return { | ||||||||||
| 'metrics': [METRIC_MAP], | ||||||||||
| 'rename_labels': RENAME_LABELS_MAP, | ||||||||||
| 'raw_metric_prefix': 'n8n_', | ||||||||||
| } | ||||||||||
|
|
||||||||||
| def _check_n8n_readiness(self): | ||||||||||
| endpoint = urljoin(self.openmetrics_endpoint, self._ready_endpoint) | ||||||||||
| response = self.http.get(endpoint) | ||||||||||
|
|
||||||||||
| # Determine metric value and status_code tag | ||||||||||
| if response.status_code is None: | ||||||||||
| self.log.warning("The readiness endpoint did not return a status code") | ||||||||||
| metric_value = 0 | ||||||||||
| metric_tags = self.tags + ['status_code:null'] | ||||||||||
| elif response.status_code == 200: | ||||||||||
| # Ready - submit 1 | ||||||||||
| metric_value = 1 | ||||||||||
| metric_tags = self.tags + [f'status_code:{response.status_code}'] | ||||||||||
| else: | ||||||||||
| # Not ready - submit 0 | ||||||||||
| metric_value = 0 | ||||||||||
| metric_tags = self.tags + [f'status_code:{response.status_code}'] | ||||||||||
|
|
||||||||||
| # Submit metric with appropriate value and status_code tag | ||||||||||
| self.gauge('readiness.check', metric_value, tags=metric_tags) | ||||||||||
|
|
||||||||||
| def check(self, instance): | ||||||||||
| super().check(instance) | ||||||||||
| @cached_property | ||||||||||
| def _readiness_endpoint(self) -> str: | ||||||||||
| parsed = urlparse(self.config.openmetrics_endpoint) | ||||||||||
| base = f'{parsed.scheme}://{parsed.netloc}' | ||||||||||
| return urljoin(base, DEFAULT_READY_PATH) | ||||||||||
|
|
||||||||||
| def _check_n8n_readiness(self) -> None: | ||||||||||
| endpoint = self._readiness_endpoint | ||||||||||
| tags = list(self.config.tags or ()) | ||||||||||
|
|
||||||||||
| try: | ||||||||||
| response = self.http.get(endpoint) | ||||||||||
| except RequestException as e: | ||||||||||
| self.log.warning("Could not reach n8n readiness endpoint %s: %s", endpoint, e) | ||||||||||
| self.gauge('readiness.check', 0, tags=tags + ['status_code:none']) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit: could be good to add the status_code when it's available (HTTP error).
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is, isn't it? Or am I misunderstanding your suggestion? is_ready = response.status_code == 200
self.gauge(
'readiness.check',
1 if is_ready else 0,
tags=tags + [f'status_code:{response.status_code}'],
)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm suggesting we set it on the failure path, inside the: except RequestException as e:
self.log.warning("Could not reach n8n readiness endpoint %s: %s", endpoint, e)
self.gauge('readiness.check', 0, tags=tags + ['status_code:none'])
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aaah, ok. When there is a Any other error (non 2xx) goes through the other branch where we add the code in the tag. I checked, just in case the Wrapper was doing the The failure path here carries no
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see, I missed that!
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, updated now. |
||||||||||
| return | ||||||||||
|
|
||||||||||
| is_ready = 200 <= response.status_code < 300 | ||||||||||
| self.gauge( | ||||||||||
| 'readiness.check', | ||||||||||
| 1 if is_ready else 0, | ||||||||||
| tags=tags + [f'status_code:{response.status_code}'], | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| def check(self, instance: dict[str, Any]) -> None: | ||||||||||
| self._check_n8n_readiness() | ||||||||||
| super().check(instance) | ||||||||||
Uh oh!
There was an error while loading. Please reload this page.