Skip to content

Commit 6136b90

Browse files
docs: Add task worker docstrings (#1186)
Closes #1179
1 parent d8ca24d commit 6136b90

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

src/blueapi/worker/task_worker.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ def __init__(
145145

146146
@start_as_current_span(TRACER, "task_id")
147147
def clear_task(self, task_id: str) -> str:
148+
"""
149+
Remove a task from the worker
150+
Args:
151+
task_id: The ID of the task to be removed
152+
Returns:
153+
task_id of the removed task
154+
"""
148155
pending = self._pending_tasks.pop(task_id, None)
149156
task = pending or self._completed_tasks.pop(task_id)
150157
return task.task_id
@@ -155,6 +162,14 @@ def cancel_active_task(
155162
failure: bool = False,
156163
reason: str | None = None,
157164
) -> str:
165+
"""
166+
Remove the currently active task from the worker if there is one
167+
Args:
168+
failure: Flag cancellation as error
169+
reason: Reason for cancellation
170+
Returns:
171+
The task_id of the active task
172+
"""
158173
if self._current is None:
159174
# Persuades type checker that self._current is not None
160175
# We only allow this method to be called if a Plan is active
@@ -171,14 +186,36 @@ def cancel_active_task(
171186

172187
@start_as_current_span(TRACER)
173188
def get_tasks(self) -> list[TrackableTask]:
189+
"""
190+
Return a list of all tasks on the worker,
191+
any one of which can be triggered with begin_task.
192+
Returns:
193+
List[TrackableTask[T]]: List of task objects
194+
"""
174195
return list(self._pending_tasks.values()) + list(self._completed_tasks.values())
175196

176197
@start_as_current_span(TRACER, "task_id")
177198
def get_task_by_id(self, task_id: str) -> TrackableTask | None:
199+
"""
200+
Returns a task matching the task ID supplied,
201+
if the worker knows of it.
202+
Args:
203+
task_id: The ID of the task
204+
Returns:
205+
Optional[TrackableTask[T]]: The task matching the ID,
206+
None if the task ID is unknown to the worker.
207+
"""
178208
return self._pending_tasks.get(task_id, None) or self._completed_tasks[task_id]
179209

180210
@start_as_current_span(TRACER, "status")
181211
def get_tasks_by_status(self, status: TaskStatusEnum) -> list[TrackableTask]:
212+
"""
213+
Retrieve a list of tasks based on their status.
214+
Args:
215+
TaskStatusEnum: The status to filter tasks by.
216+
Returns:
217+
list[TrackableTask]: A list of tasks that match the given status.
218+
"""
182219
if status == TaskStatusEnum.RUNNING:
183220
return [
184221
task
@@ -193,13 +230,26 @@ def get_tasks_by_status(self, status: TaskStatusEnum) -> list[TrackableTask]:
193230

194231
@start_as_current_span(TRACER)
195232
def get_active_task(self) -> TrackableTask[Task] | None:
233+
"""
234+
Returns the task the worker is currently running
235+
Returns:
236+
Optional[TrackableTask[Task]]: The current task,
237+
None if the worker is idle.
238+
"""
196239
current = self._current
197240
if current is not None:
198241
add_span_attributes({"Active Task": current.task_id})
199242
return current
200243

201244
@start_as_current_span(TRACER, "task_id")
202245
def begin_task(self, task_id: str) -> None:
246+
"""
247+
Trigger a pending task. Will fail if the worker is busy.
248+
Args:
249+
task_id: The ID of the task to be triggered
250+
Throws:
251+
KeyError: If the task ID does not exist
252+
"""
203253
task = self._pending_tasks.get(task_id)
204254
if task is None:
205255
raise KeyError(f"No pending task with ID {task_id}")
@@ -208,6 +258,13 @@ def begin_task(self, task_id: str) -> None:
208258

209259
@start_as_current_span(TRACER, "task.name", "task.params")
210260
def submit_task(self, task: Task) -> str:
261+
"""
262+
Submit a task to be run on begin_task
263+
Args:
264+
task: A description of the task
265+
Returns:
266+
str: A unique ID to refer to this task
267+
"""
211268
task.prepare_params(self._ctx) # Will raise if parameters are invalid
212269
task_id: str = str(uuid.uuid4())
213270
add_span_attributes({"TaskId": task_id})
@@ -264,6 +321,10 @@ def mark_task_as_started(event: WorkerEvent, _: str | None) -> None:
264321

265322
@start_as_current_span(TRACER)
266323
def start(self) -> None:
324+
"""
325+
Start worker in a new thread. Does not block, configures the bluesky
326+
event loop in the new thread.
327+
"""
267328
if self._started.is_set():
268329
raise WorkerAlreadyStartedError("Worker is already running")
269330
self._wait_until_stopped()
@@ -272,6 +333,9 @@ def start(self) -> None:
272333

273334
@start_as_current_span(TRACER)
274335
def stop(self) -> None:
336+
"""
337+
Command the worker to gracefully stop. Blocks until it has shut down.
338+
"""
275339
LOGGER.info("Attempting to stop worker")
276340

277341
# If the worker has not yet started there is nothing to do.
@@ -299,10 +363,16 @@ def _wait_until_stopped(self) -> None:
299363

300364
@property
301365
def state(self) -> WorkerState:
366+
"""
367+
:return: state of the worker
368+
"""
302369
return self._state
303370

304371
@start_as_current_span(TRACER)
305372
def run(self) -> None:
373+
"""
374+
Run all tasks that are submitted to the worker. Blocks thread.
375+
"""
306376
LOGGER.info("Worker starting")
307377
self._ctx.run_engine.state_hook = self._on_state_change # type: ignore
308378
self._ctx.run_engine.subscribe(self._on_document)
@@ -319,11 +389,20 @@ def run(self) -> None:
319389

320390
@start_as_current_span(TRACER, "defer")
321391
def pause(self, defer=False):
392+
"""
393+
Command the worker to pause.
394+
395+
Args:
396+
defer: Optional, if true wait till next checkpoint
397+
"""
322398
LOGGER.info("Requesting to pause the worker")
323399
self._ctx.run_engine.request_pause(defer)
324400

325401
@start_as_current_span(TRACER)
326402
def resume(self):
403+
"""
404+
Command the worker to resume
405+
"""
327406
LOGGER.info("Requesting to resume the worker")
328407
self._ctx.run_engine.resume()
329408

@@ -378,14 +457,29 @@ def _cycle(self) -> None:
378457

379458
@property
380459
def worker_events(self) -> EventStream[WorkerEvent, int]:
460+
"""
461+
Events representing changes/errors in worker state
462+
Returns:
463+
EventStream[WorkerEvent, int]: Subscribable stream of events
464+
"""
381465
return self._worker_events
382466

383467
@property
384468
def progress_events(self) -> EventStream[ProgressEvent, int]:
469+
"""
470+
Events representing progress in running a task
471+
Returns:
472+
EventStream[ProgressEvent, int]: Subscribable stream of events
473+
"""
385474
return self._progress_events
386475

387476
@property
388477
def data_events(self) -> EventStream[DataEvent, int]:
478+
"""
479+
Events representing collection of data
480+
Returns:
481+
EventStream[DataEvent, int]: Subscribable stream of events
482+
"""
389483
return self._data_events
390484

391485
@start_as_current_span(TRACER, "raw_new_state", "raw_old_state")

0 commit comments

Comments
 (0)