-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaintenance_tracker.py
189 lines (151 loc) · 6.58 KB
/
maintenance_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import logging
from core import *
from enum import Enum
from datetime import datetime, timedelta, UTC
logger = logging.getLogger(__name__)
class ActionRecordResults(Enum):
SUCCESS = 1 # all good, and task of this action matches the task already registered
TASK_MISMATCH = 2 # action was recorded, but the task referenced in the action is not equals to the one already registered
FAILURE = (
0 # for some reason we couldn't add the task, but did not raise an exception
)
class MaintenanceTracker:
"""maintenance_tracker sets up lists of tasks and actions related to those tasks."""
task_list: TaskLister
action_list: ActionLister
task_list_saver: TaskListPersister
action_list_saver: ActionListPersister
def __init__(
self, load=False, save_dir=None, save_actions_file=None, save_task_file=None
):
self.task_list = TaskLister([])
self.action_list = ActionLister([])
logger.debug(
f"Initializing tracker: {load =}, {save_dir = }, {save_actions_file = }, {save_task_file = }"
)
self.task_list_saver = TaskListPersister(
self.task_list, save_dir, save_task_file
)
self.action_list_saver = ActionListPersister(
self.action_list, save_dir, save_actions_file
)
if load:
self.task_list_saver.load()
self.action_list_saver.load()
logger.debug(
f"loaded tasks from : {Path(self.task_list_saver.dirname) / Path(self.task_list_saver.filename)}"
)
logger.debug(
f"num tasks: {len(self.task_list)}, num actions: {len(self.action_list)}"
)
def register_task(self, new_task: Task) -> None:
self.task_list.append(new_task)
def record_run(self, new_action: Action) -> ActionRecordResults:
"""Records that an action was performed,
Ensures the referenced task is in the task_list, and checks if the task in the
task_list matches the one passed in the action
Args:
new_action (action): new action to be recorded
Returns:
ActionRecordResults: code with information about the registration result
- FAILURE if could not add the action
- TASK_MISMATCH if the action was added, but the task referenced in the
action is not equals to the one in the task_list
- SUCCESS if the action was added successfully with a matching task
"""
ret_code = ActionRecordResults.FAILURE
# check if we've seen this task before, if not, register it
if self.task_list._check_task_name_available(new_action.ref_task.name):
logger.info(
f"adding an action to a task that did not exist before - {new_action.ref_task.name}"
)
self.register_task(new_action.ref_task)
else:
self.action_list.append(new_action)
# warn the user if ref_task is different from the one in the list
registered_task = self.task_list.get_task_by_name(new_action.ref_task.name)
if registered_task == new_action.ref_task:
ret_code = ActionRecordResults.SUCCESS
else:
ret_code = ActionRecordResults.TASK_MISMATCH
return ret_code
def get_actions_for_task(
self, target_task: Task, ordered: Ordering | bool = False
) -> ActionLister:
"""gets a list of actions for a given task (based on the task name)
Args:
target_task (task): the task you want to list actions for
ordered (Ordering | bool, optional): should the returned action list be ordered based on the action date time? values: Ordering.ASC, Ordering.DESC or bool True is the same as Ordering.ASC. Defaults to False.
Returns:
action_lister: a list of actions for the desired task
"""
result_list = [
action
for action in self.action_list
if action.ref_task.name == target_task.name
]
if ordered:
result_list = sorted(
result_list,
key=lambda a: a.timestamp,
reverse=(ordered == Ordering.DESC),
)
return ActionLister(result_list)
def get_latest_task_run(
self, tgt_task: Task, when: datetime | None = None
) -> Action | None:
"""Returns the most recent run of a task
Args:
tgt_task (task): the task we are looking for
when (datetime): the time considered as "now" for this return
Returns:
action: the latest action of the the reference task
"""
if when is None:
when = datetime.now(UTC)
actions = self.get_actions_for_task(tgt_task, ordered=Ordering.DESC)
if len(actions) > 0:
for a in actions:
if a.timestamp > when:
continue
return a
return None
def check_overdue(self, task: Task, when: datetime | None = None) -> bool:
"""Checks if a task is overdue.
A task is overdue if we have not had an action run between the latest time the
task was programmed before today
Args:
task (Task): the task to check for
Returns:
bool: whether the task is overdue
"""
if when is None:
when = datetime.now(UTC)
last_programmed_time = task.get_programmed_time(-1, when)
if last_programmed_time is None:
# tasks without programmed time are never overdue
return False
last_run = self.get_latest_task_run(task, when)
if last_run is None:
return last_programmed_time < when
return last_programmed_time < when and last_run.timestamp < last_programmed_time
def time_since_last_exec(
self, task: Task, when: datetime | None = None
) -> timedelta | None:
"""Gets a timedelta between "when" and the last action for this task
Args:
task (Task): target task
when (datetime | None, optional): a timestamp to be used as now. Defaults to None.
Returns:
timedelta | None: timedelta since the last recorded task run. Returns None if no run has happened
"""
if when is None:
when = datetime.now(UTC)
last_run = self.get_latest_task_run(task)
if last_run is None:
return None
else:
return when - last_run.timestamp
def save(self) -> None:
self.task_list_saver.save()
self.action_list_saver.save()