Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/valkey/read.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ typedef struct valkeyReader {
LIBVALKEY_API valkeyReader *valkeyReaderCreateWithFunctions(valkeyReplyObjectFunctions *fn);
LIBVALKEY_API void valkeyReaderFree(valkeyReader *r);
LIBVALKEY_API int valkeyReaderFeed(valkeyReader *r, const char *buf, size_t len);
LIBVALKEY_API int valkeyReaderGetReadBuf(valkeyReader *r, char **buf, size_t *cap, size_t minbytes);
LIBVALKEY_API void valkeyReaderCommitRead(valkeyReader *r, size_t nread);
LIBVALKEY_API int valkeyReaderGetReply(valkeyReader *r, void **reply);

#define valkeyReaderSetPrivdata(_r, _p) (int)(((valkeyReader *)(_r))->privdata = (_p))
Expand Down
86 changes: 61 additions & 25 deletions src/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,38 +747,85 @@ void valkeyReaderFree(valkeyReader *r) {
}

int valkeyReaderFeed(valkeyReader *r, const char *buf, size_t len) {
sds newbuf;

/* Return early when this reader is in an erroneous state. */
if (r->err)
return VALKEY_ERR;

/* Copy the provided buffer. */
if (buf != NULL && len >= 1) {
/* Destroy internal buffer when it is empty and is quite large. */
if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
sdsfree(r->buf);
r->buf = sdsempty();
if (r->buf == 0)
goto oom;

r->pos = 0;
char *dest;
size_t cap;
if (valkeyReaderGetReadBuf(r, &dest, &cap, len) != VALKEY_OK)
return VALKEY_ERR;
memcpy(dest, buf, len);
valkeyReaderCommitRead(r, len);
}

return VALKEY_OK;
}

/* Prepare the reader's internal buffer for a direct read. This compacts
* consumed data, ensures at least 'minbytes' of writable space, and returns
* a pointer and available capacity. The caller can then read() directly into
* *buf and call valkeyReaderCommitRead() with the number of bytes read.
* Returns VALKEY_OK on success, VALKEY_ERR on allocation failure. */
int valkeyReaderGetReadBuf(valkeyReader *r, char **buf, size_t *cap, size_t minbytes) {
if (r->err)
return VALKEY_ERR;

if (minbytes > SSIZE_MAX) {
valkeyReaderSetError(r, VALKEY_ERR_PROTOCOL,
"Requested buffer size too large");
return VALKEY_ERR;
}

/* Destroy internal buffer when it is empty and is quite large. */
if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
sdsfree(r->buf);
r->buf = sdsempty();
if (r->buf == NULL)
goto oom;
r->pos = 0;
}

/* Compact consumed data. */
if (r->pos > 0) {
if (sdslen(r->buf) > SSIZE_MAX) {
valkeyReaderSetError(r, VALKEY_ERR_PROTOCOL,
"Reader buffer is too large");
return VALKEY_ERR;
}
sdsrange(r->buf, r->pos, -1);
r->pos = 0;
r->len = sdslen(r->buf);
}

newbuf = sdscatlen(r->buf, buf, len);
/* Ensure enough writable space. */
if (sdsavail(r->buf) < minbytes) {
sds newbuf = sdsMakeRoomFor(r->buf, minbytes);
if (newbuf == NULL)
goto oom;

r->buf = newbuf;
r->len = sdslen(r->buf);
}

*buf = r->buf + sdslen(r->buf);
*cap = sdsavail(r->buf);
if (*cap > SSIZE_MAX)
*cap = SSIZE_MAX;
return VALKEY_OK;

oom:
valkeyReaderSetErrorOOM(r);
return VALKEY_ERR;
}

/* Commit bytes that were written directly into the buffer obtained from
* valkeyReaderGetReadBuf(). */
void valkeyReaderCommitRead(valkeyReader *r, size_t nread) {
assert(nread <= SSIZE_MAX);
sdsIncrLen(r->buf, (ssize_t)nread);
r->len = sdslen(r->buf);
}

int valkeyReaderGetReply(valkeyReader *r, void **reply) {
/* Default target pointer to NULL. */
if (reply != NULL)
Expand Down Expand Up @@ -812,17 +859,6 @@ int valkeyReaderGetReply(valkeyReader *r, void **reply) {
if (r->err)
return VALKEY_ERR;

/* Discard part of the buffer when we've consumed at least 1k, to avoid
* doing unnecessary calls to memmove() in sds.c. */
if (r->pos >= 1024) {
/* No length check in Valkeys sdsrange() */
if (sdslen(r->buf) > SSIZE_MAX)
return VALKEY_ERR;
sdsrange(r->buf, r->pos, -1);
r->pos = 0;
r->len = sdslen(r->buf);
}

/* Emit a reply when there is one. */
if (r->ridx == -1) {
if (reply != NULL) {
Expand Down
2 changes: 1 addition & 1 deletion src/sds.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ void *sdsAllocPtr(sds s) {
* ... check for nread <= 0 and handle it ...
* sdsIncrLen(s, nread);
*/
void sdsIncrLen(sds s, int incr) {
void sdsIncrLen(sds s, ssize_t incr) {
unsigned char flags = s[-1];
size_t len;
switch (flags & SDS_TYPE_MASK) {
Expand Down
2 changes: 1 addition & 1 deletion src/sds.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ sds sdsjoinsds(sds *argv, int argc, const char *sep, size_t seplen);

/* Low level functions exposed to the user API */
sds sdsMakeRoomFor(sds s, size_t addlen);
void sdsIncrLen(sds s, int incr);
void sdsIncrLen(sds s, ssize_t incr);
sds sdsRemoveFreeSpace(sds s);
size_t sdsAllocSize(sds s);
void *sdsAllocPtr(sds s);
Expand Down
18 changes: 12 additions & 6 deletions src/valkey.c
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,7 @@ valkeyPushFn *valkeySetPushCallback(valkeyContext *c, valkeyPushFn *fn) {
* After this function is called, you may use valkeyGetReplyFromReader to
* see if there is a reply available. */
int valkeyBufferRead(valkeyContext *c) {
char buf[1024 * 16];
int nread;
ssize_t nread;

/* Return early when the context has seen an error. */
if (c->err)
Expand All @@ -1021,14 +1020,21 @@ int valkeyBufferRead(valkeyContext *c) {
}
return c->funcs->read_zc_done(c);
}
nread = c->funcs->read(c, buf, sizeof(buf));
if (nread < 0) {

/* Read directly into the reader's buffer to avoid a memcpy. */
char *buf;
size_t cap;
if (valkeyReaderGetReadBuf(c->reader, &buf, &cap, 1024 * 16) != VALKEY_OK) {
valkeySetError(c, c->reader->err, c->reader->errstr);
return VALKEY_ERR;
}
if (nread > 0 && valkeyReaderFeed(c->reader, buf, nread) != VALKEY_OK) {
valkeySetError(c, c->reader->err, c->reader->errstr);
nread = c->funcs->read(c, buf, cap);
if (nread < 0) {
return VALKEY_ERR;
}
if (nread > 0) {
valkeyReaderCommitRead(c->reader, nread);
}
return VALKEY_OK;
}

Expand Down
Loading
Loading