Add in min_connections to available arguments#266
Add in min_connections to available arguments#266Masses wants to merge 1 commit intovalkey-io:mainfrom
Conversation
mkmkme
left a comment
There was a problem hiding this comment.
Hey! Thanks for your contribution! From the quick look it looks good. I have one comment at this point. I'll get back to it later for a proper review. I will also ask copilot to review the changes properly.
valkey/asyncio/client.py
Outdated
| return self.initialize().__await__() | ||
|
|
||
| async def initialize(self: _ValkeyT) -> _ValkeyT: | ||
| if hasattr(self.connection_pool, "initialize"): |
There was a problem hiding this comment.
As far as I understand, you're adding initialize to connection_pool with the same commit, so this if statement should always be true? Is there actually a need for this condition then?
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #266 +/- ##
==========================================
+ Coverage 76.40% 76.45% +0.04%
==========================================
Files 129 129
Lines 34040 34141 +101
==========================================
+ Hits 26009 26102 +93
- Misses 8031 8039 +8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds support for a min_connections parameter across Valkey connection pools/clients to allow pre-warming a minimum number of connections (reducing first-request latency spikes), and wires the option through cluster/client entry points with accompanying tests.
Changes:
- Add
min_connectionsto sync + asyncioConnectionPoolconstruction and propagate throughValkey/ValkeyClusterkwargs. - Implement eager (sync) / optionally eager (async) connection pre-initialization behaviors.
- Add/extend tests covering
min_connectionsbehavior for pools and asyncio cluster.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
valkey/connection.py |
Adds min_connections to sync pools and pre-creates connections during reset (including BlockingConnectionPool). |
valkey/client.py |
Exposes min_connections on the sync client and forwards it into the pool kwargs. |
valkey/cluster.py |
Allows min_connections through cluster kwargs filtering and updates doc text. |
valkey/asyncio/connection.py |
Adds min_connections handling and an async initialize() hook for eager connects. |
valkey/asyncio/client.py |
Forwards min_connections and triggers pool initialization during client initialization. |
valkey/asyncio/cluster.py |
Plumbs min_connections through cluster/node creation and eagerly connects during cluster setup. |
tests/test_connection_pool.py |
Adds sync tests for min_connections on ConnectionPool + BlockingConnectionPool. |
tests/test_asyncio/test_connection_pool.py |
Adds asyncio tests for min_connections and initialize(). |
tests/test_asyncio/test_cluster.py |
Adds asyncio cluster coverage for min_connections propagation and pre-creation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| VALKEY_ALLOWED_KEYS = ( | ||
| "charset", | ||
| "connection_class", | ||
| "connection_pool", | ||
| "connection_pool_class", | ||
| "client_name", | ||
| "credential_provider", | ||
| "db", | ||
| "decode_responses", | ||
| "encoding", | ||
| "encoding_errors", | ||
| "errors", | ||
| "host", | ||
| "lib_name", | ||
| "lib_version", | ||
| "max_connections", | ||
| "min_connections", | ||
| "nodes_flag", |
There was a problem hiding this comment.
VALKEY_ALLOWED_KEYS now permits min_connections for cluster clients, but there are no sync cluster tests exercising this path (e.g., passing min_connections to ValkeyCluster and asserting node pools are pre-warmed). Since tests/test_cluster.py exists for the sync cluster client, adding a small regression test would help prevent future breakage of this new argument.
| # Pre-create and connect min_connections connections | ||
| for _ in range(self.min_connections): | ||
| connection = self.make_connection() | ||
| connection.connect() | ||
| self._available_connections.append(connection) |
There was a problem hiding this comment.
ConnectionPool.reset() now performs network I/O by calling connection.connect() while reset() can be executed under _fork_lock (via _checkpid after a fork). This can make other threads time out on _fork_lock.acquire(timeout=5) and raise ChildDeadlockedError even though there is no deadlock (just a slow/blocked connect). Consider avoiding any socket connect inside reset() when it may be called under _fork_lock; e.g., only pre-create connection objects here and perform eager connects outside the fork lock (or provide a separate warmup method called after initialization).
| # Pre-create and connect min_connections connections | ||
| for _ in range(self.min_connections): | ||
| connection = self.make_connection() | ||
| connection.connect() |
There was a problem hiding this comment.
BlockingConnectionPool.reset() now calls connection.connect() while reset() may run under ConnectionPool._fork_lock (via _checkpid). Because _checkpid uses a 5-second timeout when acquiring _fork_lock, a slow connect can cause other threads to raise ChildDeadlockedError spuriously. To keep fork-safety guarantees, avoid performing connect operations inside reset() when it can be called under _fork_lock; defer connecting until after releasing the fork lock (or connect lazily).
| # Pre-create and connect min_connections connections | |
| for _ in range(self.min_connections): | |
| connection = self.make_connection() | |
| connection.connect() | |
| # Pre-create min_connections connections (they will connect lazily) | |
| for _ in range(self.min_connections): | |
| connection = self.make_connection() | |
| self._connections.append(connection) |
| if self._initialized: | ||
| return |
There was a problem hiding this comment.
ConnectionPool.initialize() returns early when _initialized is True, but disconnect() does not reset _initialized. If a caller disconnects the pool and then calls initialize() expecting to eagerly reconnect the pre-created connections, initialize() will be a no-op and the connections remain disconnected until first use. Consider resetting _initialized in disconnect(), or making initialize() attempt to connect regardless and only skip work if all connections are already connected.
| if self._initialized: | |
| return |
valkey/asyncio/client.py
Outdated
| if hasattr(self.connection_pool, "initialize"): | ||
| await self.connection_pool.initialize() |
There was a problem hiding this comment.
Awaiting connection_pool.initialize() based solely on hasattr() can break custom connection pool implementations that already define an initialize attribute/method that is not async (awaiting a non-coroutine will raise TypeError). To avoid an accidental breaking change for custom pools, consider checking that initialize is awaitable (e.g., inspect.iscoroutinefunction / inspect.isawaitable on the result) before awaiting, or use a more specific hook name.
| if hasattr(self.connection_pool, "initialize"): | |
| await self.connection_pool.initialize() | |
| initialize = getattr(self.connection_pool, "initialize", None) | |
| if initialize is not None: | |
| if inspect.iscoroutinefunction(initialize): | |
| await initialize() | |
| else: | |
| result = initialize() | |
| if inspect.isawaitable(result): | |
| await result |
| if min_connections > max_connections: | ||
| raise ValkeyClusterException( | ||
| '"min_connections" must be less than or equal to "max_connections"' | ||
| ) |
There was a problem hiding this comment.
ClusterNode validates min_connections only against max_connections, but does not reject negative or non-int values. This can silently accept invalid input (e.g., min_connections=-1 results in no pre-created connections) and is inconsistent with ConnectionPool validation. Consider adding the same type/range checks here and raising ValkeyClusterException for invalid values.
|
|
||
| max_connections: | ||
| The maximum number of connections for the pool. | ||
| min_connections: | ||
| The minimum number of connections to pre-create and connect | ||
| when the pool is initialized. Defaults to 0. | ||
| single_connection_client: | ||
| if `True`, connection pool is not used. In that case `Valkey` | ||
| instance use is not thread safe. |
There was a problem hiding this comment.
The new docstring "Args:" block is not formatted consistently (there’s an empty Args section header and the parameter names aren’t indented as part of the Args list). This can render poorly in generated docs. Consider formatting the args consistently with the rest of this module’s docstring style (e.g., properly indented Google-style Args or Sphinx ":param" fields).
| max_connections: | |
| The maximum number of connections for the pool. | |
| min_connections: | |
| The minimum number of connections to pre-create and connect | |
| when the pool is initialized. Defaults to 0. | |
| single_connection_client: | |
| if `True`, connection pool is not used. In that case `Valkey` | |
| instance use is not thread safe. | |
| max_connections: The maximum number of connections for the pool. | |
| min_connections: The minimum number of connections to pre-create and | |
| connect when the pool is initialized. Defaults to 0. | |
| single_connection_client: if `True`, connection pool is not used. In | |
| that case `Valkey` instance use is not thread safe. |
Signed-off-by: Brian McCaffrey <brian@musubilabs.ai>
Pull Request check-list
Description of change
This change allows for setting a minimum number of connections that the client should initialize and keep-alive, similar to functionality other language libraries provide:https://github.com/valkey-io/valkey-java. This mostly helps for systems that are exposed to sudden changes in load, such as when behind a load balancer without a gradual warm up.
I have tested this in our dev environment by running load tests with and without this change. Without the change, p99 latency of valkey commands (for this load test, a FT.SEARCH) jumps to over 200 ms (as measured by the application) until load subsides. With the change, p99 latency is higher than a gradual warmup but remains under 20ms (as measured by the application). These load tests were measured on the same hardware running the application, and a single shard with 2 replicas (all replica CPU utilization was below 10% for entire load test)
First time contributing, so let me know if I did anything wrong! Also have a stronger background on the JVM, so if I'm missing python idioms let me know.