Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,24 @@ async def read_response_mocked(*args: Any, **kwargs: Any) -> None:

await rc.aclose()

async def test_min_connections(
self, create_valkey: Callable[..., ValkeyCluster]
) -> None:
rc = await create_valkey(cls=ValkeyCluster, min_connections=5)
for node in rc.get_nodes():
assert node.min_connections == 5
assert len(node._connections) == 5
assert len(node._free) == 5
await rc.aclose()

async def test_min_connections_greater_than_max(
self, create_valkey: Callable[..., ValkeyCluster]
) -> None:
with pytest.raises(ValkeyClusterException):
await create_valkey(
cls=ValkeyCluster, min_connections=20, max_connections=10
)

async def test_execute_command_errors(self, r: ValkeyCluster) -> None:
"""
Test that if no key is provided then exception should be raised.
Expand Down
45 changes: 45 additions & 0 deletions tests/test_asyncio/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,51 @@ async def test_repr_contains_db_info_unix(self):
expected = "path=/abc,db=1,client_name=test-client"
assert expected in repr(pool)

async def test_min_connections(self):
pool = valkey.ConnectionPool(
connection_class=DummyConnection,
min_connections=5,
)
assert pool.min_connections == 5
assert len(pool._available_connections) == 5
await pool.disconnect(inuse_connections=True)

async def test_min_connections_default(self):
pool = valkey.ConnectionPool(
connection_class=DummyConnection,
)
assert pool.min_connections == 0
assert len(pool._available_connections) == 0
await pool.disconnect(inuse_connections=True)

async def test_min_connections_greater_than_max_raises(self):
with pytest.raises(ValueError):
valkey.ConnectionPool(
connection_class=DummyConnection,
min_connections=20,
max_connections=10,
)

async def test_min_connections_negative_raises(self):
with pytest.raises(ValueError):
valkey.ConnectionPool(
connection_class=DummyConnection,
min_connections=-1,
)

async def test_min_connections_initialize(self):
pool = valkey.ConnectionPool(
connection_class=DummyConnection,
min_connections=3,
)
assert not pool._initialized
await pool.initialize()
assert pool._initialized
# Calling initialize again is a no-op
await pool.initialize()
assert pool._initialized
await pool.disconnect(inuse_connections=True)


class TestBlockingConnectionPool:
@asynccontextmanager
Expand Down
56 changes: 56 additions & 0 deletions tests/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,37 @@ def test_repr_contains_db_info_unix(self):
expected = "path=/abc,db=1,client_name=test-client"
assert expected in repr(pool)

def test_min_connections(self):
pool = valkey.ConnectionPool(
connection_class=DummyConnection,
min_connections=5,
)
assert pool.min_connections == 5
assert len(pool._available_connections) == 5
assert pool._created_connections == 5

def test_min_connections_default(self):
pool = valkey.ConnectionPool(
connection_class=DummyConnection,
)
assert pool.min_connections == 0
assert len(pool._available_connections) == 0

def test_min_connections_greater_than_max_raises(self):
with pytest.raises(ValueError):
valkey.ConnectionPool(
connection_class=DummyConnection,
min_connections=20,
max_connections=10,
)

def test_min_connections_negative_raises(self):
with pytest.raises(ValueError):
valkey.ConnectionPool(
connection_class=DummyConnection,
min_connections=-1,
)


class TestBlockingConnectionPool:
def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20):
Expand Down Expand Up @@ -196,6 +227,31 @@ def test_repr_contains_db_info_unix(self):
expected = "path=abc,db=0,client_name=test-client"
assert expected in repr(pool)

def test_min_connections(self):
pool = valkey.BlockingConnectionPool(
connection_class=DummyConnection,
max_connections=10,
min_connections=5,
)
assert pool.min_connections == 5
assert len(pool._connections) == 5

def test_min_connections_default(self):
pool = valkey.BlockingConnectionPool(
connection_class=DummyConnection,
max_connections=10,
)
assert pool.min_connections == 0
assert len(pool._connections) == 0

def test_min_connections_greater_than_max_raises(self):
with pytest.raises(ValueError):
valkey.BlockingConnectionPool(
connection_class=DummyConnection,
min_connections=20,
max_connections=10,
)


class TestConnectionPoolURLParsing:
def test_hostname(self):
Expand Down
10 changes: 10 additions & 0 deletions valkey/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def __init__(
ssl_min_version: Optional[ssl.TLSVersion] = None,
ssl_ciphers: Optional[str] = None,
max_connections: Optional[int] = None,
min_connections: int = 0,
single_connection_client: bool = False,
health_check_interval: int = 0,
client_name: Optional[str] = None,
Expand All @@ -251,6 +252,13 @@ def __init__(
`retry_on_error` to a list of the error/s to retry on, then set
`retry` to a valid `Retry` object.
To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.

Args:
max_connections: The maximum number of connections for the pool.
min_connections: The minimum number of connections to pre-create
when the pool is initialized. These connections are eagerly
connected when the client is first awaited or used.
Defaults to 0.
"""
kwargs: Dict[str, Any]
# auto_close_connection_pool only has an effect if connection_pool is
Expand Down Expand Up @@ -287,6 +295,7 @@ def __init__(
"retry_on_error": retry_on_error,
"retry": copy.deepcopy(retry),
"max_connections": max_connections,
"min_connections": min_connections,
"health_check_interval": health_check_interval,
"client_name": client_name,
"lib_name": lib_name,
Expand Down Expand Up @@ -368,6 +377,7 @@ def __await__(self):
return self.initialize().__await__()

async def initialize(self: _ValkeyT) -> _ValkeyT:
await self.connection_pool.initialize()
if self.single_connection_client:
async with self._single_conn_lock:
if self.connection is None:
Expand Down
32 changes: 32 additions & 0 deletions valkey/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ class ValkeyCluster(AbstractValkey, AbstractValkeyCluster, AsyncValkeyClusterCom
maximum number of connections are already created, a
:class:`~.MaxConnectionsError` is raised. This error may be retried as defined
by :attr:`connection_error_retry_attempts`
:param min_connections:
| Minimum number of connections per node to pre-create when the cluster is
initialized. These connections are eagerly connected during cluster setup,
reducing latency on the first requests. Must be less than or equal to
``max_connections``. Defaults to 0.
:param address_remap:
| An optional callable which, when provided with an internal network
address of a node, e.g. a `(host, port)` tuple, will return the address
Expand Down Expand Up @@ -255,6 +260,7 @@ def __init__(
cluster_error_retry_attempts: int = 3,
connection_error_retry_attempts: int = 3,
max_connections: int = 2**31,
min_connections: int = 0,
# Client related kwargs
db: Union[str, int] = 0,
path: Optional[str] = None,
Expand Down Expand Up @@ -317,6 +323,7 @@ def __init__(

kwargs: Dict[str, Any] = {
"max_connections": max_connections,
"min_connections": min_connections,
"connection_class": Connection,
"parser_class": ClusterParser,
# Client related kwargs
Expand Down Expand Up @@ -979,6 +986,7 @@ class ClusterNode:
"connection_kwargs",
"host",
"max_connections",
"min_connections",
"name",
"port",
"response_callbacks",
Expand All @@ -992,12 +1000,18 @@ def __init__(
server_type: Optional[str] = None,
*,
max_connections: int = 2**31,
min_connections: int = 0,
connection_class: Type[Connection] = Connection,
**connection_kwargs: Any,
) -> None:
if host == "localhost":
host = socket.gethostbyname(host)

if min_connections > max_connections:
raise ValkeyClusterException(
'"min_connections" must be less than or equal to "max_connections"'
)
Comment on lines +1010 to +1013
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClusterNode validates min_connections only against max_connections, but does not reject negative or non-int values. This can silently accept invalid input (e.g., min_connections=-1 results in no pre-created connections) and is inconsistent with ConnectionPool validation. Consider adding the same type/range checks here and raising ValkeyClusterException for invalid values.

Copilot uses AI. Check for mistakes.

connection_kwargs["host"] = host
connection_kwargs["port"] = port
self.host = host
Expand All @@ -1006,13 +1020,28 @@ def __init__(
self.server_type = server_type

self.max_connections = max_connections
self.min_connections = min_connections
self.connection_class = connection_class
self.connection_kwargs = connection_kwargs
self.response_callbacks = connection_kwargs.pop("response_callbacks", {})

self._connections: List[Connection] = []
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)

# Pre-create min_connections connection objects
for _ in range(self.min_connections):
connection = self.connection_class(**self.connection_kwargs)
self._connections.append(connection)
self._free.append(connection)

async def initialize(self) -> None:
"""Connect all pre-created connections from min_connections."""
if not self._connections:
return
await asyncio.gather(
*(connection.connect() for connection in self._connections)
)

def __repr__(self) -> str:
return (
f"[host={self.host}, port={self.port}, "
Expand Down Expand Up @@ -1415,6 +1444,9 @@ async def initialize(self) -> None:
# If initialize was called after a MovedError, clear it
self._moved_exception = None

# Eagerly connect min_connections for each node
await asyncio.gather(*(node.initialize() for node in self.nodes_cache.values()))

async def aclose(self, attr: str = "nodes_cache") -> None:
self.default_node = None
await asyncio.gather(
Expand Down
38 changes: 38 additions & 0 deletions valkey/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,16 @@ class ConnectionPool:

Any additional keyword arguments are passed to the constructor of
``connection_class``.

Args:
connection_class: The class to use for creating connections.
Defaults to :py:class:`~valkey.asyncio.connection.Connection`.
max_connections: The maximum number of connections to create.
If not set, there is no limit.
min_connections: The minimum number of connections to pre-create
when the pool is initialized. Call :meth:`initialize` to eagerly
connect them, or they will be connected lazily on first use.
Must be less than or equal to ``max_connections``. Defaults to 0.
"""

@classmethod
Expand Down Expand Up @@ -1036,20 +1046,43 @@ def __init__(
self,
connection_class: Type[AbstractConnection] = Connection,
max_connections: Optional[int] = None,
min_connections: int = 0,
**connection_kwargs,
):
max_connections = max_connections or 2**31
if not isinstance(max_connections, int) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')
if not isinstance(min_connections, int) or min_connections < 0:
raise ValueError('"min_connections" must be a non-negative integer')
if min_connections > max_connections:
raise ValueError(
'"min_connections" must be less than or equal to "max_connections"'
)

self.connection_class = connection_class
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.min_connections = min_connections

self._available_connections: List[AbstractConnection] = []
self._in_use_connections: Set[AbstractConnection] = set()
self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder)

self._initialized = False

# Pre-create min_connections connection objects (connected lazily,
# or call initialize() to connect them eagerly)
for _ in range(self.min_connections):
self._available_connections.append(self.make_connection())

async def initialize(self):
"""Connect all pre-created connections from min_connections."""
if self._initialized:
return
Comment on lines +1080 to +1081
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectionPool.initialize() returns early when _initialized is True, but disconnect() does not reset _initialized. If a caller disconnects the pool and then calls initialize() expecting to eagerly reconnect the pre-created connections, initialize() will be a no-op and the connections remain disconnected until first use. Consider resetting _initialized in disconnect(), or making initialize() attempt to connect regardless and only skip work if all connections are already connected.

Suggested change
if self._initialized:
return

Copilot uses AI. Check for mistakes.
for connection in self._available_connections:
await connection.connect()
self._initialized = True

def __repr__(self):
return (
f"<{self.__class__.__module__}.{self.__class__.__name__}"
Expand All @@ -1059,6 +1092,11 @@ def __repr__(self):
def reset(self):
self._available_connections = []
self._in_use_connections = weakref.WeakSet()
self._initialized = False

# Pre-create min_connections connection objects
for _ in range(self.min_connections):
self._available_connections.append(self.make_connection())

def can_get_connection(self) -> bool:
"""Return True if a connection can be retrieved from the pool."""
Expand Down
7 changes: 7 additions & 0 deletions valkey/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def __init__(
ssl_min_version=None,
ssl_ciphers=None,
max_connections=None,
min_connections=0,
single_connection_client=False,
health_check_interval=0,
client_name=None,
Expand Down Expand Up @@ -231,6 +232,11 @@ def __init__(

Args:

max_connections:
The maximum number of connections for the pool.
min_connections:
The minimum number of connections to pre-create and connect
when the pool is initialized. Defaults to 0.
single_connection_client:
if `True`, connection pool is not used. In that case `Valkey`
instance use is not thread safe.
Comment on lines 234 to 242
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new docstring "Args:" block is not formatted consistently (there’s an empty Args section header and the parameter names aren’t indented as part of the Args list). This can render poorly in generated docs. Consider formatting the args consistently with the rest of this module’s docstring style (e.g., properly indented Google-style Args or Sphinx ":param" fields).

Suggested change
max_connections:
The maximum number of connections for the pool.
min_connections:
The minimum number of connections to pre-create and connect
when the pool is initialized. Defaults to 0.
single_connection_client:
if `True`, connection pool is not used. In that case `Valkey`
instance use is not thread safe.
max_connections: The maximum number of connections for the pool.
min_connections: The minimum number of connections to pre-create and
connect when the pool is initialized. Defaults to 0.
single_connection_client: if `True`, connection pool is not used. In
that case `Valkey` instance use is not thread safe.

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -265,6 +271,7 @@ def __init__(
"retry_on_error": retry_on_error,
"retry": copy.deepcopy(retry),
"max_connections": max_connections,
"min_connections": min_connections,
"health_check_interval": health_check_interval,
"client_name": client_name,
"lib_name": lib_name,
Expand Down
4 changes: 3 additions & 1 deletion valkey/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def parse_cluster_myshardid(resp, **options):
"lib_name",
"lib_version",
"max_connections",
"min_connections",
"nodes_flag",
"valkey_connect_func",
"password",
Expand Down Expand Up @@ -562,7 +563,8 @@ def __init__(

:**kwargs:
Extra arguments that will be sent into Valkey instance when created
(See Official valkey-py doc for supported kwargs)
(See Official valkey-py doc for supported kwargs
e.g. ``max_connections``, ``min_connections``)
Some kwargs are not supported and will raise a
ValkeyClusterException:
- db (Valkey do not support database SELECT in cluster mode)
Expand Down
Loading