diff --git a/tests/test_context.py b/tests/test_context.py index 03733756..2b2329f9 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -512,9 +512,10 @@ async def write_over(): proto.transport.write(b'q' * 16384) count += 1 else: - proto.transport.write(b'q' * 16384) proto.transport.set_write_buffer_limits(high=256, low=128) - count += 1 + while not proto.transport.get_write_buffer_size(): + proto.transport.write(b'q' * 16384) + count += 1 return count s = self.loop.run_in_executor(None, accept) diff --git a/tests/test_tcp.py b/tests/test_tcp.py index 8759383d..de43234a 100644 --- a/tests/test_tcp.py +++ b/tests/test_tcp.py @@ -1224,21 +1224,16 @@ def resume_writing(self): t, p = await self.loop.create_connection(Protocol, *addr) t.write(b'q' * 512) - self.assertEqual(t.get_write_buffer_size(), 512) - t.set_write_buffer_limits(low=16385) - self.assertFalse(paused) self.assertEqual(t.get_write_buffer_limits(), (16385, 65540)) with self.assertRaisesRegex(ValueError, 'high.*must be >= low'): t.set_write_buffer_limits(high=0, low=1) t.set_write_buffer_limits(high=1024, low=128) - self.assertFalse(paused) self.assertEqual(t.get_write_buffer_limits(), (128, 1024)) t.set_write_buffer_limits(high=256, low=128) - self.assertTrue(paused) self.assertEqual(t.get_write_buffer_limits(), (128, 256)) t.close() diff --git a/uvloop/handles/stream.pxd b/uvloop/handles/stream.pxd index 8ca87437..c06631b6 100644 --- a/uvloop/handles/stream.pxd +++ b/uvloop/handles/stream.pxd @@ -35,7 +35,7 @@ cdef class UVStream(UVBaseTransport): # and then call _initiate_write() to start writing either immediately or in # the next iteration (loop._queue_write()). cdef inline _buffer_write(self, object data) - cdef inline _initiate_write(self) + cdef inline _initiate_write(self, bint skip_fast_path) # _exec_write() is the method that does the actual send, and _try_write() # is a fast-path used in _exec_write() to send a single chunk. diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 9fbc5a51..2f88b9b1 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -424,11 +424,13 @@ cdef class UVStream(UVBaseTransport): self._buffer_size += dlen self._buffer.append(data) - cdef inline _initiate_write(self): - if (not self._protocol_paused and - (self._handle).write_queue_size == 0 and - self._buffer_size > self._high_water): + cdef inline _initiate_write(self, bint skip_fast_path): + if (not skip_fast_path and + not self._protocol_paused and + (self._handle).write_queue_size == 0 and + self._buffer_size > self._high_water): # Fast-path. If: + # - the caller hasn't tried fast path itself # - the protocol isn't yet paused, # - there is no data in libuv buffers for this stream, # - the protocol will be paused if we continue to buffer data @@ -684,8 +686,47 @@ cdef class UVStream(UVBaseTransport): if self._conn_lost: self._conn_lost += 1 return + + cdef ssize_t bytes_written + + if self._get_write_buffer_size() == 0: + bytes_written_ = self._try_write(buf) + + if bytes_written_ is None: + # A `self._fatal_error` was called. + # It might not raise an exception under some + # conditions. + if not self._closing: + raise RuntimeError('stream is open after ' + 'UVStream._try_write returned None') + + return + + bytes_written = bytes_written_ + + if bytes_written == 0: + # All data was successfully written. + # on_write will call "maybe_resume_protocol". + return + + if bytes_written > 0: + if UVLOOP_DEBUG: + if bytes_written == len(buf): + raise RuntimeError('_try_write sent all data and ' + 'returned non-zero') + + if PyBytes_CheckExact(buf): + # Cast bytes to memoryview to avoid copying + # data that wasn't sent. + buf = memoryview(buf) + buf = buf[bytes_written_:] + + # At this point it's either data was sent partially, + # or an EAGAIN has happened. + # buffer remaining data and send it later + self._buffer_write(buf) - self._initiate_write() + self._initiate_write(True) # skip fast path in _initiate_write def writelines(self, bufs): self._ensure_alive() @@ -697,7 +738,7 @@ cdef class UVStream(UVBaseTransport): return for buf in bufs: self._buffer_write(buf) - self._initiate_write() + self._initiate_write(False) # try fast path in _initiate_write def write_eof(self): self._ensure_alive()