Skip to content

Commit 98f2b96

Browse files
committed
use rdb to start appendonly after primary-replica full sync
Signed-off-by: Ray Cao <RayCao125@gmail.com>
1 parent da32443 commit 98f2b96

File tree

3 files changed

+103
-3
lines changed

3 files changed

+103
-3
lines changed

src/aof.c

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,104 @@ int startAppendOnly(void) {
10051005
return C_OK;
10061006
}
10071007

1008+
int tryRestartAppendOnlyWithRdb(void) {
1009+
serverAssert(server.aof_state == AOF_OFF);
1010+
1011+
int newfd = -1;
1012+
sds new_base_filename = NULL;
1013+
sds new_base_filepath = NULL;
1014+
sds new_incr_filename = NULL;
1015+
sds new_incr_filepath = NULL;
1016+
aofManifest *temp_am = NULL;
1017+
1018+
if (dirCreateIfMissing(server.aof_dirname) == -1) {
1019+
serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s", server.aof_dirname, strerror(errno));
1020+
goto cleanup;
1021+
}
1022+
1023+
serverAssert(server.aof_manifest != NULL);
1024+
1025+
/* Dup a temporary aof_manifest for subsequent modifications. */
1026+
temp_am = aofManifestDup(server.aof_manifest);
1027+
1028+
/* Get a new BASE file name and mark the previous (if we have)
1029+
* as the HISTORY type. */
1030+
new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am, server.aof_use_rdb_preamble);
1031+
serverAssert(new_base_filename != NULL);
1032+
new_base_filepath = makePath(server.aof_dirname, new_base_filename);
1033+
if (rename(server.rdb_filename, new_base_filepath) == -1) {
1034+
serverLog(LL_WARNING, "Error trying to rename the RDB file %s into %s: %s", server.rdb_filename,
1035+
new_base_filepath, strerror(errno));
1036+
goto cleanup;
1037+
}
1038+
sdsfree(new_base_filepath);
1039+
1040+
/* Get next new incr aof name. */
1041+
new_incr_filename = getNewIncrAofName(temp_am);
1042+
new_incr_filepath = makePath(server.aof_dirname, new_incr_filename);
1043+
newfd = open(new_incr_filepath, O_WRONLY | O_TRUNC | O_CREAT, 0644);
1044+
sdsfree(new_incr_filepath);
1045+
if (newfd == -1) {
1046+
serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_incr_filename, strerror(errno));
1047+
goto cleanup;
1048+
}
1049+
1050+
/* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR
1051+
* to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */
1052+
markRewrittenIncrAofAsHistory(temp_am);
1053+
1054+
/* Persist AOF Manifest. */
1055+
if (persistAofManifest(temp_am) == C_ERR) {
1056+
serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_incr_filename, strerror(errno));
1057+
goto cleanup;
1058+
}
1059+
/* We can safely let `server.aof_manifest` point to 'temp_am' and free the previous one. */
1060+
aofManifestFreeAndUpdate(temp_am);
1061+
1062+
aofDelHistoryFiles();
1063+
1064+
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset, memory_order_relaxed);
1065+
/* Update the fsynced replication offset that just now become valid. */
1066+
long long fsynced_reploff_pending =
1067+
atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed);
1068+
server.fsynced_reploff = fsynced_reploff_pending;
1069+
1070+
/* If AOF fsync error in bio job, we just ignore it and log the event. */
1071+
int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);
1072+
if (aof_bio_fsync_status == C_ERR) {
1073+
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
1074+
}
1075+
1076+
/* If AOF was in error state, we just ignore it and log the event. */
1077+
if (server.aof_last_write_status == C_ERR) {
1078+
server.aof_last_write_status = C_OK;
1079+
}
1080+
1081+
server.aof_last_fsync = server.mstime;
1082+
/* Reset the aof_last_incr_size. */
1083+
server.aof_last_incr_size = 0;
1084+
/* Reset the aof_last_incr_fsync_offset. */
1085+
server.aof_last_incr_fsync_offset = 0;
1086+
1087+
server.aof_fd = newfd;
1088+
server.aof_rewrite_base_size = getAppendOnlyFileSize(new_base_filename, NULL);
1089+
server.aof_current_size = server.aof_rewrite_base_size;
1090+
server.aof_lastbgrewrite_status = C_OK;
1091+
server.stat_aofrw_consecutive_failures = 0;
1092+
server.aof_state = AOF_ON;
1093+
1094+
return C_OK;
1095+
1096+
cleanup:
1097+
if (temp_am) aofManifestFree(temp_am);
1098+
if (new_base_filename) sdsfree(new_base_filename);
1099+
if (new_base_filepath) sdsfree(new_base_filepath);
1100+
if (new_incr_filename) sdsfree(new_incr_filename);
1101+
if (new_incr_filepath) sdsfree(new_incr_filepath);
1102+
if (newfd != -1) close(newfd);
1103+
return C_ERR;
1104+
}
1105+
10081106
/* This is a wrapper to the write syscall in order to retry on short writes
10091107
* or if the syscall gets interrupted. It could look strange that we retry
10101108
* on short writes given that we are writing to a block device: normally if

src/replication.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2474,7 +2474,7 @@ void readSyncBulkPayload(connection *conn) {
24742474
}
24752475

24762476
/* Cleanup. */
2477-
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
2477+
if (!(server.aof_enabled && server.aof_use_rdb_preamble) && server.rdb_del_sync_files && allPersistenceDisabled()) {
24782478
serverLog(LL_NOTICE, "Removing the RDB file obtained from "
24792479
"the primary. This replica has persistence "
24802480
"disabled");
@@ -2524,7 +2524,9 @@ void readSyncBulkPayload(connection *conn) {
25242524
/* Restart the AOF subsystem now that we finished the sync. This
25252525
* will trigger an AOF rewrite, and when done will start appending
25262526
* to the new file. */
2527-
if (server.aof_enabled) restartAOFAfterSYNC();
2527+
if (server.aof_enabled && server.aof_use_rdb_preamble) tryRetartAppendOnlyWithRdb();
2528+
2529+
if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
25282530

25292531
/* In case of dual channel replication sync we want to close the RDB connection
25302532
* once the connection is established */
@@ -3872,7 +3874,6 @@ int connectWithPrimary(void) {
38723874
return C_ERR;
38733875
}
38743876

3875-
38763877
server.repl_transfer_lastio = server.unixtime;
38773878
server.repl_state = REPL_STATE_CONNECTING;
38783879
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync started");

src/server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2984,6 +2984,7 @@ int rewriteAppendOnlyFileBackground(void);
29842984
int loadAppendOnlyFiles(aofManifest *am);
29852985
void stopAppendOnly(void);
29862986
int startAppendOnly(void);
2987+
int tryRestartAppendOnlyWithRdb(void);
29872988
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
29882989
void killAppendOnlyChild(void);
29892990
void restartAOFAfterSYNC(void);

0 commit comments

Comments
 (0)