Skip to content

Commit e580a8e

Browse files
committed
Handle pending IO fastsync
1 parent bb1780a commit e580a8e

5 files changed

+21
-9
lines changed

src/SnapshotPayloadParseState.cpp

-5
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,6 @@ void SnapshotPayloadParseState::trimState() {
233233

234234
if (stackParse.empty()) {
235235
flushQueuedKeys();
236-
while (*insertsInFlight > 0) {
237-
// TODO: ProcessEventsWhileBlocked
238-
aeReleaseLock();
239-
aeAcquireLock();
240-
}
241236
}
242237
}
243238

src/SnapshotPayloadParseState.h

+1
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,5 @@ class SnapshotPayloadParseState {
6363
void pushValue(const char *rgch, long long cch);
6464
void pushValue(long long value);
6565
bool shouldThrottle() const { return *insertsInFlight > (cserver.cthreads*4); }
66+
bool hasIOInFlight() const { return *insertsInFlight > 0; }
6667
};

src/config.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -3201,7 +3201,7 @@ standardConfig configs[] = {
32013201
createIntConfig("overload-protect-tenacity", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->overload_protect_tenacity, 10, INTEGER_CONFIG, NULL, NULL),
32023202
createIntConfig("force-eviction-percent", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->force_eviction_percent, 0, INTEGER_CONFIG, NULL, NULL),
32033203
createBoolConfig("enable-async-rehash", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_rehash, 1, NULL, NULL),
3204-
createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 0, NULL, NULL),
3204+
createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 1, NULL, NULL),
32053205

32063206
#ifdef USE_OPENSSL
32073207
createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, updateTLSPort), /* TCP port. */

src/replication.cpp

+18-3
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ class replicationBuffer {
10591059
while (checkClientOutputBufferLimits(replica)
10601060
&& (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) {
10611061
ul.unlock();
1062-
usleep(0);
1062+
usleep(1000); // give 1ms for the I/O before we poll again
10631063
ul.lock();
10641064
}
10651065
}
@@ -2521,7 +2521,7 @@ size_t parseCount(const char *rgch, size_t cch, long long *pvalue) {
25212521
return cchNumeral + 3;
25222522
}
25232523

2524-
bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) {
2524+
bool readFastSyncBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) {
25252525
int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster;
25262526
serverAssert(GlobalLocksAcquired());
25272527
serverAssert(mi->master == nullptr);
@@ -2546,6 +2546,10 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
25462546
}
25472547
}
25482548

2549+
if (mi->repl_state == REPL_STATE_WAIT_STORAGE_IO) {
2550+
goto LWaitIO;
2551+
}
2552+
25492553
serverAssert(mi->parseState != nullptr);
25502554
for (int iter = 0; iter < 10; ++iter) {
25512555
if (mi->parseState->shouldThrottle())
@@ -2663,7 +2667,14 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
26632667
if (!fFinished)
26642668
return false;
26652669

2670+
LWaitIO:
2671+
if (mi->parseState->hasIOInFlight()) {
2672+
mi->repl_state = REPL_STATE_WAIT_STORAGE_IO;
2673+
return false;
2674+
}
2675+
26662676
serverLog(LL_NOTICE, "Fast sync complete");
2677+
serverAssert(!mi->parseState->hasIOInFlight());
26672678
delete mi->parseState;
26682679
mi->parseState = nullptr;
26692680
return true;
@@ -3040,7 +3051,7 @@ void readSyncBulkPayload(connection *conn) {
30403051
}
30413052

30423053
if (mi->isKeydbFastsync) {
3043-
if (!readSnapshotBulkPayload(conn, mi, rsi))
3054+
if (!readFastSyncBulkPayload(conn, mi, rsi))
30443055
return;
30453056
} else {
30463057
if (!readSyncBulkPayloadRdb(conn, mi, rsi, usemark))
@@ -4807,6 +4818,10 @@ void replicationCron(void) {
48074818
{
48084819
redisMaster *mi = (redisMaster*)listNodeValue(lnMaster);
48094820

4821+
if (mi->repl_state == REPL_STATE_WAIT_STORAGE_IO && !mi->parseState->hasIOInFlight()) {
4822+
readSyncBulkPayload(mi->repl_transfer_s);
4823+
}
4824+
48104825
std::unique_lock<decltype(mi->master->lock)> ulock;
48114826
if (mi->master != nullptr)
48124827
ulock = decltype(ulock)(mi->master->lock);

src/server.h

+1
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,7 @@ typedef enum {
612612
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
613613
/* --- End of handshake states --- */
614614
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
615+
REPL_STATE_WAIT_STORAGE_IO,
615616
REPL_STATE_CONNECTED, /* Connected to master */
616617
} repl_state;
617618

0 commit comments

Comments
 (0)