Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ cmake-build-debug/
cmake-build-release/
__pycache__
src/unit/.flags
.DS_Store
4 changes: 4 additions & 0 deletions cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/ae.c
${CMAKE_SOURCE_DIR}/src/anet.c
${CMAKE_SOURCE_DIR}/src/dict.c
${CMAKE_SOURCE_DIR}/src/reply_blocking.c
${CMAKE_SOURCE_DIR}/src/durable_task.c
${CMAKE_SOURCE_DIR}/src/durability_provider.c
${CMAKE_SOURCE_DIR}/src/uncommitted_keys.c
${CMAKE_SOURCE_DIR}/src/hashtable.c
${CMAKE_SOURCE_DIR}/src/kvstore.c
${CMAKE_SOURCE_DIR}/src/sds.c
Expand Down
4 changes: 4 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ ENGINE_SERVER_OBJ = \
rdma.o \
release.o \
replication.o \
reply_blocking.o \
durable_task.o \
durability_provider.o \
uncommitted_keys.o \
resp_parser.o \
rio.o \
script.o \
Expand Down
145 changes: 145 additions & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "bio.h"
#include "io_threads.h"
#include "rio.h"
#include "functions.h"
#include "module.h"
Expand All @@ -51,6 +52,23 @@ aofManifest *aofLoadManifestFromFile(sds am_filepath);
void aofManifestFreeAndUpdate(aofManifest *am);
void aof_background_fsync_and_close(int fd);

enum {
AOF_IO_FLUSH_IDLE = 0,
AOF_IO_FLUSH_PENDING,
AOF_IO_FLUSH_DONE,
AOF_IO_FLUSH_ERR,
};

typedef struct aofIOFlushJob {
int fd;
sds buf;
size_t len;
long long reploff;
} aofIOFlushJob;

static void processAofIOThreadFlushResult(void);
static int tryOffloadAofAlwaysFlushToIOThreads(void);

/* ----------------------------------------------------------------------------
* AOF Manifest file implementation.
*
Expand Down Expand Up @@ -952,6 +970,9 @@ void stopAppendOnly(void) {
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.fsynced_reploff = -1;
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_errno, 0, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_size, 0, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, 0, memory_order_relaxed);
killAppendOnlyChild();
sdsfree(server.aof_buf);
Expand Down Expand Up @@ -1002,6 +1023,9 @@ int startAppendOnly(void) {
serverLog(LL_WARNING, "AOF reopen, just ignore the last error.");
server.aof_last_write_status = C_OK;
}
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_errno, 0, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_size, 0, memory_order_relaxed);
return C_OK;
}

Expand Down Expand Up @@ -1156,6 +1180,113 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
return totwritten;
}

static void aofIOThreadFlushJobHandler(void *data) {
aofIOFlushJob *job = data;
int err = 0;
ssize_t nwritten = aofWrite(job->fd, job->buf, job->len);
if (nwritten != (ssize_t)job->len) {
err = (nwritten == -1) ? errno : ENOSPC;
goto done;
}

if (valkey_fsync(job->fd) == -1) {
err = errno;
goto done;
}

atomic_store_explicit(&server.fsynced_reploff_pending, job->reploff, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_size, job->len, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_DONE, memory_order_release);
sdsfree(job->buf);
zfree(job);
return;

done:
atomic_store_explicit(&server.aof_io_flush_errno, err, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_ERR, memory_order_release);
sdsfree(job->buf);
zfree(job);
}

int aofIOFlushInProgress(void) {
return atomic_load_explicit(&server.aof_io_flush_state, memory_order_acquire) == AOF_IO_FLUSH_PENDING;
}

static void processAofIOThreadFlushResult(void) {
int state = atomic_load_explicit(&server.aof_io_flush_state, memory_order_acquire);
if (state == AOF_IO_FLUSH_IDLE || state == AOF_IO_FLUSH_PENDING) return;

if (state == AOF_IO_FLUSH_DONE) {
off_t nwritten = atomic_load_explicit(&server.aof_io_flush_size, memory_order_relaxed);
server.aof_current_size += nwritten;
server.aof_last_incr_size += nwritten;
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_NOTICE, "AOF write error looks solved. The server can write again.");
server.aof_last_write_status = C_OK;
}
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_release);

/* Notify sync replication that AOF fsync completed so blocked clients can be unblocked */
notifyDurabilityProgress();
return;
}

int err = atomic_load_explicit(&server.aof_io_flush_errno, memory_order_relaxed);
server.aof_last_write_errno = err;
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_release);

if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
serverLog(LL_WARNING,
"Can't persist AOF from IO thread when the "
"AOF fsync policy is 'always': %s. Exiting...",
strerror(err));
exit(1);
}
server.aof_last_write_status = C_ERR;
}

static int tryOffloadAofAlwaysFlushToIOThreads(void) {
if (server.aof_fsync != AOF_FSYNC_ALWAYS || sdslen(server.aof_buf) == 0 || aofIOFlushInProgress()) {
return C_ERR;
}

/* If IO threads are configured but not active, we can't offload.
* Note: Thread activation based on AOF workload is handled by
* adjustIOThreadsByEventLoad() via the has_background_work parameter. */
if (server.io_threads_num <= 1 || server.active_io_threads_num <= 1) {
return C_ERR;
}

/* NOTE: With sync replication enabled, we still want to offload fsync to
* IO threads to avoid blocking the main thread. The notifyDurabilityProgress()
* callback will be invoked in beforeSleep() when we check for completed IO thread
* jobs, which will then unblock waiting clients. This adds at most one
* event loop iteration of latency but keeps the main thread responsive. */

aofIOFlushJob *job = zmalloc(sizeof(*job));
job->fd = server.aof_fd;
job->buf = server.aof_buf;
job->len = sdslen(job->buf);
job->reploff = server.primary_repl_offset;

server.aof_buf = sdsempty();
atomic_store_explicit(&server.aof_io_flush_errno, 0, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_size, 0, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_PENDING, memory_order_release);
if (trySendJobToIOThreads(aofIOThreadFlushJobHandler, job) == C_OK) {
server.aof_flush_postponed_start = 0;
return C_OK;
}

atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_release);
sdsfree(server.aof_buf);
server.aof_buf = job->buf;
zfree(job);
return C_ERR;
}

/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
Expand All @@ -1180,6 +1311,15 @@ void flushAppendOnlyFile(int force) {
int sync_in_progress = 0;
mstime_t latency;

processAofIOThreadFlushResult();
if (aofIOFlushInProgress()) {
if (!force) return;
while (aofIOFlushInProgress()) {
usleep(100);
processAofIOThreadFlushResult();
}
}

if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
Expand Down Expand Up @@ -1234,6 +1374,11 @@ void flushAppendOnlyFile(int force) {
"without waiting for fsync to complete, this may slow down the server.");
}
}

if (server.aof_fsync == AOF_FSYNC_ALWAYS && !force && tryOffloadAofAlwaysFlushToIOThreads() == C_OK) {
return;
}

/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
Expand Down
7 changes: 7 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2697,6 +2697,12 @@ int updateAppendFsync(const char **err) {
return 1;
}

static int updateDurabilityEnabled(const char **err) {
UNUSED(err);
durabilityReset();
return 1;
}

/* applyBind affects both TCP and TLS (if enabled) together */
static int applyBind(const char **err) {
connListener *tcp_listener = listenerByType(CONN_TYPE_SOCKET);
Expand Down Expand Up @@ -3293,6 +3299,7 @@ standardConfig static_configs[] = {
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("lua-enable-insecure-api", "lua-enable-deprecated-api", MODIFIABLE_CONFIG | HIDDEN_CONFIG | PROTECTED_CONFIG, server.lua_enable_insecure_api, 0, NULL, updateLuaEnableInsecureApi),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("durability", "sync-replication", MODIFIABLE_CONFIG, server.durability.enabled, 0, NULL, updateDurabilityEnabled),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
7 changes: 7 additions & 0 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,9 @@ long long dbTotalServerKeyCount(void) {
* a context of a client. */
void signalModifiedKey(client *c, serverDb *db, robj *key) {
touchWatchedKey(db, key);
if (durabilitySignalModifiedKey(c, db, key)) {
return;
}
trackingInvalidateKey(c, key, 1);
}

Expand All @@ -770,6 +773,10 @@ void signalFlushedDb(int dbid, int async) {
touchAllWatchedKeysInDb(server.db[j], NULL);
}

if (durabilitySignalFlushedDb(dbid)) {
return;
}

trackingInvalidateKeysOnFlush(async);

/* Changes in this method may take place in swapMainDbWithTempDb as well,
Expand Down
13 changes: 13 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "io_threads.h"
#include "sds.h"
#include "module.h"
#include "durability_provider.h"

#include <arpa/inet.h>
#include <signal.h>
Expand Down Expand Up @@ -1066,6 +1067,18 @@ void debugCommand(client *c) {
} else if (!strcasecmp(objectGetVal(c->argv[1]), "client-enforce-reply-list") && c->argc == 3) {
server.debug_client_enforce_reply_list = atoi(objectGetVal(c->argv[2]));
addReply(c, shared.ok);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "durability-provider-pause") && c->argc == 3) {
if (pauseDurabilityProvider(objectGetVal(c->argv[2]))) {
addReply(c, shared.ok);
} else {
addReplyError(c, "No such durability provider");
}
} else if (!strcasecmp(objectGetVal(c->argv[1]), "durability-provider-resume") && c->argc == 3) {
if (resumeDurabilityProvider(objectGetVal(c->argv[2]))) {
addReply(c, shared.ok);
} else {
addReplyError(c, "No such durability provider");
}
} else if (!handleDebugClusterCommand(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down
Loading
Loading