Skip to content

[RFC, not for merge] Net: Websocket: introduce non-blocking receive frame API #4709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Net/include/Poco/Net/SocketImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class Net_API SocketImpl: public Poco::RefCountedObject
///
/// Always returns zero for platforms where not implemented.

virtual int receiveBytesNoBlock(void* buffer, int length, bool &shouldRetry, int flags = 0);

virtual int receiveBytes(void* buffer, int length, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
Expand Down
2 changes: 2 additions & 0 deletions Net/include/Poco/Net/StreamSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class Net_API StreamSocket: public Socket
/// The flags parameter can be used to pass system-defined flags
/// for send() like MSG_OOB.

int receiveBytesNoBlock(void* buffer, int length, bool &shouldRetry, int flags = 0);

int receiveBytes(void* buffer, int length, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
Expand Down
2 changes: 2 additions & 0 deletions Net/include/Poco/Net/WebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ class Net_API WebSocket: public StreamSocket
/// the received data is stored starting at the beginning of the
/// buffer.

int receiveFrameNoBlock(Poco::Buffer<char>& buffer, int& flags, bool &shouldRetry);

Mode mode() const;
/// Returns WS_SERVER if the WebSocket is a server-side
/// WebSocket, or WS_CLIENT otherwise.
Expand Down
9 changes: 9 additions & 0 deletions Net/include/Poco/Net/WebSocketImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class Net_API WebSocketImpl: public StreamSocketImpl
virtual int receiveBytes(void* buffer, int length, int flags);
/// Receives a WebSocket protocol frame.

int receiveFrameNoBlock(Poco::Buffer<char>& buffer, int& flags, bool &shouldRetry);
/// Receives a WebSocket protocol frame in a non-blocking fashion.

virtual int receiveBytes(Poco::Buffer<char>& buffer, int flags = 0, const Poco::Timespan& span = 0);
/// Receives a WebSocket protocol frame.

Expand Down Expand Up @@ -100,6 +103,12 @@ class Net_API WebSocketImpl: public StreamSocketImpl
int receivePayload(char *buffer, int payloadLength, char mask[4], bool useMask);
int receiveNBytes(void* buffer, int bytes);
int receiveSomeBytes(char* buffer, int bytes);

int receiveHeaderNoBlock(char mask[4], bool& useMask, size_t &totalRewind, bool &shouldRetry);
int receivePayloadNoBlock(char *buffer, int payloadLength, char mask[4], bool useMask, bool &shouldRetry);
int receiveNBytesNoBlock(void* buffer, int bytes, bool &shouldRetry);
int receiveSomeBytesNoBlock(char* buffer, int bytes, bool &shouldRetry);

virtual ~WebSocketImpl();

private:
Expand Down
12 changes: 12 additions & 0 deletions Net/src/SocketImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,18 @@ int SocketImpl::sendBytes(const SocketBufVec& buffers, int flags)
}


int SocketImpl::receiveBytesNoBlock(void* buffer, int length, bool &shouldRetry, int flags)
{
shouldRetry = true;
int ret = 0;

ret = receiveBytes(buffer, length, flags);
if (ret == 0)
shouldRetry = false;

return ret;
}

int SocketImpl::receiveBytes(void* buffer, int length, int flags)
{
checkBrokenTimeout(SELECT_READ);
Expand Down
4 changes: 4 additions & 0 deletions Net/src/StreamSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ int StreamSocket::sendBytes(FIFOBuffer& fifoBuf)
return ret;
}

int StreamSocket::receiveBytesNoBlock(void* buffer, int length, bool &shouldRetry, int flags)
{
return impl()->receiveBytesNoBlock(buffer, length, shouldRetry, flags);
}

int StreamSocket::receiveBytes(void* buffer, int length, int flags)
{
Expand Down
6 changes: 6 additions & 0 deletions Net/src/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ int WebSocket::receiveFrame(Poco::Buffer<char>& buffer, int& flags)
return n;
}

int WebSocket::receiveFrameNoBlock(Poco::Buffer<char>& buffer, int& flags, bool &shouldRetry)
{
int n = static_cast<WebSocketImpl*>(impl())->receiveFrameNoBlock(buffer, flags, shouldRetry);
flags = static_cast<WebSocketImpl*>(impl())->frameFlags();
return n;
}

WebSocket::Mode WebSocket::mode() const
{
Expand Down
191 changes: 191 additions & 0 deletions Net/src/WebSocketImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,197 @@
return receivePayload(buffer.begin() + oldSize, payloadLength, mask, useMask);
}

int WebSocketImpl::receivePayloadNoBlock(char *buffer, int payloadLength, char mask[4], bool useMask, bool &shouldRetry)
{
int received = receiveNBytesNoBlock(reinterpret_cast<char*>(buffer), payloadLength, shouldRetry);
if (received <= 0)
// Don't care; Caller can grab complete payload once it's there
return received;

if (useMask)
{
for (int i = 0; i < received; i++)
{
buffer[i] ^= mask[i % 4];
}
}
return received;
}

int WebSocketImpl::receiveSomeBytesNoBlock(char* buffer, int bytes, bool &shouldRetry)
{
int n = static_cast<int>(_buffer.size()) - _bufferOffset;
if (n >= bytes)
{
if (bytes < n)
n = bytes;

std::memcpy(buffer, _buffer.begin() + _bufferOffset, n);
// Consume but don't rewind:
// we leverage the buffer control operations to the caller,
// which means if HEADER / MASK / PAYLOAD is being consumed,
// it's up to the caller to rewind buffer pointers / shift
// data etc.
return n;
}
else
{
int receivedBytes;

if (bytes < n)
n = bytes;

if (n == 0) {
receivedBytes = _pStreamSocketImpl->receiveBytesNoBlock(buffer, bytes, shouldRetry);
if (receivedBytes > 0) {
_buffer.append(buffer, receivedBytes);
} else if (receivedBytes == 0)
shouldRetry = false;
} else {
std::memcpy(buffer, _buffer.begin() + _bufferOffset, n);

receivedBytes = _pStreamSocketImpl->receiveBytesNoBlock(buffer + n, bytes - n, shouldRetry);
if (receivedBytes > 0) {
_buffer.append(buffer + n, receivedBytes);
receivedBytes += n;
} else if (receivedBytes == 0)
shouldRetry = false;
}

return receivedBytes;
}
}

int WebSocketImpl::receiveNBytesNoBlock(void* buffer, int bytes, bool &shouldRetry)
{
return receiveSomeBytesNoBlock(reinterpret_cast<char*>(buffer), bytes, shouldRetry);
}

int WebSocketImpl::receiveHeaderNoBlock(char mask[4], bool& useMask, size_t &totalRewind, bool &shouldRetry)
{
char header[MAX_HEADER_LENGTH];
int n = receiveNBytesNoBlock(header, 2, shouldRetry);
if (n <= 0)
{
_frameFlags = 0;
return n;
}

totalRewind += 2;
_bufferOffset += 2;

poco_assert (n == 2);
Poco::UInt8 flags = static_cast<Poco::UInt8>(header[0]);
_frameFlags = flags;
Poco::UInt8 lengthByte = static_cast<Poco::UInt8>(header[1]);
useMask = ((lengthByte & FRAME_FLAG_MASK) != 0);
int payloadLength;
lengthByte &= 0x7f;
if (lengthByte == 127)
{
n = receiveNBytesNoBlock(header + 2, 8, shouldRetry);
_bufferOffset += 8;
totalRewind += 8;

if (n <= 0)
{
_frameFlags = 0;
return n;
}

Poco::MemoryInputStream istr(header + 2, 8);
Poco::BinaryReader reader(istr, Poco::BinaryReader::NETWORK_BYTE_ORDER);
Poco::UInt64 l;
reader >> l;
if (l > _maxPayloadSize) throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
payloadLength = static_cast<int>(l);
}
else if (lengthByte == 126)
{
n = receiveNBytesNoBlock(header + 2, 2, shouldRetry);
totalRewind += 2;
_bufferOffset += 2;

if (n <= 0)
{
_frameFlags = 0;
return n;
}
Poco::MemoryInputStream istr(header + 2, 2);
Poco::BinaryReader reader(istr, Poco::BinaryReader::NETWORK_BYTE_ORDER);
Poco::UInt16 l;
reader >> l;
if (l > _maxPayloadSize) throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
payloadLength = static_cast<int>(l);
}
else
{
if (lengthByte > _maxPayloadSize) throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
payloadLength = lengthByte;
}

if (useMask)
{
n = receiveNBytesNoBlock(mask, 4, shouldRetry);
_bufferOffset += 4;
totalRewind += 4;

if (n <= 0)
{
_frameFlags = 0;
return n;
}
}

return payloadLength;
}

int WebSocketImpl::receiveFrameNoBlock(Poco::Buffer<char>& buffer, int &flags, bool &shouldRetry)
{
size_t totalRewind = 0;
int frameSize;
char mask[4];
bool useMask;
_frameFlags = 0;
int payloadLength = receiveHeaderNoBlock(mask, useMask, totalRewind, shouldRetry);

if (payloadLength <= 0)
return payloadLength;

std::size_t oldSize = buffer.size();
buffer.resize(oldSize + payloadLength);

frameSize = receivePayloadNoBlock(buffer.begin() + oldSize, payloadLength, mask, useMask, shouldRetry);
if (frameSize > 0) {
totalRewind += frameSize;
_bufferOffset += frameSize;
}

if (frameSize == 0 || frameSize != payloadLength) {
_bufferOffset -= totalRewind;
_frameFlags = 0;
return 0;
} else if (frameSize < 0) {

Check warning

Code scanning / CodeQL

Comparison result is always the same Warning

Comparison is always false because frameSize >= 1.
_bufferOffset -= totalRewind;
_frameFlags = 0;
return frameSize;
}

// clear internal buffer from received frame;

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
size_t bytesToMove = totalRewind;
if (bytesToMove) {
size_t newSize = _buffer.size() - _bufferOffset;
if (newSize)
std::memcpy(_buffer.begin(),
_buffer.begin() + _bufferOffset,
_buffer.size() - _bufferOffset);

_buffer.resize(newSize);
_bufferOffset = 0;
}

return frameSize;
}

int WebSocketImpl::receiveNBytes(void* buffer, int bytes)
{
Expand Down
2 changes: 2 additions & 0 deletions NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class NetSSL_API SecureSocketImpl
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.

int receiveBytesNoBlock(void* buffer, int length, bool &shouldRetry, int flags = 0);

int receiveBytes(void* buffer, int length, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
Expand Down
Loading