Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/otaclient/grpc/api_v2/ecu_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@
ACTIVE_POLLING_INTERVAL = 1 # seconds


class LocalECUStatusNotReady(Exception):
"""Raised by export() during the startup race for an OTA-managed local ECU.

When the local ECU is listed in its own available_ecu_ids but the OTA core
has not yet written its first OTAClientStatus into shared memory, the
storage has no entry for my_ecu_id. Returning a StatusResponse in that
state would advertise my_ecu_id in available_ecu_ids while omitting it
from ecu_v2, which downstream agents can misinterpret as a healthy ECU
with no status. The gRPC layer maps this exception to UNAVAILABLE so the
caller will retry instead.

NOTE: this is *not* raised when the local ECU is configured as a pure
proxy/aggregator (i.e. my_ecu_id not in available_ecu_ids). In that
configuration, missing local status is the steady state.
"""


@dataclass
class ECUStatusState:
"""State container for ECUStatusStorage."""
Expand Down Expand Up @@ -369,10 +386,27 @@ async def export(self) -> api_types.StatusResponse:
NOTE: to align with preivous behavior that disconnected ECU should have no
entry in status API response, simulate this behavior by skipping
disconnected ECU's status report entry.

Raises:
LocalECUStatusNotReady: if the local ECU's status has not yet been
collected. The gRPC layer maps this to UNAVAILABLE so callers
retry rather than receive a response that omits the local ECU.
"""
res = api_types.StatusResponse()

async with self._writer_lock:
# Only gate on local status when this ECU is itself OTA-managed
# (listed in its own available_ecu_ids). When the local ECU is
# configured as a pure proxy/aggregator (not in available_ecu_ids),
# missing local status is the steady state, not a startup race.
if (
self.my_ecu_id in self._state.available_ecu_ids
and self.my_ecu_id not in self._state.all_ecus_status_v2
):
raise LocalECUStatusNotReady(
f"local ECU {self.my_ecu_id!r} status not yet collected"
)

res.available_ecu_ids.extend(self._state.available_ecu_ids)

# NOTE(20230802): export all reachable ECUs' status, no matter they are in
Expand Down
8 changes: 7 additions & 1 deletion src/otaclient_api/v2/api_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

from typing import Any

import grpc
from otaclient_pb2.v2 import otaclient_v2_pb2 as pb2
from otaclient_pb2.v2 import otaclient_v2_pb2_grpc as pb2_grpc

from otaclient.grpc.api_v2.ecu_status import LocalECUStatusNotReady
from otaclient_api.v2 import _types


Expand Down Expand Up @@ -50,5 +52,9 @@ async def ClientUpdate(
return response.export_pb()

async def Status(self, request: pb2.StatusRequest, context) -> pb2.StatusResponse:
response = await self._stub.status(_types.StatusRequest.convert(request))
try:
response = await self._stub.status(_types.StatusRequest.convert(request))
except LocalECUStatusNotReady as e:
await context.abort(grpc.StatusCode.UNAVAILABLE, str(e))
raise # context.abort() never returns; satisfy type checkers
return response.export_pb()
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from otaclient.grpc.api_v2.ecu_status import (
ACTIVE_POLLING_INTERVAL,
IDLE_POLLING_INTERVAL,
LocalECUStatusNotReady,
)
from otaclient.grpc.api_v2.servicer import ECUStatusStorage
from otaclient_api.v2 import _types as api_types
Expand Down Expand Up @@ -195,3 +196,72 @@ async def test_polling_interval_idle_when_no_active_or_aborting(self):
await asyncio.sleep(self.SAFE_INTERVAL_FOR_PROPERTY_UPDATE)

assert self.ecu_storage.get_polling_interval() == IDLE_POLLING_INTERVAL


class TestECUStatusStorageExportLocalECUNotReady:
"""Tests for export() gating during the local ECU startup race."""

@pytest.fixture(autouse=True)
async def setup_test(self, mocker: MockerFixture, ecu_info_fixture: ECUInfo):
self.ecu_info = ecu_info = ecu_info_fixture
mocker.patch(f"{ECU_STATUS_MODULE}.ecu_info", ecu_info)

self.ecu_status_flags = ecu_status_flags = MultipleECUStatusFlags(
any_child_ecu_in_update=threading.Event(), # type: ignore[assignment]
any_requires_network=threading.Event(), # type: ignore[assignment]
all_success=threading.Event(), # type: ignore[assignment]
)
self.ecu_storage = ECUStatusStorage(ecu_status_flags=ecu_status_flags)

try:
yield
finally:
self.ecu_storage._debug_properties_update_shutdown_event.set()

async def test_export_raises_when_local_ecu_status_not_ready(self):
"""During the startup window before the local OTA core writes its first
status into shared memory, my_ecu_id is absent from all_ecus_status_v2.
export() must raise LocalECUStatusNotReady so the gRPC layer returns
UNAVAILABLE instead of a response that omits the local ECU from ecu_v2.
"""
# No update_from_local_ecu call: simulate the cold-start window.
# A child ECU's status arriving first must not unblock export().
await self.ecu_storage.update_from_child_ecu(
api_types.StatusResponse(
available_ecu_ids=["p1"],
ecu_v2=[
api_types.StatusResponseEcuV2(
ecu_id="p1",
ota_status=api_types.StatusOta.SUCCESS,
),
],
)
)

with pytest.raises(LocalECUStatusNotReady):
await self.ecu_storage.export()

async def test_export_proxy_ecu_without_local_status_does_not_raise(self):
"""When the local ECU is configured as a proxy/aggregator (not in its
own available_ecu_ids), missing local status is the steady state and
export() must return normally with the child ECUs' statuses.
"""
# Simulate proxy config: drop my_ecu_id from available_ecu_ids.
self.ecu_storage._state.available_ecu_ids.pop(self.ecu_storage.my_ecu_id, None)

await self.ecu_storage.update_from_child_ecu(
api_types.StatusResponse(
available_ecu_ids=["p1"],
ecu_v2=[
api_types.StatusResponseEcuV2(
ecu_id="p1",
ota_status=api_types.StatusOta.SUCCESS,
),
],
)
)

exported = await self.ecu_storage.export()
assert self.ecu_storage.my_ecu_id not in list(exported.available_ecu_ids)
assert exported.find_ecu_v2("p1") is not None
assert exported.find_ecu_v2(self.ecu_storage.my_ecu_id) is None
67 changes: 67 additions & 0 deletions test/unit/test_otaclient_api/test_v2/test_api_stub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock

import grpc
import pytest
from otaclient_pb2.v2 import otaclient_v2_pb2 as pb2

from otaclient.grpc.api_v2.ecu_status import LocalECUStatusNotReady
from otaclient_api.v2 import _types as api_types
from otaclient_api.v2.api_stub import OtaClientServiceV2


class _AbortError(Exception):
"""Stand-in for grpc.aio.AbortError used in tests."""


class TestOtaClientServiceV2Status:
@pytest.fixture
def context(self) -> AsyncMock:
ctx = AsyncMock()
ctx.abort.side_effect = _AbortError()
return ctx

async def test_status_returns_response(self, context: AsyncMock):
inner_stub = MagicMock()
inner_stub.status = AsyncMock(return_value=api_types.StatusResponse())

service = OtaClientServiceV2(inner_stub)
result = await service.Status(pb2.StatusRequest(), context)

assert isinstance(result, pb2.StatusResponse)
context.abort.assert_not_called()

async def test_status_local_ecu_not_ready_aborts_unavailable(
self, context: AsyncMock
):
inner_stub = MagicMock()
inner_stub.status = AsyncMock(
side_effect=LocalECUStatusNotReady(
"local ECU 'autoware' status not yet collected"
)
)

service = OtaClientServiceV2(inner_stub)
with pytest.raises(_AbortError):
await service.Status(pb2.StatusRequest(), context)

context.abort.assert_awaited_once()
code, details = context.abort.await_args.args
assert code == grpc.StatusCode.UNAVAILABLE
assert "autoware" in details
Loading