Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ GraphCommands
Grokzen's
INCR
IOError
IPv
Instrumentations
JSONCommands
Jaeger
Expand Down Expand Up @@ -91,6 +92,7 @@ fo
genindex
gmail
hatchling
hostname
html
http
https
Expand Down
49 changes: 49 additions & 0 deletions docs/advanced_features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,55 @@ a separate connection pool) for each database.

It is not safe to pass PubSub or Pipeline objects between threads.

Replica redirect capability
---------------------------

Valkey 8.0 adds ``CLIENT CAPA redirect`` for standalone replica connections.
When enabled, commands sent to a replica return a redirect response that points
to the primary. valkey-py exposes this as the opt-in
``client_capa_redirect`` argument and raises ``RedirectError`` when the server
responds with ``-REDIRECT``.

This is useful when an application wants explicit redirect information instead
of relying on the replica's default behavior.

.. code:: python

>>> import valkey
>>> primary = valkey.Valkey(host='localhost', port=6379, decode_responses=True)
>>> replica = valkey.Valkey(
... host='localhost',
... port=6380,
... decode_responses=True,
... client_capa_redirect=True,
... )
>>> primary.set('example:key', 'value')
True
>>> try:
... replica.get('example:key')
... except valkey.RedirectError as exc:
... print(exc.host, exc.port)
... print(exc.node_addr)
127.0.0.1 6379
('127.0.0.1', 6379)

On the same connection, ``READONLY`` restores the ability to run read commands
against the replica:

.. code:: python

>>> replica.readonly()
True
>>> replica.get('example:key')
'value'

Write commands still redirect after ``READONLY`` because the connection remains
attached to a replica.

The redirect target can be a hostname, IPv4 address, or IPv6 address. Catching
``RedirectError`` and using its ``host``, ``port``, or ``node_addr`` attributes
is more robust than parsing the error message manually.

Pipelines
---------

Expand Down
20 changes: 20 additions & 0 deletions docs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,26 @@ Generic Client

This is the client used to connect directly to a standard Valkey node.

Standalone replica redirect capability
=====================================

Valkey 8.0 adds ``CLIENT CAPA redirect`` for standalone primary/replica
deployments. In valkey-py, this is exposed as the opt-in
``client_capa_redirect`` connection argument.

.. code-block:: python

>>> import valkey
>>> replica = valkey.Valkey(host="localhost", port=6380,
... client_capa_redirect=True)
>>> replica = valkey.from_url(
... "valkey://localhost:6380?client_capa_redirect=true"
... )

When enabled, commands sent to a standalone replica raise
``valkey.exceptions.RedirectError`` until ``READONLY`` is enabled for that
connection. For more details, see :doc:`advanced_features`.

.. autoclass:: valkey.Valkey
:members:

Expand Down
53 changes: 53 additions & 0 deletions tests/test_asyncio/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys
from string import ascii_letters
from typing import Any, Dict, List
from unittest import mock

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -481,6 +482,58 @@ async def test_client_list_after_client_setname(self, r: valkey.Valkey):
# we don't know which client ours will be
assert "valkey_py_test" in [c["name"] for c in clients]

async def test_client_capa(self, r: valkey.Valkey):
r.execute_command = mock.AsyncMock(return_value=True)
assert await r.client_capa("redirect")
r.execute_command.assert_awaited_once_with("CLIENT CAPA", "redirect")

@pytest.mark.onlynoncluster
@pytest.mark.replica
@skip_if_server_version_lt("8.0.0")
async def test_client_capa_redirect_on_replica(self, r: valkey.Valkey):
key = "capa:redirect:async"
value = "v1"
await r.set(key, value)

replica = valkey.asyncio.Valkey(
port=6380,
decode_responses=True,
client_capa_redirect=True,
)
try:
info = await replica.info("replication")
assert info["role"] in ("slave", "replica")
expected_host = info["master_host"]
expected_port = int(info["master_port"])

with pytest.raises(valkey.RedirectError) as exc_get:
await replica.get(key)
assert exc_get.value.host == expected_host
assert exc_get.value.port == expected_port
assert exc_get.value.node_addr == (expected_host, expected_port)

with pytest.raises(valkey.RedirectError) as exc_set:
await replica.set(key, "v2")
assert exc_set.value.host == expected_host
assert exc_set.value.port == expected_port
assert exc_set.value.node_addr == (expected_host, expected_port)

assert await replica.readonly() is True
assert await replica.get(key) == value
finally:
await replica.aclose()

@pytest.mark.onlynoncluster
@pytest.mark.replica
@skip_if_server_version_lt("8.0.0")
async def test_client_capa_redirect_opt_in_on_replica(self):
replica = valkey.asyncio.Valkey(port=6380, decode_responses=True)
try:
with pytest.raises(valkey.ReadOnlyError):
await replica.set("capa:redirect:async:default", "v")
finally:
await replica.aclose()

@skip_if_server_version_lt("2.9.50")
@pytest.mark.onlynoncluster
async def test_client_pause(self, r: valkey.Valkey):
Expand Down
17 changes: 16 additions & 1 deletion tests/test_asyncio/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
_CMD_SEP = b"\r\n"
_SUCCESS_RESP = b"+OK" + _CMD_SEP
_ERROR_RESP = b"-ERR" + _CMD_SEP
_SUPPORTED_CMDS = {f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP}
_SUPPORTED_CMDS = {
f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP,
"CLIENT CAPA redirect": _SUCCESS_RESP,
}


@pytest.fixture
Expand All @@ -42,6 +45,18 @@ async def test_tcp_connect(tcp_address):
await _assert_connect(conn, tcp_address)


async def test_tcp_connect_with_client_capa_redirect(tcp_address):
host, port = tcp_address
conn = Connection(
host=host,
port=port,
client_name=_CLIENT_NAME,
client_capa_redirect=True,
socket_timeout=10,
)
await _assert_connect(conn, tcp_address)


async def test_uds_connect(uds_address):
path = str(uds_address)
conn = UnixDomainSocketConnection(
Expand Down
9 changes: 8 additions & 1 deletion tests/test_asyncio/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import valkey.asyncio as valkey
from tests.conftest import skip_if_server_version_lt
from valkey._parsers.url_parser import to_bool
from valkey.asyncio.connection import Connection
from valkey.asyncio.connection import Connection, DefaultParser
from valkey.utils import SSL_AVAILABLE

from .compat import aclosing, mock
Expand Down Expand Up @@ -640,6 +640,13 @@ async def test_read_only_error(self, r):
with pytest.raises(valkey.ReadOnlyError):
await r.execute_command("DEBUG", "ERROR", "READONLY blah blah")

async def test_redirect_error(self, r):
"""REDIRECT errors get turned into RedirectError exceptions"""
err = DefaultParser.parse_error("REDIRECT 2001:db8::10:6380")
assert isinstance(err, valkey.RedirectError)
assert err.host == "2001:db8::10"
assert err.port == 6380

async def test_oom_error(self, r):
"""OOM errors get turned into OutOfMemoryError exceptions"""
with pytest.raises(valkey.OutOfMemoryError):
Expand Down
47 changes: 47 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,11 @@ def test_client_getredir(self, r):
assert isinstance(r.client_getredir(), int)
assert r.client_getredir() == -1

def test_client_capa(self, r):
r.execute_command = mock.MagicMock(return_value=True)
assert r.client_capa("redirect")
r.execute_command.assert_called_once_with("CLIENT CAPA", "redirect")

@skip_if_server_version_lt("6.0.0")
def test_hello_notI_implemented(self, r):
with pytest.raises(NotImplementedError):
Expand Down Expand Up @@ -5131,6 +5136,48 @@ def test_psync(self, r):
res = r2.psync(r2.client_id(), 1)
assert b"FULLRESYNC" in res

@pytest.mark.onlynoncluster
@pytest.mark.replica
@skip_if_server_version_lt("8.0.0")
def test_client_capa_redirect_on_replica(self, r):
key = "capa:redirect:sync"
value = "v1"
r.set(key, value)

replica = valkey.Valkey(
port=6380,
decode_responses=True,
client_capa_redirect=True,
)

info = replica.info("replication")
assert info["role"] in ("slave", "replica")
expected_host = info["master_host"]
expected_port = int(info["master_port"])

with pytest.raises(valkey.RedirectError) as exc_get:
replica.get(key)
assert exc_get.value.host == expected_host
assert exc_get.value.port == expected_port
assert exc_get.value.node_addr == (expected_host, expected_port)

with pytest.raises(valkey.RedirectError) as exc_set:
replica.set(key, "v2")
assert exc_set.value.host == expected_host
assert exc_set.value.port == expected_port
assert exc_set.value.node_addr == (expected_host, expected_port)

assert replica.readonly() is True
assert replica.get(key) == value

@pytest.mark.onlynoncluster
@pytest.mark.replica
@skip_if_server_version_lt("8.0.0")
def test_client_capa_redirect_opt_in_on_replica(self):
replica = valkey.Valkey(port=6380, decode_responses=True)
with pytest.raises(valkey.ReadOnlyError):
replica.set("capa:redirect:sync:default", "v")

@pytest.mark.onlynoncluster
def test_interrupted_command(self, r: valkey.Valkey):
"""
Expand Down
17 changes: 16 additions & 1 deletion tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
_CMD_SEP = b"\r\n"
_SUCCESS_RESP = b"+OK" + _CMD_SEP
_ERROR_RESP = b"-ERR" + _CMD_SEP
_SUPPORTED_CMDS = {f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP}
_SUPPORTED_CMDS = {
f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP,
"CLIENT CAPA redirect": _SUCCESS_RESP,
}


@pytest.fixture
Expand All @@ -39,6 +42,18 @@ def test_tcp_connect(tcp_address):
_assert_connect(conn, tcp_address)


def test_tcp_connect_with_client_capa_redirect(tcp_address):
host, port = tcp_address
conn = Connection(
host=host,
port=port,
client_name=_CLIENT_NAME,
client_capa_redirect=True,
socket_timeout=10,
)
_assert_connect(conn, tcp_address)


def test_uds_connect(uds_address):
path = str(uds_address)
conn = UnixDomainSocketConnection(path, client_name=_CLIENT_NAME, socket_timeout=10)
Expand Down
2 changes: 2 additions & 0 deletions valkey/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
OutOfMemoryError,
PubSubError,
ReadOnlyError,
RedirectError,
ResponseError,
TimeoutError,
ValkeyError,
Expand Down Expand Up @@ -65,6 +66,7 @@ def int_or_str(value):
"InvalidResponse",
"OutOfMemoryError",
"PubSubError",
"RedirectError",
"ReadOnlyError",
"Redis",
"RedisCluster",
Expand Down
6 changes: 4 additions & 2 deletions valkey/_parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
NoScriptError,
OutOfMemoryError,
ReadOnlyError,
RedirectError,
ResponseError,
ValkeyError,
)
from ..typing import EncodableT
from .encoders import Encoder
from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer

MODULE_LOAD_ERROR = "Error loading the extension. " "Please check the server logs."
MODULE_LOAD_ERROR = "Error loading the extension. Please check the server logs."
NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name"
MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not " "possible."
MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not possible."
MODULE_EXPORTS_DATA_TYPES_ERROR = (
"Error unloading module: the module "
"exports one or more module-side data "
Expand Down Expand Up @@ -70,6 +71,7 @@ class BaseParser(ABC):
"LOADING": BusyLoadingError,
"NOSCRIPT": NoScriptError,
"READONLY": ReadOnlyError,
"REDIRECT": RedirectError,
"NOAUTH": AuthenticationError,
"NOPERM": NoPermissionError,
}
Expand Down
1 change: 1 addition & 0 deletions valkey/_parsers/url_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def to_bool(value) -> Optional[bool]:
"retry_on_timeout": to_bool,
"max_connections": int,
"health_check_interval": int,
"client_capa_redirect": to_bool,
"ssl_check_hostname": to_bool,
"timeout": float,
}
Expand Down
2 changes: 2 additions & 0 deletions valkey/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
OutOfMemoryError,
PubSubError,
ReadOnlyError,
RedirectError,
ResponseError,
TimeoutError,
ValkeyError,
Expand All @@ -52,6 +53,7 @@
"InvalidResponse",
"PubSubError",
"OutOfMemoryError",
"RedirectError",
"ReadOnlyError",
"Valkey",
"ValkeyCluster",
Expand Down
2 changes: 2 additions & 0 deletions valkey/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def __init__(
valkey_connect_func=None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
client_capa_redirect: bool = False,
cache_enabled: bool = False,
client_cache: Optional[AbstractCache] = None,
cache_max_size: int = 100,
Expand Down Expand Up @@ -293,6 +294,7 @@ def __init__(
"lib_version": lib_version,
"valkey_connect_func": valkey_connect_func,
"protocol": protocol,
"client_capa_redirect": client_capa_redirect,
"cache_enabled": cache_enabled,
"client_cache": client_cache,
"cache_max_size": cache_max_size,
Expand Down
Loading
Loading