Skip to content

Commit

Permalink
feat(robot-server): status bar responds to runs in progress (#12794)
Browse files Browse the repository at this point in the history
* add function to get status bar state

* OT-2 API incorrectly renamed the `state` parameter for `set_status_bar_state`

* Added dependency to ensure light task runs on the robot

* Added light control driver & task; added tests for light control driver
  • Loading branch information
fsinapi authored May 30, 2023
1 parent 8548494 commit 9eeb0b9
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 1 deletion.
6 changes: 5 additions & 1 deletion api/src/opentrons/hardware_control/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,14 @@ async def identify(self, duration_s: int = 5) -> None:
await asyncio.sleep(max(0, 0.25 - (now - then)))
await self.set_lights(button=True)

async def set_status_bar_state(self, _: StatusBarState) -> None:
async def set_status_bar_state(self, state: StatusBarState) -> None:
"""The status bar does not exist on OT-2!"""
return None

def get_status_bar_state(self) -> StatusBarState:
"""There is no status bar on OT-2, return IDLE at all times."""
return StatusBarState.IDLE

@ExecutionManagerProvider.wait_for_running
async def delay(self, duration_s: float) -> None:
"""Delay execution by pausing and sleeping."""
Expand Down
3 changes: 3 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ async def identify(self, duration_s: int = 5) -> None:
async def set_status_bar_state(self, state: StatusBarState) -> None:
await self._status_bar_controller.set_status_bar_state(state)

def get_status_bar_state(self) -> StatusBarState:
return self._status_bar_controller.get_current_state()

@ExecutionManagerProvider.wait_for_running
async def delay(self, duration_s: float) -> None:
"""Delay execution by pausing and sleeping."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,9 @@ async def set_status_bar_state(self, state: StatusBarState) -> None:
and will implicitly revert back to the previous state after a short
action, while others"""
...

def get_status_bar_state(self) -> StatusBarState:
"""Get the current status bar state.
:returns: The current status bar state enumeration."""
...
20 changes: 20 additions & 0 deletions robot-server/robot_server/runs/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
from .engine_store import EngineStore
from .run_store import RunStore
from .run_data_manager import RunDataManager
from .light_control_task import LightController, run_light_task

_run_store_accessor = AppStateAccessor[RunStore]("run_store")
_engine_store_accessor = AppStateAccessor[EngineStore]("engine_store")
_light_control_accessor = AppStateAccessor[LightController]("light_controller")


async def get_run_store(
Expand Down Expand Up @@ -70,10 +72,28 @@ async def get_protocol_run_has_been_played(
return protocol_run_state.commands.has_been_played()


async def ensure_light_control_task(
app_state: AppState = Depends(get_app_state),
engine_store: EngineStore = Depends(get_engine_store),
task_runner: TaskRunner = Depends(get_task_runner),
api: HardwareControlAPI = Depends(get_hardware),
) -> None:
"""Ensure the light control task is running."""
light_controller = _light_control_accessor.get_from(app_state)

if light_controller is None:
light_controller = LightController(api=api, engine_store=engine_store)
task_runner.run(run_light_task, driver=light_controller)
_light_control_accessor.set_on(app_state, light_controller)

return None


async def get_run_data_manager(
task_runner: TaskRunner = Depends(get_task_runner),
engine_store: EngineStore = Depends(get_engine_store),
run_store: RunStore = Depends(get_run_store),
light_control: None = Depends(ensure_light_control_task),
) -> RunDataManager:
"""Get a run data manager to keep track of current/historical run data."""
return RunDataManager(
Expand Down
71 changes: 71 additions & 0 deletions robot-server/robot_server/runs/light_control_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Background task to drive the status bar."""
from typing import Optional
from logging import getLogger
import asyncio

from .engine_store import EngineStore
from opentrons.hardware_control import HardwareControlAPI
from opentrons.protocol_engine.types import EngineStatus
from opentrons.hardware_control.types import StatusBarState

log = getLogger(__name__)


def _engine_status_to_status_bar(status: Optional[EngineStatus]) -> StatusBarState:
"""Convert an engine status into a status bar status."""
if status is None:
return StatusBarState.IDLE

return {
EngineStatus.IDLE: StatusBarState.IDLE,
EngineStatus.RUNNING: StatusBarState.RUNNING,
EngineStatus.PAUSED: StatusBarState.PAUSED,
EngineStatus.BLOCKED_BY_OPEN_DOOR: StatusBarState.PAUSED,
EngineStatus.STOP_REQUESTED: StatusBarState.RUNNING,
EngineStatus.STOPPED: StatusBarState.RUNNING,
EngineStatus.FINISHING: StatusBarState.RUNNING,
EngineStatus.FAILED: StatusBarState.HARDWARE_ERROR,
EngineStatus.SUCCEEDED: StatusBarState.RUN_COMPLETED,
}[status]


class LightController:
"""LightController sets the status bar to match the protocol status."""

def __init__(self, api: HardwareControlAPI, engine_store: EngineStore) -> None:
"""Create a new LightController."""
self._api = api
self._engine_store = engine_store

async def update(
self, prev_status: Optional[EngineStatus], new_status: Optional[EngineStatus]
) -> None:
"""Update the status bar if the current run status has changed."""
if prev_status == new_status:
# No change, don't try to set anything.
return

await self._api.set_status_bar_state(
state=_engine_status_to_status_bar(status=new_status)
)

def get_current_status(self) -> Optional[EngineStatus]:
"""Get the `status` value from the engine's active run engine."""
current_id = self._engine_store.current_run_id
if current_id is not None:
return self._engine_store.engine.state_view.commands.get_status()

return None


async def run_light_task(driver: LightController) -> None:
"""Run the light control task.
This is intended to be run as a background task once the EngineStore has been created.
"""
prev_status = driver.get_current_status()
while True:
await asyncio.sleep(0.1)
new_status = driver.get_current_status()
await driver.update(prev_status=prev_status, new_status=new_status)
prev_status = new_status
89 changes: 89 additions & 0 deletions robot-server/tests/runs/test_light_control_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Unit tests for `runs.light_control_task`."""

import pytest
from typing import Optional
from decoy import Decoy

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import StatusBarState
from opentrons.protocol_engine.types import EngineStatus
from robot_server.runs.engine_store import EngineStore
from robot_server.runs.light_control_task import LightController


@pytest.fixture
def engine_store(decoy: Decoy) -> EngineStore:
"""Mock out the EngineStore."""
return decoy.mock(cls=EngineStore)


@pytest.fixture
def subject(
hardware_api: HardwareControlAPI, engine_store: EngineStore
) -> LightController:
"""Test subject - LightController."""
return LightController(api=hardware_api, engine_store=engine_store)


@pytest.mark.parametrize(
["active", "status"],
[
[False, EngineStatus.IDLE],
[True, EngineStatus.IDLE],
[True, EngineStatus.RUNNING],
[False, EngineStatus.FAILED],
],
)
async def test_get_current_status(
decoy: Decoy,
engine_store: EngineStore,
subject: LightController,
active: bool,
status: EngineStatus,
) -> None:
"""Test LightController.get_current_status."""
decoy.when(engine_store.current_run_id).then_return("fake_id" if active else None)
decoy.when(engine_store.engine.state_view.commands.get_status()).then_return(status)

expected = status if active else None

assert subject.get_current_status() == expected


@pytest.mark.parametrize(
["prev_state", "new_state", "expected"],
[
[None, None, StatusBarState.IDLE],
[EngineStatus.IDLE, None, StatusBarState.IDLE],
[EngineStatus.IDLE, EngineStatus.IDLE, None],
[None, EngineStatus.IDLE, StatusBarState.IDLE],
[None, EngineStatus.PAUSED, StatusBarState.PAUSED],
[
EngineStatus.RUNNING,
EngineStatus.BLOCKED_BY_OPEN_DOOR,
StatusBarState.PAUSED,
],
[EngineStatus.RUNNING, EngineStatus.FAILED, StatusBarState.HARDWARE_ERROR],
[EngineStatus.RUNNING, EngineStatus.SUCCEEDED, StatusBarState.RUN_COMPLETED],
],
)
async def test_light_controller_update(
decoy: Decoy,
hardware_api: HardwareControlAPI,
subject: LightController,
prev_state: Optional[EngineStatus],
new_state: Optional[EngineStatus],
expected: StatusBarState,
) -> None:
"""Test LightController.update.
Verifies that the status bar is NOT updated if the state is the same, and
checks that state mapping is correct.
"""
await subject.update(prev_status=prev_state, new_status=new_state)

call_count = 0 if prev_state == new_state else 1

decoy.verify(
await hardware_api.set_status_bar_state(state=expected), times=call_count
)

0 comments on commit 9eeb0b9

Please sign in to comment.