Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ disable = [
"invalid-name",
"no-else-return", # Bad rule IMO (- OB)
"line-too-long", # Black takes care of line length.
"logging-fstring-interpolation"
"logging-fstring-interpolation",
"duplicate-code",
]
extension-pkg-whitelist = "pydantic"

Expand Down
129 changes: 129 additions & 0 deletions sarc/alerts/usage_alerts/cluster_scraping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging
import sys
from datetime import datetime, timedelta
from typing import List, Optional

import pandas

from sarc.config import MTL
from sarc.jobs.series import compute_time_frames, load_job_series

logger = logging.getLogger(__name__)


def check_nb_jobs_per_cluster_per_time(
time_interval: Optional[timedelta] = timedelta(days=7),
time_unit=timedelta(days=1),
cluster_names: Optional[List[str]] = None,
nb_stddev=2,
verbose=False,
):
"""
Check if we have scraped enough jobs per time unit per cluster on given time interval.
Log a warning for each cluster where number of jobs per time unit is lower than a limit
computed using mean and standard deviation statistics from this cluster.

Parameters
----------
time_interval: timedelta
If given, only jobs which ran in [now - time_interval, time_interval] will be used for checking.
Default is last 7 days.
If None, all jobs are used.
time_unit: timedelta
Time unit in which we must check cluster usage through time_interval. Default is 1 day.
cluster_names: list
Optional list of clusters to check.
If empty (or not specified), use all clusters available among jobs retrieved with time_interval.
nb_stddev: int
Amount of standard deviation to remove from average statistics to compute checking threshold.
For each cluster, threshold is computed as:
max(0, average - nb_stddev * stddev)
verbose: bool
If True, print supplementary info about clusters statistics.
"""

# Parse time_interval
start, end, clip_time = None, None, False
if time_interval is not None:
end = datetime.now(tz=MTL)
start = end - time_interval
clip_time = True

# Get data frame
df = load_job_series(start=start, end=end, clip_time=clip_time)

# Split data frame into time frames using `time_unit`
tf = compute_time_frames(df, frame_size=time_unit)

# List all available timestamps.
# We will check each timestamp for each cluster.
timestamps = sorted(tf["timestamp"].unique())

# List clusters
if cluster_names:
cluster_names = sorted(cluster_names)
else:
cluster_names = sorted(df["cluster_name"].unique())

# Iter for each cluster.
for cluster_name in cluster_names:
# Select only jobs for current cluster,
# group jobs by timestamp, and count jobs for each timestamp.
f_stats = (
tf[tf["cluster_name"] == cluster_name]
.groupby(["timestamp"])[["job_id"]]
.count()
)

# Create a dataframe with all available timestamps
# and associate each timestamp to 0 jobs by default.
c = (
pandas.DataFrame({"timestamp": timestamps, "count": [0] * len(timestamps)})
.groupby(["timestamp"])[["count"]]
.sum()
)
# Set each timestamp valid for this cluster with real number of jobs scraped in this timestamp.
c.loc[f_stats.index, "count"] = f_stats["job_id"]

# We now have number of jobs for each timestamp for this cluster,
# with count 0 for timestamps where no jobs run on cluster,

# Compute average number of jobs per timestamp for this cluster
avg = c["count"].mean()
# Compute standard deviation of job count per timestamp for this cluster
stddev = c["count"].std()
# Compute threshold to use for warnings: <average> - nb_stddev * <standard deviation>
threshold = max(0, avg - nb_stddev * stddev)

if verbose:
print(f"[{cluster_name}]", file=sys.stderr)
print(c, file=sys.stderr)
print(f"avg {avg}, stddev {stddev}, threshold {threshold}", file=sys.stderr)
print(file=sys.stderr)

if threshold == 0:
# If threshold is zero, no check can be done, as jobs count will be always >= 0.
# Instead, we log a general warning.
msg = f"[{cluster_name}] threshold 0 ({avg} - {nb_stddev} * {stddev})."
if len(timestamps) == 1:
msg += (
f" Only 1 timestamp found. Either time_interval ({time_interval}) is too short, "
f"or this cluster should not be currently checked"
)
else:
msg += (
f" Either nb_stddev is too high, time_interval ({time_interval}) is too short, "
f"or this cluster should not be currently checked"
)
logger.warning(msg)
else:
# With a non-null threshold, we can check each timestamp.
for timestamp in timestamps:
nb_jobs = c.loc[timestamp]["count"]
if nb_jobs < threshold:
logger.warning(
f"[{cluster_name}][{timestamp}] "
f"insufficient cluster scraping: {nb_jobs} jobs / cluster / time unit; "
f"minimum required for this cluster: {threshold} ({avg} - {nb_stddev} * {stddev}); "
f"time unit: {time_unit}"
)
58 changes: 58 additions & 0 deletions tests/functional/usage_alerts/test_alert_cluster_scraping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import functools
import re

import pytest

from sarc.alerts.usage_alerts.cluster_scraping import check_nb_jobs_per_cluster_per_time

from ..jobs.test_func_load_job_series import MOCK_TIME
from .common import _get_warnings

get_warnings = functools.partial(
_get_warnings,
module="sarc.alerts.usage_alerts.cluster_scraping:cluster_scraping.py",
)


@pytest.mark.freeze_time(MOCK_TIME)
@pytest.mark.usefixtures("read_only_db", "tzlocal_is_mtl")
@pytest.mark.parametrize(
"params",
[
# Check with default params. In last 7 days from now (mock time: 2023-11-22),
# there is only 2 jobs from 1 cluster in 1 timestamp. So, threshold will be 0.
dict(verbose=True),
# Check with no time interval (i.e. all jobs).
dict(time_interval=None, verbose=True),
# Check with a supplementary cluster `another_cluster` which is not in data frame.
dict(
time_interval=None,
cluster_names=[
"fromage",
"mila",
"patate",
"raisin",
"another_cluster",
],
verbose=True,
),
# Check above case with 2 clusters ignored.
dict(
time_interval=None,
cluster_names=[
"mila",
"raisin",
"another_cluster",
],
),
],
)
def test_check_nb_jobs_per_cluster_per_time(params, capsys, caplog, file_regression):
check_nb_jobs_per_cluster_per_time(**params)
file_regression.check(
re.sub(
r"WARNING +sarc\.alerts\.usage_alerts\.cluster_scraping:cluster_scraping.py:[0-9]+ +",
"",
f"{capsys.readouterr().err}\n{caplog.text}",
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load job series: 0%| | 0/2 [00:00<?, ?it/s]load job series: 100%|██████████| 2/2 [00:00<?, ?it/s]
[raisin]
count
timestamp
2023-11-21 07:00:00-05:00 2
avg 2.0, stddev nan, threshold 0


[raisin] threshold 0 (2.0 - 2 * nan). Only 1 timestamp found. Either time_interval (7 days, 0:00:00) is too short, or this cluster should not be currently checked
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
load job series: 0%| | 0/24 [00:00<?, ?it/s]load job series: 100%|██████████| 24/24 [00:00<?, ?it/s]
[fromage]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 1
2023-02-18 00:01:00-05:00 0
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.125, stddev 0.3535533905932738, threshold 0

[mila]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 0
2023-02-18 00:01:00-05:00 1
2023-02-19 00:01:00-05:00 1
2023-02-20 00:01:00-05:00 1
2023-11-21 00:01:00-05:00 0
avg 0.375, stddev 0.5175491695067657, threshold 0

[patate]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 1
2023-02-18 00:01:00-05:00 1
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.25, stddev 0.4629100498862757, threshold 0

[raisin]
count
timestamp
2023-02-14 00:01:00-05:00 4
2023-02-15 00:01:00-05:00 3
2023-02-16 00:01:00-05:00 4
2023-02-17 00:01:00-05:00 3
2023-02-18 00:01:00-05:00 3
2023-02-19 00:01:00-05:00 4
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 2
avg 2.875, stddev 1.3562026818605375, threshold 0.162594636278925


[fromage] threshold 0 (0.125 - 2 * 0.3535533905932738). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[mila] threshold 0 (0.375 - 2 * 0.5175491695067657). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[patate] threshold 0 (0.25 - 2 * 0.4629100498862757). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[raisin][2023-02-20 00:01:00-05:00] insufficient cluster scraping: 0 jobs / cluster / time unit; minimum required for this cluster: 0.162594636278925 (2.875 - 2 * 1.3562026818605375); time unit: 1 day, 0:00:00
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
load job series: 0%| | 0/24 [00:00<?, ?it/s]load job series: 100%|██████████| 24/24 [00:00<?, ?it/s]
[another_cluster]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 0
2023-02-18 00:01:00-05:00 0
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.0, stddev 0.0, threshold 0

[fromage]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 1
2023-02-18 00:01:00-05:00 0
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.125, stddev 0.3535533905932738, threshold 0

[mila]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 0
2023-02-18 00:01:00-05:00 1
2023-02-19 00:01:00-05:00 1
2023-02-20 00:01:00-05:00 1
2023-11-21 00:01:00-05:00 0
avg 0.375, stddev 0.5175491695067657, threshold 0

[patate]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 1
2023-02-18 00:01:00-05:00 1
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.25, stddev 0.4629100498862757, threshold 0

[raisin]
count
timestamp
2023-02-14 00:01:00-05:00 4
2023-02-15 00:01:00-05:00 3
2023-02-16 00:01:00-05:00 4
2023-02-17 00:01:00-05:00 3
2023-02-18 00:01:00-05:00 3
2023-02-19 00:01:00-05:00 4
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 2
avg 2.875, stddev 1.3562026818605375, threshold 0.162594636278925


[another_cluster] threshold 0 (0.0 - 2 * 0.0). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[fromage] threshold 0 (0.125 - 2 * 0.3535533905932738). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[mila] threshold 0 (0.375 - 2 * 0.5175491695067657). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[patate] threshold 0 (0.25 - 2 * 0.4629100498862757). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[raisin][2023-02-20 00:01:00-05:00] insufficient cluster scraping: 0 jobs / cluster / time unit; minimum required for this cluster: 0.162594636278925 (2.875 - 2 * 1.3562026818605375); time unit: 1 day, 0:00:00
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
load job series: 0%| | 0/24 [00:00<?, ?it/s]load job series: 100%|██████████| 24/24 [00:00<?, ?it/s]

[another_cluster] threshold 0 (0.0 - 2 * 0.0). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[mila] threshold 0 (0.375 - 2 * 0.5175491695067657). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[raisin][2023-02-20 00:01:00-05:00] insufficient cluster scraping: 0 jobs / cluster / time unit; minimum required for this cluster: 0.162594636278925 (2.875 - 2 * 1.3562026818605375); time unit: 1 day, 0:00:00
Expand Down
Loading