Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,10 @@ void clusterCommand(client *c) {
addReplyBulkCString(c,ni);
sdsfree(ni);
}
} else if (!strcasecmp(c->argv[1]->ptr, "migration")) {
clusterMigrationCommand(c);
} else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) {
clusterSyncSlotsCommand(c);
} else if(!clusterCommandSpecial(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down Expand Up @@ -2127,3 +2131,84 @@ void resetClusterStats(void) {

clusterSlotStatResetAll();
}

/* This function is called at server startup in order to initialize cluster data
* structures that are shared between the different cluster implementations. */
void clusterCommonInit(void) {
server.cluster_slot_stats = malloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
resetClusterStats();
asmInit();
}
Comment on lines +2137 to +2141
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use zcalloc instead of deprecated malloc — this is causing the build failure.

The pipeline fails with -Werror=deprecated-declarations because malloc is deprecated in this codebase. Redis uses zmalloc/zcalloc for memory allocation. Using zcalloc also zero-initializes the memory, which is appropriate before resetClusterStats().

🔧 Proposed fix
 void clusterCommonInit(void) {
-    server.cluster_slot_stats = malloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
+    server.cluster_slot_stats = zcalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
     resetClusterStats();
     asmInit();
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void clusterCommonInit(void) {
server.cluster_slot_stats = malloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
resetClusterStats();
asmInit();
}
void clusterCommonInit(void) {
server.cluster_slot_stats = zcalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
resetClusterStats();
asmInit();
}
🧰 Tools
🪛 GitHub Actions: Codecov

[error] 2138-2138: malloc is deprecated. Function call triggers -Werror=deprecated-declarations; build failure during lcov step due to deprecated malloc usage in clusterCommonInit.

🪛 GitHub Actions: External Server Tests

[error] 2138-2138: ClusterCommonInit: malloc is deprecated (-Werror=deprecated-declarations). Use calloc or ensure proper usage to avoid deprecation warning treated as an error.

🤖 Prompt for AI Agents
In `@src/cluster.c` around lines 2137 - 2141, Replace the deprecated malloc call
in clusterCommonInit by allocating zeroed memory with zcalloc: allocate
server.cluster_slot_stats using zcalloc(CLUSTER_SLOTS, sizeof(clusterSlotStat))
instead of malloc(...), keeping the subsequent resetClusterStats() and asmInit()
calls unchanged; update any necessary includes/headers if zcalloc isn’t already
available in this compilation unit.


/* This function is called after the node startup in order to check if there
* are any slots that we have keys for, but are not assigned to us. If so,
* we delete the keys. */
void clusterDeleteKeysInUnownedSlots(void) {
if (clusterNodeIsSlave(getMyClusterNode())) return;

/* Check that all the slots we have keys for are assigned to us. Otherwise,
* delete the keys. */
for (int i = 0; i < CLUSTER_SLOTS; i++) {
/* Skip if: no keys in the slot, it's our slot, or we are importing it. */
if (!countKeysInSlot(i) ||
clusterIsMySlot(i) ||
getImportingSlotSource(i))
{
continue;
}

serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is "
"assigned to another node. "
"Deleting keys in the slot.", i);
/* With atomic slot migration, it is safe to drop keys from slots
* that are not owned. This will not result in data loss under the
* legacy slot migration approach either, since the importing state
* has already been persisted in node.conf. */
clusterDelKeysInSlot(i, 0);
}
}


/* This function is called after the node startup in order to verify that data
* loaded from disk is in agreement with the cluster configuration:
*
* 1) If we find keys about hash slots we have no responsibility for, the
* following happens:
* A) If no other node is in charge according to the current cluster
* configuration, we add these slots to our node.
* B) If according to our config other nodes are already in charge for
* this slots, we set the slots as IMPORTING from our point of view
* in order to justify we have those slots, and in order to make
* redis-cli aware of the issue, so that it can try to fix it.
* 2) If we find data in a DB different than DB0 we return C_ERR to
* signal the caller it should quit the server with an error message
* or take other actions.
*
* The function always returns C_OK even if it will try to correct
* the error described in "1". However if data is found in DB different
* from DB0, C_ERR is returned.
*
* The function also uses the logging facility in order to warn the user
* about desynchronizations between the data we have in memory and the
* cluster configuration. */
int verifyClusterConfigWithData(void) {
/* Return ASAP if a module disabled cluster redirections. In that case
* every master can store keys about every possible hash slot. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return C_OK;

/* If this node is a slave, don't perform the check at all as we
* completely depend on the replication stream. */
if (clusterNodeIsSlave(getMyClusterNode())) return C_OK;

/* Make sure we only have keys in DB0. */
for (int i = 1; i < server.dbnum; i++) {
if (kvstoreSize(server.db[i].keys)) return C_ERR;
}

/* Delete keys in unowned slots */
clusterDeleteKeysInUnownedSlots();
/* Take over slots that we have keys for, but are assigned to no one. */
clusterClaimUnassignedSlots();
return C_OK;
}
Comment on lines +2171 to +2214
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Function comment at lines 2179-2182 is inconsistent with implementation.

The comment states that if other nodes are in charge of slots, "we set the slots as IMPORTING from our point of view." However, the implementation at line 2210 calls clusterDeleteKeysInUnownedSlots(), which deletes keys in those slots rather than setting IMPORTING state.

The comment in clusterDeleteKeysInUnownedSlots() explains this is safe with atomic slot migration, but the parent function's documentation should be updated to reflect this behavior change.

📝 Suggested comment update
 /* This function is called after the node startup in order to verify that data
  * loaded from disk is in agreement with the cluster configuration:
  *
  * 1) If we find keys about hash slots we have no responsibility for, the
  *    following happens:
  *    A) If no other node is in charge according to the current cluster
  *       configuration, we add these slots to our node.
- *    B) If according to our config other nodes are already in charge for
- *       this slots, we set the slots as IMPORTING from our point of view
- *       in order to justify we have those slots, and in order to make
- *       redis-cli aware of the issue, so that it can try to fix it.
+ *    B) If according to our config other nodes are already in charge for
+ *       these slots, we delete the keys in those slots. With atomic slot
+ *       migration, it is safe to drop keys from slots that are not owned.
  * 2) If we find data in a DB different than DB0 we return C_ERR to
  *    signal the caller it should quit the server with an error message
  *    or take other actions.
🤖 Prompt for AI Agents
In `@src/cluster.c` around lines 2171 - 2214, The function comment for
verifyClusterConfigWithData is inaccurate about handling keys for slots owned by
other nodes: instead of marking such slots IMPORTING, the implementation calls
clusterDeleteKeysInUnownedSlots() which removes those keys (safe with atomic
slot migration), then calls clusterClaimUnassignedSlots() to take truly
unassigned slots; update the comment to reflect that behavior change by
describing deletion via clusterDeleteKeysInUnownedSlots() and claiming via
clusterClaimUnassignedSlots() rather than setting IMPORTING.

2 changes: 2 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ static inline unsigned int keyHashSlot(const char *key, int keylen) {
/* functions requiring mechanism specific implementations */
void clusterInit(void);
void clusterInitLast(void);
void clusterCommonInit(void);
void clusterCron(void);
void clusterBeforeSleep(void);
void clusterClaimUnassignedSlots(void);
int verifyClusterConfigWithData(void);

int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
Expand Down
114 changes: 27 additions & 87 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1036,10 +1036,8 @@ void clusterInit(void) {
clusterUpdateMyselfIp();
clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename();
resetClusterStats();

getRandomHexChars(server.cluster->internal_secret, CLUSTER_INTERNALSECRETLEN);
asmInit();
}

void clusterInitLast(void) {
Expand Down Expand Up @@ -4934,9 +4932,6 @@ void clusterCron(void) {

if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();

/* Atomic slot migration cron */
asmCron();
}

/* This function is called before the event handler returns to sleep for
Expand Down Expand Up @@ -4978,8 +4973,6 @@ void clusterBeforeSleep(void) {
/* Broadcast a PONG to all the nodes. */
if (flags & CLUSTER_TODO_BROADCAST_PONG)
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

asmBeforeSleep();
}

void clusterDoBeforeSleep(int flags) {
Expand Down Expand Up @@ -5252,82 +5245,6 @@ void clusterUpdateState(void) {
}
}

/* This function is called after the node startup in order to verify that data
* loaded from disk is in agreement with the cluster configuration:
*
* 1) If we find keys about hash slots we have no responsibility for, the
* following happens:
* A) If no other node is in charge according to the current cluster
* configuration, we add these slots to our node.
* B) If according to our config other nodes are already in charge for
* this slots, we set the slots as IMPORTING from our point of view
* in order to justify we have those slots, and in order to make
* redis-cli aware of the issue, so that it can try to fix it.
* 2) If we find data in a DB different than DB0 we return C_ERR to
* signal the caller it should quit the server with an error message
* or take other actions.
*
* The function always returns C_OK even if it will try to correct
* the error described in "1". However if data is found in DB different
* from DB0, C_ERR is returned.
*
* The function also uses the logging facility in order to warn the user
* about desynchronizations between the data we have in memory and the
* cluster configuration. */
int verifyClusterConfigWithData(void) {
int j;
int update_config = 0;

/* Return ASAP if a module disabled cluster redirections. In that case
* every master can store keys about every possible hash slot. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return C_OK;

/* If this node is a slave, don't perform the check at all as we
* completely depend on the replication stream. */
if (nodeIsSlave(myself)) return C_OK;

/* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) {
if (kvstoreSize(server.db[j].keys)) return C_ERR;
}

/* Check that all the slots we see populated memory have a corresponding
* entry in the cluster table. Otherwise fix the table. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
/* Check if we are assigned to this slot or if we are importing it.
* In both cases check the next slot as the configuration makes
* sense. */
if (server.cluster->slots[j] == myself ||
server.cluster->importing_slots_from[j] != NULL) continue;

/* If we are here data and cluster config don't agree, and we have
* slot 'j' populated even if we are not importing it, nor we are
* assigned to this slot. Fix this condition. */

update_config++;
/* Case A: slot is unassigned. Take responsibility for it. */
if (server.cluster->slots[j] == NULL) {
serverLog(LL_NOTICE, "I have keys for unassigned slot %d. "
"Taking responsibility for it.",j);
clusterAddSlot(myself,j);
} else {
serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is "
"assigned to another node. "
"Deleting keys in the slot.", j);

/* With atomic slot migration, it is safe to drop keys from slots
* that are not owned. This will not result in data loss under the
* legacy slot migration approach either, since the importing state
* has already been persisted in node.conf. */
clusterDelKeysInSlot(j, 0);
}
}
if (update_config) clusterSaveConfigOrDie(1);
return C_OK;
}

/* Remove all the shard channel related information not owned by the current shard. */
static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
if (!kvstoreSize(server.pubsubshard_channels)) return;
Expand All @@ -5339,6 +5256,33 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
}
}

/* This function is called after the node startup in order to check if there
* are any slots that we have keys for, but are assigned to no one. If so,
* we take ownership of them. */
void clusterClaimUnassignedSlots(void) {
if (nodeIsSlave(myself)) return;

int update_config = 0;
for (int i = 0; i < CLUSTER_SLOTS; i++) {
/* Skip if: no keys, already has an owner, or we are importing it. */
if (!countKeysInSlot(i) ||
server.cluster->slots[i] != NULL ||
server.cluster->importing_slots_from[i] != NULL)
{
continue;
}

/* If we are here data and cluster config don't agree, and we have
* slot 'i' populated even if we are not importing it, nor anyone else
* is assigned to it. Fix this condition by taking ownership. */
update_config++;
serverLog(LL_NOTICE, "I have keys for unassigned slot %d. "
"Taking responsibility for it.", i);
clusterAddSlot(myself, i);
}
if (update_config) clusterSaveConfigOrDie(1);
}
Comment on lines +5259 to +5284
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

rg -n "clusterClaimUnassignedSlots" -C 5

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 1730


🏁 Script executed:

# Get more context around the call site to see which function it's in
sed -n '2190,2220p' src/cluster.c | cat -n

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 1266


🏁 Script executed:

# Find where verifyClusterConfigWithData is called
rg -n "verifyClusterConfigWithData" -C 3

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 1356


🏁 Script executed:

# Get context around line 7768 to identify the function
sed -n '7700,7780p' src/server.c | cat -n

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 3748


🏁 Script executed:

# Search for when state is broadcast after config changes in cluster mode
rg -n "clusterBeforeSleep|replicateCommandPropagate" -A 2 -B 2 | head -50

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 1344


🏁 Script executed:

# Find clusterSaveConfigOrDie implementation
rg -n "clusterSaveConfigOrDie" -A 10 | head -40

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 2648


🏁 Script executed:

# Check if clusterSaveConfigOrDie sets any broadcast flags
rg -n "todo_before_sleep" -B 2 -A 2 | grep -E "(todo_before_sleep|clusterSaveConfig)" | head -20

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 919


🏁 Script executed:

# Check clusterAddSlot and surrounding broadcast logic
rg -n "clusterAddSlot" -B 3 -A 8 | head -60

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 3994


🏁 Script executed:

# Check what happens in the caller after verifyClusterConfigWithData
sed -n '7760,7790p' src/server.c | cat -n

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 1584


🏁 Script executed:

# Check clusterBeforeSleep implementation for todo_before_sleep handling
sed -n '4942,5000p' src/cluster_legacy.c | cat -n

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 2538


🏁 Script executed:

# Search for what sets CLUSTER_TODO_SAVE_CONFIG or related flags
rg -n "CLUSTER_TODO" -B 1 | grep -E "(CLUSTER_TODO_|clusterDoBeforeSleep|clusterSaveConfig)" | head -30

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 2641


🏁 Script executed:

# Check if clusterClaimUnassignedSlots sets any todo flags
sed -n '5259,5284p' src/cluster_legacy.c | cat -n

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 1346


🏁 Script executed:

# Compare with similar slot claim/add patterns - look at line 2419-2426 and 4268-4279
sed -n '2415,2430p' src/cluster_legacy.c | cat -n

Repository: qodo-benchmark/redis-combined-coderabbit

Length of output: 985


Add missing state and broadcast updates when claiming slots.

The function claims slots and saves config but lacks the state/broadcast updates that similar code performs. Compare with lines 2423–2425 and 4268–4279: after calling clusterAddSlot(), the code invokes clusterDoBeforeSleep() with CLUSTER_TODO_UPDATE_STATE and/or CLUSTER_TODO_BROADCAST_PONG flags. This ensures the cluster state is updated and peers are notified of the claim. Without these, peers remain unaware of the new slot assignments until the next periodic update.

🤖 Prompt for AI Agents
In `@src/cluster_legacy.c` around lines 5259 - 5284, The
clusterClaimUnassignedSlots function currently calls clusterAddSlot(myself, i)
and clusterSaveConfigOrDie but does not request the usual state/broadcast
updates; modify it so that after taking a slot you mark the need to update state
and notify peers by invoking clusterDoBeforeSleep() with
CLUSTER_TODO_UPDATE_STATE and CLUSTER_TODO_BROADCAST_PONG (or accumulate those
flags into update_config and call clusterDoBeforeSleep(update_flags) after the
loop), ensuring clusterAddSlot, clusterDoBeforeSleep, CLUSTER_TODO_UPDATE_STATE,
and CLUSTER_TODO_BROADCAST_PONG are used exactly as in the analogous code paths
so peers are informed of the new slot ownership before saving config.


/* -----------------------------------------------------------------------------
* SLAVE nodes handling
* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -6481,10 +6425,6 @@ int clusterCommandSpecial(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) {
/* CLUSTER LINKS */
addReplyClusterLinksDescription(c);
} else if (!strcasecmp(c->argv[1]->ptr, "migration")) {
clusterMigrationCommand(c);
} else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) {
clusterSyncSlotsCommand(c);
} else {
return 0;
}
Expand Down
13 changes: 10 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

/* Run the Redis Cluster cron. */
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
if (server.cluster_enabled) {
asmCron();
clusterCron();
}
}

/* Run the Sentinel timer if we are in sentinel mode. */
Expand Down Expand Up @@ -1835,7 +1838,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
* later in this function, must be done before blockedBeforeSleep. */
if (server.cluster_enabled) clusterBeforeSleep();
if (server.cluster_enabled) {
clusterBeforeSleep();
asmBeforeSleep();
}

/* Handle blocked clients.
* must be done before flushAppendOnlyFile, in case of appendfsync=always,
Expand Down Expand Up @@ -7730,7 +7736,8 @@ int main(int argc, char **argv) {
redisAsciiArt();
checkTcpBacklogSettings();
if (server.cluster_enabled) {
server.cluster_slot_stats = zmalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
/* clusterCommonInit() initializes slot-stats required by clusterInit() */
clusterCommonInit();
clusterInit();
}
if (!server.sentinel_mode) {
Expand Down
Loading