Skip to content

Commit e5fd710

Browse files
committed
Fix the long-blocking read for Valkey RDMA.
During the valkey benchmark, when we set the data size as 16KB, the benchmark will be blocking on GET. The reason behind is that RDMA event is edge triggered and every incoming data has to be read totally instead of read partially. We modify the valkeyBufferRead to enable a total read over the RDMA buffer when the conneciton type is RDMA. To realize this logic we need to keep read the buffer until the read result data size is smaller than the attempt read size. In addition, we need to deal with a corner case for the logic above. If the message happened to be 16KB and the first read finish all the available data, the second read will be triggered and block there until return an error. To solve this problem, we make read non-blocking except for the first read. Signed-off-by: Ruihong Wang <ruihong@google.com>
1 parent af2e009 commit e5fd710

File tree

2 files changed

+44
-8
lines changed

2 files changed

+44
-8
lines changed

src/rdma.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,12 @@ static ssize_t valkeyRdmaRead(valkeyContext *c, char *buf, size_t bufcap) {
511511
return VALKEY_ERR;
512512
}
513513

514-
end = vk_msec_now() + timed;
514+
if (timed > 0) {
515+
end = vk_msec_now() + timed;
516+
} else {
517+
// end = -1 marks that we want a non-blocking read.
518+
end = -1;
519+
}
515520

516521
pollcq:
517522
/* try to poll a CQ first */
@@ -531,6 +536,9 @@ static ssize_t valkeyRdmaRead(valkeyContext *c, char *buf, size_t bufcap) {
531536
}
532537

533538
return toread;
539+
} else if (ctx->recv_offset == ctx->rx_offset && end == -1) {
540+
// non-blocking read over an empty buffer shall directly return here.
541+
return 0;
534542
}
535543

536544
if (valkeyRdmaPollCqCm(c, end) == VALKEY_OK) {

src/valkey.c

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,13 +1009,41 @@ int valkeyBufferRead(valkeyContext *c) {
10091009
if (c->err)
10101010
return VALKEY_ERR;
10111011

1012-
nread = c->funcs->read(c, buf, sizeof(buf));
1013-
if (nread < 0) {
1014-
return VALKEY_ERR;
1015-
}
1016-
if (nread > 0 && valkeyReaderFeed(c->reader, buf, nread) != VALKEY_OK) {
1017-
valkeySetError(c, c->reader->err, c->reader->errstr);
1018-
return VALKEY_ERR;
1012+
if (c->connection_type != VALKEY_CONN_RDMA) {
1013+
nread = c->funcs->read(c, buf, sizeof(buf));
1014+
if (nread < 0) {
1015+
return VALKEY_ERR;
1016+
}
1017+
if (nread > 0 && valkeyReaderFeed(c->reader, buf, nread) != VALKEY_OK) {
1018+
valkeySetError(c, c->reader->err, c->reader->errstr);
1019+
return VALKEY_ERR;
1020+
}
1021+
} else {
1022+
/* For Valkey over RDMA, as the ready-to-read signal only exists per arrived RDMA write.
1023+
* This funciton has to copy all the data from communication buffer to client buffer.*/
1024+
struct timeval *recorded_timeout = c->command_timeout;
1025+
struct timeval zero_timeout;
1026+
while (1) {
1027+
nread = c->funcs->read(c, buf, sizeof(buf));
1028+
if (nread < 0) {
1029+
return VALKEY_ERR;
1030+
}
1031+
if (nread > 0 && valkeyReaderFeed(c->reader, buf, nread) != VALKEY_OK) {
1032+
valkeySetError(c, c->reader->err, c->reader->errstr);
1033+
return VALKEY_ERR;
1034+
}
1035+
if ((uint32_t)nread < sizeof(buf))
1036+
break;
1037+
/* Only the first read is required to be blocking, the rest of reads should be non-blocking
1038+
* and keep read until there is no data in the communicaiton buffer.*/
1039+
zero_timeout.tv_sec = 0;
1040+
zero_timeout.tv_usec = 0;
1041+
c->command_timeout = &zero_timeout;
1042+
}
1043+
/*Recover the timeout back*/
1044+
if (c->command_timeout != recorded_timeout) {
1045+
c->command_timeout = recorded_timeout;
1046+
}
10191047
}
10201048
return VALKEY_OK;
10211049
}

0 commit comments

Comments
 (0)