Skip to content

Commit e71b82b

Browse files
authored
feat: Component Status updates (#152)
# Summary Closes #148. # Changes * Add a new `Status` enum to `plugboard.schemas`. * Add a `status` property to the `Component`. * Removes from flakiness from the tuner tests.
1 parent 23bfa8f commit e71b82b

8 files changed

Lines changed: 122 additions & 16 deletions

File tree

plugboard/component/component.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
UnrecognisedEventError,
1919
ValidationError,
2020
)
21+
from plugboard.schemas.state import Status
2122
from plugboard.state import StateBackend
2223
from plugboard.utils import DI, ClassRegistry, ExportMixin, is_on_ray_worker
2324

@@ -73,6 +74,8 @@ def __init__(
7374
namespace=self.name,
7475
component=self,
7576
)
77+
self.status = Status.CREATED
78+
self._is_running = False
7679
self._field_inputs: dict[str, _t.Any] = {}
7780
self._field_inputs_ready: bool = False
7881

@@ -88,6 +91,12 @@ def __init_subclass__(cls, *args: _t.Any, **kwargs: _t.Any) -> None:
8891
# Configure IO last in case it fails in case of components with dynamic io args
8992
cls._configure_io()
9093

94+
async def _set_status(self, status: Status, publish: bool = True) -> None:
95+
"""Sets the status of the component and optionaly publishes it to the state backend."""
96+
self.status = status
97+
if publish and self._state and self._state_is_connected:
98+
await self._state.upsert_component(self)
99+
91100
@classmethod
92101
def _configure_io(cls) -> None:
93102
# Get all parent classes that are Component subclasses
@@ -203,8 +212,7 @@ def _handle_init_wrapper(self) -> _t.Callable:
203212
async def _wrapper() -> None:
204213
with self._job_id_ctx():
205214
await self._init()
206-
if self._state is not None and self._state_is_connected:
207-
await self._state.upsert_component(self)
215+
await self._set_status(Status.INIT)
208216

209217
return _wrapper
210218

@@ -233,14 +241,21 @@ def _handle_step_wrapper(self) -> _t.Callable:
233241
@wraps(self.step)
234242
async def _wrapper() -> None:
235243
with self._job_id_ctx():
244+
await self._set_status(Status.RUNNING, publish=not self._is_running)
236245
await self.io.read()
237246
await self._handle_events()
238247
self._bind_inputs()
239248
if self._can_step:
240-
await self._step()
249+
try:
250+
await self._step()
251+
except Exception as e:
252+
await self._set_status(Status.FAILED)
253+
self._logger.exception("Component step failed")
254+
raise e
241255
self._bind_outputs()
242256
await self.io.write()
243257
self._field_inputs_ready = False
258+
await self._set_status(Status.WAITING, publish=not self._is_running)
244259

245260
return _wrapper
246261

@@ -304,14 +319,21 @@ async def _stop_event_handler(self, event: StopEvent) -> None:
304319
await self.io.close()
305320
except IOStreamClosedError:
306321
pass
322+
await self._set_status(Status.STOPPED)
307323

308324
async def run(self) -> None:
309325
"""Executes component logic for all steps to completion."""
310-
while True:
311-
try:
312-
await self.step()
313-
except IOStreamClosedError:
314-
break
326+
self._is_running = True
327+
await self._set_status(Status.RUNNING)
328+
try:
329+
while True:
330+
try:
331+
await self.step()
332+
except IOStreamClosedError:
333+
break
334+
await self._set_status(Status.COMPLETED)
335+
finally:
336+
self._is_running = False
315337

316338
async def destroy(self) -> None:
317339
"""Performs tear-down actions for `Component`."""
@@ -327,6 +349,7 @@ def dict(self) -> dict[str, _t.Any]: # noqa: D102
327349
return {
328350
"id": self.id,
329351
"name": self.name,
352+
"status": self.status,
330353
**field_data,
331354
"exports": {name: getattr(self, name, None) for name in self.exports or []},
332355
}

plugboard/process/ray_process.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ async def _update_component_attributes(self) -> None:
7676
**state[str(IODirection.INPUT)],
7777
**state[str(IODirection.OUTPUT)],
7878
**state["exports"],
79+
"status": state["status"],
7980
}
8081
)
8182

plugboard/schemas/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from .entities import Entity
2020
from .io import IODirection
2121
from .process import ProcessArgsDict, ProcessArgsSpec, ProcessSpec
22-
from .state import StateBackendArgsDict, StateBackendArgsSpec, StateBackendSpec
22+
from .state import StateBackendArgsDict, StateBackendArgsSpec, StateBackendSpec, Status
2323
from .tune import (
2424
Direction,
2525
ObjectiveSpec,
@@ -55,6 +55,7 @@
5555
"StateBackendSpec",
5656
"StateBackendArgsDict",
5757
"StateBackendArgsSpec",
58+
"Status",
5859
"TuneArgsDict",
5960
"TuneArgsSpec",
6061
"TuneSpec",

plugboard/schemas/state.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Provides `StateBackendSpec` class."""
22

33
from datetime import datetime, timezone
4+
from enum import StrEnum
45
import typing as _t
56

67
from pydantic import Field
@@ -12,6 +13,33 @@
1213
DEFAULT_STATE_BACKEND_CLS_PATH: str = "plugboard.state.DictStateBackend"
1314

1415

16+
class Status(StrEnum):
17+
"""`Status` describes the status of either a `Component` or a `Process`.
18+
19+
Attributes:
20+
CREATED: The `Component` or `Process` has been created but not yet started.
21+
INIT: The `Component` or `Process` has been initialised but has not started running.
22+
RUNNING: The `Component` or `Process` is currently running.
23+
WAITING: The `Component` or `Process` is waiting for input.
24+
COMPLETED: The `Component` or `Process` has completed successfully.
25+
FAILED: The `Component` or `Process` has failed.
26+
STOPPED: The `Component` or `Process` has been cancelled or stopped.
27+
"""
28+
29+
CREATED = "created"
30+
INIT = "init"
31+
RUNNING = "running"
32+
WAITING = "waiting"
33+
COMPLETED = "completed"
34+
FAILED = "failed"
35+
STOPPED = "stopped"
36+
37+
@property
38+
def is_terminal(self) -> bool:
39+
"""Returns whether the status is terminal."""
40+
return self in {self.COMPLETED, self.FAILED, self.STOPPED}
41+
42+
1543
class StateBackendArgsDict(_t.TypedDict):
1644
"""`TypedDict` of the [`StateBackend`][plugboard.state.StateBackend] constructor arguments."""
1745

tests/integration/test_process_stop_event.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from plugboard.connector import AsyncioConnector, Connector, ConnectorBuilder, RabbitMQConnector
1212
from plugboard.events import EventConnectorBuilder, StopEvent
1313
from plugboard.process import LocalProcess, Process, RayProcess
14-
from plugboard.schemas import ConnectorSpec
14+
from plugboard.schemas import ConnectorSpec, Status
1515
from tests.conftest import ComponentTestHelper, zmq_connector_cls
1616

1717

@@ -98,6 +98,7 @@ async def stop_after() -> None:
9898

9999
for c in components:
100100
assert c.is_initialised
101+
assert c.status == Status.INIT
101102

102103
async with asyncio.TaskGroup() as tg:
103104
tg.create_task(process.run())
@@ -116,6 +117,7 @@ async def stop_after() -> None:
116117
for c in [comp_b1, comp_b2, comp_b3, comp_b4, comp_b5]:
117118
assert c.is_finished
118119
assert c.step_count == pytest.approx(iters_before_stop, abs=stop_tolerance)
120+
assert c.status == Status.STOPPED
119121

120122
# A performs n+1 full steps and is interrupted on step n+2 before a final update of out_1,
121123
# hence n+2.

tests/integration/test_state_backend.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from plugboard.component import Component, IOController
99
from plugboard.connector import AsyncioConnector, Connector
1010
from plugboard.process import LocalProcess
11-
from plugboard.schemas import ConnectorSpec
11+
from plugboard.schemas import ConnectorSpec, Status
1212
from plugboard.state import StateBackend
1313
from tests.conftest import ComponentTestHelper
1414
from tests.integration.conftest import (
@@ -208,4 +208,8 @@ async def test_state_backend_process_init(
208208
assert await state_backend.get_connector(conn_1.id) == conn_1.dict()
209209
assert await state_backend.get_connector(conn_2.id) == conn_2.dict()
210210

211+
# All components must report their INIT status to the StateBackend
212+
for c in B_components:
213+
assert (await state_backend.get_component(c.id))["status"] == Status.INIT
214+
211215
await process.destroy()

tests/integration/test_tuner.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ async def test_multi_objective_tune(config: dict, ray_ctx: None) -> None:
116116
assert not [t for t in result if t.error]
117117
# Results must contain two objectives and correct optimimum must be found
118118
# The best result must be a list of two results
119-
assert best_result[0].config["a.iters"] == 2
120-
assert best_result[1].config["b.factor"] == -1
121-
assert best_result[0].metrics["c.in_1"] == 1
122-
assert best_result[1].metrics["b.out_1"] == -1
119+
assert len(best_result) == 2
120+
assert all(r.config["a.iters"] == 2 for r in best_result)
121+
assert -1 in set(r.config["b.factor"] for r in best_result)
122+
assert -1 in set(r.metrics["b.out_1"] for r in best_result)
123+
assert 1 in set(r.metrics["c.in_1"] for r in best_result)

tests/unit/test_component.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from plugboard.component import Component, IOController as IO
99
from plugboard.connector import AsyncioConnector
10-
from plugboard.schemas import ConnectorSpec
10+
from plugboard.schemas import ComponentArgsDict, ConnectorSpec, Status
1111

1212

1313
class A(Component):
@@ -17,6 +17,23 @@ async def step(self) -> None:
1717
self.c = {"a": self.a, "b": self.b}
1818

1919

20+
class S(Component):
21+
io = IO(inputs=[], outputs=["component_status"])
22+
23+
def __init__(self, max_iters: int = 5, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
24+
super().__init__(**kwargs)
25+
self._remaining_iters = max_iters
26+
self.raise_exception = False
27+
28+
async def step(self) -> None:
29+
self._remaining_iters -= 1
30+
if self.raise_exception:
31+
raise ValueError("This is a test exception.")
32+
self.component_status = str(self.status)
33+
if self._remaining_iters <= 0:
34+
await self.io.close()
35+
36+
2037
@pytest.mark.asyncio
2138
@pytest.mark.parametrize(
2239
"initial_values", [{"a": [-1], "b": [-2]}, {"a": [-2]}, {"a": [-2, -1]}, {}]
@@ -49,3 +66,32 @@ async def test_component_initial_values(initial_values: dict[str, _t.Iterable])
4966
assert component.c.get(field) == input_idx - n_init[field]
5067

5168
await component.io.close()
69+
70+
71+
@pytest.mark.asyncio
72+
async def test_component_status() -> None:
73+
"""Tests the status of a `Component` across its lifecycle."""
74+
component = S(name="status-component")
75+
assert component.status == Status.CREATED, "Component should start with CREATED status"
76+
77+
await component.init()
78+
assert component.status == Status.INIT, "Component should be INIT after init"
79+
80+
component.raise_exception = False
81+
await component.step()
82+
assert component.status == Status.WAITING, "Component should be WAITING after step"
83+
assert component.component_status == "running", "Status should be running during step"
84+
85+
component.raise_exception = True
86+
try:
87+
await component.step()
88+
except ValueError:
89+
pass
90+
assert component.status == Status.FAILED, "Component should be FAILED after exception"
91+
92+
run_component = S(name="run-status-component")
93+
await run_component.init()
94+
95+
run_component.raise_exception = False
96+
await run_component.run()
97+
assert run_component.status == Status.COMPLETED, "Component should be COMPLETED after run"

0 commit comments

Comments
 (0)