Skip to content

WSS receive_json/string/binary API error handling #8581

Open
@arcivanov

Description

@arcivanov

Describe the solution you'd like

The current API provides for a very bizarre/absent error handling mechanism for web socket receive operations.

Specifically these are the current implementations for receiving string, bytes and json:

async def receive_str(self, *, timeout: Optional[float] = None) -> str:
msg = await self.receive(timeout)
if msg.type is not WSMsgType.TEXT:
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
return cast(str, msg.data)
async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
msg = await self.receive(timeout)
if msg.type is not WSMsgType.BINARY:
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
return cast(bytes, msg.data)
async def receive_json(
self,
*,
loads: JSONDecoder = DEFAULT_JSON_DECODER,
timeout: Optional[float] = None,
) -> Any:
data = await self.receive_str(timeout=timeout)
return loads(data)
def __aiter__(self) -> "ClientWebSocketResponse":
return self

As you can see any message that is not of the type TEXT or BINARY will result in a TypeError raised. However, if we inspect the underlying receive, we'll see that whole bunch of error and state conditions will produce messages that are neither TEXT nor BINARY, including WS_CLOSED_MESSAGE, WSMsgType.CLOSED and WSMsgType.ERROR (this one is internal API implementation and should never be exposed to a user anyway) [emphasis added by ****]:

    async def receive(self, timeout: Optional[float] = None) -> WSMessage:
        while True:
            if self._waiting:
                raise RuntimeError("Concurrent call to receive() is not allowed")


            if self._closed:
****                return WS_CLOSED_MESSAGE
            elif self._closing:
                await self.close()
****                return WS_CLOSED_MESSAGE


            try:
                self._waiting = True
                try:
                    async with async_timeout.timeout(timeout or self._receive_timeout):
                        msg = await self._reader.read()
                    self._reset_heartbeat()
                finally:
                    self._waiting = False
                    if self._close_wait:
                        set_result(self._close_wait, None)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                self._close_code = WSCloseCode.ABNORMAL_CLOSURE
                raise
            except EofStream:
                self._close_code = WSCloseCode.OK
                await self.close()
****               return WSMessage(WSMsgType.CLOSED, None, None)
            except ClientError:
                self._closed = True
                self._close_code = WSCloseCode.ABNORMAL_CLOSURE
****                return WS_CLOSED_MESSAGE
            except WebSocketError as exc:
                self._close_code = exc.code
                await self.close(code=exc.code)
****               return WSMessage(WSMsgType.ERROR, exc, None)
            except Exception as exc:
                self._exception = exc
                self._closing = True
                self._close_code = WSCloseCode.ABNORMAL_CLOSURE
                await self.close()
****                return WSMessage(WSMsgType.ERROR, exc, None)


            if msg.type is WSMsgType.CLOSE:
                self._closing = True
                self._close_code = msg.data
                if not self._closed and self._autoclose:
                    await self.close()
            elif msg.type is WSMsgType.CLOSING:
                self._closing = True
            elif msg.type is WSMsgType.PING and self._autoping:
                await self.pong(msg.data)
                continue
            elif msg.type is WSMsgType.PONG and self._autoping:
                continue


            return msg

So, what happens when you receive_str on a closed web socket? - TypeError
Server-initiated socket closing? TypeError
TCP connection reset and disconnected? TypeError

The TypeError is a builtin, its only payload is a string and information about the state or exception is entirely lost except as encoded within the error string. This is clearly not a desired API behavior.

In my application internally I had to implement my own wrapper of the receive as follows to ensure that:

  1. If WSS is closed normally a None is returned to indicate EOF.
  2. If we receive an unsolicited server closure, a ServerDisconnectedError is raised.
  3. If an exception occurs during a receive, the exception is unwrapped from the WSMessage(WSMsgType.ERROR, exc, None) and re-raised.

The code is as follows and I believe the API should change along the similar lines:

    async def ws_receive(self, ws_conn: ClientWebSocketResponse):
        async def receive_str(timeout: float | None = None) -> str | None:
            msg = await ws_conn.receive(timeout)
            if msg.type == WSMsgType.CLOSE:
                raise ServerDisconnectedError()
            if msg.type in (WSMsgType.CLOSING, WSMsgType.CLOSED):
                return None
            if msg.type == WSMsgType.ERROR:
                raise msg.data
            if msg.type != WSMsgType.TEXT:
                raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
            return cast(str, msg.data)

        async def receive_json(
                loads: JSONDecoder = DEFAULT_JSON_DECODER,
                timeout: float | None = None, ) -> Any:
            data = await receive_str(timeout=timeout)
            if data is None:
                return None
            return loads(data)

        return await receive_json()

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions