From 46d5afc44f3c848fcd919a73d813ecc4bedb25d8 Mon Sep 17 00:00:00 2001 From: Brian McCaffrey Date: Fri, 20 Feb 2026 12:50:48 -0600 Subject: [PATCH] Add in min_connections to available arguments Signed-off-by: Brian McCaffrey --- tests/test_asyncio/test_cluster.py | 18 +++++++ tests/test_asyncio/test_connection_pool.py | 45 +++++++++++++++++ tests/test_connection_pool.py | 56 ++++++++++++++++++++++ valkey/asyncio/client.py | 10 ++++ valkey/asyncio/cluster.py | 32 +++++++++++++ valkey/asyncio/connection.py | 38 +++++++++++++++ valkey/client.py | 7 +++ valkey/cluster.py | 4 +- valkey/connection.py | 41 ++++++++++++++-- 9 files changed, 245 insertions(+), 6 deletions(-) diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 01c515a0..925e0e21 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -468,6 +468,24 @@ async def read_response_mocked(*args: Any, **kwargs: Any) -> None: await rc.aclose() + async def test_min_connections( + self, create_valkey: Callable[..., ValkeyCluster] + ) -> None: + rc = await create_valkey(cls=ValkeyCluster, min_connections=5) + for node in rc.get_nodes(): + assert node.min_connections == 5 + assert len(node._connections) == 5 + assert len(node._free) == 5 + await rc.aclose() + + async def test_min_connections_greater_than_max( + self, create_valkey: Callable[..., ValkeyCluster] + ) -> None: + with pytest.raises(ValkeyClusterException): + await create_valkey( + cls=ValkeyCluster, min_connections=20, max_connections=10 + ) + async def test_execute_command_errors(self, r: ValkeyCluster) -> None: """ Test that if no key is provided then exception should be raised. diff --git a/tests/test_asyncio/test_connection_pool.py b/tests/test_asyncio/test_connection_pool.py index 2b7813fe..803a16b5 100644 --- a/tests/test_asyncio/test_connection_pool.py +++ b/tests/test_asyncio/test_connection_pool.py @@ -196,6 +196,51 @@ async def test_repr_contains_db_info_unix(self): expected = "path=/abc,db=1,client_name=test-client" assert expected in repr(pool) + async def test_min_connections(self): + pool = valkey.ConnectionPool( + connection_class=DummyConnection, + min_connections=5, + ) + assert pool.min_connections == 5 + assert len(pool._available_connections) == 5 + await pool.disconnect(inuse_connections=True) + + async def test_min_connections_default(self): + pool = valkey.ConnectionPool( + connection_class=DummyConnection, + ) + assert pool.min_connections == 0 + assert len(pool._available_connections) == 0 + await pool.disconnect(inuse_connections=True) + + async def test_min_connections_greater_than_max_raises(self): + with pytest.raises(ValueError): + valkey.ConnectionPool( + connection_class=DummyConnection, + min_connections=20, + max_connections=10, + ) + + async def test_min_connections_negative_raises(self): + with pytest.raises(ValueError): + valkey.ConnectionPool( + connection_class=DummyConnection, + min_connections=-1, + ) + + async def test_min_connections_initialize(self): + pool = valkey.ConnectionPool( + connection_class=DummyConnection, + min_connections=3, + ) + assert not pool._initialized + await pool.initialize() + assert pool._initialized + # Calling initialize again is a no-op + await pool.initialize() + assert pool._initialized + await pool.disconnect(inuse_connections=True) + class TestBlockingConnectionPool: @asynccontextmanager diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 7111e894..b912f4ee 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -107,6 +107,37 @@ def test_repr_contains_db_info_unix(self): expected = "path=/abc,db=1,client_name=test-client" assert expected in repr(pool) + def test_min_connections(self): + pool = valkey.ConnectionPool( + connection_class=DummyConnection, + min_connections=5, + ) + assert pool.min_connections == 5 + assert len(pool._available_connections) == 5 + assert pool._created_connections == 5 + + def test_min_connections_default(self): + pool = valkey.ConnectionPool( + connection_class=DummyConnection, + ) + assert pool.min_connections == 0 + assert len(pool._available_connections) == 0 + + def test_min_connections_greater_than_max_raises(self): + with pytest.raises(ValueError): + valkey.ConnectionPool( + connection_class=DummyConnection, + min_connections=20, + max_connections=10, + ) + + def test_min_connections_negative_raises(self): + with pytest.raises(ValueError): + valkey.ConnectionPool( + connection_class=DummyConnection, + min_connections=-1, + ) + class TestBlockingConnectionPool: def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20): @@ -196,6 +227,31 @@ def test_repr_contains_db_info_unix(self): expected = "path=abc,db=0,client_name=test-client" assert expected in repr(pool) + def test_min_connections(self): + pool = valkey.BlockingConnectionPool( + connection_class=DummyConnection, + max_connections=10, + min_connections=5, + ) + assert pool.min_connections == 5 + assert len(pool._connections) == 5 + + def test_min_connections_default(self): + pool = valkey.BlockingConnectionPool( + connection_class=DummyConnection, + max_connections=10, + ) + assert pool.min_connections == 0 + assert len(pool._connections) == 0 + + def test_min_connections_greater_than_max_raises(self): + with pytest.raises(ValueError): + valkey.BlockingConnectionPool( + connection_class=DummyConnection, + min_connections=20, + max_connections=10, + ) + class TestConnectionPoolURLParsing: def test_hostname(self): diff --git a/valkey/asyncio/client.py b/valkey/asyncio/client.py index 851b2cc1..6635a0f2 100644 --- a/valkey/asyncio/client.py +++ b/valkey/asyncio/client.py @@ -226,6 +226,7 @@ def __init__( ssl_min_version: Optional[ssl.TLSVersion] = None, ssl_ciphers: Optional[str] = None, max_connections: Optional[int] = None, + min_connections: int = 0, single_connection_client: bool = False, health_check_interval: int = 0, client_name: Optional[str] = None, @@ -251,6 +252,13 @@ def __init__( `retry_on_error` to a list of the error/s to retry on, then set `retry` to a valid `Retry` object. To retry on TimeoutError, `retry_on_timeout` can also be set to `True`. + + Args: + max_connections: The maximum number of connections for the pool. + min_connections: The minimum number of connections to pre-create + when the pool is initialized. These connections are eagerly + connected when the client is first awaited or used. + Defaults to 0. """ kwargs: Dict[str, Any] # auto_close_connection_pool only has an effect if connection_pool is @@ -287,6 +295,7 @@ def __init__( "retry_on_error": retry_on_error, "retry": copy.deepcopy(retry), "max_connections": max_connections, + "min_connections": min_connections, "health_check_interval": health_check_interval, "client_name": client_name, "lib_name": lib_name, @@ -368,6 +377,7 @@ def __await__(self): return self.initialize().__await__() async def initialize(self: _ValkeyT) -> _ValkeyT: + await self.connection_pool.initialize() if self.single_connection_client: async with self._single_conn_lock: if self.connection is None: diff --git a/valkey/asyncio/cluster.py b/valkey/asyncio/cluster.py index 172fdfb3..a6833166 100644 --- a/valkey/asyncio/cluster.py +++ b/valkey/asyncio/cluster.py @@ -174,6 +174,11 @@ class ValkeyCluster(AbstractValkey, AbstractValkeyCluster, AsyncValkeyClusterCom maximum number of connections are already created, a :class:`~.MaxConnectionsError` is raised. This error may be retried as defined by :attr:`connection_error_retry_attempts` + :param min_connections: + | Minimum number of connections per node to pre-create when the cluster is + initialized. These connections are eagerly connected during cluster setup, + reducing latency on the first requests. Must be less than or equal to + ``max_connections``. Defaults to 0. :param address_remap: | An optional callable which, when provided with an internal network address of a node, e.g. a `(host, port)` tuple, will return the address @@ -255,6 +260,7 @@ def __init__( cluster_error_retry_attempts: int = 3, connection_error_retry_attempts: int = 3, max_connections: int = 2**31, + min_connections: int = 0, # Client related kwargs db: Union[str, int] = 0, path: Optional[str] = None, @@ -317,6 +323,7 @@ def __init__( kwargs: Dict[str, Any] = { "max_connections": max_connections, + "min_connections": min_connections, "connection_class": Connection, "parser_class": ClusterParser, # Client related kwargs @@ -979,6 +986,7 @@ class ClusterNode: "connection_kwargs", "host", "max_connections", + "min_connections", "name", "port", "response_callbacks", @@ -992,12 +1000,18 @@ def __init__( server_type: Optional[str] = None, *, max_connections: int = 2**31, + min_connections: int = 0, connection_class: Type[Connection] = Connection, **connection_kwargs: Any, ) -> None: if host == "localhost": host = socket.gethostbyname(host) + if min_connections > max_connections: + raise ValkeyClusterException( + '"min_connections" must be less than or equal to "max_connections"' + ) + connection_kwargs["host"] = host connection_kwargs["port"] = port self.host = host @@ -1006,6 +1020,7 @@ def __init__( self.server_type = server_type self.max_connections = max_connections + self.min_connections = min_connections self.connection_class = connection_class self.connection_kwargs = connection_kwargs self.response_callbacks = connection_kwargs.pop("response_callbacks", {}) @@ -1013,6 +1028,20 @@ def __init__( self._connections: List[Connection] = [] self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections) + # Pre-create min_connections connection objects + for _ in range(self.min_connections): + connection = self.connection_class(**self.connection_kwargs) + self._connections.append(connection) + self._free.append(connection) + + async def initialize(self) -> None: + """Connect all pre-created connections from min_connections.""" + if not self._connections: + return + await asyncio.gather( + *(connection.connect() for connection in self._connections) + ) + def __repr__(self) -> str: return ( f"[host={self.host}, port={self.port}, " @@ -1415,6 +1444,9 @@ async def initialize(self) -> None: # If initialize was called after a MovedError, clear it self._moved_exception = None + # Eagerly connect min_connections for each node + await asyncio.gather(*(node.initialize() for node in self.nodes_cache.values())) + async def aclose(self, attr: str = "nodes_cache") -> None: self.default_node = None await asyncio.gather( diff --git a/valkey/asyncio/connection.py b/valkey/asyncio/connection.py index 5bc3e46c..303b6806 100644 --- a/valkey/asyncio/connection.py +++ b/valkey/asyncio/connection.py @@ -984,6 +984,16 @@ class ConnectionPool: Any additional keyword arguments are passed to the constructor of ``connection_class``. + + Args: + connection_class: The class to use for creating connections. + Defaults to :py:class:`~valkey.asyncio.connection.Connection`. + max_connections: The maximum number of connections to create. + If not set, there is no limit. + min_connections: The minimum number of connections to pre-create + when the pool is initialized. Call :meth:`initialize` to eagerly + connect them, or they will be connected lazily on first use. + Must be less than or equal to ``max_connections``. Defaults to 0. """ @classmethod @@ -1036,20 +1046,43 @@ def __init__( self, connection_class: Type[AbstractConnection] = Connection, max_connections: Optional[int] = None, + min_connections: int = 0, **connection_kwargs, ): max_connections = max_connections or 2**31 if not isinstance(max_connections, int) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') + if not isinstance(min_connections, int) or min_connections < 0: + raise ValueError('"min_connections" must be a non-negative integer') + if min_connections > max_connections: + raise ValueError( + '"min_connections" must be less than or equal to "max_connections"' + ) self.connection_class = connection_class self.connection_kwargs = connection_kwargs self.max_connections = max_connections + self.min_connections = min_connections self._available_connections: List[AbstractConnection] = [] self._in_use_connections: Set[AbstractConnection] = set() self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder) + self._initialized = False + + # Pre-create min_connections connection objects (connected lazily, + # or call initialize() to connect them eagerly) + for _ in range(self.min_connections): + self._available_connections.append(self.make_connection()) + + async def initialize(self): + """Connect all pre-created connections from min_connections.""" + if self._initialized: + return + for connection in self._available_connections: + await connection.connect() + self._initialized = True + def __repr__(self): return ( f"<{self.__class__.__module__}.{self.__class__.__name__}" @@ -1059,6 +1092,11 @@ def __repr__(self): def reset(self): self._available_connections = [] self._in_use_connections = weakref.WeakSet() + self._initialized = False + + # Pre-create min_connections connection objects + for _ in range(self.min_connections): + self._available_connections.append(self.make_connection()) def can_get_connection(self) -> bool: """Return True if a connection can be retrieved from the pool.""" diff --git a/valkey/client.py b/valkey/client.py index df265ab4..0c081700 100755 --- a/valkey/client.py +++ b/valkey/client.py @@ -204,6 +204,7 @@ def __init__( ssl_min_version=None, ssl_ciphers=None, max_connections=None, + min_connections=0, single_connection_client=False, health_check_interval=0, client_name=None, @@ -231,6 +232,11 @@ def __init__( Args: + max_connections: + The maximum number of connections for the pool. + min_connections: + The minimum number of connections to pre-create and connect + when the pool is initialized. Defaults to 0. single_connection_client: if `True`, connection pool is not used. In that case `Valkey` instance use is not thread safe. @@ -265,6 +271,7 @@ def __init__( "retry_on_error": retry_on_error, "retry": copy.deepcopy(retry), "max_connections": max_connections, + "min_connections": min_connections, "health_check_interval": health_check_interval, "client_name": client_name, "lib_name": lib_name, diff --git a/valkey/cluster.py b/valkey/cluster.py index fb508637..d16061f8 100644 --- a/valkey/cluster.py +++ b/valkey/cluster.py @@ -146,6 +146,7 @@ def parse_cluster_myshardid(resp, **options): "lib_name", "lib_version", "max_connections", + "min_connections", "nodes_flag", "valkey_connect_func", "password", @@ -562,7 +563,8 @@ def __init__( :**kwargs: Extra arguments that will be sent into Valkey instance when created - (See Official valkey-py doc for supported kwargs) + (See Official valkey-py doc for supported kwargs + e.g. ``max_connections``, ``min_connections``) Some kwargs are not supported and will raise a ValkeyClusterException: - db (Valkey do not support database SELECT in cluster mode) diff --git a/valkey/connection.py b/valkey/connection.py index 699b3e51..d5b7d6b8 100644 --- a/valkey/connection.py +++ b/valkey/connection.py @@ -966,6 +966,16 @@ class ConnectionPool: Any additional keyword arguments are passed to the constructor of ``connection_class``. + + Args: + connection_class: The class to use for creating connections. + Defaults to :py:class:`~valkey.Connection`. + max_connections: The maximum number of connections to create. + If not set, there is no limit. + min_connections: The minimum number of connections to pre-create + and connect when the pool is initialized. These connections are + immediately available, reducing latency on the first requests. + Must be less than or equal to ``max_connections``. Defaults to 0. """ @classmethod @@ -1019,15 +1029,23 @@ def __init__( self, connection_class=Connection, max_connections: Optional[int] = None, + min_connections: int = 0, **connection_kwargs, ): max_connections = max_connections or 2**31 if not isinstance(max_connections, int) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') + if not isinstance(min_connections, int) or min_connections < 0: + raise ValueError('"min_connections" must be a non-negative integer') + if min_connections > max_connections: + raise ValueError( + '"min_connections" must be less than or equal to "max_connections"' + ) self.connection_class = connection_class self.connection_kwargs = connection_kwargs self.max_connections = max_connections + self.min_connections = min_connections # a lock to protect the critical section in _checkpid(). # this lock is acquired when the process id changes, such as @@ -1052,6 +1070,12 @@ def reset(self) -> None: self._available_connections: list[Connection] = [] self._in_use_connections: set[Connection] = set() + # Pre-create and connect min_connections connections + for _ in range(self.min_connections): + connection = self.make_connection() + connection.connect() + self._available_connections.append(connection) + # this must be the last operation in this method. while reset() is # called when holding _fork_lock, other threads in this process # can call _checkpid() which compares self.pid and os.getpid() without @@ -1292,16 +1316,23 @@ def __init__( def reset(self): # Create and fill up a thread safe queue with ``None`` values. self.pool = self.queue_class(self.max_connections) - while True: - try: - self.pool.put_nowait(None) - except Full: - break # Keep a list of actual connection instances so that we can # disconnect them later. self._connections = [] + # Fill slots beyond min_connections with None placeholders first, + # so that pre-created connections end up on top of the LIFO queue + # and are used first. + for _ in range(self.max_connections - self.min_connections): + self.pool.put_nowait(None) + + # Pre-create and connect min_connections connections + for _ in range(self.min_connections): + connection = self.make_connection() + connection.connect() + self.pool.put_nowait(connection) + # this must be the last operation in this method. while reset() is # called when holding _fork_lock, other threads in this process # can call _checkpid() which compares self.pid and os.getpid() without