Skip to content

Commit 9f7cfc7

Browse files
authored
Refactor clusterMsg type casting for improve type safety (valkey-io#2986)
Introduced `clusterMsgHeader` and `toClusterMsg()` `toClusterMsgLight()` helpers to unify clusterMsg parsing and make casting explicit. switched packet read/validation/processing paths to use the common 16-byte header first, then cast to clusterMsg/clusterMsgLight only after determining message type. this is a refactor to improve readability and avoid accidental miscasts between full vs light headers. issue : valkey-io#2837 --------- Signed-off-by: Su Ko <rhtn1128@gmail.com>
1 parent 96c6cb8 commit 9f7cfc7

File tree

2 files changed

+82
-51
lines changed

2 files changed

+82
-51
lines changed

src/cluster_legacy.c

Lines changed: 66 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,18 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
137137
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);
138138
void clusterCommandFlushslot(client *c);
139139

140+
static inline clusterMsg *toClusterMsg(void *buf) {
141+
clusterMsgHeader *hdr = (clusterMsgHeader *)buf;
142+
serverAssert(!IS_LIGHT_MESSAGE(ntohs(hdr->type)));
143+
return (clusterMsg *)buf;
144+
}
145+
146+
static inline clusterMsgLight *toClusterMsgLight(void *buf) {
147+
clusterMsgHeader *hdr = (clusterMsgHeader *)buf;
148+
serverAssert(IS_LIGHT_MESSAGE(ntohs(hdr->type)));
149+
return (clusterMsgLight *)buf;
150+
}
151+
140152
/* Only primaries that own slots have voting rights.
141153
* Returns 1 if the node has voting rights, otherwise returns 0. */
142154
int clusterNodeIsVotingPrimary(clusterNode *n) {
@@ -3544,9 +3556,8 @@ static inline int messageTypeSupportsLightHdr(uint16_t type) {
35443556
return 0;
35453557
}
35463558

3547-
35483559
int clusterIsValidPacket(clusterLink *link) {
3549-
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
3560+
clusterMsgHeader *hdr = (clusterMsgHeader *)link->rcvbuf;
35503561
uint32_t totlen = ntohl(hdr->totlen);
35513562
int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type));
35523563
uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK;
@@ -3581,16 +3592,17 @@ int clusterIsValidPacket(clusterLink *link) {
35813592
uint32_t explen; /* expected length of this packet */
35823593

35833594
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
3584-
uint16_t extensions = ntohs(hdr->extensions);
3585-
uint16_t count = ntohs(hdr->count);
3595+
clusterMsg *msg = toClusterMsg(link->rcvbuf);
3596+
uint16_t extensions = ntohs(msg->extensions);
3597+
uint16_t count = ntohs(msg->count);
35863598

35873599
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
35883600
explen += (sizeof(clusterMsgDataGossip) * count);
35893601

35903602
/* If there is extension data, which doesn't have a fixed length,
35913603
* loop through them and validate the length of it now. */
3592-
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
3593-
clusterMsgPingExt *ext = getInitialPingExt(hdr, count);
3604+
if (msg->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
3605+
clusterMsgPingExt *ext = getInitialPingExt(msg, count);
35943606
while (extensions--) {
35953607
uint32_t extlen = getPingExtLength(ext);
35963608
if (extlen % 8 != 0) {
@@ -3615,11 +3627,12 @@ int clusterIsValidPacket(clusterLink *link) {
36153627
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
36163628
clusterMsgDataPublish *publish_data;
36173629
if (is_light) {
3618-
clusterMsgLight *hdr_light = (clusterMsgLight *)link->rcvbuf;
3619-
publish_data = &hdr_light->data.publish.msg;
3630+
clusterMsgLight *msg_light = toClusterMsgLight(link->rcvbuf);
3631+
publish_data = &msg_light->data.publish.msg;
36203632
explen = sizeof(clusterMsgLight);
36213633
} else {
3622-
publish_data = &hdr->data.publish.msg;
3634+
clusterMsg *msg = toClusterMsg(link->rcvbuf);
3635+
publish_data = &msg->data.publish.msg;
36233636
explen = sizeof(clusterMsg);
36243637
}
36253638
explen -= sizeof(union clusterMsgData);
@@ -3633,12 +3646,13 @@ int clusterIsValidPacket(clusterLink *link) {
36333646
explen += sizeof(clusterMsgDataUpdate);
36343647
} else if (type == CLUSTERMSG_TYPE_MODULE) {
36353648
if (is_light) {
3636-
clusterMsgLight *hdr_light = (clusterMsgLight *)link->rcvbuf;
3649+
clusterMsgLight *msg_light = toClusterMsgLight(link->rcvbuf);
36373650
explen = sizeof(clusterMsgLight) - sizeof(union clusterMsgData);
3638-
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr_light->data.module.msg.len);
3651+
explen += sizeof(clusterMsgModule) - 3 + ntohl(msg_light->data.module.msg.len);
36393652
} else {
3653+
clusterMsg *msg = toClusterMsg(link->rcvbuf);
36403654
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
3641-
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr->data.module.msg.len);
3655+
explen += sizeof(clusterMsgModule) - 3 + ntohl(msg->data.module.msg.len);
36423656
}
36433657
} else {
36443658
/* We don't know this type of packet, so we assume it's well formed. */
@@ -3676,7 +3690,7 @@ static inline int clusterExtractSlotFromWord(uint64_t *slot_word, size_t slot_wo
36763690
int clusterProcessPacket(clusterLink *link) {
36773691
/* Validate that the packet is well-formed */
36783692
if (!clusterIsValidPacket(link)) {
3679-
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
3693+
clusterMsgHeader *hdr = (clusterMsgHeader *)link->rcvbuf;
36803694
uint16_t type = ntohs(hdr->type);
36813695
if (server.debug_cluster_close_link_on_packet_drop &&
36823696
(type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2)) {
@@ -3687,7 +3701,7 @@ int clusterProcessPacket(clusterLink *link) {
36873701
return 1;
36883702
}
36893703

3690-
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
3704+
clusterMsgHeader *hdr = (clusterMsgHeader *)link->rcvbuf;
36913705
mstime_t now = mstime();
36923706
int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type));
36933707
uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK;
@@ -3709,16 +3723,17 @@ int clusterProcessPacket(clusterLink *link) {
37093723
return 1;
37103724
}
37113725

3712-
uint16_t flags = ntohs(hdr->flags);
3726+
clusterMsg *msg = toClusterMsg(link->rcvbuf);
3727+
uint16_t flags = ntohs(msg->flags);
37133728
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
3714-
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
3715-
int sender_claims_to_be_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN);
3729+
clusterNode *sender = getNodeFromLinkAndMsg(link, msg);
3730+
int sender_claims_to_be_primary = !memcmp(msg->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN);
37163731
int sender_last_reported_as_replica = sender && nodeIsReplica(sender);
37173732
int sender_last_reported_as_primary = sender && nodeIsPrimary(sender);
37183733

37193734
/* We store this information at the link layer so that we can send extensions
37203735
* during the handshake even if we don't know the sender. */
3721-
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
3736+
if (msg->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
37223737
link->flags |= CLUSTER_LINK_EXTENSIONS_SUPPORTED;
37233738
}
37243739

@@ -3759,8 +3774,8 @@ int clusterProcessPacket(clusterLink *link) {
37593774

37603775
if (sender && !nodeInHandshake(sender)) {
37613776
/* Update our currentEpoch if we see a newer epoch in the cluster. */
3762-
sender_claimed_current_epoch = ntohu64(hdr->currentEpoch);
3763-
sender_claimed_config_epoch = ntohu64(hdr->configEpoch);
3777+
sender_claimed_current_epoch = ntohu64(msg->currentEpoch);
3778+
sender_claimed_config_epoch = ntohu64(msg->configEpoch);
37643779
if (sender_claimed_current_epoch > server.cluster->currentEpoch)
37653780
server.cluster->currentEpoch = sender_claimed_current_epoch;
37663781
/* Update the sender configEpoch if it is a primary publishing a newer one. */
@@ -3788,11 +3803,11 @@ int clusterProcessPacket(clusterLink *link) {
37883803
}
37893804
}
37903805
/* Update the replication offset info for this node. */
3791-
sender->repl_offset = ntohu64(hdr->offset);
3806+
sender->repl_offset = ntohu64(msg->offset);
37923807
/* If we are a replica performing a manual failover and our primary
37933808
* sent its offset while already paused, populate the MF state. */
37943809
if (server.cluster->mf_end && nodeIsReplica(myself) && myself->replicaof == sender &&
3795-
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_primary_offset == -1) {
3810+
msg->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_primary_offset == -1) {
37963811
server.cluster->mf_primary_offset = sender->repl_offset;
37973812
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
37983813
serverLog(LL_NOTICE,
@@ -3829,7 +3844,7 @@ int clusterProcessPacket(clusterLink *link) {
38293844
if (!sender) {
38303845
if (!link->node) {
38313846
char ip[NET_IP_STR_LEN] = {0};
3832-
if (nodeIp2String(ip, link, hdr->myip) != C_OK) {
3847+
if (nodeIp2String(ip, link, msg->myip) != C_OK) {
38333848
/* Unable to retrieve the node's IP address from the connection. Without a
38343849
* valid IP, the node becomes unusable in the cluster. This failure might be
38353850
* due to the connection being closed. */
@@ -3849,8 +3864,8 @@ int clusterProcessPacket(clusterLink *link) {
38493864
* in the future packet. */
38503865
clusterNode *node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
38513866
memcpy(node->ip, ip, sizeof(ip));
3852-
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
3853-
node->cport = ntohs(hdr->cport);
3867+
getClientPortFromClusterMsg(msg, &node->tls_port, &node->tcp_port);
3868+
node->cport = ntohs(msg->cport);
38543869
if (linkSupportsExtension(link)) {
38553870
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
38563871
}
@@ -3872,7 +3887,7 @@ int clusterProcessPacket(clusterLink *link) {
38723887
/* If this is a MEET packet from an unknown node, we still process
38733888
* the gossip section here since we have to trust the sender because
38743889
* of the message type. */
3875-
clusterProcessGossipSection(hdr, link);
3890+
clusterProcessGossipSection(msg, link);
38763891
} else if (sender->link && nodeExceedsHandshakeTimeout(sender, now)) {
38773892
/* The MEET packet is from a known node, after the handshake timeout, so the sender
38783893
* thinks that I do not know it.
@@ -3926,7 +3941,7 @@ int clusterProcessPacket(clusterLink *link) {
39263941
"Handshake: we already know node %.40s (%s), "
39273942
"updating the address if needed.",
39283943
sender->name, sender->human_nodename);
3929-
if (nodeUpdateAddressIfNeeded(sender, link, hdr)) {
3944+
if (nodeUpdateAddressIfNeeded(sender, link, msg)) {
39303945
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
39313946
}
39323947
/* Free this node as we already have it. This will
@@ -3937,12 +3952,12 @@ int clusterProcessPacket(clusterLink *link) {
39373952

39383953
/* First thing to do is replacing the random name with the
39393954
* right node name if this was a handshake stage. */
3940-
clusterRenameNode(link->node, hdr->sender);
3955+
clusterRenameNode(link->node, msg->sender);
39413956
serverLog(LL_DEBUG, "Handshake with node %.40s (%s) completed.", link->node->name, link->node->human_nodename);
39423957
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
39433958
link->node->flags |= flags & (CLUSTER_NODE_PRIMARY | CLUSTER_NODE_REPLICA);
39443959
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
3945-
} else if (memcmp(link->node->name, hdr->sender, CLUSTER_NAMELEN) != 0) {
3960+
} else if (memcmp(link->node->name, msg->sender, CLUSTER_NAMELEN) != 0) {
39463961
/* If the reply has a non matching node ID we
39473962
* disconnect this node and set it as not having an associated
39483963
* address. This can happen if the node did CLUSTER RESET and changed
@@ -3989,7 +4004,7 @@ int clusterProcessPacket(clusterLink *link) {
39894004

39904005
/* Update the node address if it changed. */
39914006
if (sender && type == CLUSTERMSG_TYPE_PING && !nodeInHandshake(sender) &&
3992-
nodeUpdateAddressIfNeeded(sender, link, hdr)) {
4007+
nodeUpdateAddressIfNeeded(sender, link, msg)) {
39934008
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
39944009
}
39954010

@@ -4023,7 +4038,7 @@ int clusterProcessPacket(clusterLink *link) {
40234038
}
40244039
} else {
40254040
/* Node is a replica. */
4026-
clusterNode *sender_claimed_primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);
4041+
clusterNode *sender_claimed_primary = clusterLookupNode(msg->replicaof, CLUSTER_NAMELEN);
40274042

40284043
if (sender_last_reported_as_primary) {
40294044
serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name,
@@ -4161,7 +4176,7 @@ int clusterProcessPacket(clusterLink *link) {
41614176
* this ASAP to avoid other computational expensive checks later.*/
41624177

41634178
if (sender && sender_claims_to_be_primary &&
4164-
(sender_last_reported_as_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) {
4179+
(sender_last_reported_as_replica || memcmp(sender->slots, msg->myslots, sizeof(msg->myslots)))) {
41654180
/* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */
41664181
serverAssert(nodeIsPrimary(sender));
41674182

@@ -4171,7 +4186,7 @@ int clusterProcessPacket(clusterLink *link) {
41714186
/* 1) If the sender of the message is a primary, and we detected that
41724187
* the set of slots it claims changed, scan the slots to see if we
41734188
* need to update our configuration. */
4174-
clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, hdr->myslots);
4189+
clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, msg->myslots);
41754190

41764191
/* 2) We also check for the reverse condition, that is, the sender
41774192
* claims to serve slots we know are served by a primary with a
@@ -4194,7 +4209,7 @@ int clusterProcessPacket(clusterLink *link) {
41944209
bool found_new_owner = false;
41954210
for (size_t w = 0; w < CLUSTER_SLOT_WORDS && !found_new_owner; w++) {
41964211
uint64_t word;
4197-
memcpy(&word, hdr->myslots + SLOT_WORD_OFFSET(w), sizeof(word));
4212+
memcpy(&word, msg->myslots + SLOT_WORD_OFFSET(w), sizeof(word));
41984213
while (word) {
41994214
const int slot = clusterExtractSlotFromWord(&word, w);
42004215

@@ -4227,29 +4242,29 @@ int clusterProcessPacket(clusterLink *link) {
42274242

42284243
/* Get info from the gossip section */
42294244
if (sender) {
4230-
clusterProcessGossipSection(hdr, link);
4231-
clusterProcessPingExtensions(hdr, link);
4245+
clusterProcessGossipSection(msg, link);
4246+
clusterProcessPingExtensions(msg, link);
42324247
}
42334248
} else if (type == CLUSTERMSG_TYPE_FAIL) {
42344249
clusterNode *failing;
42354250

42364251
if (sender) {
4237-
failing = clusterLookupNode(hdr->data.fail.about.nodename, CLUSTER_NAMELEN);
4252+
failing = clusterLookupNode(msg->data.fail.about.nodename, CLUSTER_NAMELEN);
42384253
if (failing && !(failing->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_MYSELF))) {
4239-
serverLog(LL_NOTICE, "FAIL message received from %.40s (%s) about %.40s (%s)", hdr->sender,
4240-
sender->human_nodename, hdr->data.fail.about.nodename, failing->human_nodename);
4254+
serverLog(LL_NOTICE, "FAIL message received from %.40s (%s) about %.40s (%s)", msg->sender,
4255+
sender->human_nodename, msg->data.fail.about.nodename, failing->human_nodename);
42414256
markNodeAsFailing(failing);
42424257
}
42434258
} else {
4244-
serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender,
4245-
hdr->data.fail.about.nodename);
4259+
serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", msg->sender,
4260+
msg->data.fail.about.nodename);
42464261
}
42474262
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
42484263
if (!sender) return 1; /* We don't know that node. */
4249-
clusterProcessPublishPacket(&hdr->data.publish.msg, type);
4264+
clusterProcessPublishPacket(&msg->data.publish.msg, type);
42504265
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
42514266
if (!sender) return 1; /* We don't know that node. */
4252-
clusterSendFailoverAuthIfNeeded(sender, hdr);
4267+
clusterSendFailoverAuthIfNeeded(sender, msg);
42534268
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
42544269
if (!sender) return 1; /* We don't know that node. */
42554270
/* We consider this vote only if the sender is a primary serving
@@ -4281,10 +4296,10 @@ int clusterProcessPacket(clusterLink *link) {
42814296
clusterSendPing(link, CLUSTERMSG_TYPE_PING);
42824297
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
42834298
clusterNode *n; /* The node the update is about. */
4284-
uint64_t reportedConfigEpoch = ntohu64(hdr->data.update.nodecfg.configEpoch);
4299+
uint64_t reportedConfigEpoch = ntohu64(msg->data.update.nodecfg.configEpoch);
42854300

42864301
if (!sender) return 1; /* We don't know the sender. */
4287-
n = clusterLookupNode(hdr->data.update.nodecfg.nodename, CLUSTER_NAMELEN);
4302+
n = clusterLookupNode(msg->data.update.nodecfg.nodename, CLUSTER_NAMELEN);
42884303
if (!n) return 1; /* We don't know the reported node. */
42894304
if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
42904305

@@ -4301,9 +4316,9 @@ int clusterProcessPacket(clusterLink *link) {
43014316

43024317
/* Check the bitmap of served slots and update our
43034318
* config accordingly. */
4304-
clusterUpdateSlotsConfigWith(n, reportedConfigEpoch, hdr->data.update.nodecfg.slots);
4319+
clusterUpdateSlotsConfigWith(n, reportedConfigEpoch, msg->data.update.nodecfg.slots);
43054320
} else if (type == CLUSTERMSG_TYPE_MODULE) {
4306-
clusterProcessModulePacket(&hdr->data.module.msg, sender);
4321+
clusterProcessModulePacket(&msg->data.module.msg, sender);
43074322
} else {
43084323
serverLog(LL_WARNING, "Received unknown packet type: %d", type);
43094324
}
@@ -4409,7 +4424,7 @@ void clusterLinkConnectHandler(connection *conn) {
44094424
}
44104425

44114426
/* Performs sanity check on the message signature and length depending on the type. */
4412-
static inline int isClusterMsgSignatureAndLengthValid(clusterMsg *hdr) {
4427+
static inline int isClusterMsgSignatureAndLengthValid(clusterMsgHeader *hdr) {
44134428
if (memcmp(hdr->sig, "RCmb", 4) != 0) return 0;
44144429
uint16_t type = ntohs(hdr->type);
44154430
uint32_t totlen = ntohl(hdr->totlen);
@@ -4424,7 +4439,7 @@ static inline int isClusterMsgSignatureAndLengthValid(clusterMsg *hdr) {
44244439
void clusterReadHandler(connection *conn) {
44254440
clusterMsg buf[1];
44264441
ssize_t nread;
4427-
clusterMsg *hdr;
4442+
clusterMsgHeader *hdr;
44284443
clusterLink *link = connGetPrivateData(conn);
44294444
unsigned int readlen, rcvbuflen;
44304445

@@ -4436,7 +4451,7 @@ void clusterReadHandler(connection *conn) {
44364451
readlen = RCVBUF_MIN_READ_LEN - rcvbuflen;
44374452
} else {
44384453
/* Finally read the full message. */
4439-
hdr = (clusterMsg *)link->rcvbuf;
4454+
hdr = (clusterMsgHeader *)link->rcvbuf;
44404455
if (rcvbuflen == RCVBUF_MIN_READ_LEN) {
44414456
/* Perform some sanity check on the message signature
44424457
* and length. */
@@ -4485,7 +4500,7 @@ void clusterReadHandler(connection *conn) {
44854500
}
44864501
memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
44874502
link->rcvbuf_len += nread;
4488-
hdr = (clusterMsg *)link->rcvbuf;
4503+
hdr = (clusterMsgHeader *)link->rcvbuf;
44894504
rcvbuflen += nread;
44904505
}
44914506

0 commit comments

Comments
 (0)