Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions aiperf/common/enums/system_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,12 @@ class SystemState(CaseInsensitiveStrEnum):
determine what actions to take when a signal is received.
"""

CREATED = "created"
INITIALIZING = "initializing"
"""The system is initializing. This is the initial state."""

CONFIGURING = "configuring"
"""The system is configuring services."""

READY = "ready"
"""The system is ready to start profiling. This is a temporary state that should be
followed by PROFILING."""

STARTING_SERVICES = "starting_services"
CONFIGURING_SERVICES = "configuring_services"
PROFILING = "profiling"
"""The system is running a profiling run."""

PROCESSING = "processing"
"""The system is processing results."""

PROCESSING_RESULTS = "processing_results"
EXPORTING_DATA = "exporting_data"
STOPPING = "stopping"
"""The system is stopping."""

SHUTDOWN = "shutdown"
"""The system is shutting down. This is the final state."""
6 changes: 6 additions & 0 deletions aiperf/controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
from aiperf.controller.system_mixins import (
SignalHandlerMixin,
)
from aiperf.controller.system_utils import (
display_command_errors,
extract_errors,
)

__all__ = [
"BaseServiceManager",
Expand All @@ -31,5 +35,7 @@
"ServiceKubernetesRunInfo",
"SignalHandlerMixin",
"SystemController",
"display_command_errors",
"extract_errors",
"main",
]
55 changes: 45 additions & 10 deletions aiperf/controller/system_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ServiceRegistrationStatus,
ServiceType,
)
from aiperf.common.enums.system_enums import SystemState
from aiperf.common.factories import (
AIPerfUIFactory,
ServiceFactory,
Expand Down Expand Up @@ -51,6 +52,7 @@
from aiperf.common.types import ServiceTypeT
from aiperf.controller.proxy_manager import ProxyManager
from aiperf.controller.system_mixins import SignalHandlerMixin
from aiperf.controller.system_utils import display_command_errors, extract_errors
from aiperf.exporters.exporter_manager import ExporterManager


Expand All @@ -75,6 +77,7 @@ def __init__(
)
self.debug("Creating System Controller")
self._was_cancelled = False
self._system_state = SystemState.CREATED
# List of required service types, in no particular order
# These are services that must be running before the system controller can start profiling
self.required_services: dict[ServiceTypeT, int] = {
Expand Down Expand Up @@ -124,10 +127,24 @@ async def request_realtime_metrics(self) -> None:
)
)

@property
def system_state(self) -> SystemState:
"""Get the current state of the system."""
return self._system_state

@system_state.setter
def system_state(self, state: SystemState) -> None:
"""Set the current state of the system."""
if state == self._system_state:
return
self.info(f"AIPerf System is {state.name}")
self._system_state = state

async def initialize(self) -> None:
"""We need to override the initialize method to run the proxy manager before the base service initialize.
This is because the proxies need to be running before we can subscribe to the message bus.
"""
self.system_state = SystemState.INITIALIZING
self.debug("Running ZMQ Proxy Manager Before Initialize")
await self.proxy_manager.initialize_and_start()
# Once the proxies are running, call the original initialize method
Expand All @@ -151,27 +168,30 @@ async def _start_services(self) -> None:
- Start all required services
"""
self.debug("System Controller is bootstrapping services")
self.system_state = SystemState.STARTING_SERVICES
# Start all required services
await self.service_manager.start()
# Wait for all services to be registered
await self.service_manager.wait_for_all_services_registration(
stop_event=self._stop_requested_event,
)

self.info("AIPerf System is CONFIGURING")
await self._profile_configure_all_services()
self.info("AIPerf System is CONFIGURED")
await self._start_profiling_all_services()
self.info("AIPerf System is PROFILING")
self.system_state = SystemState.CONFIGURING_SERVICES
if not await self._profile_configure_all_services():
return
self.system_state = SystemState.PROFILING
if not await self._start_profiling_all_services():
return

async def _profile_configure_all_services(self) -> None:
async def _profile_configure_all_services(self) -> bool:
"""Configure all services to start profiling.

This is a blocking call that will wait for all services to be configured before returning. This way
we can ensure that all services are configured before we start profiling.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the comments to make it clear what the return values signify?

"""
self.info("Configuring all services to start profiling")
begin = time.perf_counter()
await self.send_command_and_wait_for_all_responses(
responses = await self.send_command_and_wait_for_all_responses(
ProfileConfigureCommand(
service_id=self.service_id,
config=self.user_config,
Expand All @@ -181,18 +201,30 @@ async def _profile_configure_all_services(self) -> None:
)
duration = time.perf_counter() - begin
self.info(f"All services configured in {duration:.2f} seconds")

async def _start_profiling_all_services(self) -> None:
errors = extract_errors(responses)
if errors:
display_command_errors("Failed to configure all services", errors)
Copy link

Copilot AI Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using asyncio.shield() prevents cancellation of the stop operation, but since this is in an error handling path, consider whether shielding is necessary. If the goal is to ensure cleanup always runs, this pattern is correct, but it should be documented why shielding is needed here.

Suggested change
display_command_errors("Failed to configure all services", errors)
display_command_errors("Failed to configure all services", errors)
# Shielding stop() to ensure cleanup always runs even if cancelled.

Copilot uses AI. Check for mistakes.
await asyncio.shield(self.stop())
return False
return True

async def _start_profiling_all_services(self) -> bool:
"""Tell all services to start profiling."""
self.debug("Sending PROFILE_START command to all services")
await self.send_command_and_wait_for_all_responses(
responses = await self.send_command_and_wait_for_all_responses(
ProfileStartCommand(
service_id=self.service_id,
),
list(self.service_manager.service_id_map.keys()),
timeout=DEFAULT_PROFILE_START_TIMEOUT,
)
self.info("All services started profiling successfully")
errors = extract_errors(responses)
if errors:
display_command_errors("Failed to start profiling all services", errors)
await asyncio.shield(self.stop())
Copy link

Copilot AI Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Duplicate use of asyncio.shield() for stop operations. Consider extracting this pattern into a helper method to reduce code duplication and ensure consistent error handling behavior across the codebase.

Copilot uses AI. Check for mistakes.
return False
return True

@on_command(CommandType.REGISTER_SERVICE)
async def _handle_register_service_command(
Expand Down Expand Up @@ -267,6 +299,7 @@ async def _process_credits_complete_message(
"""
service_id = message.service_id
self.info(f"Received credits complete from {service_id}")
self.system_state = SystemState.PROCESSING_RESULTS

@on_message(MessageType.STATUS)
async def _process_status_message(self, message: StatusMessage) -> None:
Expand Down Expand Up @@ -353,6 +386,7 @@ async def _on_process_records_result_message(

# This data will also be displayed by the console error exporter
self.debug(lambda: f"Error summary: {message.results.results.error_summary}")
self.system_state = SystemState.EXPORTING_DATA

self._profile_results = message.results

Expand Down Expand Up @@ -400,6 +434,7 @@ async def _cancel_profiling(self) -> None:
@on_stop
async def _stop_system_controller(self) -> None:
"""Stop the system controller and all running services."""
self.system_state = SystemState.STOPPING
# Broadcast a shutdown command to all services
await self.publish(ShutdownCommand(service_id=self.service_id))

Expand Down
61 changes: 61 additions & 0 deletions aiperf/controller/system_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from rich.console import Console
from rich.panel import Panel
from rich.text import Text

from aiperf.common.messages import CommandErrorResponse
from aiperf.common.messages.command_messages import CommandResponse
from aiperf.common.models.error_models import ErrorDetails


def extract_errors(
responses: list[CommandResponse | ErrorDetails],
) -> list[CommandErrorResponse | ErrorDetails]:
"""Extract errors from a list of command responses."""
return [
response
for response in responses
if isinstance(response, CommandErrorResponse | ErrorDetails)
]


def display_command_errors(
title: str, errors: list[CommandErrorResponse | ErrorDetails]
) -> None:
"""Display command errors to the user."""
if not errors:
return
summary = []
for error in errors:
if isinstance(error, CommandErrorResponse):
summary.append(
Text.assemble(
Text("•", style="bold red"),
f" Service: {error.service_id}: Command: {error.command}\n",
)
)
summary.append(
Text.assemble(
Text(f"\t{error.error.type}:", style="bold red"),
f" {error.error.message}\n",
)
)
else:
summary.append(
Text.assemble(
Text(f"• {error.type}:", style="bold red"), f" {error.message}\n"
)
)
# Remove the trailing newline from the last summary item
summary[-1]._text[-1] = summary[-1]._text[-1].rstrip("\n")
Copy link

Copilot AI Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct access to private attributes (_text) breaks encapsulation and makes the code fragile to changes in the Rich library. Consider using the public API or building the text without trailing newlines instead of modifying internal state.

Suggested change
summary[-1]._text[-1] = summary[-1]._text[-1].rstrip("\n")
summary[-1] = Text(summary[-1].plain.rstrip("\n"), style=summary[-1].style)

Copilot uses AI. Check for mistakes.
console = Console()
console.print(
Panel(
Text.assemble(*summary),
border_style="bold red",
title=f"Error: {title}",
title_align="left",
)
)
console.file.flush()
Loading