diff --git a/aiperf/common/constants.py b/aiperf/common/constants.py index 97079cb51..5548a3521 100644 --- a/aiperf/common/constants.py +++ b/aiperf/common/constants.py @@ -91,5 +91,8 @@ DEFAULT_WORKER_HEALTH_CHECK_INTERVAL = 2.0 """Default interval in seconds between worker health check messages.""" +DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT = 10.0 +"""Default timeout for terminating the ZMQ context in seconds.""" + AIPERF_HTTP_CONNECTION_LIMIT = int(os.environ.get("AIPERF_HTTP_CONNECTION_LIMIT", 2500)) """Maximum number of concurrent connections for HTTP clients.""" diff --git a/aiperf/controller/proxy_manager.py b/aiperf/controller/proxy_manager.py index c2e142b64..7327f8163 100644 --- a/aiperf/controller/proxy_manager.py +++ b/aiperf/controller/proxy_manager.py @@ -1,8 +1,11 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import asyncio + import zmq.asyncio from aiperf.common.config import ServiceConfig +from aiperf.common.constants import DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT from aiperf.common.enums import ZMQProxyType from aiperf.common.factories import ZMQProxyFactory from aiperf.common.hooks import on_init, on_start, on_stop @@ -47,5 +50,14 @@ async def _stop_proxies(self) -> None: self.debug("Stopping all proxies") for proxy in self.proxies: await proxy.stop() - zmq.asyncio.Context.instance().destroy() self.debug("All proxies stopped successfully") + + try: + self.debug("Terminating ZMQ context") + await asyncio.wait_for( + asyncio.to_thread(zmq.asyncio.Context.instance().term), + timeout=DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT, + ) + self.debug("ZMQ context terminated successfully") + except BaseException as e: + self.warning(f"Error terminating ZMQ context: {e}")