-
Notifications
You must be signed in to change notification settings - Fork 804
Allow clients to subscribe to slot migrations #298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -307,6 +307,10 @@ typedef struct { | |
clusterMsg msg; | ||
} clusterMsgSendBlock; | ||
|
||
/* Special values for server.cluster->moved_slot_since_sleep */ | ||
#define CLUSTER_MOVED_SLOT_NONE -2 | ||
#define CLUSTER_MOVED_SLOT_MULTIPLE -3 | ||
|
||
/* ----------------------------------------------------------------------------- | ||
* Initialization | ||
* -------------------------------------------------------------------------- */ | ||
|
@@ -962,6 +966,9 @@ void clusterInit(void) { | |
server.cluster->state = CLUSTER_FAIL; | ||
server.cluster->size = 0; | ||
server.cluster->todo_before_sleep = 0; | ||
server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_NONE; | ||
server.cluster->moved_slot_channel = | ||
createObject(OBJ_STRING, sdsnew("__cluster__:moved")); | ||
server.cluster->nodes = dictCreate(&clusterNodesDictType); | ||
server.cluster->shards = dictCreate(&clusterSdsToListType); | ||
server.cluster->nodes_black_list = | ||
|
@@ -4844,17 +4851,43 @@ void clusterCron(void) { | |
clusterUpdateState(); | ||
} | ||
|
||
/* Notify clients subscribed to slot moved events. */ | ||
void clusterNotifyMovedSlot(int moved_slot, list *clients) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like this should be cluster.h, it seems like all cluster implementations would want to send this type of notification. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function is using clusterNode which is only in This separation of cluster and cluster_legacy is quite arbitrary. I don't mind that you fix it or we can just merge the two again. Then I'll rebase this PR. :) Do you have a better idea? |
||
clusterNode *n = server.cluster->slots[moved_slot]; | ||
/* As for -MOVED redirects, the port in the message depends on whether the | ||
* client is using TLS or not. */ | ||
robj *messages[2] = {NULL, NULL}; /* Created lazily. */ | ||
listNode *ln; | ||
listIter li; | ||
listRewind(clients, &li); | ||
while ((ln = listNext(&li)) != NULL) { | ||
client *c = ln->value; | ||
/* TLS judgement is like shouldReturnTlsInfo() but for c instead of current_client. */ | ||
int use_tls_port = c->conn ? connIsTLS(server.current_client->conn) : server.tls_cluster; | ||
if (messages[use_tls_port] == NULL) { | ||
sds s = clusterFormatRedirect("MOVED", moved_slot, n, use_tls_port); | ||
messages[use_tls_port] = createObject(OBJ_STRING, s); | ||
} | ||
addReplyPubsubMessage(c, server.cluster->moved_slot_channel, messages[use_tls_port], shared.messagebulk); | ||
updateClientMemUsageAndBucket(c); | ||
} | ||
if (messages[0]) decrRefCount(messages[0]); | ||
if (messages[1]) decrRefCount(messages[1]); | ||
} | ||
|
||
/* This function is called before the event handler returns to sleep for | ||
* events. It is useful to perform operations that must be done ASAP in | ||
* reaction to events fired but that are not safe to perform inside event | ||
* handlers, or to perform potentially expansive tasks that we need to do | ||
* a single time before replying to clients. */ | ||
void clusterBeforeSleep(void) { | ||
int flags = server.cluster->todo_before_sleep; | ||
int moved_slot = server.cluster->moved_slot_since_sleep; | ||
|
||
/* Reset our flags (not strictly needed since every single function | ||
* called for flags set should be able to clear its flag). */ | ||
server.cluster->todo_before_sleep = 0; | ||
server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_NONE; | ||
|
||
if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) { | ||
/* Handle manual failover as soon as possible so that won't have a 100ms | ||
|
@@ -4874,6 +4907,16 @@ void clusterBeforeSleep(void) { | |
if (flags & CLUSTER_TODO_UPDATE_STATE) | ||
clusterUpdateState(); | ||
|
||
/* Notify clients subscribed to moved slot events. To avoid flooding the | ||
* clients, we only publish the moved slot message if exactly one slot has | ||
* been migrated. In cases like failover, clients will receive -MOVED | ||
* redirects and will need to refresh the full slot mapping with nodes | ||
* including replicas. */ | ||
if (moved_slot >= 0) { | ||
list *clients = pubsubGetSubscribers(server.cluster->moved_slot_channel); | ||
if (clients) clusterNotifyMovedSlot(moved_slot, clients); | ||
} | ||
|
||
/* Save the config, possibly using fsync. */ | ||
if (flags & CLUSTER_TODO_SAVE_CONFIG) { | ||
int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG; | ||
|
@@ -4976,6 +5019,10 @@ int clusterAddSlot(clusterNode *n, int slot) { | |
if (server.cluster->slots[slot]) return C_ERR; | ||
clusterNodeSetSlotBit(n,slot); | ||
server.cluster->slots[slot] = n; | ||
if (server.cluster->moved_slot_since_sleep == CLUSTER_MOVED_SLOT_NONE) | ||
server.cluster->moved_slot_since_sleep = slot; | ||
else | ||
server.cluster->moved_slot_since_sleep = CLUSTER_MOVED_SLOT_MULTIPLE; | ||
return C_OK; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any clients that primarily store a map of NodeID -> Nodes as opposed to endpoint:port -> Nodes? I ask because I'm wondering if it would be useful to also return the NodeID here as well. I know python doesn't, but I'm less familiar with the other clients, but if there are nodes that don't have the main node map key'd off the endpoints, then maybe it would be easier for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, but since redirects use the host:port form, clients need to be able to identify them by this.