fix(connection): retry entire connect+handshake to fix PubSub reconnect#304
fix(connection): retry entire connect+handshake to fix PubSub reconnect#304makubo-aws wants to merge 1 commit intovalkey-io:mainfrom
Conversation
Previously, only the socket-level _connect() was wrapped in the retry block. The on_connect() handshake (auth, protocol negotiation, etc.) ran outside the retry scope, so a ConnectionError raised during reconnection (e.g. 'Connection reset by peer' in PubSub) would propagate directly to the caller instead of being retried. This is the root cause of the PubSub retry bug reported in issue valkey-io#169 and the idle-to-burst ConnectionError bursts on ElastiCache Serverless reported in issue valkey-io#225. Fix: extract _connect_with_handshake() that combines the socket connect and on_connect() into a single retryable unit. The retry block now covers the full connection establishment flow. On failure, disconnect() is called as the error handler to ensure a clean state before the next attempt. The same fix is applied to both the sync (valkey/connection.py) and async (valkey/asyncio/connection.py) AbstractConnection classes. This mirrors the fix applied to redis-py in PR #3863. Fixes valkey-io#169 Related: valkey-io#225
|
vibe coded based on existing fix for this in redis-py |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #304 +/- ##
===========================================
- Coverage 76.72% 62.23% -14.50%
===========================================
Files 129 129
Lines 34146 34150 +4
===========================================
- Hits 26199 21253 -4946
- Misses 7947 12897 +4950 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR expands connection retry coverage to include the full connect + handshake sequence (auth/protocol negotiation/client setup), addressing PubSub reconnect failures where handshake-time ConnectionErrors previously escaped the retry policy.
Changes:
- Wrap socket connect and
on_connect()handshake into a single retryable unit via a new_connect_with_handshake()helper (sync + async). - Update
connect()to retry_connect_with_handshake()and run connect callbacks after a successful connection. - Ensure cleanup between attempts by calling
disconnect()on failures.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
valkey/connection.py |
Wrap sync connect + handshake inside retry; add _connect_with_handshake() and move callbacks post-connect. |
valkey/asyncio/connection.py |
Mirror the same retry + handshake restructuring for asyncio connections. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| await self.retry.call_with_retry( | ||
| lambda: self._connect(), lambda error: self.disconnect() | ||
| lambda: self._connect_with_handshake(), | ||
| lambda error: self.disconnect(), | ||
| ) |
| self.retry.call_with_retry( | ||
| lambda: self._connect_with_handshake(), | ||
| lambda error: self.disconnect(error), | ||
| ) |
| # clean up after any error in on_connect so that the next | ||
| # retry attempt starts from a clean state | ||
| self.disconnect() |
| self.retry.call_with_retry( | ||
| lambda: self._connect_with_handshake(), | ||
| lambda error: self.disconnect(error), | ||
| ) |
| await self.retry.call_with_retry( | ||
| lambda: self._connect(), lambda error: self.disconnect() | ||
| lambda: self._connect_with_handshake(), | ||
| lambda error: self.disconnect(), | ||
| ) |
| except ValkeyError: | ||
| # clean up after any error in on_connect | ||
| # clean up after any error in on_connect so that the next | ||
| # retry attempt starts from a clean state | ||
| await self.disconnect() | ||
| raise |
|
Could you have a look at:
Thanks! |
Summary
Fixes #169
Related: #225
Problem
The retry logic in
AbstractConnection.connect()only wrapped the socket-level_connect()call. Theon_connect()handshake (authentication, protocol negotiation, CLIENT SETNAME, etc.) ran outside the retry scope.When a PubSub connection drops and the client tries to reconnect,
on_connect()can raise aConnectionError(e.g.Connection reset by peer— errno 104) if the server isn't fully ready yet. Because this error escapes the retry block, the configured retry policy is never applied and the PubSub thread crashes instead of recovering.This is also the root cause of the idle-to-burst
ConnectionErrorbursts seen on ElastiCache Serverless (issue #225).Fix
Extract a
_connect_with_handshake()helper that combines the socket connect andon_connect()handshake into a single retryable unit. The retry block inconnect()now covers the full connection establishment flow. On failure,disconnect()is called as the error handler to ensure a clean state before the next attempt.The same fix is applied to both:
valkey/connection.py(sync)valkey/asyncio/connection.py(async)Prior art
This mirrors the fix applied to redis-py in PR #3863, which resolved the equivalent issues in that library.
Changes
valkey/connection.py:connect()now calls_connect_with_handshake()inside the retry block; new_connect_with_handshake()method combines socket connect + handshakevalkey/asyncio/connection.py: same changes for the async path