Skip to content

Commit 68dabdd

Browse files
dipannita08copybara-github
authored andcommitted
Add a Step Deviation Query API
PiperOrigin-RevId: 720202588
1 parent 982bae7 commit 68dabdd

File tree

4 files changed

+185
-31
lines changed

4 files changed

+185
-31
lines changed

ml_goodput_measurement/src/goodput.py

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
from cloud_goodput.ml_goodput_measurement.src.checkpoint_badput_calculator import CheckpointBadputCalculator
1313
from cloud_goodput.ml_goodput_measurement.src.checkpoint_badput_calculator import CheckpointLoggerOptions
1414
from cloud_goodput.ml_goodput_measurement.src.goodput_cache import GoodputCache
15-
from cloud_goodput.ml_goodput_measurement.src.goodput_utils import BadputType, GoodputInfo, get_timestamp_from_log_entry
16-
import numpy as np
17-
from scipy import stats
15+
from cloud_goodput.ml_goodput_measurement.src.goodput_utils import BadputType, GoodputInfo, StepInfo
16+
from cloud_goodput.ml_goodput_measurement.src.goodput_utils import compute_ideal_step_time, get_extra_time_from_anomalous_steps, get_timestamp_from_log_entry
1817

1918

2019
_JOB_NAME = 'job_name'
@@ -455,31 +454,6 @@ def _get_current_productive_and_unproductive_time(
455454
now based on the latest entries retrieved from Cloud Logging.
456455
"""
457456

458-
def get_extra_time_from_anomalous_steps(step_times: list[Any]) -> float:
459-
def get_anomalous_and_normal_step_times(
460-
step_times: list[Any],
461-
) -> tuple[list[Any], list[Any]]:
462-
mad = stats.median_abs_deviation(step_times)
463-
med = np.median(step_times)
464-
465-
anomalous_step_times = []
466-
normal_step_times = []
467-
for step_time in step_times:
468-
if step_time > (med + mad * 3):
469-
anomalous_step_times.append(step_time)
470-
else:
471-
normal_step_times.append(step_time)
472-
473-
return anomalous_step_times, normal_step_times
474-
475-
anomalous_step_times, normal_step_times = (
476-
get_anomalous_and_normal_step_times(step_times)
477-
)
478-
normal_step_mean = np.mean(normal_step_times)
479-
return sum(anomalous_step_times) - (
480-
len(anomalous_step_times) * normal_step_mean
481-
)
482-
483457
def get_segment_productive_and_unproductive_time(
484458
step_start_data: dict[int, float], curr_step: int
485459
) -> tuple[float, dict[BadputType, float]]:
@@ -1010,6 +984,76 @@ def get_job_goodput_interval(
1010984
self._number_of_interruptions,
1011985
)
1012986

987+
def _get_step_times(self):
988+
"""Helper function to compute step times from the log entries."""
989+
step_times = {}
990+
previous_step_start_time = None
991+
previous_step_count = None
992+
for payload in self._current_entries:
993+
if _STEP_START_TIME in payload:
994+
step_start_time = payload[_STEP_START_TIME]
995+
step_count = int(payload[_STEP_COUNT])
996+
if (
997+
previous_step_start_time is not None
998+
and previous_step_count is not None
999+
and step_count == previous_step_count + 1
1000+
):
1001+
step_times[previous_step_count] = (
1002+
step_start_time - previous_step_start_time
1003+
)
1004+
previous_step_count = step_count
1005+
previous_step_start_time = step_start_time
1006+
return step_times
1007+
1008+
def get_step_deviation(
1009+
self, configured_ideal_step_time: Optional[float] = None
1010+
) -> dict[int, float]:
1011+
"""Method to get the step deviation of the current step based on the ideal step time.
1012+
1013+
This method computes the ideal step time if one is not provided by the user
1014+
and returns the step deviation of the current step.
1015+
1016+
Args:
1017+
configured_ideal_step_time: Optional user-defined ideal step time.
1018+
1019+
Returns:
1020+
A dictionary of step deviation for each step.
1021+
"""
1022+
# Get the log entries.
1023+
self._update_log_entries()
1024+
# Compute step times from the log entries.
1025+
step_times = self._get_step_times()
1026+
# Get the previous ideal step time from the cache.
1027+
previous_ideal_step_time = (
1028+
self._goodput_cache._step_info.ideal_step_time
1029+
if self._goodput_cache._step_info
1030+
and self._goodput_cache._step_info.ideal_step_time
1031+
else None
1032+
)
1033+
# Compute ideal step time.
1034+
ideal_step_time = (
1035+
configured_ideal_step_time
1036+
if configured_ideal_step_time is not None
1037+
else compute_ideal_step_time(
1038+
step_times=list(step_times.values()),
1039+
previous_ideal_step_time=previous_ideal_step_time,
1040+
)
1041+
)
1042+
1043+
# Compute step deviation.
1044+
step_deviations = {
1045+
step_count: abs(step_time - ideal_step_time)
1046+
for step_count, step_time in step_times.items()
1047+
}
1048+
# Update the step information in the cache.
1049+
self._goodput_cache.update_step_info(
1050+
StepInfo(
1051+
ideal_step_time=ideal_step_time,
1052+
step_deviations=step_deviations,
1053+
)
1054+
)
1055+
return step_deviations
1056+
10131057
def _get_job_badput_breakdown(
10141058
self, total_productive_time, total_unproductive_time, total_job_time
10151059
):

ml_goodput_measurement/src/goodput_cache.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import datetime
44
from typing import Any, Dict, Optional
55

6-
from cloud_goodput.ml_goodput_measurement.src.goodput_utils import BadputType, GoodputInfo
6+
from cloud_goodput.ml_goodput_measurement.src.goodput_utils import BadputType, GoodputInfo, StepInfo
77

88
_TIME_ENTRY = 'time'
99
_JOB_START_TIME = 'job_start_time'
@@ -18,6 +18,11 @@ def __init__(self):
1818
self._last_entry_timestamp = None
1919
self._job_start_time = None
2020
self._job_end_time = None
21+
self._step_info = None
22+
23+
def update_step_info(self, step_info: StepInfo):
24+
"""Updates the step information."""
25+
self._step_info = step_info
2126

2227
def update_cached_entries(self, entries: list[Any]):
2328
"""Updated the cached entries."""
@@ -75,4 +80,3 @@ def clear_cache(self):
7580
def is_cache_empty(self) -> bool:
7681
"""Checks if the cache is empty."""
7782
return not self._cached_entries
78-

ml_goodput_measurement/src/goodput_utils.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import datetime
44
import enum
55
from typing import Any, Optional
6+
import numpy as np
7+
from scipy import stats
68

79
_TIME_ENTRY = 'time'
810

@@ -36,6 +38,62 @@ def __init__(
3638
self.last_recorded_step = last_recorded_step
3739

3840

41+
class StepInfo:
42+
"""Step Information."""
43+
44+
def __init__(
45+
self,
46+
ideal_step_time: float,
47+
step_deviations: dict[int, float],
48+
):
49+
self.ideal_step_time = ideal_step_time
50+
self.step_deviations = step_deviations
51+
52+
53+
def compute_ideal_step_time(
54+
step_times: list[float], previous_ideal_step_time: Optional[float]
55+
) -> float:
56+
"""Helper function to compute the ideal step time."""
57+
# Filter out the normal step times from the step times dictionary.
58+
mad = stats.median_abs_deviation(step_times)
59+
med = np.median(step_times)
60+
normal_step_times = []
61+
for step_time in step_times:
62+
if step_time <= (med + mad * 3):
63+
normal_step_times.append(step_time)
64+
mean_normal_step_time = np.mean(normal_step_times)
65+
if previous_ideal_step_time is not None:
66+
return np.mean([mean_normal_step_time, previous_ideal_step_time])
67+
return mean_normal_step_time
68+
69+
70+
def get_anomalous_and_normal_step_times(
71+
step_times: list[Any],
72+
) -> tuple[list[Any], list[Any]]:
73+
mad = stats.median_abs_deviation(step_times)
74+
med = np.median(step_times)
75+
76+
anomalous_step_times = []
77+
normal_step_times = []
78+
for step_time in step_times:
79+
if step_time > (med + mad * 3):
80+
anomalous_step_times.append(step_time)
81+
else:
82+
normal_step_times.append(step_time)
83+
84+
return anomalous_step_times, normal_step_times
85+
86+
87+
def get_extra_time_from_anomalous_steps(step_times: list[Any]) -> float:
88+
anomalous_step_times, normal_step_times = get_anomalous_and_normal_step_times(
89+
step_times
90+
)
91+
normal_step_mean = np.mean(normal_step_times)
92+
return sum(anomalous_step_times) - (
93+
len(anomalous_step_times) * normal_step_mean
94+
)
95+
96+
3997
def get_timestamp_from_log_entry(
4098
entry: dict[str, Any],
4199
) -> datetime.datetime | None:

ml_goodput_measurement/tests/goodput_test.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import dataclasses
44
from dataclasses import asdict
55
import datetime
6+
import random
67
import time
78
from typing import Optional
89

910
from cloud_goodput.ml_goodput_measurement.src import goodput
10-
from cloud_goodput.ml_goodput_measurement.src.goodput_utils import BadputType, get_timestamp_from_log_entry
11+
from cloud_goodput.ml_goodput_measurement.src.goodput_utils import BadputType
12+
from cloud_goodput.ml_goodput_measurement.src.goodput_utils import compute_ideal_step_time, get_timestamp_from_log_entry
1113

1214
from google3.testing.pybase import googletest
1315

@@ -1595,6 +1597,52 @@ def test_goodput_badput_with_interval_query(self):
15951597
computed_badput_breakdown[BadputType.WASTED_PROGRESS_FROM_DISRUPTION], 0
15961598
)
15971599

1600+
def _generate_step_start_times(self, number_of_steps: int, start_time):
1601+
"""Generate a list of n non-decreasing datetime objects."""
1602+
max_step_seconds = 600
1603+
step_start_times = [start_time]
1604+
for _ in range(1, number_of_steps):
1605+
increment = random.randint(1, max_step_seconds)
1606+
new_time = step_start_times[-1] + datetime.timedelta(seconds=increment)
1607+
step_start_times.append(new_time)
1608+
return step_start_times
1609+
1610+
def test_get_step_deviation(self):
1611+
"""Test function to validate step deviation computation."""
1612+
job_start_time = datetime.datetime.utcnow()
1613+
self.goodput_recorder.record_job_start_time(job_start_time)
1614+
# Generate a list of 100 step start times with random step times.
1615+
step_count = 0
1616+
max_steps = 100
1617+
test_step_start_times = self._generate_step_start_times(
1618+
number_of_steps=max_steps, start_time=job_start_time
1619+
)
1620+
1621+
# Record step start times.
1622+
for step_start_time in test_step_start_times:
1623+
self.goodput_recorder.record_step_start_time(step_count, step_start_time)
1624+
step_count += 1
1625+
1626+
job_end_time = test_step_start_times[-1] + datetime.timedelta(seconds=10)
1627+
self.goodput_recorder.record_job_end_time(job_end_time)
1628+
1629+
step_times = self.goodput_calculator._get_step_times()
1630+
ideal_step_time = compute_ideal_step_time(
1631+
step_times=list(step_times.values()), previous_ideal_step_time=None
1632+
)
1633+
computed_step_deviations = self.goodput_calculator.get_step_deviation()
1634+
expected_step_deviations = {
1635+
step_count: abs(step_time - ideal_step_time)
1636+
for step_count, step_time in step_times.items()
1637+
}
1638+
for step_count, expected_deviation in expected_step_deviations.items():
1639+
computed_deviation = computed_step_deviations[step_count]
1640+
self.assertAlmostEqual(
1641+
expected_deviation,
1642+
computed_deviation,
1643+
delta=0.1,
1644+
)
1645+
15981646

15991647
if __name__ == '__main__':
16001648
googletest.main()

0 commit comments

Comments
 (0)