Skip to content

Commit 3c243fb

Browse files
authored
Merge pull request #47 from tarasko/feature/43_send_reuse_external_bytearray
Add WSTransport.send_reuse_external_bytearray
2 parents 81d1819 + a04006c commit 3c243fb

File tree

5 files changed

+77
-10
lines changed

5 files changed

+77
-10
lines changed

docs/source/reference.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ Classes
171171

172172
.. attention::
173173

174-
Message's buffer should have at least 10 bytes in front of the message pointer available for writing.
174+
Message's buffer should have at least 14 bytes in front of the message pointer available for writing.
175175

176176
:param msg_type: Message type
177177
:param msg_ptr: Pointer to a message payload

picows/picows.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,9 @@ cdef class WSTransport:
119119
MemoryBuffer _write_buf
120120
int _socket
121121

122-
cdef send_reuse_external_buffer(self, WSMsgType msg_type, char* msg_ptr, size_t msg_size, bint fin=*, bint rsv1=*)
122+
cdef inline send_reuse_external_buffer(self, WSMsgType msg_type, char* msg_ptr, Py_ssize_t msg_size, bint fin=*, bint rsv1=*)
123123
cpdef send(self, WSMsgType msg_type, message, bint fin=*, bint rsv1=*)
124+
cpdef send_reuse_external_bytearray(self, WSMsgType msg_type, bytearray buffer, Py_ssize_t msg_offset, bint fin=*, bint rsv1=*)
124125
cpdef send_ping(self, message=*)
125126
cpdef send_pong(self, message=*)
126127
cpdef send_close(self, WSCloseCode close_code=*, close_message=*)

picows/picows.pyi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ class WSTransport:
9999
fin: bool = True,
100100
rsv1: bool = False,
101101
) -> None: ...
102+
def send_reuse_external_bytearray(
103+
self,
104+
msg_type: WSMsgType,
105+
buffer: bytearray,
106+
msg_offset: int,
107+
fin: bool = True,
108+
rsv1: bool = False
109+
) -> None: ...
102110
def send_ping(self, message: Optional[WSBuffer]=None) -> None: ...
103111
def send_pong(self, message: Optional[WSBuffer]=None) -> None: ...
104112
def send_close(self, close_code: WSCloseCode = ..., close_message: Optional[WSBuffer]=None) -> None: ...

picows/picows.pyx

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ from multidict import CIMultiDict
1616

1717
cimport cython
1818
from cpython.bytes cimport PyBytes_GET_SIZE, PyBytes_AS_STRING, PyBytes_FromStringAndSize, PyBytes_CheckExact
19-
from cpython.bytearray cimport PyByteArray_AS_STRING, PyByteArray_GET_SIZE, PyByteArray_CheckExact
19+
from cpython.bytearray cimport PyByteArray_AS_STRING, PyByteArray_GET_SIZE, PyByteArray_CheckExact, PyByteArray_FromStringAndSize
2020
from cpython.memoryview cimport PyMemoryView_FromMemory
2121
from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free
2222
from cpython.buffer cimport PyBUF_WRITE, PyBUF_READ, PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
@@ -211,20 +211,20 @@ cdef _raise_from_errno(int ec):
211211
raise exc(ec, reason)
212212

213213

214-
cdef void _mask_payload(uint8_t* input, size_t input_len, uint32_t mask) noexcept:
214+
cdef void _mask_payload(uint8_t* input, Py_ssize_t input_len, uint32_t mask) noexcept:
215215
# According to perf, _mask_payload is very fast and is not worth spending
216216
# any time optimizing it further.
217217
# But we could use here SIMD or AVX2 instruction to speed this up.
218218
# Also apply vector instructions only on aligned pointer
219219

220220
cdef:
221-
size_t i
221+
Py_ssize_t i
222222
# bit operations on signed integers are implementation-specific
223223
# cast everything to uint
224224
uint64_t mask64 = (<uint64_t>mask << 32) | <uint64_t>mask
225225
uint8_t* mask_buf = <uint8_t*> &mask64
226226

227-
if sizeof(size_t) >= 8:
227+
if sizeof(Py_ssize_t) >= 8:
228228
while input_len >= 8:
229229
(<uint64_t *> input)[0] ^= mask64
230230
input += 8
@@ -506,16 +506,23 @@ cdef class WSTransport:
506506
self._socket = underlying_transport.get_extra_info('socket').fileno()
507507

508508
cdef send_reuse_external_buffer(self, WSMsgType msg_type,
509-
char* msg_ptr, size_t msg_size,
509+
char* msg_ptr, Py_ssize_t msg_size,
510510
bint fin=True, bint rsv1=False):
511511
cdef:
512512
uint8_t* header_ptr = <uint8_t*>msg_ptr
513513
uint64_t extended_payload_length_64
514-
uint32_t mask = <uint32_t> rand() if self.is_client_side else 0
514+
uint32_t mask = 0
515515
uint16_t extended_payload_length_16
516516
uint8_t first_byte = <uint8_t>msg_type
517-
uint8_t second_byte = 0x80 if self.is_client_side else 0
518-
cdef Py_ssize_t total_size = msg_size
517+
uint8_t second_byte = 0
518+
Py_ssize_t total_size = msg_size
519+
520+
if self.is_client_side:
521+
mask = <uint32_t> rand()
522+
second_byte = 0x80
523+
total_size += 4
524+
header_ptr -= 4
525+
(<uint32_t*>header_ptr)[0] = mask
519526

520527
if fin:
521528
first_byte |= 0x80
@@ -551,6 +558,40 @@ cdef class WSTransport:
551558
else:
552559
self._try_native_write_then_transport_write(<char*>header_ptr, total_size)
553560

561+
cpdef send_reuse_external_bytearray(self, WSMsgType msg_type,
562+
bytearray buffer,
563+
Py_ssize_t msg_offset,
564+
bint fin=True, bint rsv1=False):
565+
"""
566+
Send a frame over websocket with a message as its payload.
567+
This function does not copy message to prepare websocket frames.
568+
It reuses bytearray's memory to append websocket frame header at the front.
569+
570+
:param msg_type: :any:`WSMsgType` enum value\n
571+
:param msg_offset: specifies where message begins in the bytearray.
572+
Must be at least 14 to let picows to insert websocket frame header in front of the message.
573+
:param buffer: bytearray that contains message and some extra space (at least 14 bytes) in the beginning.
574+
The len of the message is determined as `len(buffer) - msg_offset`
575+
:param fin: fin bit in websocket frame.
576+
Indicate that the frame is the last one in the message.
577+
:param rsv1: first reserved bit in websocket frame.
578+
Some protocol extensions use it to indicate that payload is compressed.
579+
"""
580+
assert buffer is not None, "buffer is None"
581+
assert msg_offset >= 14, "buffer must have at least 14 bytes available before message starts, check msg_offset parameter"
582+
583+
cdef:
584+
char* buffer_ptr = PyByteArray_AS_STRING(buffer)
585+
Py_ssize_t buffer_size = PyByteArray_GET_SIZE(buffer)
586+
587+
assert buffer_size >= msg_offset, "msg_offset points beyond buffer end, msg_offset > len(buffer)"
588+
589+
cdef:
590+
char* msg_ptr = buffer_ptr + msg_offset
591+
Py_ssize_t msg_size = buffer_size - msg_offset
592+
593+
self.send_reuse_external_buffer(msg_type, msg_ptr, msg_size, fin, rsv1)
594+
554595
cpdef send(self, WSMsgType msg_type, message, bint fin=True, bint rsv1=False):
555596
"""
556597
Send a frame over websocket with a message as its payload.

tests/test_basics.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ async def test_echo(client_msg_queue, msg_size):
124124
assert not frame.fin
125125
assert not frame.rsv1
126126

127+
ba = bytearray(b"1234567890123456")
128+
ba += msg
129+
client_msg_queue.transport.send_reuse_external_bytearray(picows.WSMsgType.BINARY, ba, 16)
130+
frame = await client_msg_queue.get_message()
131+
assert frame.msg_type == picows.WSMsgType.BINARY
132+
assert frame.payload_as_bytes == msg
133+
127134
msg = base64.b64encode(msg)
128135
client_msg_queue.transport.send(picows.WSMsgType.TEXT, msg, True, True)
129136
frame = await client_msg_queue.get_message()
@@ -156,6 +163,16 @@ async def test_echo(client_msg_queue, msg_size):
156163
client_msg_queue.transport.send(picows.WSMsgType.BINARY, "hi")
157164

158165

166+
async def test_send_external_bytearray_asserts(client_msg_queue):
167+
with pytest.raises(AssertionError):
168+
# Check assertion for msg_len >= 0
169+
client_msg_queue.transport.send_reuse_external_bytearray(picows.WSMsgType.BINARY, bytearray(b"HELLO"), 16)
170+
171+
with pytest.raises(AssertionError):
172+
# Check assertion for offset to be at least 14
173+
client_msg_queue.transport.send_reuse_external_bytearray(picows.WSMsgType.BINARY, bytearray(b"1234567890123HELLO"), 13)
174+
175+
159176
async def test_max_frame_size_violation():
160177
msg = os.urandom(1024 * 1024)
161178
max_frame_size = 16 * 1024

0 commit comments

Comments
 (0)