Skip to content

Commit aa4f817

Browse files
authored
admin: monitor worker version (#12463)
* root: include version in celery ping Signed-off-by: Jens Langhammer <[email protected]> * check version in worker endpoint Signed-off-by: Jens Langhammer <[email protected]> * include worker version in prom metrics Signed-off-by: Jens Langhammer <[email protected]> * format Signed-off-by: Jens Langhammer <[email protected]> * fix tests Signed-off-by: Jens Langhammer <[email protected]> --------- Signed-off-by: Jens Langhammer <[email protected]>
1 parent c3aefd5 commit aa4f817

File tree

8 files changed

+100
-26
lines changed

8 files changed

+100
-26
lines changed

authentik/admin/api/workers.py

+36-5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
"""authentik administration overview"""
22

3+
from socket import gethostname
4+
35
from django.conf import settings
46
from drf_spectacular.utils import extend_schema, inline_serializer
5-
from rest_framework.fields import IntegerField
7+
from packaging.version import parse
8+
from rest_framework.fields import BooleanField, CharField
69
from rest_framework.request import Request
710
from rest_framework.response import Response
811
from rest_framework.views import APIView
912

13+
from authentik import get_full_version
1014
from authentik.rbac.permissions import HasPermission
1115
from authentik.root.celery import CELERY_APP
1216

@@ -16,11 +20,38 @@ class WorkerView(APIView):
1620

1721
permission_classes = [HasPermission("authentik_rbac.view_system_info")]
1822

19-
@extend_schema(responses=inline_serializer("Workers", fields={"count": IntegerField()}))
23+
@extend_schema(
24+
responses=inline_serializer(
25+
"Worker",
26+
fields={
27+
"worker_id": CharField(),
28+
"version": CharField(),
29+
"version_matching": BooleanField(),
30+
},
31+
many=True,
32+
)
33+
)
2034
def get(self, request: Request) -> Response:
2135
"""Get currently connected worker count."""
22-
count = len(CELERY_APP.control.ping(timeout=0.5))
36+
raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5)
37+
our_version = parse(get_full_version())
38+
response = []
39+
for worker in raw:
40+
key = list(worker.keys())[0]
41+
version = worker[key].get("version")
42+
version_matching = False
43+
if version:
44+
version_matching = parse(version) == our_version
45+
response.append(
46+
{"worker_id": key, "version": version, "version_matching": version_matching}
47+
)
2348
# In debug we run with `task_always_eager`, so tasks are ran on the main process
2449
if settings.DEBUG: # pragma: no cover
25-
count += 1
26-
return Response({"count": count})
50+
response.append(
51+
{
52+
"worker_id": f"authentik-debug@{gethostname()}",
53+
"version": get_full_version(),
54+
"version_matching": True,
55+
}
56+
)
57+
return Response(response)

authentik/admin/apps.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
"""authentik admin app config"""
22

3-
from prometheus_client import Gauge, Info
3+
from prometheus_client import Info
44

55
from authentik.blueprints.apps import ManagedAppConfig
66

77
PROM_INFO = Info("authentik_version", "Currently running authentik version")
8-
GAUGE_WORKERS = Gauge("authentik_admin_workers", "Currently connected workers")
98

109

1110
class AuthentikAdminConfig(ManagedAppConfig):

authentik/admin/signals.py

+24-3
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,35 @@
11
"""admin signals"""
22

33
from django.dispatch import receiver
4+
from packaging.version import parse
5+
from prometheus_client import Gauge
46

5-
from authentik.admin.apps import GAUGE_WORKERS
7+
from authentik import get_full_version
68
from authentik.root.celery import CELERY_APP
79
from authentik.root.monitoring import monitoring_set
810

11+
GAUGE_WORKERS = Gauge(
12+
"authentik_admin_workers",
13+
"Currently connected workers, their versions and if they are the same version as authentik",
14+
["version", "version_matched"],
15+
)
16+
17+
18+
_version = parse(get_full_version())
19+
920

1021
@receiver(monitoring_set)
1122
def monitoring_set_workers(sender, **kwargs):
1223
"""Set worker gauge"""
13-
count = len(CELERY_APP.control.ping(timeout=0.5))
14-
GAUGE_WORKERS.set(count)
24+
raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5)
25+
worker_version_count = {}
26+
for worker in raw:
27+
key = list(worker.keys())[0]
28+
version = worker[key].get("version")
29+
version_matching = False
30+
if version:
31+
version_matching = parse(version) == _version
32+
worker_version_count.setdefault(version, {"count": 0, "matching": version_matching})
33+
worker_version_count[version]["count"] += 1
34+
for version, stats in worker_version_count.items():
35+
GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])

authentik/admin/tests/test_api.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_workers(self):
3434
response = self.client.get(reverse("authentik_api:admin_workers"))
3535
self.assertEqual(response.status_code, 200)
3636
body = loads(response.content)
37-
self.assertEqual(body["count"], 0)
37+
self.assertEqual(len(body), 0)
3838

3939
def test_metrics(self):
4040
"""Test metrics API"""

authentik/root/celery.py

+8
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
task_prerun,
1919
worker_ready,
2020
)
21+
from celery.worker.control import inspect_command
2122
from django.conf import settings
2223
from django.db import ProgrammingError
2324
from django_tenants.utils import get_public_schema_name
2425
from structlog.contextvars import STRUCTLOG_KEY_PREFIX
2526
from structlog.stdlib import get_logger
2627
from tenant_schemas_celery.app import CeleryApp as TenantAwareCeleryApp
2728

29+
from authentik import get_full_version
2830
from authentik.lib.sentry import before_send
2931
from authentik.lib.utils.errors import exception_to_string
3032

@@ -159,6 +161,12 @@ def update_heartbeat_file(self, worker: Worker):
159161
HEARTBEAT_FILE.touch()
160162

161163

164+
@inspect_command(default_timeout=0.2)
165+
def ping(state, **kwargs):
166+
"""Ping worker(s)."""
167+
return {"ok": "pong", "version": get_full_version()}
168+
169+
162170
CELERY_APP.config_from_object(settings.CELERY)
163171

164172
# Load task modules from all registered Django app configs.

blueprints/schema.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -4159,7 +4159,7 @@
41594159
"re_evaluate_policies": {
41604160
"type": "boolean",
41614161
"title": "Re evaluate policies",
4162-
"description": "Evaluate policies when the Stage is present to the user."
4162+
"description": "Evaluate policies when the Stage is presented to the user."
41634163
},
41644164
"order": {
41654165
"type": "integer",

schema.yml

+14-6
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ paths:
349349
description: ''
350350
/admin/workers/:
351351
get:
352-
operationId: admin_workers_retrieve
352+
operationId: admin_workers_list
353353
description: Get currently connected worker count.
354354
tags:
355355
- admin
@@ -360,7 +360,9 @@ paths:
360360
content:
361361
application/json:
362362
schema:
363-
$ref: '#/components/schemas/Workers'
363+
type: array
364+
items:
365+
$ref: '#/components/schemas/Worker'
364366
description: ''
365367
'400':
366368
content:
@@ -56987,13 +56989,19 @@ components:
5698756989
required:
5698856990
- aaguid
5698956991
- description
56990-
Workers:
56992+
Worker:
5699156993
type: object
5699256994
properties:
56993-
count:
56994-
type: integer
56995+
worker_id:
56996+
type: string
56997+
version:
56998+
type: string
56999+
version_matching:
57000+
type: boolean
5699557001
required:
56996-
- count
57002+
- version
57003+
- version_matching
57004+
- worker_id
5699757005
modelRequest:
5699857006
oneOf:
5699957007
- $ref: '#/components/schemas/GoogleWorkspaceProviderRequest'

web/src/admin/admin-overview/cards/WorkerStatusCard.ts

+15-8
Original file line numberDiff line numberDiff line change
@@ -8,34 +8,41 @@ import { msg } from "@lit/localize";
88
import { TemplateResult, html } from "lit";
99
import { customElement } from "lit/decorators.js";
1010

11-
import { AdminApi } from "@goauthentik/api";
11+
import { AdminApi, Worker } from "@goauthentik/api";
1212

1313
@customElement("ak-admin-status-card-workers")
14-
export class WorkersStatusCard extends AdminStatusCard<number> {
14+
export class WorkersStatusCard extends AdminStatusCard<Worker[]> {
1515
icon = "pf-icon pf-icon-server";
1616

17-
getPrimaryValue(): Promise<number> {
18-
return new AdminApi(DEFAULT_CONFIG).adminWorkersRetrieve().then((workers) => {
19-
return workers.count;
20-
});
17+
getPrimaryValue(): Promise<Worker[]> {
18+
return new AdminApi(DEFAULT_CONFIG).adminWorkersList();
2119
}
2220

2321
renderHeader(): TemplateResult {
2422
return html`${msg("Workers")}`;
2523
}
2624

27-
getStatus(value: number): Promise<AdminStatus> {
28-
if (value < 1) {
25+
getStatus(value: Worker[]): Promise<AdminStatus> {
26+
if (value.length < 1) {
2927
return Promise.resolve<AdminStatus>({
3028
icon: "fa fa-times-circle pf-m-danger",
3129
message: html`${msg("No workers connected. Background tasks will not run.")}`,
3230
});
31+
} else if (value.filter((w) => !w.versionMatching).length > 0) {
32+
return Promise.resolve<AdminStatus>({
33+
icon: "fa fa-times-circle pf-m-danger",
34+
message: html`${msg("Worker with incorrect version connected.")}`,
35+
});
3336
} else {
3437
return Promise.resolve<AdminStatus>({
3538
icon: "fa fa-check-circle pf-m-success",
3639
});
3740
}
3841
}
42+
43+
renderValue() {
44+
return html`${this.value?.length}`;
45+
}
3946
}
4047

4148
declare global {

0 commit comments

Comments
 (0)