-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Description
You can subscribe to a binary channel name like so:
await pubsub.subscribe(b'\xc3\x28')
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message is not None:
pass # do stuffNote that the channel name I picked is invalid UTF-8.
When the underlying connection pool is configured to retry on errors, it will reconnect on error and then eventually end up in PubSub.on_connect.
This method attempts to decode all channel names using the connection's decoder, so it can pass them as kwargs to subscribe. This is guaranteed to work for all channels with a listener (because those must have been created via kwargs in the first place). However if there is no listener attached to a channel, it may have been passed as a normal argument to subscribe in which case it may not be valid according to the encoding (and even if it is, encoding/decoding may break it).
The synchronous code has the same bug:
Lines 896 to 919 in 334cb3b
| def on_connect(self, connection) -> None: | |
| "Re-subscribe to any channels and patterns previously subscribed to" | |
| # NOTE: for python3, we can't pass bytestrings as keyword arguments | |
| # so we need to decode channel/pattern names back to unicode strings | |
| # before passing them to [p]subscribe. | |
| self.pending_unsubscribe_channels.clear() | |
| self.pending_unsubscribe_patterns.clear() | |
| self.pending_unsubscribe_shard_channels.clear() | |
| if self.channels: | |
| channels = { | |
| self.encoder.decode(k, force=True): v for k, v in self.channels.items() | |
| } | |
| self.subscribe(**channels) | |
| if self.patterns: | |
| patterns = { | |
| self.encoder.decode(k, force=True): v for k, v in self.patterns.items() | |
| } | |
| self.psubscribe(**patterns) | |
| if self.shard_channels: | |
| shard_channels = { | |
| self.encoder.decode(k, force=True): v | |
| for k, v in self.shard_channels.items() | |
| } | |
| self.ssubscribe(**shard_channels) |
This then leads to UnicodeDecodeError being thrown:
1768463839457 2026-01-15T07:57:19.457Z UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 8: invalid start byte
1768463839457 2026-01-15T07:57:19.457Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1768463839457 2026-01-15T07:57:19.457Z value = value.decode(self.encoding, self.encoding_errors)
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/_parsers/encoders.py", line 43, in decode
1768463839457 2026-01-15T07:57:19.457Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1768463839457 2026-01-15T07:57:19.457Z channels[self.encoder.decode(k, force=True)] = v
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 904, in on_connect
1768463839457 2026-01-15T07:57:19.457Z await task
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/asyncio/connection.py", line 343, in connect_check_health
1768463839457 2026-01-15T07:57:19.457Z await self.connect_check_health(check_health=True)
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/asyncio/connection.py", line 296, in connect
1768463839457 2026-01-15T07:57:19.457Z await conn.connect()
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 954, in _reconnect
1768463839457 2026-01-15T07:57:19.457Z await fail(error)
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 53, in call_with_retry
1768463839457 2026-01-15T07:57:19.457Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1768463839457 2026-01-15T07:57:19.457Z return await conn.retry.call_with_retry(
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 964, in _execute
1768463839457 2026-01-15T07:57:19.457Z ^^^^^^^^^^^^^^^^^^^^
1768463839457 2026-01-15T07:57:19.457Z response = await self._execute(
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 984, in parse_response
1768463839457 2026-01-15T07:57:19.457Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1768463839457 2026-01-15T07:57:19.457Z response = await self.parse_response(block=(timeout is None), timeout=timeout)
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/.cache/pypoetry/virtualenvs/my-app-venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 1111, in get_message
1768463839457 2026-01-15T07:57:19.457Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1768463839457 2026-01-15T07:57:19.457Z message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
1768463839457 2026-01-15T07:57:19.457Z File "/home/app/code/backend/my_event_bus/listener.py", line 53, in __aiter__
My workaround for now is to base64-encode my channels.