-
Notifications
You must be signed in to change notification settings - Fork 600
Improve performance and reduce latency of Transport.write by attempting to send data immediately if all write buffers are empty #619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
f4d505c
20aa1bb
fde18ef
e360c3a
b79a840
cfa33ab
96dc94c
94cbc7f
4f66353
4e96818
f6f5094
ede3f38
b0ea7f6
3dd6c51
ae3f8d2
1e19cd5
786ee48
37af342
b56c375
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| cdef enum: | ||
| cdef enum: | ||
| __PREALLOCED_BUFS = 4 | ||
|
|
||
|
|
||
|
|
@@ -341,13 +341,15 @@ cdef class UVStream(UVBaseTransport): | |
| else: | ||
| self.__reading_stopped() | ||
|
|
||
| cdef inline _try_write(self, object data): | ||
| cdef inline Py_ssize_t _try_write(self, object data) except? -2: | ||
| # Returns number of bytes written. | ||
| # -1 - in case of fatal errors | ||
| cdef: | ||
| ssize_t written | ||
| Py_ssize_t written | ||
| bint used_buf = 0 | ||
| Py_buffer py_buf | ||
| void* buf | ||
| size_t blen | ||
| Py_ssize_t blen | ||
| int saved_errno | ||
| int fd | ||
|
|
||
|
|
@@ -368,6 +370,8 @@ cdef class UVStream(UVBaseTransport): | |
| blen = py_buf.len | ||
|
|
||
| if blen == 0: | ||
| if used_buf: | ||
| PyBuffer_Release(&py_buf) | ||
|
Comment on lines
+373
to
+374
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added PyBuffer_Release for consistency, but I think that the outer check if blen == 0: is not really necessary. We already filter out empty data objects twice. First in write(), second in _buffer_write(). I can do it, but I'd rather put it into a separate PR
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah let's merge this first. |
||
| # Empty data, do nothing. | ||
| return 0 | ||
|
|
||
|
|
@@ -392,24 +396,20 @@ cdef class UVStream(UVBaseTransport): | |
| PyBuffer_Release(&py_buf) | ||
|
|
||
| if written < 0: | ||
| if saved_errno == errno.EAGAIN or \ | ||
| saved_errno == system.EWOULDBLOCK: | ||
| return -1 | ||
| if saved_errno in (errno.EAGAIN, system.EWOULDBLOCK): | ||
| return 0 | ||
| else: | ||
| exc = convert_error(-saved_errno) | ||
| self._fatal_error(exc, True) | ||
| return | ||
| return -1 | ||
|
|
||
| if UVLOOP_DEBUG: | ||
| self._loop._debug_stream_write_tries += 1 | ||
|
|
||
| if <size_t>written == blen: | ||
| return 0 | ||
|
|
||
| return written | ||
|
|
||
| cdef inline _buffer_write(self, object data): | ||
| cdef int dlen | ||
| cdef Py_ssize_t dlen | ||
|
|
||
| if not PyBytes_CheckExact(data): | ||
| data = memoryview(data).cast('b') | ||
|
|
@@ -422,19 +422,19 @@ cdef class UVStream(UVBaseTransport): | |
| self._buffer.append(data) | ||
|
|
||
| cdef inline _initiate_write(self): | ||
| cdef bint all_sent | ||
|
|
||
| if (not self._protocol_paused and | ||
| (<uv.uv_stream_t*>self._handle).write_queue_size == 0 and | ||
| self._buffer_size > self._high_water): | ||
| (<uv.uv_stream_t*>self._handle).write_queue_size == 0): | ||
| # Fast-path. If: | ||
| # - the protocol isn't yet paused, | ||
| # - there is no data in libuv buffers for this stream, | ||
| # - the protocol will be paused if we continue to buffer data | ||
| # | ||
| # Then: | ||
| # - Try to write all buffered data right now. | ||
| all_sent = self._exec_write() | ||
| if UVLOOP_DEBUG: | ||
| if self._buffer_size != 0 or self._buffer != []: | ||
| if self._buffer_size != 0 or self._buffer: | ||
| raise RuntimeError( | ||
| '_buffer_size is not 0 after a successful _exec_write') | ||
|
|
||
|
|
@@ -450,20 +450,23 @@ cdef class UVStream(UVBaseTransport): | |
| self._maybe_pause_protocol() | ||
| self._loop._queue_write(self) | ||
|
|
||
| cdef inline _exec_write(self): | ||
| cdef inline bint _exec_write(self) except? -1: | ||
tarasko marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # Returns True if all data from self._buffers has been sent, | ||
| # False - otherwise | ||
| cdef: | ||
| int err | ||
| int buf_len | ||
| Py_ssize_t buf_len | ||
| Py_ssize_t sent | ||
| _StreamWriteContext ctx = None | ||
|
|
||
| if self._closed: | ||
| # If the handle is closed, just return, it's too | ||
| # late to do anything. | ||
| return | ||
| return False | ||
|
|
||
| buf_len = len(self._buffer) | ||
| if not buf_len: | ||
| return | ||
| return True | ||
|
|
||
| if (<uv.uv_stream_t*>self._handle).write_queue_size == 0: | ||
| # libuv internal write buffers for this stream are empty. | ||
|
|
@@ -473,34 +476,16 @@ cdef class UVStream(UVBaseTransport): | |
| data = self._buffer[0] | ||
| sent = self._try_write(data) | ||
|
|
||
| if sent is None: | ||
| # A `self._fatal_error` was called. | ||
| # It might not raise an exception under some | ||
| # conditions. | ||
| self._buffer_size = 0 | ||
| self._buffer.clear() | ||
| if not self._closing: | ||
| # This should never happen. | ||
| raise RuntimeError( | ||
| 'stream is open after UVStream._try_write ' | ||
| 'returned None') | ||
| return | ||
|
|
||
| if sent == 0: | ||
| # All data was successfully written. | ||
| if sent == len(data): | ||
| # The most likely and latency sensitive outcome goes first, | ||
| # all data was successfully written. | ||
| self._buffer_size = 0 | ||
| self._buffer.clear() | ||
| # on_write will call "maybe_resume_protocol". | ||
| self._on_write() | ||
| return True | ||
|
|
||
| if sent > 0: | ||
| if UVLOOP_DEBUG: | ||
| if sent == len(data): | ||
| raise RuntimeError( | ||
| '_try_write sent all data and returned ' | ||
| 'non-zero') | ||
|
|
||
| elif sent > 0: | ||
| if PyBytes_CheckExact(data): | ||
| # Cast bytes to memoryview to avoid copying | ||
| # data that wasn't sent. | ||
|
|
@@ -510,6 +495,19 @@ cdef class UVStream(UVBaseTransport): | |
| self._buffer_size -= sent | ||
| self._buffer[0] = data | ||
|
|
||
| elif sent == -1: | ||
| # A `self._fatal_error` was called. | ||
| # It might not raise an exception under some | ||
| # conditions. | ||
| self._buffer_size = 0 | ||
| self._buffer.clear() | ||
| if not self._closing: | ||
| # This should never happen. | ||
| raise RuntimeError( | ||
| 'stream is open after UVStream._try_write ' | ||
| 'returned None') | ||
| return False | ||
|
|
||
| # At this point it's either data was sent partially, | ||
| # or an EAGAIN has happened. | ||
|
|
||
|
|
@@ -543,15 +541,15 @@ cdef class UVStream(UVBaseTransport): | |
| self._fatal_error(ex, True) | ||
| self._buffer.clear() | ||
| self._buffer_size = 0 | ||
| return | ||
| return False | ||
|
|
||
| elif err != uv.UV_EAGAIN: | ||
| ctx.close() | ||
| exc = convert_error(err) | ||
| self._fatal_error(exc, True) | ||
| self._buffer.clear() | ||
| self._buffer_size = 0 | ||
| return | ||
| return False | ||
|
|
||
| # fall through | ||
|
|
||
|
|
@@ -575,9 +573,10 @@ cdef class UVStream(UVBaseTransport): | |
|
|
||
| exc = convert_error(err) | ||
| self._fatal_error(exc, True) | ||
| return | ||
| return False | ||
|
|
||
| self._maybe_resume_protocol() | ||
| return False | ||
|
|
||
| cdef size_t _get_write_buffer_size(self): | ||
| if self._handle is NULL: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.