Skip to content
48 changes: 44 additions & 4 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@
ResponseError,
WatchError,
)
from redis.maint_notifications import MaintNotificationsConfig
from redis.observability.attributes import PubSubDirection
from redis.typing import ChannelT, EncodableT, KeyT, PubSubHandler, Subscription
from redis.utils import (
SENTINEL,
SSL_AVAILABLE,
_set_info_logger,
check_protocol_version,
deprecated_args,
deprecated_function,
safe_str,
Expand Down Expand Up @@ -287,6 +289,7 @@ def __init__(
protocol: int | None = None,
legacy_responses: bool = True,
event_dispatcher: EventDispatcher | None = None,
maint_notifications_config: MaintNotificationsConfig | None = None,
):
"""
Initialize a new Redis client.
Expand Down Expand Up @@ -322,6 +325,14 @@ def __init__(
options that are not available are skipped. Pass `None` or `{}` to
avoid setting additional TCP keepalive options. Argument is ignored
when connection_pool is provided.
maint_notifications_config:
configures the pool to support maintenance notifications - see
`redis.maint_notifications.MaintNotificationsConfig` for details.
Only supported with RESP3
If not provided and protocol is RESP3, the maintenance notifications
will be enabled by default (logic is included in the connection pool
initialization).
Argument is ignored when connection_pool is provided.
"""
kwargs: Dict[str, Any]
if event_dispatcher is None:
Expand Down Expand Up @@ -375,10 +386,21 @@ def __init__(
}
# based on input, setup appropriate connection args
if unix_socket_path is not None:
if (
maint_notifications_config
and maint_notifications_config.enabled is True
):
raise RedisError(
"Maintenance notifications are not supported with Unix "
"domain socket connections"
)
kwargs.update(
{
"path": unix_socket_path,
"connection_class": UnixDomainSocketConnection,
"maint_notifications_config": MaintNotificationsConfig(
enabled=False
),
}
)
else:
Expand Down Expand Up @@ -411,6 +433,19 @@ def __init__(
"ssl_password": ssl_password,
}
)
maint_notifications_enabled = (
maint_notifications_config and maint_notifications_config.enabled
)
if maint_notifications_enabled and not check_protocol_version(protocol, 3):
raise RedisError(
"Maintenance notifications handlers on connection are only supported with RESP version 3"
)
if maint_notifications_config:
kwargs.update(
{
"maint_notifications_config": maint_notifications_config,
}
)
# This arg only used if no pool is passed in
self.auto_close_connection_pool = auto_close_connection_pool
connection_pool = ConnectionPool(**kwargs)
Expand Down Expand Up @@ -864,10 +899,15 @@ def failure_callback(error, failure_count):
)
raise
finally:
if self.single_connection_client:
Comment thread
petyaslavova marked this conversation as resolved.
self._single_conn_lock.release()
if not self.connection:
await pool.release(conn)
try:
if self.single_connection_client and conn and conn.should_reconnect():
await self._close_connection(conn)
await conn.connect()
finally:
if self.single_connection_client:
self._single_conn_lock.release()
if not self.connection:
await pool.release(conn)

async def parse_response(
self, connection: Connection, command_name: Union[str, bytes], **options
Expand Down
Loading