Skip to content

Commit 96eaf5a

Browse files
committed
use rdb to startAppendonly after primary-replica full sync
Signed-off-by: Ray Cao <zisong.cw@alibaba-inc.com>
1 parent da32443 commit 96eaf5a

File tree

3 files changed

+111
-3
lines changed

3 files changed

+111
-3
lines changed

src/aof.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,112 @@ int startAppendOnly(void) {
10051005
return C_OK;
10061006
}
10071007

1008+
/* try to Restart AOF after a successful primary-replica full SYNC with
1009+
* existing RDB file. */
1010+
int tryRestartAOFAfterSYNCWithRdb(void) {
1011+
serverAssert(server.aof_state == AOF_OFF);
1012+
1013+
int newfd = -1;
1014+
sds new_base_filename = NULL;
1015+
sds new_base_filepath = NULL;
1016+
sds new_incr_filename = NULL;
1017+
sds new_incr_filepath = NULL;
1018+
aofManifest *temp_am = NULL;
1019+
1020+
/* Make sure the AOF directory exists */
1021+
if (dirCreateIfMissing(server.aof_dirname) == -1) {
1022+
serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s", server.aof_dirname, strerror(errno));
1023+
goto cleanup;
1024+
}
1025+
1026+
serverAssert(server.aof_manifest != NULL);
1027+
1028+
/* Create a temporary copy of the manifest for modifications */
1029+
temp_am = aofManifestDup(server.aof_manifest);
1030+
1031+
/* Generate a new base AOF filename and mark the previous base file (if any) as history */
1032+
new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am, server.aof_use_rdb_preamble);
1033+
serverAssert(new_base_filename != NULL);
1034+
new_base_filepath = makePath(server.aof_dirname, new_base_filename);
1035+
1036+
/* Rename the RDB file to be the new base AOF file */
1037+
if (rename(server.rdb_filename, new_base_filepath) == -1) {
1038+
serverLog(LL_WARNING, "Error trying to rename the RDB file %s into %s: %s", server.rdb_filename,
1039+
new_base_filepath, strerror(errno));
1040+
goto cleanup;
1041+
}
1042+
sdsfree(new_base_filepath);
1043+
1044+
/* Create a new incr AOF file */
1045+
new_incr_filename = getNewIncrAofName(temp_am);
1046+
new_incr_filepath = makePath(server.aof_dirname, new_incr_filename);
1047+
newfd = open(new_incr_filepath, O_WRONLY | O_TRUNC | O_CREAT, 0644);
1048+
sdsfree(new_incr_filepath);
1049+
if (newfd == -1) {
1050+
serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_incr_filename, strerror(errno));
1051+
goto cleanup;
1052+
}
1053+
1054+
/* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR
1055+
* to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */
1056+
markRewrittenIncrAofAsHistory(temp_am);
1057+
1058+
/* Persist AOF Manifest. */
1059+
if (persistAofManifest(temp_am) == C_ERR) {
1060+
serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_incr_filename, strerror(errno));
1061+
goto cleanup;
1062+
}
1063+
1064+
/* Now, we can safely Update the server's manifest with our modified one */
1065+
aofManifestFreeAndUpdate(temp_am);
1066+
1067+
aofDelHistoryFiles();
1068+
1069+
/* Set the initial repl_offset, which will be applied to fsynced_reploff */
1070+
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset, memory_order_relaxed);
1071+
/* Update the fsynced replication offset that just now become valid. */
1072+
long long fsynced_reploff_pending =
1073+
atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed);
1074+
server.fsynced_reploff = fsynced_reploff_pending;
1075+
1076+
/* If AOF fsync error in bio job, we just ignore it and log the event. */
1077+
int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);
1078+
if (aof_bio_fsync_status == C_ERR) {
1079+
serverLog(LL_WARNING, "AOF reopen, just ignore the AOF fsync error in bio job");
1080+
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
1081+
}
1082+
1083+
/* If AOF was in error state, we just ignore it and log the event. */
1084+
if (server.aof_last_write_status == C_ERR) {
1085+
serverLog(LL_WARNING, "AOF reopen, just ignore the last error.");
1086+
server.aof_last_write_status = C_OK;
1087+
}
1088+
1089+
/* resume the bgrewrite has been done successfully */
1090+
server.aof_lastbgrewrite_status = C_OK;
1091+
server.stat_aofrw_consecutive_failures = 0;
1092+
1093+
server.aof_last_fsync = server.mstime;
1094+
server.aof_last_incr_size = 0;
1095+
server.aof_last_incr_fsync_offset = 0;
1096+
server.aof_rewrite_base_size = getAppendOnlyFileSize(new_base_filename, NULL);
1097+
server.aof_current_size = server.aof_rewrite_base_size;
1098+
1099+
server.aof_fd = newfd;
1100+
server.aof_state = AOF_ON;
1101+
1102+
return C_OK;
1103+
1104+
cleanup:
1105+
if (temp_am) aofManifestFree(temp_am);
1106+
if (new_base_filename) sdsfree(new_base_filename);
1107+
if (new_base_filepath) sdsfree(new_base_filepath);
1108+
if (new_incr_filename) sdsfree(new_incr_filename);
1109+
if (new_incr_filepath) sdsfree(new_incr_filepath);
1110+
if (newfd != -1) close(newfd);
1111+
return C_ERR;
1112+
}
1113+
10081114
/* This is a wrapper to the write syscall in order to retry on short writes
10091115
* or if the syscall gets interrupted. It could look strange that we retry
10101116
* on short writes given that we are writing to a block device: normally if

src/replication.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2474,7 +2474,7 @@ void readSyncBulkPayload(connection *conn) {
24742474
}
24752475

24762476
/* Cleanup. */
2477-
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
2477+
if (!(server.aof_enabled && server.aof_use_rdb_preamble) && server.rdb_del_sync_files && allPersistenceDisabled()) {
24782478
serverLog(LL_NOTICE, "Removing the RDB file obtained from "
24792479
"the primary. This replica has persistence "
24802480
"disabled");
@@ -2524,7 +2524,9 @@ void readSyncBulkPayload(connection *conn) {
25242524
/* Restart the AOF subsystem now that we finished the sync. This
25252525
* will trigger an AOF rewrite, and when done will start appending
25262526
* to the new file. */
2527-
if (server.aof_enabled) restartAOFAfterSYNC();
2527+
if (server.aof_enabled && server.aof_use_rdb_preamble) tryRestartAOFAfterSYNCWithRdb();
2528+
2529+
if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
25282530

25292531
/* In case of dual channel replication sync we want to close the RDB connection
25302532
* once the connection is established */
@@ -3872,7 +3874,6 @@ int connectWithPrimary(void) {
38723874
return C_ERR;
38733875
}
38743876

3875-
38763877
server.repl_transfer_lastio = server.unixtime;
38773878
server.repl_state = REPL_STATE_CONNECTING;
38783879
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync started");

src/server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2984,6 +2984,7 @@ int rewriteAppendOnlyFileBackground(void);
29842984
int loadAppendOnlyFiles(aofManifest *am);
29852985
void stopAppendOnly(void);
29862986
int startAppendOnly(void);
2987+
int tryRestartAOFAfterSYNCWithRdb(void);
29872988
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
29882989
void killAppendOnlyChild(void);
29892990
void restartAOFAfterSYNC(void);

0 commit comments

Comments
 (0)