diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 3d5a7490..22facfc5 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -17,6 +17,7 @@ GraphCommands Grokzen's INCR IOError +IPv Instrumentations JSONCommands Jaeger @@ -91,6 +92,7 @@ fo genindex gmail hatchling +hostname html http https diff --git a/docs/advanced_features.rst b/docs/advanced_features.rst index a707019e..38503cc7 100644 --- a/docs/advanced_features.rst +++ b/docs/advanced_features.rst @@ -23,6 +23,55 @@ a separate connection pool) for each database. It is not safe to pass PubSub or Pipeline objects between threads. +Replica redirect capability +--------------------------- + +Valkey 8.0 adds ``CLIENT CAPA redirect`` for standalone replica connections. +When enabled, commands sent to a replica return a redirect response that points +to the primary. valkey-py exposes this as the opt-in +``client_capa_redirect`` argument and raises ``RedirectError`` when the server +responds with ``-REDIRECT``. + +This is useful when an application wants explicit redirect information instead +of relying on the replica's default behavior. + +.. code:: python + + >>> import valkey + >>> primary = valkey.Valkey(host='localhost', port=6379, decode_responses=True) + >>> replica = valkey.Valkey( + ... host='localhost', + ... port=6380, + ... decode_responses=True, + ... client_capa_redirect=True, + ... ) + >>> primary.set('example:key', 'value') + True + >>> try: + ... replica.get('example:key') + ... except valkey.RedirectError as exc: + ... print(exc.host, exc.port) + ... print(exc.node_addr) + 127.0.0.1 6379 + ('127.0.0.1', 6379) + +On the same connection, ``READONLY`` restores the ability to run read commands +against the replica: + +.. code:: python + + >>> replica.readonly() + True + >>> replica.get('example:key') + 'value' + +Write commands still redirect after ``READONLY`` because the connection remains +attached to a replica. + +The redirect target can be a hostname, IPv4 address, or IPv6 address. Catching +``RedirectError`` and using its ``host``, ``port``, or ``node_addr`` attributes +is more robust than parsing the error message manually. + Pipelines --------- diff --git a/docs/connections.rst b/docs/connections.rst index 4405f8e2..ec2e7e1c 100644 --- a/docs/connections.rst +++ b/docs/connections.rst @@ -7,6 +7,26 @@ Generic Client This is the client used to connect directly to a standard Valkey node. +Standalone replica redirect capability +===================================== + +Valkey 8.0 adds ``CLIENT CAPA redirect`` for standalone primary/replica +deployments. In valkey-py, this is exposed as the opt-in +``client_capa_redirect`` connection argument. + +.. code-block:: python + + >>> import valkey + >>> replica = valkey.Valkey(host="localhost", port=6380, + ... client_capa_redirect=True) + >>> replica = valkey.from_url( + ... "valkey://localhost:6380?client_capa_redirect=true" + ... ) + +When enabled, commands sent to a standalone replica raise +``valkey.exceptions.RedirectError`` until ``READONLY`` is enabled for that +connection. For more details, see :doc:`advanced_features`. + .. autoclass:: valkey.Valkey :members: diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index ed42c509..404feba8 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -10,6 +10,7 @@ import sys from string import ascii_letters from typing import Any, Dict, List +from unittest import mock import pytest import pytest_asyncio @@ -481,6 +482,58 @@ async def test_client_list_after_client_setname(self, r: valkey.Valkey): # we don't know which client ours will be assert "valkey_py_test" in [c["name"] for c in clients] + async def test_client_capa(self, r: valkey.Valkey): + r.execute_command = mock.AsyncMock(return_value=True) + assert await r.client_capa("redirect") + r.execute_command.assert_awaited_once_with("CLIENT CAPA", "redirect") + + @pytest.mark.onlynoncluster + @pytest.mark.replica + @skip_if_server_version_lt("8.0.0") + async def test_client_capa_redirect_on_replica(self, r: valkey.Valkey): + key = "capa:redirect:async" + value = "v1" + await r.set(key, value) + + replica = valkey.asyncio.Valkey( + port=6380, + decode_responses=True, + client_capa_redirect=True, + ) + try: + info = await replica.info("replication") + assert info["role"] in ("slave", "replica") + expected_host = info["master_host"] + expected_port = int(info["master_port"]) + + with pytest.raises(valkey.RedirectError) as exc_get: + await replica.get(key) + assert exc_get.value.host == expected_host + assert exc_get.value.port == expected_port + assert exc_get.value.node_addr == (expected_host, expected_port) + + with pytest.raises(valkey.RedirectError) as exc_set: + await replica.set(key, "v2") + assert exc_set.value.host == expected_host + assert exc_set.value.port == expected_port + assert exc_set.value.node_addr == (expected_host, expected_port) + + assert await replica.readonly() is True + assert await replica.get(key) == value + finally: + await replica.aclose() + + @pytest.mark.onlynoncluster + @pytest.mark.replica + @skip_if_server_version_lt("8.0.0") + async def test_client_capa_redirect_opt_in_on_replica(self): + replica = valkey.asyncio.Valkey(port=6380, decode_responses=True) + try: + with pytest.raises(valkey.ReadOnlyError): + await replica.set("capa:redirect:async:default", "v") + finally: + await replica.aclose() + @skip_if_server_version_lt("2.9.50") @pytest.mark.onlynoncluster async def test_client_pause(self, r: valkey.Valkey): diff --git a/tests/test_asyncio/test_connect.py b/tests/test_asyncio/test_connect.py index dc92b2f1..cfad747e 100644 --- a/tests/test_asyncio/test_connect.py +++ b/tests/test_asyncio/test_connect.py @@ -21,7 +21,10 @@ _CMD_SEP = b"\r\n" _SUCCESS_RESP = b"+OK" + _CMD_SEP _ERROR_RESP = b"-ERR" + _CMD_SEP -_SUPPORTED_CMDS = {f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP} +_SUPPORTED_CMDS = { + f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP, + "CLIENT CAPA redirect": _SUCCESS_RESP, +} @pytest.fixture @@ -42,6 +45,18 @@ async def test_tcp_connect(tcp_address): await _assert_connect(conn, tcp_address) +async def test_tcp_connect_with_client_capa_redirect(tcp_address): + host, port = tcp_address + conn = Connection( + host=host, + port=port, + client_name=_CLIENT_NAME, + client_capa_redirect=True, + socket_timeout=10, + ) + await _assert_connect(conn, tcp_address) + + async def test_uds_connect(uds_address): path = str(uds_address) conn = UnixDomainSocketConnection( diff --git a/tests/test_asyncio/test_connection_pool.py b/tests/test_asyncio/test_connection_pool.py index 2b7813fe..5e4d8472 100644 --- a/tests/test_asyncio/test_connection_pool.py +++ b/tests/test_asyncio/test_connection_pool.py @@ -6,7 +6,7 @@ import valkey.asyncio as valkey from tests.conftest import skip_if_server_version_lt from valkey._parsers.url_parser import to_bool -from valkey.asyncio.connection import Connection +from valkey.asyncio.connection import Connection, DefaultParser from valkey.utils import SSL_AVAILABLE from .compat import aclosing, mock @@ -640,6 +640,13 @@ async def test_read_only_error(self, r): with pytest.raises(valkey.ReadOnlyError): await r.execute_command("DEBUG", "ERROR", "READONLY blah blah") + async def test_redirect_error(self, r): + """REDIRECT errors get turned into RedirectError exceptions""" + err = DefaultParser.parse_error("REDIRECT 2001:db8::10:6380") + assert isinstance(err, valkey.RedirectError) + assert err.host == "2001:db8::10" + assert err.port == 6380 + async def test_oom_error(self, r): """OOM errors get turned into OutOfMemoryError exceptions""" with pytest.raises(valkey.OutOfMemoryError): diff --git a/tests/test_commands.py b/tests/test_commands.py index c7b6ae54..2ee59c3b 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -765,6 +765,11 @@ def test_client_getredir(self, r): assert isinstance(r.client_getredir(), int) assert r.client_getredir() == -1 + def test_client_capa(self, r): + r.execute_command = mock.MagicMock(return_value=True) + assert r.client_capa("redirect") + r.execute_command.assert_called_once_with("CLIENT CAPA", "redirect") + @skip_if_server_version_lt("6.0.0") def test_hello_notI_implemented(self, r): with pytest.raises(NotImplementedError): @@ -5131,6 +5136,48 @@ def test_psync(self, r): res = r2.psync(r2.client_id(), 1) assert b"FULLRESYNC" in res + @pytest.mark.onlynoncluster + @pytest.mark.replica + @skip_if_server_version_lt("8.0.0") + def test_client_capa_redirect_on_replica(self, r): + key = "capa:redirect:sync" + value = "v1" + r.set(key, value) + + replica = valkey.Valkey( + port=6380, + decode_responses=True, + client_capa_redirect=True, + ) + + info = replica.info("replication") + assert info["role"] in ("slave", "replica") + expected_host = info["master_host"] + expected_port = int(info["master_port"]) + + with pytest.raises(valkey.RedirectError) as exc_get: + replica.get(key) + assert exc_get.value.host == expected_host + assert exc_get.value.port == expected_port + assert exc_get.value.node_addr == (expected_host, expected_port) + + with pytest.raises(valkey.RedirectError) as exc_set: + replica.set(key, "v2") + assert exc_set.value.host == expected_host + assert exc_set.value.port == expected_port + assert exc_set.value.node_addr == (expected_host, expected_port) + + assert replica.readonly() is True + assert replica.get(key) == value + + @pytest.mark.onlynoncluster + @pytest.mark.replica + @skip_if_server_version_lt("8.0.0") + def test_client_capa_redirect_opt_in_on_replica(self): + replica = valkey.Valkey(port=6380, decode_responses=True) + with pytest.raises(valkey.ReadOnlyError): + replica.set("capa:redirect:sync:default", "v") + @pytest.mark.onlynoncluster def test_interrupted_command(self, r: valkey.Valkey): """ diff --git a/tests/test_connect.py b/tests/test_connect.py index cc580008..c6b2b824 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -18,7 +18,10 @@ _CMD_SEP = b"\r\n" _SUCCESS_RESP = b"+OK" + _CMD_SEP _ERROR_RESP = b"-ERR" + _CMD_SEP -_SUPPORTED_CMDS = {f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP} +_SUPPORTED_CMDS = { + f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP, + "CLIENT CAPA redirect": _SUCCESS_RESP, +} @pytest.fixture @@ -39,6 +42,18 @@ def test_tcp_connect(tcp_address): _assert_connect(conn, tcp_address) +def test_tcp_connect_with_client_capa_redirect(tcp_address): + host, port = tcp_address + conn = Connection( + host=host, + port=port, + client_name=_CLIENT_NAME, + client_capa_redirect=True, + socket_timeout=10, + ) + _assert_connect(conn, tcp_address) + + def test_uds_connect(uds_address): path = str(uds_address) conn = UnixDomainSocketConnection(path, client_name=_CLIENT_NAME, socket_timeout=10) diff --git a/valkey/__init__.py b/valkey/__init__.py index e74de088..f2740cef 100644 --- a/valkey/__init__.py +++ b/valkey/__init__.py @@ -21,6 +21,7 @@ OutOfMemoryError, PubSubError, ReadOnlyError, + RedirectError, ResponseError, TimeoutError, ValkeyError, @@ -65,6 +66,7 @@ def int_or_str(value): "InvalidResponse", "OutOfMemoryError", "PubSubError", + "RedirectError", "ReadOnlyError", "Redis", "RedisCluster", diff --git a/valkey/_parsers/base.py b/valkey/_parsers/base.py index f3af7ecc..69250398 100644 --- a/valkey/_parsers/base.py +++ b/valkey/_parsers/base.py @@ -19,6 +19,7 @@ NoScriptError, OutOfMemoryError, ReadOnlyError, + RedirectError, ResponseError, ValkeyError, ) @@ -26,9 +27,9 @@ from .encoders import Encoder from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer -MODULE_LOAD_ERROR = "Error loading the extension. " "Please check the server logs." +MODULE_LOAD_ERROR = "Error loading the extension. Please check the server logs." NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name" -MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not " "possible." +MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not possible." MODULE_EXPORTS_DATA_TYPES_ERROR = ( "Error unloading module: the module " "exports one or more module-side data " @@ -70,6 +71,7 @@ class BaseParser(ABC): "LOADING": BusyLoadingError, "NOSCRIPT": NoScriptError, "READONLY": ReadOnlyError, + "REDIRECT": RedirectError, "NOAUTH": AuthenticationError, "NOPERM": NoPermissionError, } diff --git a/valkey/_parsers/url_parser.py b/valkey/_parsers/url_parser.py index 89fa491a..6061b275 100644 --- a/valkey/_parsers/url_parser.py +++ b/valkey/_parsers/url_parser.py @@ -32,6 +32,7 @@ def to_bool(value) -> Optional[bool]: "retry_on_timeout": to_bool, "max_connections": int, "health_check_interval": int, + "client_capa_redirect": to_bool, "ssl_check_hostname": to_bool, "timeout": float, } diff --git a/valkey/asyncio/__init__.py b/valkey/asyncio/__init__.py index 9d431542..b7118420 100644 --- a/valkey/asyncio/__init__.py +++ b/valkey/asyncio/__init__.py @@ -26,6 +26,7 @@ OutOfMemoryError, PubSubError, ReadOnlyError, + RedirectError, ResponseError, TimeoutError, ValkeyError, @@ -52,6 +53,7 @@ "InvalidResponse", "PubSubError", "OutOfMemoryError", + "RedirectError", "ReadOnlyError", "Valkey", "ValkeyCluster", diff --git a/valkey/asyncio/client.py b/valkey/asyncio/client.py index 32b23533..d30fde8e 100644 --- a/valkey/asyncio/client.py +++ b/valkey/asyncio/client.py @@ -237,6 +237,7 @@ def __init__( valkey_connect_func=None, credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, + client_capa_redirect: bool = False, cache_enabled: bool = False, client_cache: Optional[AbstractCache] = None, cache_max_size: int = 100, @@ -293,6 +294,7 @@ def __init__( "lib_version": lib_version, "valkey_connect_func": valkey_connect_func, "protocol": protocol, + "client_capa_redirect": client_capa_redirect, "cache_enabled": cache_enabled, "client_cache": client_cache, "cache_max_size": cache_max_size, diff --git a/valkey/asyncio/connection.py b/valkey/asyncio/connection.py index 5bc3e46c..18687c28 100644 --- a/valkey/asyncio/connection.py +++ b/valkey/asyncio/connection.py @@ -124,6 +124,7 @@ class AbstractConnection: "encoder", "ssl_context", "protocol", + "client_capa_redirect", "client_cache", "cache_deny_list", "cache_allow_list", @@ -161,6 +162,7 @@ def __init__( encoder_class: Type[Encoder] = Encoder, credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, + client_capa_redirect: bool = False, cache_enabled: bool = False, client_cache: Optional[AbstractCache] = None, cache_max_size: int = 10000, @@ -225,6 +227,7 @@ def __init__( if p < 2 or p > 3: raise ConnectionError("protocol must be either 2 or 3") self.protocol = protocol + self.client_capa_redirect = client_capa_redirect if cache_enabled: _cache = _LocalCache(cache_max_size, cache_ttl, cache_policy) else: @@ -429,6 +432,14 @@ async def on_connect(self) -> None: await self.send_command("CLIENT", "SETINFO", "LIB-NAME", self.lib_name) if self.lib_version: await self.send_command("CLIENT", "SETINFO", "LIB-VER", self.lib_version) + + if self.client_capa_redirect: + try: + await self.send_command("CLIENT", "CAPA", "redirect") + await self.read_response() + except ResponseError: + pass + # if a database is specified, switch to it. Also pipeline this if self.db: await self.send_command("SELECT", self.db) @@ -967,6 +978,7 @@ class ConnectKwargs(TypedDict, total=False): port: int db: int path: str + client_capa_redirect: bool _CP = TypeVar("_CP", bound="ConnectionPool") diff --git a/valkey/client.py b/valkey/client.py index df265ab4..08cf2abc 100755 --- a/valkey/client.py +++ b/valkey/client.py @@ -214,6 +214,7 @@ def __init__( valkey_connect_func=None, credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, + client_capa_redirect: bool = False, cache_enabled: bool = False, client_cache: Optional[AbstractCache] = None, cache_max_size: int = 10000, @@ -272,6 +273,7 @@ def __init__( "valkey_connect_func": valkey_connect_func, "credential_provider": credential_provider, "protocol": protocol, + "client_capa_redirect": client_capa_redirect, "cache_enabled": cache_enabled, "client_cache": client_cache, "cache_max_size": cache_max_size, @@ -1485,8 +1487,7 @@ def raise_first_error(self, commands, response): def annotate_exception(self, exception, number, command): cmd = " ".join(map(safe_str, command)) msg = ( - f"Command # {number} ({cmd}) of pipeline " - f"caused error: {exception.args[0]}" + f"Command # {number} ({cmd}) of pipeline caused error: {exception.args[0]}" ) exception.args = (msg,) + exception.args[1:] @@ -1535,7 +1536,6 @@ def _disconnect_raise_reset( conn.retry_on_error is None or isinstance(error, tuple(conn.retry_on_error)) is False ): - self.reset() raise error diff --git a/valkey/cluster.py b/valkey/cluster.py index fb508637..3acf721c 100644 --- a/valkey/cluster.py +++ b/valkey/cluster.py @@ -237,6 +237,7 @@ class AbstractValkeyCluster: "CLIENT SETINFO", "CLIENT SETNAME", "CLIENT GETNAME", + "CLIENT CAPA", "CONFIG SET", "CONFIG REWRITE", "CONFIG RESETSTAT", @@ -1616,7 +1617,7 @@ def initialize(self): if len(disagreements) > 5: raise ValkeyClusterException( f"startup_nodes could not agree on a valid " - f'slots cache: {", ".join(disagreements)}' + f"slots cache: {', '.join(disagreements)}" ) fully_covered = self.check_slots_coverage(tmp_slots) @@ -2036,8 +2037,7 @@ def annotate_exception(self, exception, number, command): """ cmd = " ".join(map(safe_str, command)) msg = ( - f"Command # {number} ({cmd}) of pipeline " - f"caused error: {exception.args[0]}" + f"Command # {number} ({cmd}) of pipeline caused error: {exception.args[0]}" ) exception.args = (msg,) + exception.args[1:] diff --git a/valkey/commands/core.py b/valkey/commands/core.py index aa56feb0..e10da248 100644 --- a/valkey/commands/core.py +++ b/valkey/commands/core.py @@ -550,6 +550,14 @@ def client_getredir(self, **kwargs) -> ResponseT: """ return self.execute_command("CLIENT GETREDIR", **kwargs) + def client_capa(self, *capabilities: str, **kwargs) -> ResponseT: + """ + Declare client capabilities for the current connection. + + See https://valkey.io/commands/client-capa + """ + return self.execute_command("CLIENT CAPA", *capabilities, **kwargs) + def client_reply( self, reply: Union[Literal["ON"], Literal["OFF"], Literal["SKIP"]], **kwargs ) -> ResponseT: diff --git a/valkey/connection.py b/valkey/connection.py index 699b3e51..dd868579 100644 --- a/valkey/connection.py +++ b/valkey/connection.py @@ -174,6 +174,7 @@ def __init__( valkey_connect_func: Optional[Callable[["AbstractConnection"], None]] = None, credential_provider: Optional[CredentialProvider] = None, protocol: Optional[Union[int, str]] = 2, + client_capa_redirect: bool = False, command_packer: Optional[Callable[[], None]] = None, cache_enabled: bool = False, client_cache: Optional[AbstractCache] = None, @@ -245,6 +246,7 @@ def __init__( # p = DEFAULT_RESP_VERSION self.protocol = p self._command_packer = self._construct_command_packer(command_packer) + self.client_capa_redirect = client_capa_redirect if cache_enabled: _cache = _LocalCache(cache_max_size, cache_ttl, cache_policy) else: @@ -441,6 +443,13 @@ def on_connect(self): except ResponseError: pass + if self.client_capa_redirect: + try: + self.send_command("CLIENT", "CAPA", "redirect") + self.read_response() + except ResponseError: + pass + # if a database is specified, switch to it if self.db: self.send_command("SELECT", self.db) @@ -569,9 +578,7 @@ def read_response( except OSError as e: if disconnect_on_error: self.disconnect() - raise ConnectionError( - f"Error while reading from {host_error}" f" : {e.args}" - ) + raise ConnectionError(f"Error while reading from {host_error} : {e.args}") except BaseException: # Also by default close in case of BaseException. A lot of code # relies on this behaviour when doing Command/Response pairs. diff --git a/valkey/exceptions.py b/valkey/exceptions.py index a43d9864..bd03de2a 100644 --- a/valkey/exceptions.py +++ b/valkey/exceptions.py @@ -69,6 +69,14 @@ class ReadOnlyError(ResponseError): pass +class RedirectError(ResponseError): + def __init__(self, resp): + self.args = (resp,) + self.message = resp + host, port = resp.rsplit(":", 1) + self.node_addr = self.host, self.port = host, int(port) + + class NoPermissionError(ResponseError): pass