Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions lib/gruvi/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from . import logging, compat
from .hub import switchpoint
from .util import delegate_method, docfrom
from .transports import TransportError
from .protocols import MessageProtocol, ProtocolError
from .stream import Stream
from .endpoints import Client, Server
Expand Down Expand Up @@ -1033,6 +1034,7 @@ def on_message_complete(parser):
if self._header_value:
self._save_header()
self._message.body.buffer.feed_eof()
self._message = None
self._maybe_pause_transport()
return 0

Expand Down Expand Up @@ -1077,11 +1079,17 @@ def connection_lost(self, exc):
if nbytes != 0:
msg = b2s(ffi.string(lib.http_errno_name(lib.http_errno(self._parser))))
self._log.debug('http_parser_execute(): {}', msg)
if exc is None:
exc = HttpError('parse error: {}'.format(msg))
if self._message:
self._message.body.buffer.feed_error(exc)
self._error = exc
self._error = HttpError('parse error: {}'.format(msg))
elif self._error is None:
self._error = exc if exc is not None else TransportError('connection lost')
# If a message header was read then the error is passed as an IO error
# to the corresponding read() method. If not, for client protocols we
# report the error via the queue, where e.g. getresponse() will raise
# an exception.
if self._message:
self._message.body.buffer.feed_error(self._error)
elif not self._server_side:
self._queue.put_nowait(self._error)

@property
def writer(self):
Expand Down