Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions uvloop/handles/stream.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ cdef class UVStream(UVBaseTransport):
Py_buffer _read_pybuf
bint _read_pybuf_acquired

cpdef write(self, object buf)

# All "inline" methods are final

cdef inline _init(self, Loop loop, object protocol, Server server,
Expand Down
41 changes: 33 additions & 8 deletions uvloop/handles/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ cdef class UVStream(UVBaseTransport):

UVBaseTransport._set_protocol(self, protocol)

if (hasattr(protocol, 'get_buffer') and
if isinstance(protocol, SSLProtocol):
self.__buffered = 1
elif (hasattr(protocol, 'get_buffer') and
not isinstance(protocol, aio_Protocol)):
try:
self._protocol_get_buffer = protocol.get_buffer
Expand Down Expand Up @@ -671,7 +673,7 @@ cdef class UVStream(UVBaseTransport):
self.__reading,
id(self))

def write(self, object buf):
cpdef write(self, object buf):
self._ensure_alive()

if self._eof:
Expand Down Expand Up @@ -921,9 +923,24 @@ cdef void __uv_stream_buffered_alloc(
"UVStream alloc buffer callback") == 0:
return

cdef UVStream sc = <UVStream>stream.data

# Fast pass for our own SSLProtocol
# avoid python calls, memoryviews, context enter/exit, etc
if isinstance(sc._protocol, SSLProtocol):
try:
(<SSLProtocol>sc._protocol).get_buffer_impl(
suggested_size, &uvbuf.base, &uvbuf.len)
return
except BaseException as exc:
# Can't call 'sc._fatal_error' or 'sc._close', libuv will SF.
# We'll do it later in __uv_stream_buffered_on_read when we
# receive UV_ENOBUFS.
uvbuf.len = 0
uvbuf.base = NULL
return

cdef:
UVStream sc = <UVStream>stream.data
Loop loop = sc._loop
Py_buffer* pybuf = &sc._read_pybuf
int got_buf = 0

Expand Down Expand Up @@ -984,7 +1001,7 @@ cdef void __uv_stream_buffered_on_read(
return

try:
if nread > 0 and not sc._read_pybuf_acquired:
if nread > 0 and not isinstance(sc._protocol, SSLProtocol) and not sc._read_pybuf_acquired:
# From libuv docs:
# nread is > 0 if there is data available or < 0 on error. When
# we’ve reached EOF, nread will be set to UV_EOF. When
Expand All @@ -1005,12 +1022,20 @@ cdef void __uv_stream_buffered_on_read(
if UVLOOP_DEBUG:
loop._debug_stream_read_cb_total += 1

run_in_context1(sc.context, sc._protocol_buffer_updated, nread)
if isinstance(sc._protocol, SSLProtocol):
Context_Enter(sc.context)
try:
(<SSLProtocol>sc._protocol).buffer_updated_impl(nread)
finally:
Context_Exit(sc.context)
else:
run_in_context1(sc.context, sc._protocol_buffer_updated, nread)
except BaseException as exc:
if UVLOOP_DEBUG:
loop._debug_stream_read_cb_errors_total += 1

sc._fatal_error(exc, False)
finally:
sc._read_pybuf_acquired = 0
PyBuffer_Release(pybuf)
if sc._read_pybuf_acquired:
sc._read_pybuf_acquired = 0
PyBuffer_Release(pybuf)
69 changes: 37 additions & 32 deletions uvloop/sslproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ cdef class SSLProtocol:
object _outgoing_read
char* _ssl_buffer
size_t _ssl_buffer_len
object _ssl_buffer_view
SSLProtocolState _state
size_t _conn_lost
AppProtocolState _app_state
Expand All @@ -84,55 +83,61 @@ cdef class SSLProtocol:
object _handshake_timeout_handle
object _shutdown_timeout_handle

cdef _set_app_protocol(self, app_protocol)
cdef _wakeup_waiter(self, exc=*)
cdef _get_extra_info(self, name, default=*)
cdef _set_state(self, SSLProtocolState new_state)
# Instead of doing python calls, c methods *_impl are called directly
# from stream.pyx

cdef inline get_buffer_impl(self, size_t n, char** buf, size_t* buf_size)
cdef inline buffer_updated_impl(self, size_t nbytes)

cdef inline _set_app_protocol(self, app_protocol)
cdef inline _wakeup_waiter(self, exc=*)
cdef inline _get_extra_info(self, name, default=*)
cdef inline _set_state(self, SSLProtocolState new_state)

# Handshake flow

cdef _start_handshake(self)
cdef _check_handshake_timeout(self)
cdef _do_handshake(self)
cdef _on_handshake_complete(self, handshake_exc)
cdef inline _start_handshake(self)
cdef inline _check_handshake_timeout(self)
cdef inline _do_handshake(self)
cdef inline _on_handshake_complete(self, handshake_exc)

# Shutdown flow

cdef _start_shutdown(self, object context=*)
cdef _check_shutdown_timeout(self)
cdef _do_read_into_void(self, object context)
cdef _do_flush(self, object context=*)
cdef _do_shutdown(self, object context=*)
cdef _on_shutdown_complete(self, shutdown_exc)
cdef _abort(self, exc)
cdef inline _start_shutdown(self, object context=*)
cdef inline _check_shutdown_timeout(self)
cdef inline _do_read_into_void(self, object context)
cdef inline _do_flush(self, object context=*)
cdef inline _do_shutdown(self, object context=*)
cdef inline _on_shutdown_complete(self, shutdown_exc)
cdef inline _abort(self, exc)

# Outgoing flow

cdef _write_appdata(self, list_of_data, object context)
cdef _do_write(self)
cdef _process_outgoing(self)
cdef inline _write_appdata(self, list_of_data, object context)
cdef inline _do_write(self)
cdef inline _process_outgoing(self)

# Incoming flow

cdef _do_read(self)
cdef _do_read__buffered(self)
cdef _do_read__copied(self)
cdef _call_eof_received(self, object context=*)
cdef inline _do_read(self)
cdef inline _do_read__buffered(self)
cdef inline _do_read__copied(self)
cdef inline _call_eof_received(self, object context=*)

# Flow control for writes from APP socket

cdef _control_app_writing(self, object context=*)
cdef size_t _get_write_buffer_size(self)
cdef _set_write_buffer_limits(self, high=*, low=*)
cdef inline _control_app_writing(self, object context=*)
cdef inline size_t _get_write_buffer_size(self)
cdef inline _set_write_buffer_limits(self, high=*, low=*)

# Flow control for reads to APP socket

cdef _pause_reading(self)
cdef _resume_reading(self, object context)
cdef inline _pause_reading(self)
cdef inline _resume_reading(self, object context)

# Flow control for reads from SSL socket

cdef _control_ssl_reading(self)
cdef _set_read_buffer_limits(self, high=*, low=*)
cdef size_t _get_read_buffer_size(self)
cdef _fatal_error(self, exc, message=*)
cdef inline _control_ssl_reading(self)
cdef inline _set_read_buffer_limits(self, high=*, low=*)
cdef inline size_t _get_read_buffer_size(self)
cdef inline _fatal_error(self, exc, message=*)
30 changes: 21 additions & 9 deletions uvloop/sslproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,8 @@ cdef class SSLProtocol:
self._ssl_buffer = <char*>PyMem_RawMalloc(self._ssl_buffer_len)
if not self._ssl_buffer:
raise MemoryError()
self._ssl_buffer_view = PyMemoryView_FromMemory(
self._ssl_buffer, self._ssl_buffer_len, PyBUF_WRITE)

def __dealloc__(self):
self._ssl_buffer_view = None
PyMem_RawFree(self._ssl_buffer)
self._ssl_buffer = NULL
self._ssl_buffer_len = 0
Expand Down Expand Up @@ -358,7 +355,7 @@ cdef class SSLProtocol:
self._handshake_timeout_handle.cancel()
self._handshake_timeout_handle = None

def get_buffer(self, n):
cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size):
cdef size_t want = n
if want > SSL_READ_MAX_SIZE:
want = SSL_READ_MAX_SIZE
Expand All @@ -367,11 +364,11 @@ cdef class SSLProtocol:
if not self._ssl_buffer:
raise MemoryError()
self._ssl_buffer_len = want
self._ssl_buffer_view = PyMemoryView_FromMemory(
self._ssl_buffer, want, PyBUF_WRITE)
return self._ssl_buffer_view

def buffer_updated(self, nbytes):
buf[0] = self._ssl_buffer
buf_size[0] = self._ssl_buffer_len

cdef buffer_updated_impl(self, size_t nbytes):
self._incoming_write(PyMemoryView_FromMemory(
self._ssl_buffer, nbytes, PyBUF_WRITE))

Expand All @@ -387,6 +384,18 @@ cdef class SSLProtocol:
elif self._state == SHUTDOWN:
self._do_shutdown()

def get_buffer(self, size_t n):
# This pure python call is still used by some very peculiar test cases
cdef:
char* buf
size_t buf_size

self.get_buffer_impl(n, &buf, &buf_size)
return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE)

def buffer_updated(self, size_t nbytes):
self.buffer_updated_impl(nbytes)

def eof_received(self):
"""Called when the other end of the low-level stream
is half-closed.
Expand Down Expand Up @@ -696,7 +705,10 @@ cdef class SSLProtocol:
if not self._ssl_writing_paused:
data = self._outgoing_read()
if len(data):
self._transport.write(data)
if isinstance(self._transport, UVStream):
(<UVStream>self._transport).write(data)
else:
self._transport.write(data)

# Incoming flow

Expand Down