diff --git a/pyproject.toml b/pyproject.toml index fb5a4e77..1b84eb67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,8 @@ disable = [ "invalid-name", "no-else-return", # Bad rule IMO (- OB) "line-too-long", # Black takes care of line length. - "logging-fstring-interpolation" + "logging-fstring-interpolation", + "duplicate-code", ] extension-pkg-whitelist = "pydantic" diff --git a/sarc/alerts/usage_alerts/cluster_scraping.py b/sarc/alerts/usage_alerts/cluster_scraping.py new file mode 100644 index 00000000..06d4a955 --- /dev/null +++ b/sarc/alerts/usage_alerts/cluster_scraping.py @@ -0,0 +1,129 @@ +import logging +import sys +from datetime import datetime, timedelta +from typing import List, Optional + +import pandas + +from sarc.config import MTL +from sarc.jobs.series import compute_time_frames, load_job_series + +logger = logging.getLogger(__name__) + + +def check_nb_jobs_per_cluster_per_time( + time_interval: Optional[timedelta] = timedelta(days=7), + time_unit=timedelta(days=1), + cluster_names: Optional[List[str]] = None, + nb_stddev=2, + verbose=False, +): + """ + Check if we have scraped enough jobs per time unit per cluster on given time interval. + Log a warning for each cluster where number of jobs per time unit is lower than a limit + computed using mean and standard deviation statistics from this cluster. + + Parameters + ---------- + time_interval: timedelta + If given, only jobs which ran in [now - time_interval, time_interval] will be used for checking. + Default is last 7 days. + If None, all jobs are used. + time_unit: timedelta + Time unit in which we must check cluster usage through time_interval. Default is 1 day. + cluster_names: list + Optional list of clusters to check. + If empty (or not specified), use all clusters available among jobs retrieved with time_interval. + nb_stddev: int + Amount of standard deviation to remove from average statistics to compute checking threshold. + For each cluster, threshold is computed as: + max(0, average - nb_stddev * stddev) + verbose: bool + If True, print supplementary info about clusters statistics. + """ + + # Parse time_interval + start, end, clip_time = None, None, False + if time_interval is not None: + end = datetime.now(tz=MTL) + start = end - time_interval + clip_time = True + + # Get data frame + df = load_job_series(start=start, end=end, clip_time=clip_time) + + # Split data frame into time frames using `time_unit` + tf = compute_time_frames(df, frame_size=time_unit) + + # List all available timestamps. + # We will check each timestamp for each cluster. + timestamps = sorted(tf["timestamp"].unique()) + + # List clusters + if cluster_names: + cluster_names = sorted(cluster_names) + else: + cluster_names = sorted(df["cluster_name"].unique()) + + # Iter for each cluster. + for cluster_name in cluster_names: + # Select only jobs for current cluster, + # group jobs by timestamp, and count jobs for each timestamp. + f_stats = ( + tf[tf["cluster_name"] == cluster_name] + .groupby(["timestamp"])[["job_id"]] + .count() + ) + + # Create a dataframe with all available timestamps + # and associate each timestamp to 0 jobs by default. + c = ( + pandas.DataFrame({"timestamp": timestamps, "count": [0] * len(timestamps)}) + .groupby(["timestamp"])[["count"]] + .sum() + ) + # Set each timestamp valid for this cluster with real number of jobs scraped in this timestamp. + c.loc[f_stats.index, "count"] = f_stats["job_id"] + + # We now have number of jobs for each timestamp for this cluster, + # with count 0 for timestamps where no jobs run on cluster, + + # Compute average number of jobs per timestamp for this cluster + avg = c["count"].mean() + # Compute standard deviation of job count per timestamp for this cluster + stddev = c["count"].std() + # Compute threshold to use for warnings: - nb_stddev * + threshold = max(0, avg - nb_stddev * stddev) + + if verbose: + print(f"[{cluster_name}]", file=sys.stderr) + print(c, file=sys.stderr) + print(f"avg {avg}, stddev {stddev}, threshold {threshold}", file=sys.stderr) + print(file=sys.stderr) + + if threshold == 0: + # If threshold is zero, no check can be done, as jobs count will be always >= 0. + # Instead, we log a general warning. + msg = f"[{cluster_name}] threshold 0 ({avg} - {nb_stddev} * {stddev})." + if len(timestamps) == 1: + msg += ( + f" Only 1 timestamp found. Either time_interval ({time_interval}) is too short, " + f"or this cluster should not be currently checked" + ) + else: + msg += ( + f" Either nb_stddev is too high, time_interval ({time_interval}) is too short, " + f"or this cluster should not be currently checked" + ) + logger.warning(msg) + else: + # With a non-null threshold, we can check each timestamp. + for timestamp in timestamps: + nb_jobs = c.loc[timestamp]["count"] + if nb_jobs < threshold: + logger.warning( + f"[{cluster_name}][{timestamp}] " + f"insufficient cluster scraping: {nb_jobs} jobs / cluster / time unit; " + f"minimum required for this cluster: {threshold} ({avg} - {nb_stddev} * {stddev}); " + f"time unit: {time_unit}" + ) diff --git a/tests/functional/usage_alerts/test_alert_cluster_scraping.py b/tests/functional/usage_alerts/test_alert_cluster_scraping.py new file mode 100644 index 00000000..2d1313c0 --- /dev/null +++ b/tests/functional/usage_alerts/test_alert_cluster_scraping.py @@ -0,0 +1,58 @@ +import functools +import re + +import pytest + +from sarc.alerts.usage_alerts.cluster_scraping import check_nb_jobs_per_cluster_per_time + +from ..jobs.test_func_load_job_series import MOCK_TIME +from .common import _get_warnings + +get_warnings = functools.partial( + _get_warnings, + module="sarc.alerts.usage_alerts.cluster_scraping:cluster_scraping.py", +) + + +@pytest.mark.freeze_time(MOCK_TIME) +@pytest.mark.usefixtures("read_only_db", "tzlocal_is_mtl") +@pytest.mark.parametrize( + "params", + [ + # Check with default params. In last 7 days from now (mock time: 2023-11-22), + # there is only 2 jobs from 1 cluster in 1 timestamp. So, threshold will be 0. + dict(verbose=True), + # Check with no time interval (i.e. all jobs). + dict(time_interval=None, verbose=True), + # Check with a supplementary cluster `another_cluster` which is not in data frame. + dict( + time_interval=None, + cluster_names=[ + "fromage", + "mila", + "patate", + "raisin", + "another_cluster", + ], + verbose=True, + ), + # Check above case with 2 clusters ignored. + dict( + time_interval=None, + cluster_names=[ + "mila", + "raisin", + "another_cluster", + ], + ), + ], +) +def test_check_nb_jobs_per_cluster_per_time(params, capsys, caplog, file_regression): + check_nb_jobs_per_cluster_per_time(**params) + file_regression.check( + re.sub( + r"WARNING +sarc\.alerts\.usage_alerts\.cluster_scraping:cluster_scraping.py:[0-9]+ +", + "", + f"{capsys.readouterr().err}\n{caplog.text}", + ) + ) diff --git a/tests/functional/usage_alerts/test_alert_cluster_scraping/test_check_nb_jobs_per_cluster_per_time_params0_.txt b/tests/functional/usage_alerts/test_alert_cluster_scraping/test_check_nb_jobs_per_cluster_per_time_params0_.txt new file mode 100644 index 00000000..966b36e6 --- /dev/null +++ b/tests/functional/usage_alerts/test_alert_cluster_scraping/test_check_nb_jobs_per_cluster_per_time_params0_.txt @@ -0,0 +1,9 @@ + load job series: 0%| | 0/2 [00:00