diff --git a/src/otaclient/grpc/api_v2/ecu_status.py b/src/otaclient/grpc/api_v2/ecu_status.py index ca396c804..d01b015cb 100644 --- a/src/otaclient/grpc/api_v2/ecu_status.py +++ b/src/otaclient/grpc/api_v2/ecu_status.py @@ -61,6 +61,23 @@ ACTIVE_POLLING_INTERVAL = 1 # seconds +class LocalECUStatusNotReady(Exception): + """Raised by export() during the startup race for an OTA-managed local ECU. + + When the local ECU is listed in its own available_ecu_ids but the OTA core + has not yet written its first OTAClientStatus into shared memory, the + storage has no entry for my_ecu_id. Returning a StatusResponse in that + state would advertise my_ecu_id in available_ecu_ids while omitting it + from ecu_v2, which downstream agents can misinterpret as a healthy ECU + with no status. The gRPC layer maps this exception to UNAVAILABLE so the + caller will retry instead. + + NOTE: this is *not* raised when the local ECU is configured as a pure + proxy/aggregator (i.e. my_ecu_id not in available_ecu_ids). In that + configuration, missing local status is the steady state. + """ + + @dataclass class ECUStatusState: """State container for ECUStatusStorage.""" @@ -369,10 +386,27 @@ async def export(self) -> api_types.StatusResponse: NOTE: to align with preivous behavior that disconnected ECU should have no entry in status API response, simulate this behavior by skipping disconnected ECU's status report entry. + + Raises: + LocalECUStatusNotReady: if the local ECU's status has not yet been + collected. The gRPC layer maps this to UNAVAILABLE so callers + retry rather than receive a response that omits the local ECU. """ res = api_types.StatusResponse() async with self._writer_lock: + # Only gate on local status when this ECU is itself OTA-managed + # (listed in its own available_ecu_ids). When the local ECU is + # configured as a pure proxy/aggregator (not in available_ecu_ids), + # missing local status is the steady state, not a startup race. + if ( + self.my_ecu_id in self._state.available_ecu_ids + and self.my_ecu_id not in self._state.all_ecus_status_v2 + ): + raise LocalECUStatusNotReady( + f"local ECU {self.my_ecu_id!r} status not yet collected" + ) + res.available_ecu_ids.extend(self._state.available_ecu_ids) # NOTE(20230802): export all reachable ECUs' status, no matter they are in diff --git a/src/otaclient_api/v2/api_stub.py b/src/otaclient_api/v2/api_stub.py index 2cd18a022..62abbd8e3 100644 --- a/src/otaclient_api/v2/api_stub.py +++ b/src/otaclient_api/v2/api_stub.py @@ -17,9 +17,11 @@ from typing import Any +import grpc from otaclient_pb2.v2 import otaclient_v2_pb2 as pb2 from otaclient_pb2.v2 import otaclient_v2_pb2_grpc as pb2_grpc +from otaclient.grpc.api_v2.ecu_status import LocalECUStatusNotReady from otaclient_api.v2 import _types @@ -50,5 +52,9 @@ async def ClientUpdate( return response.export_pb() async def Status(self, request: pb2.StatusRequest, context) -> pb2.StatusResponse: - response = await self._stub.status(_types.StatusRequest.convert(request)) + try: + response = await self._stub.status(_types.StatusRequest.convert(request)) + except LocalECUStatusNotReady as e: + await context.abort(grpc.StatusCode.UNAVAILABLE, str(e)) + raise # context.abort() never returns; satisfy type checkers return response.export_pb() diff --git a/test/unit/test_otaclient/test_grpc/test_api_v2/test_ecu_status.py b/test/unit/test_otaclient/test_grpc/test_api_v2/test_ecu_status.py index 9589c488e..457fdb836 100644 --- a/test/unit/test_otaclient/test_grpc/test_api_v2/test_ecu_status.py +++ b/test/unit/test_otaclient/test_grpc/test_api_v2/test_ecu_status.py @@ -29,6 +29,7 @@ from otaclient.grpc.api_v2.ecu_status import ( ACTIVE_POLLING_INTERVAL, IDLE_POLLING_INTERVAL, + LocalECUStatusNotReady, ) from otaclient.grpc.api_v2.servicer import ECUStatusStorage from otaclient_api.v2 import _types as api_types @@ -195,3 +196,72 @@ async def test_polling_interval_idle_when_no_active_or_aborting(self): await asyncio.sleep(self.SAFE_INTERVAL_FOR_PROPERTY_UPDATE) assert self.ecu_storage.get_polling_interval() == IDLE_POLLING_INTERVAL + + +class TestECUStatusStorageExportLocalECUNotReady: + """Tests for export() gating during the local ECU startup race.""" + + @pytest.fixture(autouse=True) + async def setup_test(self, mocker: MockerFixture, ecu_info_fixture: ECUInfo): + self.ecu_info = ecu_info = ecu_info_fixture + mocker.patch(f"{ECU_STATUS_MODULE}.ecu_info", ecu_info) + + self.ecu_status_flags = ecu_status_flags = MultipleECUStatusFlags( + any_child_ecu_in_update=threading.Event(), # type: ignore[assignment] + any_requires_network=threading.Event(), # type: ignore[assignment] + all_success=threading.Event(), # type: ignore[assignment] + ) + self.ecu_storage = ECUStatusStorage(ecu_status_flags=ecu_status_flags) + + try: + yield + finally: + self.ecu_storage._debug_properties_update_shutdown_event.set() + + async def test_export_raises_when_local_ecu_status_not_ready(self): + """During the startup window before the local OTA core writes its first + status into shared memory, my_ecu_id is absent from all_ecus_status_v2. + export() must raise LocalECUStatusNotReady so the gRPC layer returns + UNAVAILABLE instead of a response that omits the local ECU from ecu_v2. + """ + # No update_from_local_ecu call: simulate the cold-start window. + # A child ECU's status arriving first must not unblock export(). + await self.ecu_storage.update_from_child_ecu( + api_types.StatusResponse( + available_ecu_ids=["p1"], + ecu_v2=[ + api_types.StatusResponseEcuV2( + ecu_id="p1", + ota_status=api_types.StatusOta.SUCCESS, + ), + ], + ) + ) + + with pytest.raises(LocalECUStatusNotReady): + await self.ecu_storage.export() + + async def test_export_proxy_ecu_without_local_status_does_not_raise(self): + """When the local ECU is configured as a proxy/aggregator (not in its + own available_ecu_ids), missing local status is the steady state and + export() must return normally with the child ECUs' statuses. + """ + # Simulate proxy config: drop my_ecu_id from available_ecu_ids. + self.ecu_storage._state.available_ecu_ids.pop(self.ecu_storage.my_ecu_id, None) + + await self.ecu_storage.update_from_child_ecu( + api_types.StatusResponse( + available_ecu_ids=["p1"], + ecu_v2=[ + api_types.StatusResponseEcuV2( + ecu_id="p1", + ota_status=api_types.StatusOta.SUCCESS, + ), + ], + ) + ) + + exported = await self.ecu_storage.export() + assert self.ecu_storage.my_ecu_id not in list(exported.available_ecu_ids) + assert exported.find_ecu_v2("p1") is not None + assert exported.find_ecu_v2(self.ecu_storage.my_ecu_id) is None diff --git a/test/unit/test_otaclient_api/test_v2/test_api_stub.py b/test/unit/test_otaclient_api/test_v2/test_api_stub.py new file mode 100644 index 000000000..e6b801b7d --- /dev/null +++ b/test/unit/test_otaclient_api/test_v2/test_api_stub.py @@ -0,0 +1,67 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import grpc +import pytest +from otaclient_pb2.v2 import otaclient_v2_pb2 as pb2 + +from otaclient.grpc.api_v2.ecu_status import LocalECUStatusNotReady +from otaclient_api.v2 import _types as api_types +from otaclient_api.v2.api_stub import OtaClientServiceV2 + + +class _AbortError(Exception): + """Stand-in for grpc.aio.AbortError used in tests.""" + + +class TestOtaClientServiceV2Status: + @pytest.fixture + def context(self) -> AsyncMock: + ctx = AsyncMock() + ctx.abort.side_effect = _AbortError() + return ctx + + async def test_status_returns_response(self, context: AsyncMock): + inner_stub = MagicMock() + inner_stub.status = AsyncMock(return_value=api_types.StatusResponse()) + + service = OtaClientServiceV2(inner_stub) + result = await service.Status(pb2.StatusRequest(), context) + + assert isinstance(result, pb2.StatusResponse) + context.abort.assert_not_called() + + async def test_status_local_ecu_not_ready_aborts_unavailable( + self, context: AsyncMock + ): + inner_stub = MagicMock() + inner_stub.status = AsyncMock( + side_effect=LocalECUStatusNotReady( + "local ECU 'autoware' status not yet collected" + ) + ) + + service = OtaClientServiceV2(inner_stub) + with pytest.raises(_AbortError): + await service.Status(pb2.StatusRequest(), context) + + context.abort.assert_awaited_once() + code, details = context.abort.await_args.args + assert code == grpc.StatusCode.UNAVAILABLE + assert "autoware" in details