Conversation
Move the expensive AOF write+fsync off the main thread when IO threads are available. This prevents the main thread from blocking on disk I/O when appendfsync is set to 'always'. Add a generic trySendJobToIOThreads() API to io_threads with round-robin distribution, and an aof IO flush state machine (IDLE/PENDING/DONE/ERR) with atomic coordination between main and IO threads. The adjustIOThreadsByEventLoad() function gains a has_background_work parameter to ensure IO threads stay active when AOF fsync work is pending, even during low-traffic periods. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Introduce a provider registry that allows multiple durability backends (AOF fsync, replicas, etc.) to register and contribute to a consensus offset. The overall durability consensus is the MIN (AND) of all enabled providers' acknowledged offsets. Include the built-in AOF provider that tracks fsynced_reploff_pending when appendfsync=always, and transparently passes through when not. Add pause/resume support for providers (used via DEBUG commands) to enable deterministic testing by freezing a provider's acknowledged offset at a point-in-time snapshot. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add a task registry that defers side-effects (keyspace notifications, key invalidations, flush invalidations) until durability providers acknowledge the associated write offset. Each task type registers create/destroy/execute/onClientDestroy handlers. Tasks are created during command execution with a deferred offset, then moved to an official waiting list once the replication offset is known. When the consensus offset advances past a task's offset, the task is executed and freed. Key invalidation tasks track the originating client pointer and properly handle client disconnection before task execution. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Track which keys have been modified but not yet acknowledged by durability providers using a per-database hashtable. This enables rejecting reads of uncommitted keys to ensure clients only see durable data (zero-data-loss semantics). Each uncommitted key stores the replication offset at which it was last modified. Keys are purged when the durability consensus offset advances past their stored offset. Include incremental cleanup via serverCron that scans databases round-robin with a configurable time limit, plus immediate purging on read access (lazy cleanup). Also handle database-level modifications (FLUSHDB, FLUSHALL, SWAPDB) and function store dirty tracking for transactions. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add the core orchestration layer that blocks client responses in the client output buffer (COB) until durability providers confirm the write offset, then unblocks and flushes responses to clients. reply_blocking.c/h contains: - durabilityInit/Cleanup/Reset lifecycle management - beforeCommandTrackReplOffset/afterCommandTrackReplOffset for tracking which replication offsets each command produces - preCommandExec: rejects commands accessing uncommitted keys - postCommandExec: blocks client responses until providers acknowledge - notifyDurabilityProgress: called from beforeSleep to unblock clients whose offsets have been acknowledged - blockClientOnReplOffset/unblockResponsesWithAckOffset - Function store dirty tracking for FUNCTION LOAD/DELETE - INFO durability stats generation Integration points across the server: - server.c: init/cleanup in server lifecycle, pre/post command hooks in call() and processCommand(), notifyDurabilityProgress in beforeSleep, uncommitted keys cleanup in serverCron, per-DB init, INFO section - server.h: durable_t in server struct, clientDurabilityInfo in client, uncommitted_keys/dirty_repl_offset in serverDb, new client flag - config.c: 'durability' bool config with dynamic update callback - db.c: durabilitySignalModifiedKey/durabilitySignalFlushedDb hooks - networking.c: client durability init/reset, COB reply limiting - notify.c: defer keyspace notifications when durability is enabled - script.c/module.c: pre-script checks for uncommitted data access - replication.c: clear durability state on primary change - debug.c: durability-provider-pause/resume DEBUG subcommands - object.c: getIntFromObject utility Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add reply_blocking.c, durable_task.c, durability_provider.c, and uncommitted_keys.c to the build system (both Makefile and CMake). Also fix a clang compatibility issue in unit test CMakeLists.txt: -fno-var-tracking-assignments is GCC-only, so guard it with a compiler ID check. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add comprehensive gtest-based unit tests covering the reply blocking subsystem including: - Client output buffer blocking and unblocking mechanics - Offset tracking through command execution - Multi-command transaction (MULTI/EXEC) offset handling - Durability provider consensus calculations - Deferred task lifecycle (create, execute, cleanup) - Uncommitted key tracking and purging - Edge cases: client disconnection, provider pause/resume Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add Tcl-based integration tests (1,051 lines) covering end-to-end durability behavior including: - AOF-based response blocking with appendfsync=always - Provider pause/resume via DEBUG commands for deterministic testing - Uncommitted key rejection (reads return error for dirty keys) - MULTI/EXEC transaction durability semantics - Lua script and FCALL durability checks - Function store (FUNCTION LOAD/DELETE) durability blocking - Client disconnection during blocked state - Multiple concurrent clients with interleaved blocking/unblocking - INFO durability stats verification Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Do you think we need a separate config for this? If you set up fsync always, can we imply that |
Should we factor in available memory before executing the command to avoid the over-buffering which may introduce OOM risk? |
Yeah, I went back and forth on this. I had the flag left over from the initial draft and figured it might be useful to not enable this since I wasn't sure whether we'd do a major version or minor with this change. I can remove |
Good point, we should have a mechanism for this. Let me think through the options -- a proactive one might be harder (as we need to estimate the output before execution) but we can probably track the ammount of pending responses (or pending writes to the durability providers) and start throttling (rejecting) writes after a certain threshold? |
|
Regarding the |
Yeah, proactive would be challenging but a reactive approach might be good enough. We could track the total consumed output buffer and initiate throttling once a predefined threshold is reached. Valkey’s existing In addition, pointing out that this client suspension should be conditioned on the ability to zero-copy responses (e.g., the requested key is not robj based). |
Yeah, makes sense to me. I will remove it in the next commit, along with other feedback! |
| (c->bufpos == 0 && n->disallowed_reply_block != NULL && listFirst(c->reply) == n->disallowed_reply_block)) { | ||
| // Both positions are pointing both at the initial 16KB buffer or the | ||
| // first reply block, compare the sentlen with the last allowed byte offset | ||
| return c->io_last_written.data_len < n->disallowed_byte_offset; |
There was a problem hiding this comment.
This is conditional is not compatible with copy avoidance. c->io_last_written.data_len represents total decoded data written to socket whereas n->disallowed_byte_offset could represent encoded buffer offset. This could lead to releasing the response before the ack. We should replace with c->io_last_written.bufpos instead
| task->argv[i] = va_arg(ap, void *); | ||
| } | ||
|
|
||
| // Increase reference count to avoid the key from being deleted |
There was a problem hiding this comment.
keyspace notifification called via modules makes task->argv[1] (event string) dangling pointer because of the fact that the module will manage the lifecycle of the event string.
We should copy the string instead of just storing the ptr.
There was a problem hiding this comment.
we can do something like
char *event = (char *)task->argv[1]; if (event) { task->argv[1] = strdup((char *)task->argv[1]); }
| lastblock = listLast(c->reply); | ||
| bufpos = c->bufpos; | ||
| } else { | ||
| lastblock = c->io_last_reply_block; |
There was a problem hiding this comment.
One thing is not clear to me. How does the write handler _writeToClient know till what point in the COB can be released to the client ? Shouldn't we override the bufpos & lastblock from the list of blocked_response we are tracking ?
|
Just regarding the title and description of the PR. This is not really changing the AOF durability. It just makes it possible to do fsync in the background so it gets faster. I think we can name the PR something like Write-behind log for async AOF durability. The main benefit for the future is that it introduces the WBL into the code base. Also it isn't doing anything to sync replication, other than preparing for introducing it in the future, so I don't think it should be highlighted that much. |
Ack! yeah, my brain is focused on the long term but you are correct. I will address some of the other feedback and change the description. |
murphyjacob4
left a comment
There was a problem hiding this comment.
Just some first pass comments, not a complete review
| * effectively making this provider a transparent pass-through that doesn't | ||
| * block consensus. When appendfsync is switched to "always", the provider | ||
| * immediately starts returning the actual fsynced offset. */ | ||
| if (server.aof_fsync != AOF_FSYNC_ALWAYS) { |
There was a problem hiding this comment.
Does it just make more sense to not register it unless AOF_FSYNC_ALWAYS is set? Then we don't have to deal with a durability provider returning a non-monotonic acked offset
| // Ensuring required_embedded_size < current_embedded_allocation_size * 3 / 4, which creates a new entry | ||
| size_t current_embedded_allocation_size = entryMemUsage(e9); | ||
| sds value10 = sdsnew("xxxxxxxxxxxxxxxxxxxxx"); | ||
| sds value10 = sdsnew("xxxxxx"); |
There was a problem hiding this comment.
Is this change intentional?
| @@ -0,0 +1,344 @@ | |||
| /* Do not modify this file, it's automatically generated from utils/generate-unit-test-header.py */ | |||
There was a problem hiding this comment.
Is generate-unit-test-header.py a thing? I don't see it in our repo
| (void *)(long)dbid); | ||
| } | ||
|
|
||
| // At this point (ZDL branch), we have notified modules, or queued a task. For clients, |
| /** | ||
| * Checks if we should reject a command that is accessing uncommitted data. | ||
| */ | ||
| bool shouldRejectCommandWithUncommittedData(client *c) { |
There was a problem hiding this comment.
Can you help me understand the intent here? It seems like on replicas, we will reject all reads for uncommitted data? Why wouldn't we follow the same process as on a primary (block until durable, then respond)?
Maybe you are deferring this work to a followup?
| * Below are the data structures used to buffer intermediate dirty keys/DBs for multi-command | ||
| * blocks including MULTI/EXEC and Lua script. As we execute the individual commands in the | ||
| * transaction, we don't know the final replication offset so we store the updated keys and DBs | ||
| * in afterCommandTrackReplOffset(), and process them in postCommandExec() after the entire transaction is | ||
| * propagated to the replication buffer. |
There was a problem hiding this comment.
The way I think about it, we have two concerns here:
- Is a key dirty or not? As the script or multi/exec runs, it is dirtying data. I feel like this is easiest to understand if it is at the moment it is dirtied rather than being deferred
- When can we mark a key as not dirty? This depends on when the underlying command would be considered committed
This file feels like it should just care about 1 - it is essentially an uncommitted key tracker - IMO not related at all to knowing when they may become committed.
For 2 ("when are keys committed") - it feels like a concern of the system that is dependent on the uncommitted key tracker. In this case, the durability system.
If you model it this way, then as we execute a script, it is marking keys as dirty at the moment they are dirtied in the uncommitted keys tracker. Once the script completes, it goes into the durability system and says "automatically unmark these keys as dirty once we have made the current offset durable".
With this setup - you wouldn't need this pending tracker, it is just updating the two systems at the respective points of time (i.e, when the key is dirtied, and when the transaction completes). It also opens up the design to later having other non-durability systems mark keys as uncommitted for their own purposes (not really a goal, but a nice side-effect).
I also feel like 2 is best modeled as a queue, then when we get a new committed offset, we just pop the head of the queue until the head is > the committed offset.
So something like this:
- Parse
MULTI/SET X Y/SET A B/EXEC - Execute
2.a. SET X Y -> mark X as dirty ->uncommitted_keys = {X}
2.b. SET A B -> mark A as dirty ->uncommitted_keys = {X, A} - After execution -> add X and A to the tail of the offset tracker with the current offset ->
offset_tracker = head -> {key=X, off=1000} -> {key=A, off=1000} -> tail - Read command comes in for X -> check
uncommitted_keys-> block - Offset 1000 durable -> pop X and A from the
offset_tracker->uncommitted_keys = {}-> unblock read command
WDYT? I guess it is less space efficient, but these are just pointers not copies of the key contents, so should be cheap?
AOF-based Durability (Sync Replication)
Summary
This PR adds an AOF-based durability mode where Valkey blocks client responses in the output buffer until the underlying durability provider (AOF fsync) acknowledges the write. It is milestone one in the durability plan (here)
When
durability yesandappendfsync alwaysare both enabled (looking for feedback on these configurations, and whether we want two, or not), a client writingSET foo barwon't receive+OKuntil the data is fsynced to disk — giving zero-data-loss guarantees without requiring application-level WAITAOF.The design is "provider-pluggable", as the same building block of reply tracking/blocking will be used to implement sync-replication w.r.t replicas. The durability code accepts multiple providers (AOF, replicas, etc.) and computes consensus as the MIN of all enabled providers' acknowledged offsets (AND semantics). This PR ships only the built-in AOF provider; replica-based providers will follow in other milestones.
Design decisions
appendfsync always, the write+fsync is offloaded to IO threads when available.How to Review
I've split the code into commits that can be more or less reviewed alone. Following the order is probably best. Reviewers are also welcome to review the whole thing at once if that is preferred.
aof: offload appendfsync=always flush+fsync to IO threadsaof.c, the generictrySendJobToIOThreads()inio_threads.c, and thehas_background_workparameter.durability: add pluggable durability provider interfacedurability_provider.hfor the interface, the consensus calculation (MIN/AND), and the built-in AOF provider.durability: add deferred task system for post-ack executiondurability: add uncommitted key tracking per databasedurability: add reply blocking and wire into server subsystemsreply_blocking.c, (b) the pre/post command hooks inserver.c, (c) integration points indb.c,networking.c,notify.c,script.c,module.c.build: add durability source files to Makefile and CMaketests: unit tests for reply blockingtests: integration tests for durabilityConfiguration
New INFO section
INFO durabilityreports blocking/unblocking stats, per-type counters (read/write/other), cumulative block times, and uncommitted key counts.New DEBUG commands
DEBUG durability-provider-pause <name>— Freeze a provider's offset (for testing)DEBUG durability-provider-resume <name>— Resume a frozen provider