From f3df06192adcc22b63bfe8d9efd5a3fbea3d59ce Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Mon, 17 Mar 2025 17:17:30 +0100 Subject: [PATCH] Keep underutilized buffer for all recv operations --- erts/emulator/nifs/unix/unix_socket_syncio.c | 148 +++++++++++-------- lib/kernel/src/socket.erl | 43 ++++-- 2 files changed, 120 insertions(+), 71 deletions(-) diff --git a/erts/emulator/nifs/unix/unix_socket_syncio.c b/erts/emulator/nifs/unix/unix_socket_syncio.c index 922b127ff9ea..1b6bfc1eb86f 100644 --- a/erts/emulator/nifs/unix/unix_socket_syncio.c +++ b/erts/emulator/nifs/unix/unix_socket_syncio.c @@ -325,6 +325,12 @@ static ERL_NIF_TERM essio_sendfile_ok(ErlNifEnv* env, size_t count); #endif +static BOOLEAN_T recv_alloc_buf(size_t size, + ErlNifBinary *bufP); +static BOOLEAN_T recv_create_bin(ErlNifBinary *bufP, + size_t size, + ErlNifBinary *binP); + static BOOLEAN_T recv_check_entry(ErlNifEnv *env, ESockDescriptor *descP, ERL_NIF_TERM recvRef, @@ -2725,7 +2731,7 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env, int flags) { int saveErrno; - ErlNifBinary buf; + ErlNifBinary bin, *bufP; ssize_t readResult; size_t bufSz = (len != 0 ? len : descP->rBufSz); // 0 means default ERL_NIF_TERM ret; @@ -2746,16 +2752,8 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env, return ret; } - /* Allocate the receive buffer */ - if (descP->buf.data == NULL) { - ESOCK_ASSERT( ALLOC_BIN(bufSz, &buf) ); - } - else { - buf = descP->buf; - if (buf.size != bufSz) { - REALLOC_BIN(&buf, bufSz); - } - } + bufP = &descP->buf; + ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) ); SSDBG( descP, ("UNIX-ESSIO", "essio_recv {%d} -> try read (%lu)\r\n", descP->sock, (unsigned long) len) ); @@ -2764,7 +2762,7 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env, esock_atom_read_tries, &descP->readTries, 1); /* recv() */ - readResult = sock_recv(descP->sock, buf.data, buf.size, flags); + readResult = sock_recv(descP->sock, bufP->data, bufP->size, flags); saveErrno = ESOCK_IS_ERROR(readResult) ? sock_errno() : 0; SSDBG( descP, ("UNIX-ESSIO", @@ -2775,13 +2773,13 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env, if (! recv_check_result(env, descP, sockRef, recvRef, readResult, saveErrno, &ret) ) { /* Keep the buffer */ - descP->buf = buf; return ret; } /* readResult >= 0 */ - ESOCK_ASSERT( readResult <= buf.size ); - if (readResult < buf.size) { + ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) ); + + if (bin.size < bufP->size) { /* +++ We did not fill the buffer +++ */ @@ -2789,26 +2787,11 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env, ("UNIX-ESSIO", "essio_recv {%d} -> [%lu] " "did not fill the buffer (%ld)\r\n", - descP->sock, (unsigned long) buf.size, - (long) readResult) ); - - if (// Less than 4K (1 page) wasted - readResult >= (buf.size & ~4095) || - // Less than 25% wasted - readResult >= (buf.size >> 1) + (buf.size >> 2)) { - // - /* Reallocate and drop buffer */ - descP->buf.data = NULL; - ESOCK_ASSERT( REALLOC_BIN(&buf, readResult) ); - } - else { - /* Keep buffer, copy content to new binary*/ - descP->buf = buf; - ESOCK_ASSERT( ALLOC_BIN(readResult, &buf) ); - sys_memcpy(buf.data, descP->buf.data, buf.size); - } + descP->sock, (unsigned long) bufP->size, + (unsigned long) bin.size) ); + /* Return {ok|timeout|select|select_read, Bin} */ - return recv_check_partial(env, descP, sockRef, recvRef, len, &buf); + return recv_check_partial(env, descP, sockRef, recvRef, len, &bin); } else { @@ -2817,11 +2800,10 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env, SSDBG( descP, ("UNIX-ESSIO", "essio_recv {%d} -> [%lu] filled the buffer\r\n", - descP->sock, (unsigned long) buf.size) ); + descP->sock, (unsigned long) bin.size) ); - descP->buf.data = NULL; // Drop buffer /* Return {more|ok|select_read, Bin} */ - return recv_check_full(env, descP, sockRef, recvRef, len, &buf); + return recv_check_full(env, descP, sockRef, recvRef, len, &bin); } } @@ -2844,7 +2826,7 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env, SOCKLEN_T fromAddrLen; ssize_t readResult; int saveErrno; - ErlNifBinary buf; + ErlNifBinary bin, *bufP; size_t bufSz = (len != 0 ? len : descP->rBufSz); // 0 means default ERL_NIF_TERM ret; @@ -2861,8 +2843,8 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env, return ret; } - /* Allocate the receive buffer */ - ESOCK_ASSERT( ALLOC_BIN(bufSz, &buf) ); + bufP = &descP->buf; + ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) ); ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_tries, &descP->readTries, 1); @@ -2871,18 +2853,19 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env, sys_memzero((char*) &fromAddr, fromAddrLen); /* recvfrom() */ - readResult = sock_recvfrom(descP->sock, buf.data, buf.size, flags, + readResult = sock_recvfrom(descP->sock, bufP->data, bufP->size, flags, &fromAddr.sa, &fromAddrLen); saveErrno = ESOCK_IS_ERROR(readResult) ? sock_errno() : 0; /* Check for errors and end of stream */ if (! recv_check_result(env, descP, sockRef, recvRef, readResult, saveErrno, &ret) ) { - FREE_BIN(&buf); + /* Keep the buffer */ return ret; } /* readResult >= 0 */ - ESOCK_ASSERT( readResult <= buf.size ); + + ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) ); /* The recvfrom function delivers one (1) message. If our buffer * is too small, the message will be truncated. So, regardless @@ -2892,18 +2875,14 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env, * Encode the message and source address */ - if (readResult < buf.size) { - ESOCK_ASSERT( REALLOC_BIN(&buf, readResult) ); - } - descP->rNumCnt = 0; ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_pkg, &descP->readPkgCnt, 1); ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_byte, - &descP->readByteCnt, buf.size); - if (buf.size > descP->readPkgMax) - descP->readPkgMax = buf.size; + &descP->readByteCnt, bin.size); + if (bin.size > descP->readPkgMax) + descP->readPkgMax = bin.size; esock_encode_sockaddr(env, &fromAddr, fromAddrLen, @@ -2913,7 +2892,7 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env, * erlang term in env (no need to free; it will be GC:ed). */ /* {FromAddr, Bin} */ - ret = MKT2(env, ret, MKBIN(env, &buf)); + ret = MKT2(env, ret, MKBIN(env, &bin)); if (descP->selectRead && (COMPARE(recvRef, esock_atom_zero) != 0)) { /* Return {select_read, {FromAddr, Bin}} */ @@ -2950,8 +2929,7 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env, size_t ctrlSz = (ctrlLen != 0 ? ctrlLen : descP->rCtrlSz); struct msghdr msgHdr; SysIOVec iov[1]; // Shall we always use 1? - ErlNifBinary data[1]; // Shall we always use 1? - ErlNifBinary ctrl; + ErlNifBinary ctrl, bin, *bufP; ERL_NIF_TERM ret; ESockAddress addr; @@ -2970,9 +2948,10 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env, return ret; } - /* Allocate the (msg) data buffer: + /* Allocate the data buffer */ - ESOCK_ASSERT( ALLOC_BIN(bufSz, &data[0]) ); + bufP = &descP->buf; + ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) ); /* Allocate the ctrl (buffer): */ @@ -2985,8 +2964,8 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env, sys_memzero((char*) &addr, addrLen); sys_memzero((char*) &msgHdr, sizeof(msgHdr)); - iov[0].iov_base = data[0].data; - iov[0].iov_len = data[0].size; + iov[0].iov_base = bufP->data; + iov[0].iov_len = bufP->size; msgHdr.msg_name = &addr; msgHdr.msg_namelen = addrLen; @@ -3002,12 +2981,14 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env, /* Check for errors and end of stream */ if (! recv_check_result(env, descP, sockRef, recvRef, readResult, saveErrno, &ret) ) { - FREE_BIN(&data[0]); + /* Keep the data buffer */ FREE_BIN(&ctrl); return ret; } /* readResult >= 0 */ + ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) ); + /* The recvmsg function delivers one (1) message. If our buffer * is to small, the message will be truncated. So, regardless * if we filled the buffer or not, we have got what we are going @@ -3038,7 +3019,7 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env, descP->readPkgMax = readResult; encode_msg(env, descP, - readResult, &msgHdr, &data[0], &ctrl, + readResult, &msgHdr, &bin, &ctrl, &ret); if (descP->selectRead && (COMPARE(recvRef, esock_atom_zero) != 0)) { @@ -6861,6 +6842,55 @@ void essio_down(ErlNifEnv* env, /* *** Recv/recvfrom/recvmsg utility functions *** */ +static +BOOLEAN_T recv_alloc_buf(size_t size, + ErlNifBinary *bufP) +{ + if (bufP->data == NULL) { + return ALLOC_BIN(size, bufP); + } + else { + if (size != bufP->size) + return REALLOC_BIN(bufP, size); + else + return TRUE; + } +} + +static +BOOLEAN_T recv_create_bin(ErlNifBinary *bufP, size_t size, ErlNifBinary *binP) +{ + /* Don't touch bufP->size + */ + if (size >= bufP->size) { + /* Buffer full + * - use it as return binary and drop buffer + */ + ESOCK_ASSERT( bufP->size >= size ); + *binP = *bufP; + bufP->data = NULL; + return TRUE; + } + else if (size >= (bufP->size & ~4095) || + size >= (bufP->size >> 1) + (bufP->size >> 2)) { + /* Less than a 4 K page shrink or less than 25% shrink + * - reallocate and drop buffer + */ + *binP = *bufP; + bufP->data = NULL; + return REALLOC_BIN(binP, size); + } + else { + BOOLEAN_T ret; + /* Keep buffer, copy content to new allocated binary + */ + ret = ALLOC_BIN(size, binP); + if (ret) + sys_memcpy(binP->data, bufP->data, size); + return ret; + } +} + static BOOLEAN_T recv_check_entry(ErlNifEnv *env, ESockDescriptor *descP, diff --git a/lib/kernel/src/socket.erl b/lib/kernel/src/socket.erl index ca1271d6dc59..4ad7128033ab 100644 --- a/lib/kernel/src/socket.erl +++ b/lib/kernel/src/socket.erl @@ -5427,20 +5427,38 @@ recv_deadline(SockRef, Length, Flags, Deadline, Buf) -> %% {select_read, Bin} -> %% All data, new recv operation in progress + %% The combination of select_read and recv with time-out + %% is contradictive since the return values has no place + %% for a continuation because neither a ref nor 'nowait' + %% was given, so we handle this as if there was no select_read + %% by cancelling the new recv operation + %% _ = cancel(SockRef, recv, Handle), {ok, condense_buffer(Bin, Buf)}; %% - Select %% select | {select, Bin} %% No data or incomplete - when Select =:= select; - tuple_size(Select) =:= 2, element(1, Select) =:= select -> - {Length_1, Buf_1} = - if - Select =:= select -> - {Length, Buf}; - true -> - Bin = element(2, Select), - {Length - byte_size(Bin), [Bin | Buf]} - end, + select -> + %% + %% There is nothing just now, but we will be notified + %% with a select message when there is something to recv + Timeout = timeout(Deadline), + receive + ?socket_msg(?socket(SockRef), select, Handle) -> + if + 0 < Timeout -> + %% Retry + recv_deadline( + SockRef, Length, Flags, Deadline, Buf); + true -> + recv_error(timeout, Buf) + end; + ?socket_msg(_Socket, abort, {Handle, Reason}) -> + recv_error(Reason, Buf) + after Timeout -> + _ = cancel(SockRef, recv, Handle), + recv_error(timeout, Buf) + end; + {select, Bin} -> + Buf_1 = [Bin | Buf], %% %% There is nothing just now, but we will be notified %% with a select message when there is something to recv @@ -5451,7 +5469,8 @@ recv_deadline(SockRef, Length, Flags, Deadline, Buf) -> 0 < Timeout -> %% Retry recv_deadline( - SockRef, Length_1, Flags, Deadline, Buf_1); + SockRef, Length - byte_size(Bin), + Flags, Deadline, Buf_1); true -> recv_error(timeout, Buf_1) end;