Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aiperf/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,6 @@

DEFAULT_WORKER_HEALTH_CHECK_INTERVAL = 2.0
"""Default interval in seconds between worker health check messages."""

DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT = 10.0
"""Default timeout for terminating the ZMQ context in seconds."""
14 changes: 13 additions & 1 deletion aiperf/controller/proxy_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio

import zmq.asyncio

from aiperf.common.config import ServiceConfig
from aiperf.common.constants import DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT
from aiperf.common.enums import ZMQProxyType
from aiperf.common.factories import ZMQProxyFactory
from aiperf.common.hooks import on_init, on_start, on_stop
Expand Down Expand Up @@ -47,5 +50,14 @@ async def _stop_proxies(self) -> None:
self.debug("Stopping all proxies")
for proxy in self.proxies:
await proxy.stop()
zmq.asyncio.Context.instance().destroy()
self.debug("All proxies stopped successfully")

try:
self.debug("Terminating ZMQ context")
await asyncio.wait_for(
asyncio.to_thread(zmq.asyncio.Context.instance().term),
timeout=DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT,
)
self.debug("ZMQ context terminated successfully")
except BaseException as e:
self.warning(f"Error terminating ZMQ context: {e}")