Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 89 additions & 59 deletions erts/emulator/nifs/unix/unix_socket_syncio.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ static ERL_NIF_TERM essio_sendfile_ok(ErlNifEnv* env,
size_t count);
#endif

static BOOLEAN_T recv_alloc_buf(size_t size,
ErlNifBinary *bufP);
static BOOLEAN_T recv_create_bin(ErlNifBinary *bufP,
size_t size,
ErlNifBinary *binP);

static BOOLEAN_T recv_check_entry(ErlNifEnv *env,
ESockDescriptor *descP,
ERL_NIF_TERM recvRef,
Expand Down Expand Up @@ -2725,7 +2731,7 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
int flags)
{
int saveErrno;
ErlNifBinary buf;
ErlNifBinary bin, *bufP;
ssize_t readResult;
size_t bufSz = (len != 0 ? len : descP->rBufSz); // 0 means default
ERL_NIF_TERM ret;
Expand All @@ -2746,16 +2752,8 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
return ret;
}

/* Allocate the receive buffer */
if (descP->buf.data == NULL) {
ESOCK_ASSERT( ALLOC_BIN(bufSz, &buf) );
}
else {
buf = descP->buf;
if (buf.size != bufSz) {
REALLOC_BIN(&buf, bufSz);
}
}
bufP = &descP->buf;
ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );

SSDBG( descP, ("UNIX-ESSIO", "essio_recv {%d} -> try read (%lu)\r\n",
descP->sock, (unsigned long) len) );
Expand All @@ -2764,7 +2762,7 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
esock_atom_read_tries, &descP->readTries, 1);

/* recv() */
readResult = sock_recv(descP->sock, buf.data, buf.size, flags);
readResult = sock_recv(descP->sock, bufP->data, bufP->size, flags);
saveErrno = ESOCK_IS_ERROR(readResult) ? sock_errno() : 0;

SSDBG( descP, ("UNIX-ESSIO",
Expand All @@ -2775,40 +2773,25 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
if (! recv_check_result(env, descP, sockRef, recvRef,
readResult, saveErrno, &ret) ) {
/* Keep the buffer */
descP->buf = buf;
return ret;
}
/* readResult >= 0 */
ESOCK_ASSERT( readResult <= buf.size );

if (readResult < buf.size) {
ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );

if (bin.size < bufP->size) {

/* +++ We did not fill the buffer +++ */

SSDBG( descP,
("UNIX-ESSIO",
"essio_recv {%d} -> [%lu] "
"did not fill the buffer (%ld)\r\n",
descP->sock, (unsigned long) buf.size,
(long) readResult) );

if (// Less than 4K (1 page) wasted
readResult >= (buf.size & ~4095) ||
// Less than 25% wasted
readResult >= (buf.size >> 1) + (buf.size >> 2)) {
//
/* Reallocate and drop buffer */
descP->buf.data = NULL;
ESOCK_ASSERT( REALLOC_BIN(&buf, readResult) );
}
else {
/* Keep buffer, copy content to new binary*/
descP->buf = buf;
ESOCK_ASSERT( ALLOC_BIN(readResult, &buf) );
sys_memcpy(buf.data, descP->buf.data, buf.size);
}
descP->sock, (unsigned long) bufP->size,
(unsigned long) bin.size) );

/* Return {ok|timeout|select|select_read, Bin} */
return recv_check_partial(env, descP, sockRef, recvRef, len, &buf);
return recv_check_partial(env, descP, sockRef, recvRef, len, &bin);

} else {

Expand All @@ -2817,11 +2800,10 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
SSDBG( descP,
("UNIX-ESSIO",
"essio_recv {%d} -> [%lu] filled the buffer\r\n",
descP->sock, (unsigned long) buf.size) );
descP->sock, (unsigned long) bin.size) );

descP->buf.data = NULL; // Drop buffer
/* Return {more|ok|select_read, Bin} */
return recv_check_full(env, descP, sockRef, recvRef, len, &buf);
return recv_check_full(env, descP, sockRef, recvRef, len, &bin);
}
}

Expand All @@ -2844,7 +2826,7 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
SOCKLEN_T fromAddrLen;
ssize_t readResult;
int saveErrno;
ErlNifBinary buf;
ErlNifBinary bin, *bufP;
size_t bufSz = (len != 0 ? len : descP->rBufSz); // 0 means default
ERL_NIF_TERM ret;

Expand All @@ -2861,8 +2843,8 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
return ret;
}

/* Allocate the receive buffer */
ESOCK_ASSERT( ALLOC_BIN(bufSz, &buf) );
bufP = &descP->buf;
ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );

ESOCK_CNT_INC(env, descP, sockRef,
esock_atom_read_tries, &descP->readTries, 1);
Expand All @@ -2871,18 +2853,19 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
sys_memzero((char*) &fromAddr, fromAddrLen);

/* recvfrom() */
readResult = sock_recvfrom(descP->sock, buf.data, buf.size, flags,
readResult = sock_recvfrom(descP->sock, bufP->data, bufP->size, flags,
&fromAddr.sa, &fromAddrLen);
saveErrno = ESOCK_IS_ERROR(readResult) ? sock_errno() : 0;

/* Check for errors and end of stream */
if (! recv_check_result(env, descP, sockRef, recvRef,
readResult, saveErrno, &ret) ) {
FREE_BIN(&buf);
/* Keep the buffer */
return ret;
}
/* readResult >= 0 */
ESOCK_ASSERT( readResult <= buf.size );

ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );

/* The recvfrom function delivers one (1) message. If our buffer
* is too small, the message will be truncated. So, regardless
Expand All @@ -2892,18 +2875,14 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
* Encode the message and source address
*/

if (readResult < buf.size) {
ESOCK_ASSERT( REALLOC_BIN(&buf, readResult) );
}

descP->rNumCnt = 0;

ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_pkg,
&descP->readPkgCnt, 1);
ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_byte,
&descP->readByteCnt, buf.size);
if (buf.size > descP->readPkgMax)
descP->readPkgMax = buf.size;
&descP->readByteCnt, bin.size);
if (bin.size > descP->readPkgMax)
descP->readPkgMax = bin.size;

esock_encode_sockaddr(env,
&fromAddr, fromAddrLen,
Expand All @@ -2913,7 +2892,7 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
* erlang term in env (no need to free; it will be GC:ed).
*/
/* {FromAddr, Bin} */
ret = MKT2(env, ret, MKBIN(env, &buf));
ret = MKT2(env, ret, MKBIN(env, &bin));

if (descP->selectRead && (COMPARE(recvRef, esock_atom_zero) != 0)) {
/* Return {select_read, {FromAddr, Bin}} */
Expand Down Expand Up @@ -2950,8 +2929,7 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
size_t ctrlSz = (ctrlLen != 0 ? ctrlLen : descP->rCtrlSz);
struct msghdr msgHdr;
SysIOVec iov[1]; // Shall we always use 1?
ErlNifBinary data[1]; // Shall we always use 1?
ErlNifBinary ctrl;
ErlNifBinary ctrl, bin, *bufP;
ERL_NIF_TERM ret;
ESockAddress addr;

Expand All @@ -2970,9 +2948,10 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
return ret;
}

/* Allocate the (msg) data buffer:
/* Allocate the data buffer
*/
ESOCK_ASSERT( ALLOC_BIN(bufSz, &data[0]) );
bufP = &descP->buf;
ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );

/* Allocate the ctrl (buffer):
*/
Expand All @@ -2985,8 +2964,8 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
sys_memzero((char*) &addr, addrLen);
sys_memzero((char*) &msgHdr, sizeof(msgHdr));

iov[0].iov_base = data[0].data;
iov[0].iov_len = data[0].size;
iov[0].iov_base = bufP->data;
iov[0].iov_len = bufP->size;

msgHdr.msg_name = &addr;
msgHdr.msg_namelen = addrLen;
Expand All @@ -3002,12 +2981,14 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
/* Check for errors and end of stream */
if (! recv_check_result(env, descP, sockRef, recvRef,
readResult, saveErrno, &ret) ) {
FREE_BIN(&data[0]);
/* Keep the data buffer */
FREE_BIN(&ctrl);
return ret;
}
/* readResult >= 0 */

ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );

/* The recvmsg function delivers one (1) message. If our buffer
* is to small, the message will be truncated. So, regardless
* if we filled the buffer or not, we have got what we are going
Expand Down Expand Up @@ -3038,7 +3019,7 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
descP->readPkgMax = readResult;

encode_msg(env, descP,
readResult, &msgHdr, &data[0], &ctrl,
readResult, &msgHdr, &bin, &ctrl,
&ret);

if (descP->selectRead && (COMPARE(recvRef, esock_atom_zero) != 0)) {
Expand Down Expand Up @@ -6861,6 +6842,55 @@ void essio_down(ErlNifEnv* env,

/* *** Recv/recvfrom/recvmsg utility functions *** */

static
BOOLEAN_T recv_alloc_buf(size_t size,
ErlNifBinary *bufP)
{
if (bufP->data == NULL) {
return ALLOC_BIN(size, bufP);
}
else {
if (size != bufP->size)
return REALLOC_BIN(bufP, size);
else
return TRUE;
}
}

static
BOOLEAN_T recv_create_bin(ErlNifBinary *bufP, size_t size, ErlNifBinary *binP)
{
/* Don't touch bufP->size
*/
if (size >= bufP->size) {
/* Buffer full
* - use it as return binary and drop buffer
*/
ESOCK_ASSERT( bufP->size >= size );
*binP = *bufP;
bufP->data = NULL;
return TRUE;
}
else if (size >= (bufP->size & ~4095) ||
size >= (bufP->size >> 1) + (bufP->size >> 2)) {
/* Less than a 4 K page shrink or less than 25% shrink
* - reallocate and drop buffer
*/
*binP = *bufP;
bufP->data = NULL;
return REALLOC_BIN(binP, size);
}
else {
BOOLEAN_T ret;
/* Keep buffer, copy content to new allocated binary
*/
ret = ALLOC_BIN(size, binP);
if (ret)
sys_memcpy(binP->data, bufP->data, size);
return ret;
}
}

static
BOOLEAN_T recv_check_entry(ErlNifEnv *env,
ESockDescriptor *descP,
Expand Down
43 changes: 31 additions & 12 deletions lib/kernel/src/socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5427,20 +5427,38 @@ recv_deadline(SockRef, Length, Flags, Deadline, Buf) ->

%%
{select_read, Bin} -> %% All data, new recv operation in progress
%% The combination of select_read and recv with time-out
%% is contradictive since the return values has no place
%% for a continuation because neither a ref nor 'nowait'
%% was given, so we handle this as if there was no select_read
%% by cancelling the new recv operation
%%
_ = cancel(SockRef, recv, Handle),
{ok, condense_buffer(Bin, Buf)};
%%
Select %% select | {select, Bin} %% No data or incomplete
when Select =:= select;
tuple_size(Select) =:= 2, element(1, Select) =:= select ->
{Length_1, Buf_1} =
if
Select =:= select ->
{Length, Buf};
true ->
Bin = element(2, Select),
{Length - byte_size(Bin), [Bin | Buf]}
end,
select ->
%%
%% There is nothing just now, but we will be notified
%% with a select message when there is something to recv
Timeout = timeout(Deadline),
receive
?socket_msg(?socket(SockRef), select, Handle) ->
if
0 < Timeout ->
%% Retry
recv_deadline(
SockRef, Length, Flags, Deadline, Buf);
true ->
recv_error(timeout, Buf)
end;
?socket_msg(_Socket, abort, {Handle, Reason}) ->
recv_error(Reason, Buf)
after Timeout ->
_ = cancel(SockRef, recv, Handle),
recv_error(timeout, Buf)
end;
{select, Bin} ->
Buf_1 = [Bin | Buf],
%%
%% There is nothing just now, but we will be notified
%% with a select message when there is something to recv
Expand All @@ -5451,7 +5469,8 @@ recv_deadline(SockRef, Length, Flags, Deadline, Buf) ->
0 < Timeout ->
%% Retry
recv_deadline(
SockRef, Length_1, Flags, Deadline, Buf_1);
SockRef, Length - byte_size(Bin),
Flags, Deadline, Buf_1);
true ->
recv_error(timeout, Buf_1)
end;
Expand Down
Loading