Skip to content

Commit a0443fc

Browse files
author
taras
committed
Use list for write queue
1 parent 837ef22 commit a0443fc

File tree

2 files changed

+64
-34
lines changed

2 files changed

+64
-34
lines changed

uvloop/sslproto.pxd

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ cdef class SSLProtocol:
3838

3939
object _extra
4040

41-
object _write_backlog
42-
size_t _write_buffer_size
41+
list _write_backlog
42+
Py_ssize_t _write_buffer_size
4343

4444
object _waiter
4545
Loop _loop
@@ -108,7 +108,9 @@ cdef class SSLProtocol:
108108

109109
# Outgoing flow
110110

111-
cdef _write_appdata(self, list_of_data, object context)
111+
cdef inline bint _is_protocol_ready(self) except? -1
112+
cdef inline _check_and_enqueue_appdata(self, data)
113+
cdef inline _flush_write_backlog(self, object context)
112114
cdef _do_write(self)
113115
cdef _process_outgoing(self)
114116

@@ -122,7 +124,7 @@ cdef class SSLProtocol:
122124
# Flow control for writes from APP socket
123125

124126
cdef _control_app_writing(self, object context=*)
125-
cdef size_t _get_write_buffer_size(self)
127+
cdef Py_ssize_t _get_write_buffer_size(self)
126128
cdef _set_write_buffer_limits(self, high=*, low=*)
127129

128130
# Flow control for reads to APP socket
@@ -134,5 +136,5 @@ cdef class SSLProtocol:
134136

135137
cdef _control_ssl_reading(self)
136138
cdef _set_read_buffer_limits(self, high=*, low=*)
137-
cdef size_t _get_read_buffer_size(self)
139+
cdef Py_ssize_t _get_read_buffer_size(self)
138140
cdef _fatal_error(self, exc, message=*)

uvloop/sslproto.pyx

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -147,20 +147,34 @@ cdef class _SSLProtocolTransport:
147147
This does not block; it buffers the data and arranges for it
148148
to be sent out asynchronously.
149149
"""
150-
if not isinstance(data, (bytes, bytearray, memoryview)):
151-
raise TypeError(f"data: expecting a bytes-like instance, "
152-
f"got {type(data).__name__}")
153-
if not data:
150+
if not self._ssl_protocol._is_protocol_ready():
154151
return
155-
self._ssl_protocol._write_appdata((data,), self.context.copy())
152+
153+
self._ssl_protocol._check_and_enqueue_appdata(data)
154+
self._ssl_protocol._flush_write_backlog(self.context.copy())
156155

157156
def writelines(self, list_of_data):
158157
"""Write a list (or any iterable) of data bytes to the transport.
159158
160159
The default implementation concatenates the arguments and
161160
calls write() on the result.
162161
"""
163-
self._ssl_protocol._write_appdata(list_of_data, self.context.copy())
162+
if not self._ssl_protocol._is_protocol_ready():
163+
return
164+
165+
cdef Py_ssize_t backlog_len_before = len(self._ssl_protocol._write_backlog)
166+
cdef size_t backlog_size_before = self._ssl_protocol._write_buffer_size
167+
168+
try:
169+
for data in list_of_data:
170+
self._ssl_protocol._check_and_enqueue_appdata(data)
171+
except:
172+
# Remove already enqueued items on exception
173+
del self._ssl_protocol._write_backlog[backlog_len_before:]
174+
self._ssl_protocol._write_buffer_size = backlog_size_before
175+
raise
176+
177+
self._ssl_protocol._flush_write_backlog(self.context.copy())
164178

165179
def write_eof(self):
166180
"""Close the write end after flushing buffered data.
@@ -246,7 +260,7 @@ cdef class SSLProtocol:
246260
self._extra = dict(sslcontext=sslcontext)
247261

248262
# App data write buffering
249-
self._write_backlog = col_deque()
263+
self._write_backlog = []
250264
self._write_buffer_size = 0
251265

252266
self._waiter = waiter
@@ -652,19 +666,28 @@ cdef class SSLProtocol:
652666

653667
# Outgoing flow
654668

655-
cdef _write_appdata(self, list_of_data, object context):
669+
cdef bint _is_protocol_ready(self) except? -1:
656670
if self._state in (FLUSHING, SHUTDOWN, UNWRAPPED):
657671
if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES:
658672
aio_logger.warning('SSL connection is closed')
659673
self._conn_lost += 1
674+
return False
675+
else:
676+
return True
677+
678+
cdef _check_and_enqueue_appdata(self, data):
679+
if not isinstance(data, (bytes, bytearray, memoryview)):
680+
raise TypeError(f"data: expecting a bytes-like instance, "
681+
f"got {type(data).__name__}")
682+
if not data:
660683
return
661684

662-
for data in list_of_data:
663-
self._write_backlog.append(data)
664-
self._write_buffer_size += len(data)
685+
self._write_backlog.append(data)
686+
self._write_buffer_size += len(data)
665687

688+
cdef _flush_write_backlog(self, object context):
666689
try:
667-
if self._state == WRAPPED:
690+
if self._state == WRAPPED and self._write_buffer_size > 0:
668691
self._do_write()
669692
self._process_outgoing()
670693
self._control_app_writing(context)
@@ -674,22 +697,27 @@ cdef class SSLProtocol:
674697

675698
cdef _do_write(self):
676699
"""Do SSL write, consumes write backlog and fills outgoing BIO."""
677-
cdef size_t data_len, count
700+
cdef Py_ssize_t data_len, bytes_written
701+
cdef Py_ssize_t idx = 0
702+
678703
try:
679-
while self._write_backlog:
680-
data = self._write_backlog[0]
681-
count = self._sslobj_write(data)
704+
while idx < len(self._write_backlog):
705+
data = self._write_backlog[idx]
682706
data_len = len(data)
683-
if count < data_len:
684-
if not PyMemoryView_Check(data):
685-
data = PyMemoryView_FromObject(data)
686-
self._write_backlog[0] = data[count:]
687-
self._write_buffer_size -= count
688-
else:
689-
del self._write_backlog[0]
707+
bytes_written = self._sslobj_write(data)
708+
if bytes_written == data_len:
690709
self._write_buffer_size -= data_len
710+
idx += 1
711+
continue
712+
713+
if not PyMemoryView_Check(data):
714+
data = PyMemoryView_FromObject(data)
715+
self._write_backlog[0] = data[bytes_written:]
716+
self._write_buffer_size -= bytes_written
691717
except ssl_SSLAgainErrors as exc:
692718
pass
719+
finally:
720+
del self._write_backlog[:idx]
693721

694722
cdef _process_outgoing(self):
695723
"""Send bytes from the outgoing BIO."""
@@ -821,7 +849,7 @@ cdef class SSLProtocol:
821849
# Flow control for writes from APP socket
822850

823851
cdef _control_app_writing(self, object context=None):
824-
cdef size_t size = self._get_write_buffer_size()
852+
cdef Py_ssize_t size = self._get_write_buffer_size()
825853
if size >= self._outgoing_high_water and not self._app_writing_paused:
826854
self._app_writing_paused = True
827855
try:
@@ -861,8 +889,8 @@ cdef class SSLProtocol:
861889
'protocol': self,
862890
})
863891

864-
cdef size_t _get_write_buffer_size(self):
865-
return self._outgoing.pending + self._write_buffer_size
892+
cdef Py_ssize_t _get_write_buffer_size(self):
893+
return self._write_buffer_size + <Py_ssize_t>self._outgoing.pending
866894

867895
cdef _set_write_buffer_limits(self, high=None, low=None):
868896
high, low = add_flowcontrol_defaults(
@@ -889,7 +917,7 @@ cdef class SSLProtocol:
889917
# Flow control for reads from SSL socket
890918

891919
cdef _control_ssl_reading(self):
892-
cdef size_t size = self._get_read_buffer_size()
920+
cdef Py_ssize_t size = self._get_read_buffer_size()
893921
if size >= self._incoming_high_water and not self._ssl_reading_paused:
894922
self._ssl_reading_paused = True
895923
self._transport.pause_reading()
@@ -903,8 +931,8 @@ cdef class SSLProtocol:
903931
self._incoming_high_water = high
904932
self._incoming_low_water = low
905933

906-
cdef size_t _get_read_buffer_size(self):
907-
return self._incoming.pending
934+
cdef Py_ssize_t _get_read_buffer_size(self):
935+
return <Py_ssize_t>self._incoming.pending
908936

909937
# Flow control for writes to SSL socket
910938

0 commit comments

Comments
 (0)