Skip to content

Commit 1d9b6e0

Browse files
authored
Improve performance/latency of Transport.write (#619)
1 parent 837ef22 commit 1d9b6e0

File tree

4 files changed

+49
-54
lines changed

4 files changed

+49
-54
lines changed

tests/test_context.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,10 @@ async def write_over():
512512
proto.transport.write(b'q' * 16384)
513513
count += 1
514514
else:
515-
proto.transport.write(b'q' * 16384)
516515
proto.transport.set_write_buffer_limits(high=256, low=128)
517-
count += 1
516+
while not proto.transport.get_write_buffer_size():
517+
proto.transport.write(b'q' * 16384)
518+
count += 1
518519
return count
519520

520521
s = self.loop.run_in_executor(None, accept)

tests/test_tcp.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,21 +1226,16 @@ def resume_writing(self):
12261226
t, p = await self.loop.create_connection(Protocol, *addr)
12271227

12281228
t.write(b'q' * 512)
1229-
self.assertEqual(t.get_write_buffer_size(), 512)
1230-
12311229
t.set_write_buffer_limits(low=16385)
1232-
self.assertFalse(paused)
12331230
self.assertEqual(t.get_write_buffer_limits(), (16385, 65540))
12341231

12351232
with self.assertRaisesRegex(ValueError, 'high.*must be >= low'):
12361233
t.set_write_buffer_limits(high=0, low=1)
12371234

12381235
t.set_write_buffer_limits(high=1024, low=128)
1239-
self.assertFalse(paused)
12401236
self.assertEqual(t.get_write_buffer_limits(), (128, 1024))
12411237

12421238
t.set_write_buffer_limits(high=256, low=128)
1243-
self.assertTrue(paused)
12441239
self.assertEqual(t.get_write_buffer_limits(), (128, 256))
12451240

12461241
t.close()

uvloop/handles/stream.pxd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ cdef class UVStream(UVBaseTransport):
3939

4040
# _exec_write() is the method that does the actual send, and _try_write()
4141
# is a fast-path used in _exec_write() to send a single chunk.
42-
cdef inline _exec_write(self)
43-
cdef inline _try_write(self, object data)
42+
cdef inline bint _exec_write(self) except -1
43+
cdef inline Py_ssize_t _try_write(self, object data) except -2
4444

4545
cdef _close(self)
4646

uvloop/handles/stream.pyx

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
cdef enum:
1+
cdef enum:
22
__PREALLOCED_BUFS = 4
33

44

@@ -341,13 +341,15 @@ cdef class UVStream(UVBaseTransport):
341341
else:
342342
self.__reading_stopped()
343343

344-
cdef inline _try_write(self, object data):
344+
cdef inline Py_ssize_t _try_write(self, object data) except -2:
345+
# Returns number of bytes written.
346+
# -1 - in case of fatal errors
345347
cdef:
346-
ssize_t written
348+
Py_ssize_t written
347349
bint used_buf = 0
348350
Py_buffer py_buf
349351
void* buf
350-
size_t blen
352+
Py_ssize_t blen
351353
int saved_errno
352354
int fd
353355

@@ -368,6 +370,8 @@ cdef class UVStream(UVBaseTransport):
368370
blen = py_buf.len
369371

370372
if blen == 0:
373+
if used_buf:
374+
PyBuffer_Release(&py_buf)
371375
# Empty data, do nothing.
372376
return 0
373377

@@ -392,24 +396,20 @@ cdef class UVStream(UVBaseTransport):
392396
PyBuffer_Release(&py_buf)
393397

394398
if written < 0:
395-
if saved_errno == errno.EAGAIN or \
396-
saved_errno == system.EWOULDBLOCK:
397-
return -1
399+
if saved_errno in (errno.EAGAIN, system.EWOULDBLOCK):
400+
return 0
398401
else:
399402
exc = convert_error(-saved_errno)
400403
self._fatal_error(exc, True)
401-
return
404+
return -1
402405

403406
if UVLOOP_DEBUG:
404407
self._loop._debug_stream_write_tries += 1
405408

406-
if <size_t>written == blen:
407-
return 0
408-
409409
return written
410410

411411
cdef inline _buffer_write(self, object data):
412-
cdef int dlen
412+
cdef Py_ssize_t dlen
413413

414414
if not PyBytes_CheckExact(data):
415415
data = memoryview(data).cast('b')
@@ -422,19 +422,19 @@ cdef class UVStream(UVBaseTransport):
422422
self._buffer.append(data)
423423

424424
cdef inline _initiate_write(self):
425+
cdef bint all_sent
426+
425427
if (not self._protocol_paused and
426-
(<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
427-
self._buffer_size > self._high_water):
428+
(<uv.uv_stream_t*>self._handle).write_queue_size == 0):
428429
# Fast-path. If:
429430
# - the protocol isn't yet paused,
430431
# - there is no data in libuv buffers for this stream,
431-
# - the protocol will be paused if we continue to buffer data
432432
#
433433
# Then:
434434
# - Try to write all buffered data right now.
435435
all_sent = self._exec_write()
436436
if UVLOOP_DEBUG:
437-
if self._buffer_size != 0 or self._buffer != []:
437+
if self._buffer_size != 0 or self._buffer:
438438
raise RuntimeError(
439439
'_buffer_size is not 0 after a successful _exec_write')
440440

@@ -450,20 +450,23 @@ cdef class UVStream(UVBaseTransport):
450450
self._maybe_pause_protocol()
451451
self._loop._queue_write(self)
452452

453-
cdef inline _exec_write(self):
453+
cdef inline bint _exec_write(self) except -1:
454+
# Returns True if all data from self._buffers has been sent,
455+
# False - otherwise
454456
cdef:
455457
int err
456-
int buf_len
458+
Py_ssize_t buf_len
459+
Py_ssize_t sent
457460
_StreamWriteContext ctx = None
458461

459462
if self._closed:
460463
# If the handle is closed, just return, it's too
461464
# late to do anything.
462-
return
465+
return False
463466

464467
buf_len = len(self._buffer)
465468
if not buf_len:
466-
return
469+
return True
467470

468471
if (<uv.uv_stream_t*>self._handle).write_queue_size == 0:
469472
# libuv internal write buffers for this stream are empty.
@@ -473,34 +476,16 @@ cdef class UVStream(UVBaseTransport):
473476
data = self._buffer[0]
474477
sent = self._try_write(data)
475478

476-
if sent is None:
477-
# A `self._fatal_error` was called.
478-
# It might not raise an exception under some
479-
# conditions.
480-
self._buffer_size = 0
481-
self._buffer.clear()
482-
if not self._closing:
483-
# This should never happen.
484-
raise RuntimeError(
485-
'stream is open after UVStream._try_write '
486-
'returned None')
487-
return
488-
489-
if sent == 0:
490-
# All data was successfully written.
479+
if sent == len(data):
480+
# The most likely and latency sensitive outcome goes first,
481+
# all data was successfully written.
491482
self._buffer_size = 0
492483
self._buffer.clear()
493484
# on_write will call "maybe_resume_protocol".
494485
self._on_write()
495486
return True
496487

497-
if sent > 0:
498-
if UVLOOP_DEBUG:
499-
if sent == len(data):
500-
raise RuntimeError(
501-
'_try_write sent all data and returned '
502-
'non-zero')
503-
488+
elif sent > 0:
504489
if PyBytes_CheckExact(data):
505490
# Cast bytes to memoryview to avoid copying
506491
# data that wasn't sent.
@@ -510,6 +495,19 @@ cdef class UVStream(UVBaseTransport):
510495
self._buffer_size -= sent
511496
self._buffer[0] = data
512497

498+
elif sent == -1:
499+
# A `self._fatal_error` was called.
500+
# It might not raise an exception under some
501+
# conditions.
502+
self._buffer_size = 0
503+
self._buffer.clear()
504+
if not self._closing:
505+
# This should never happen.
506+
raise RuntimeError(
507+
'stream is open after UVStream._try_write '
508+
'returned None')
509+
return False
510+
513511
# At this point it's either data was sent partially,
514512
# or an EAGAIN has happened.
515513

@@ -543,15 +541,15 @@ cdef class UVStream(UVBaseTransport):
543541
self._fatal_error(ex, True)
544542
self._buffer.clear()
545543
self._buffer_size = 0
546-
return
544+
return False
547545

548546
elif err != uv.UV_EAGAIN:
549547
ctx.close()
550548
exc = convert_error(err)
551549
self._fatal_error(exc, True)
552550
self._buffer.clear()
553551
self._buffer_size = 0
554-
return
552+
return False
555553

556554
# fall through
557555

@@ -575,9 +573,10 @@ cdef class UVStream(UVBaseTransport):
575573

576574
exc = convert_error(err)
577575
self._fatal_error(exc, True)
578-
return
576+
return False
579577

580578
self._maybe_resume_protocol()
579+
return False
581580

582581
cdef size_t _get_write_buffer_size(self):
583582
if self._handle is NULL:

0 commit comments

Comments
 (0)