Skip to content

Commit 0be6cd0

Browse files
authored
Use prometheus instead of kube (ansible#198)
* Use prometheus instead of kube * Create unit tests * Fix kubernetes tests * Simplify generated code * SonarCloud fixes * use integer for the test * Add details in the log * Get the data for 59m59s and not 1h * Update test because 59m:59s collection rather than 1h * test current_hour_start * Rename METRICS_UTILITY_USAGE_BASED_BILLING_ENABLED * Add timeline in logs * Fix return value * As per comments * Add ca_cert_path as a prometheus_client attribute * Remove commented code --------- Signed-off-by: itdove <dvernier@redhat.com>
1 parent c0644e3 commit 0be6cd0

10 files changed

Lines changed: 1338 additions & 294 deletions

File tree

Makefile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
CONTAINER_ENGINE ?= docker
2+
13
help:
24
@echo help sync test coverage lint fix compose clean psql
35

@@ -19,12 +21,12 @@ fix:
1921
uv run ruff format
2022

2123
compose:
22-
docker compose -f tools/docker/docker-compose.yaml up
24+
${CONTAINER_ENGINE} compose -f tools/docker/docker-compose.yaml up
2325

2426
clean:
25-
docker compose -f tools/docker/docker-compose.yaml down -v
27+
${CONTAINER_ENGINE} compose -f tools/docker/docker-compose.yaml down -v
2628

2729
psql:
28-
docker compose -f tools/docker/docker-compose.yaml exec postgres psql -U awx
30+
${CONTAINER_ENGINE} compose -f tools/docker/docker-compose.yaml exec postgres psql -U awx
2931

3032
.PHONY: help sync test coverage lint fix compose clean psql

metrics_utility/automation_controller_billing/collectors.py

Lines changed: 88 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import platform
55

66
from datetime import datetime, timezone
7+
from typing import Tuple
78

89
import distro
910

@@ -14,14 +15,14 @@
1415
from django.db.utils import ProgrammingError
1516
from django.utils.timezone import now, timedelta
1617
from django.utils.translation import gettext_lazy as _
17-
from kubernetes import client
18-
from kubernetes import config as kube_config
1918

2019
from metrics_utility.base import CsvFileSplitter, register
2120
from metrics_utility.base.utils import get_max_gather_period_days, get_optional_collectors
2221
from metrics_utility.exceptions import MetricsException, MissingRequiredEnvVar
2322
from metrics_utility.logger import logger, logger_info_level
2423

24+
from .prometheus_client import PrometheusClient
25+
2526

2627
"""
2728
This module is used to define metrics collected by
@@ -551,10 +552,17 @@ def total_workers_vcpu(since, full_path, until, **kwargs):
551552
raise MissingRequiredEnvVar('environment variable METRICS_UTILITY_CLUSTER_NAME is not set')
552553

553554
now = datetime.now(timezone.utc)
554-
555-
info = {'cluster_name': cluster_name, 'timestamp': now.isoformat(), 'nodes': []}
556-
# If METRICS_UTILITY_USAGE_BASED_BILLING_ENABLED is not set or set to false then it returns 1
557-
usage_based_billing_enabled_str = os.getenv('METRICS_UTILITY_USAGE_BASED_BILLING_ENABLED')
555+
current_ts = now.timestamp()
556+
prev_hour_start, prev_hour_end = get_hour_boundaries(current_ts)
557+
558+
info = {
559+
'cluster_name': cluster_name,
560+
'collection_timestamp': datetime.fromtimestamp(current_ts).isoformat(),
561+
'start_timestamp': datetime.fromtimestamp(prev_hour_start).isoformat(),
562+
'end_timestamp': datetime.fromtimestamp(prev_hour_end).isoformat(),
563+
}
564+
# If METRICS_UTILITY_USAGE_BASED_METERING_ENABLED is not set or set to false then it returns 1
565+
usage_based_billing_enabled_str = os.getenv('METRICS_UTILITY_USAGE_BASED_METERING_ENABLED')
558566
usage_based_billing_enabled = False
559567
if usage_based_billing_enabled_str and (usage_based_billing_enabled_str.lower() == 'true'):
560568
usage_based_billing_enabled = True
@@ -563,42 +571,89 @@ def total_workers_vcpu(since, full_path, until, **kwargs):
563571
info['total_workers_vcpu'] = 1
564572
# This message must always appear in the log regardless of the log level.
565573
logger_info_level.info(json.dumps(info, indent=2))
566-
return {'cluster_name': info['cluster_name'], 'total_workers_vcpu': info['total_workers_vcpu']}
574+
return {'timestamp': info['end_timestamp'], 'cluster_name': info['cluster_name'], 'total_workers_vcpu': info['total_workers_vcpu']}
575+
576+
url = os.getenv('METRICS_UTILITY_PROMETHEUS_URL')
577+
if not url:
578+
prometheus_default_url = 'https://prometheus-k8s.openshift-monitoring.svc.cluster.local:9091'
579+
logger.info(
580+
f'environment variable METRICS_UTILITY_PROMETHEUS_URL is not set, \
581+
default {prometheus_default_url} will be assigned'
582+
)
583+
url = prometheus_default_url
567584

568585
try:
569-
kube_config.load_incluster_config()
570-
except kube_config.ConfigException:
571-
try:
572-
kube_config.load_kube_config()
573-
except kube_config.ConfigException as e:
574-
logger.error(f'Could not configure Kubernetes Python client ERROR: {e}')
575-
raise MetricsException(f'Could not configure Kubernetes Python client ERROR: {e}')
576-
577-
# Create a CoreV1Api client
578-
api_instance = client.CoreV1Api()
579-
if not api_instance:
580-
raise MetricsException('Could not get a Kube CoreV1Api client')
586+
prom = PrometheusClient(url=url)
587+
except Exception as e:
588+
raise MetricsException(f'Can not create a prometheus api client ERROR: {e}')
581589

582590
try:
583-
nodes = api_instance.list_node()
584-
except Exception as e:
591+
total_workers_vcpu, promql_query = get_total_workers_cpu(prom, prev_hour_start)
592+
timeline = get_cpu_timeline(prom, prev_hour_start, prev_hour_end)
593+
except MetricsException as e:
585594
raise MetricsException(f'Unexpected error when retrieving nodes: {e}')
586595

587-
if nodes is None:
588-
raise MetricsException('No nodes found')
596+
info['promql_query'] = promql_query
597+
info['timeline'] = timeline
598+
599+
logger.debug(f'total_workers_vcpu: {total_workers_vcpu}')
589600

590-
total_workers_vcpu = 0
591-
# In SaaS case we have only Worker nodes and so we don't need to filter out the control plan.
592-
# If it used for other environement, we might need to implement the filtering.
593-
for node_info in nodes.items:
594-
for resource, value in node_info.status.capacity.items():
595-
if resource == 'cpu':
596-
info['nodes'].append({node_info.metadata.name: int(value)})
597-
total_workers_vcpu += int(value)
601+
# This can happen when the prev_hour_start doesn't have data, it could be when the cluster just started or
602+
# if for some reasons prometheus loss some data.
603+
if total_workers_vcpu is None:
604+
logger.warning('No data availble yet, the cluster is probably running for less than an hour')
605+
raise MetricsException('No data availble yet, the cluster is probably running for less than an hour')
598606

599-
info['total_workers_vcpu'] = total_workers_vcpu
607+
info['total_workers_vcpu'] = int(total_workers_vcpu)
600608

601609
# This message must always appear in the log regardless of the log level.
602610
logger_info_level.info(json.dumps(info, indent=2))
603611

604-
return {'cluster_name': info['cluster_name'], 'total_workers_vcpu': info['total_workers_vcpu']}
612+
return {'timestamp': info['end_timestamp'], 'cluster_name': info['cluster_name'], 'total_workers_vcpu': info['total_workers_vcpu']}
613+
614+
615+
def get_hour_boundaries(current_timestamp: float) -> Tuple[float, float, float]:
616+
current_hour_start = (current_timestamp // 3600) * 3600
617+
previous_hour_start = current_hour_start - 3600
618+
previous_hour_end = current_hour_start - 1
619+
return previous_hour_start, previous_hour_end
620+
621+
622+
def get_total_workers_cpu(prom: PrometheusClient, base_timestamp: float) -> Tuple[float, str]:
623+
promql_query = f'max_over_time(sum(machine_cpu_cores)[59m59s:5m] @ {base_timestamp})'
624+
625+
try:
626+
total_workers_vcpu = prom.get_current_value(promql_query)
627+
except Exception as e:
628+
raise MetricsException(f'Unexpected error when retrieving nodes: {e}')
629+
630+
return total_workers_vcpu, promql_query
631+
632+
633+
def get_cpu_timeline(prom: PrometheusClient, previous_hour_start, previous_hour_end: float) -> list:
634+
"""
635+
Get array of timestamp/CPU pairs for the hour leading up to previous_hour_end
636+
Returns:
637+
List of dicts with 'timestamp' (ISO format) and 'cpu_sum' keys
638+
"""
639+
# Use instant query - query_range will handle the time range
640+
query = 'sum(machine_cpu_cores)'
641+
642+
try:
643+
response = prom.query_range(query=query, start_time=previous_hour_start, end_time=previous_hour_end, step='5m')
644+
645+
result = []
646+
if response and 'data' in response and 'result' in response['data']:
647+
for series in response['data']['result']:
648+
if 'values' in series:
649+
for timestamp_val, cpu_val in series['values']:
650+
result.append(
651+
{'timestamp': datetime.fromtimestamp(float(timestamp_val), timezone.utc).isoformat(), 'cpu_sum': float(cpu_val)}
652+
)
653+
654+
# Sort by timestamp
655+
result.sort(key=lambda x: x['timestamp'])
656+
return result
657+
658+
except Exception as e:
659+
raise MetricsException(f'Error querying CPU timeline: {e}')
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import os
2+
3+
from metrics_utility.exceptions import MetricsException
4+
from metrics_utility.logger import logger
5+
6+
7+
TOKEN_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/token'
8+
CA_CERT_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt'
9+
10+
11+
class KubernetesClient:
12+
"""
13+
Simplified Kubernetes client for service account token operations.
14+
15+
This class assumes running in a Kubernetes pod with standard service account
16+
files mounted at /var/run/secrets/kubernetes.io/serviceaccount/
17+
"""
18+
19+
def __init__(self):
20+
"""Initialize the client and validate service account files are available."""
21+
self._validate_service_account_files()
22+
23+
def _validate_service_account_files(self):
24+
"""Validate that required service account files exist."""
25+
26+
if not os.path.exists(TOKEN_PATH):
27+
raise MetricsException('Service account token not found at /var/run/secrets/kubernetes.io/serviceaccount/token')
28+
29+
logger.info('Service account files validated')
30+
31+
def get_current_token(self) -> str:
32+
"""
33+
Get the current pod's service account token from the mounted file.
34+
35+
Returns:
36+
Current service account token
37+
38+
Raises:
39+
MetricsException: If token cannot be read
40+
"""
41+
42+
try:
43+
with open(TOKEN_PATH, 'r') as f:
44+
token = f.read().strip()
45+
logger.info("Retrieved current pod's mounted token")
46+
logger.info(f' Token Length: {len(token)} characters')
47+
return token
48+
except Exception as e:
49+
raise MetricsException(f'Error reading token: {e}')
50+
51+
def get_ca_cert_path(self) -> str:
52+
"""
53+
Get the current pod's service account ca_cert from the mounted file.
54+
55+
Returns:
56+
Current service account ca_cert
57+
"""
58+
return CA_CERT_PATH
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import os
2+
3+
from typing import Optional
4+
5+
import requests
6+
7+
from metrics_utility.exceptions import MetricsException
8+
from metrics_utility.logger import logger
9+
10+
from .kubernetes_client import KubernetesClient
11+
12+
13+
class PrometheusClient:
14+
"""
15+
Prometheus client with Kubernetes service account authentication support.
16+
17+
This class handles:
18+
- Service account token retrieval from Kubernetes
19+
- Prometheus connection management
20+
- Query execution with proper error handling
21+
"""
22+
23+
def __init__(self, url: str, timeout: int = 30):
24+
"""
25+
Initialize Prometheus client.
26+
27+
Args:
28+
url: Prometheus server URL
29+
timeout: Request timeout in seconds (default: 30)
30+
"""
31+
self.url = url.rstrip('/') # Remove trailing slash
32+
self.timeout = timeout
33+
self.token = None
34+
self.ca_cert_path = None
35+
self.session = requests.Session()
36+
37+
kube_client = KubernetesClient()
38+
self.token = kube_client.get_current_token()
39+
if not self.token:
40+
raise MetricsException('Unable to retrieve the token for the current service account')
41+
42+
self.ca_cert_path = kube_client.get_ca_cert_path()
43+
44+
# Setup session
45+
self._setup_session()
46+
47+
def _setup_session(self):
48+
"""Setup HTTP session with authentication headers and CA certificate"""
49+
if self.token:
50+
logger.info('Creating authenticated Prometheus client')
51+
logger.info(f' URL: {self.url}')
52+
53+
self.session.headers.update({'Authorization': f'Bearer {self.token}', 'Content-Type': 'application/x-www-form-urlencoded'})
54+
else:
55+
logger.info('Creating unauthenticated Prometheus client')
56+
logger.info(f' URL: {self.url}')
57+
58+
# Use service CA certificate for SSL verification
59+
if os.path.exists(self.ca_cert_path):
60+
self.session.verify = self.ca_cert_path
61+
logger.info(f'Using service CA certificate: {self.ca_cert_path}')
62+
else:
63+
raise MetricsException(f'CA_CERT not found at {self.ca_cert_path}')
64+
65+
def query(self, query: str, time_param: Optional[float] = None) -> Optional[list]:
66+
"""
67+
Execute instant PromQL query.
68+
69+
Args:
70+
query: PromQL query string
71+
time_param: Optional timestamp for the query
72+
73+
Returns:
74+
Query results as list, or raise MetricsException if failed
75+
"""
76+
try:
77+
url = f'{self.url}/api/v1/query'
78+
params = {'query': query}
79+
80+
if time_param:
81+
params['time'] = time_param
82+
83+
response = self.session.get(url, params=params, timeout=self.timeout)
84+
85+
logger.debug(f'response: {response}')
86+
if response.status_code == 200:
87+
data = response.json()
88+
logger.debug(f'data: {data}')
89+
if data.get('status') == 'success':
90+
return data.get('data', {}).get('result', [])
91+
else:
92+
raise MetricsException(f'Prometheus API error: {data.get("error", "Unknown error")}')
93+
else:
94+
raise MetricsException(f'HTTP error {response.status_code}: {response.text}')
95+
96+
except Exception as e:
97+
raise MetricsException(f'Query failed: {e}')
98+
99+
def get_current_value(self, query: str) -> Optional[float]:
100+
"""
101+
Get current value from an instant query.
102+
103+
Args:
104+
query: PromQL query string
105+
106+
Returns:
107+
Current value as float, or None if result is empty
108+
"""
109+
result = self.query(query)
110+
if result and len(result) > 0:
111+
return float(result[0]['value'][1])
112+
return None
113+
114+
def query_range(self, query: str, start_time: float, end_time: float, step: str = '5m') -> Optional[dict]:
115+
"""
116+
Execute a range query against Prometheus.
117+
Args:
118+
query: PromQL instant query (not range query)
119+
start_time: Start time (Unix timestamp)
120+
end_time: End time (Unix timestamp)
121+
step: Query resolution step (e.g., '1m', '5m')
122+
"""
123+
params = {'query': query, 'start': start_time, 'end': end_time, 'step': step}
124+
125+
try:
126+
url = f'{self.url}/api/v1/query_range'
127+
logger.debug(f'Range query URL: {url}')
128+
logger.debug(f'Range query params: {params}')
129+
130+
response = self.session.get(url, params=params, timeout=self.timeout)
131+
response.raise_for_status()
132+
133+
data = response.json()
134+
if data.get('status') == 'success':
135+
return data
136+
else:
137+
logger.error(f'Prometheus range query failed: {data.get("error", "Unknown error")}')
138+
return None
139+
140+
except Exception as e:
141+
logger.error(f'Range query failed: {e}')
142+
raise MetricsException(e)

metrics_utility/base/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ An example can be found in [Test package](tests/classes/package.py)
9292
### Environment variables for total_workers_vcpu collector:
9393

9494
- `METRICS_UTILITY_CLUSTER_NAME`: Contains the cluster name which is part of the collection payload.
95-
- `METRICS_UTILITY_USAGE_BASED_BILLING_ENABLED`: [true/false] In case of true, the payload will contain the actual number of total vcpu accross all workers otherwise the total will be set to 1.
95+
- `METRICS_UTILITY_USAGE_BASED_METERING_ENABLED`: [true/false] In case of true, the payload will contain the actual number of total vcpu accross all workers otherwise the total will be set to 1.
9696

9797
N.B.: The SaaS solution runs on ROSA HCP so all nodes are workers, if this collector is used for another solution then the filtering must be implemented.
9898

0 commit comments

Comments
 (0)