Skip to content

Commit 1597493

Browse files
authored
Add option to select a logical database (valkey-io#244)
Set the option `select_db` in `valkeyClusterOptions` to select a logical database after a successful connect. The option is supported in both the Synchronous and Asynchronous API, and the `SELECT` command is only sent when configured to a non-zero value. Additionally adds new flags to the clusterclients `[--select-db NUM]` and update tests. Signed-off-by: Björn Svensson <bjorn.a.svensson@est.tech>
1 parent ebece93 commit 1597493

File tree

6 files changed

+105
-8
lines changed

6 files changed

+105
-8
lines changed

include/valkey/cluster.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ typedef struct valkeyClusterContext {
9393
int max_retry_count; /* Allowed retry attempts */
9494
char *username; /* Authenticate using user */
9595
char *password; /* Authentication password */
96+
int select_db;
9697

9798
struct dict *nodes; /* Known valkeyClusterNode's */
9899
uint64_t route_version; /* Increased when the node lookup table changes */
@@ -163,6 +164,10 @@ typedef struct {
163164
const char *password; /* Authentication password. */
164165
int max_retry; /* Allowed retry attempts. */
165166

167+
/* Select a logical database after a successful connect.
168+
* Default 0, i.e. the SELECT command is not sent. */
169+
int select_db;
170+
166171
/* Common callbacks. */
167172

168173
/* A hook to get notified when certain events occur. The `event` is set to

src/cluster.c

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,25 @@ static int authenticate(valkeyClusterContext *cc, valkeyContext *c) {
358358
return VALKEY_ERR;
359359
}
360360

361+
/* Select a logical database by sending the SELECT command. */
362+
static int select_db(valkeyClusterContext *cc, valkeyContext *c) {
363+
if (cc->select_db == 0)
364+
return VALKEY_OK;
365+
366+
valkeyReply *reply = valkeyCommand(c, "SELECT %d", cc->select_db);
367+
if (reply == NULL) {
368+
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Failed to select logical database");
369+
return VALKEY_ERR;
370+
}
371+
if (reply->type == VALKEY_REPLY_ERROR) {
372+
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str);
373+
freeReplyObject(reply);
374+
return VALKEY_ERR;
375+
}
376+
freeReplyObject(reply);
377+
return VALKEY_OK;
378+
}
379+
361380
/**
362381
* Return a new node with the "cluster slots" command reply.
363382
*/
@@ -1189,6 +1208,9 @@ static int valkeyClusterContextInit(valkeyClusterContext *cc,
11891208
} else {
11901209
cc->max_retry_count = CLUSTER_DEFAULT_MAX_RETRY_COUNT;
11911210
}
1211+
if (options->select_db > 0) {
1212+
cc->select_db = options->select_db;
1213+
}
11921214
if (options->initial_nodes != NULL &&
11931215
valkeyClusterSetOptionAddNodes(cc, options->initial_nodes) != VALKEY_OK) {
11941216
return VALKEY_ERR; /* err and errstr already set. */
@@ -1563,8 +1585,10 @@ valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
15631585
if (cc->tls && cc->tls_init_fn(c, cc->tls) != VALKEY_OK) {
15641586
valkeyClusterSetError(cc, c->err, c->errstr);
15651587
}
1566-
1567-
authenticate(cc, c); // err and errstr handled in function
1588+
/* Authenticate and select a logical database when configured.
1589+
* cc->err and cc->errstr are set when failing. */
1590+
authenticate(cc, c);
1591+
select_db(cc, c);
15681592
}
15691593

15701594
return c;
@@ -1606,6 +1630,10 @@ valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
16061630
valkeyFree(c);
16071631
return NULL;
16081632
}
1633+
if (select_db(cc, c) != VALKEY_OK) {
1634+
valkeyFree(c);
1635+
return NULL;
1636+
}
16091637

16101638
node->con = c;
16111639

@@ -2582,6 +2610,18 @@ static void unlinkAsyncContextAndNode(void *data) {
25822610
}
25832611
}
25842612

2613+
/* Reply callback function for SELECT */
2614+
void selectReplyCallback(valkeyAsyncContext *ac, void *r, void *privdata) {
2615+
valkeyReply *reply = (valkeyReply *)r;
2616+
valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata;
2617+
2618+
if (reply == NULL || reply->type == VALKEY_REPLY_ERROR) {
2619+
valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER,
2620+
"Failed to select logical database");
2621+
valkeyAsyncDisconnect(ac);
2622+
}
2623+
}
2624+
25852625
valkeyAsyncContext *
25862626
valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
25872627
valkeyClusterNode *node) {
@@ -2658,6 +2698,15 @@ valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
26582698
return NULL;
26592699
}
26602700
}
2701+
// Select logical database when needed
2702+
if (acc->cc.select_db > 0) {
2703+
ret = valkeyAsyncCommand(ac, selectReplyCallback, acc, "SELECT %d", acc->cc.select_db);
2704+
if (ret != VALKEY_OK) {
2705+
valkeyClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
2706+
valkeyAsyncFree(ac);
2707+
return NULL;
2708+
}
2709+
}
26612710

26622711
if (acc->attach_fn) {
26632712
ret = acc->attach_fn(ac, acc->attach_data);

tests/clusterclient.c

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,27 +61,45 @@ void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) {
6161
printf("Event: %s\n", e);
6262
}
6363

64+
void connectCallback(const valkeyContext *c, int status) {
65+
const char *s = "";
66+
if (status != VALKEY_OK)
67+
s = "failed to ";
68+
printf("Event: %sconnect to %s:%d\n", s, c->tcp.host, c->tcp.port);
69+
}
70+
6471
int main(int argc, char **argv) {
6572
int show_events = 0;
6673
int use_cluster_nodes = 0;
6774
int send_to_all = 0;
75+
int show_connection_events = 0;
76+
int select_db = 0;
6877

6978
int argindex;
7079
for (argindex = 1; argindex < argc && argv[argindex][0] == '-';
7180
argindex++) {
7281
if (strcmp(argv[argindex], "--events") == 0) {
7382
show_events = 1;
83+
} else if (strcmp(argv[argindex], "--connection-events") == 0) {
84+
show_connection_events = 1;
7485
} else if (strcmp(argv[argindex], "--use-cluster-nodes") == 0) {
7586
use_cluster_nodes = 1;
87+
} else if (strcmp(argv[argindex], "--select-db") == 0) {
88+
if (++argindex < argc) /* Need an additional argument */
89+
select_db = atoi(argv[argindex]);
90+
if (select_db == 0) {
91+
fprintf(stderr, "Missing or faulty argument for --select-db\n");
92+
exit(1);
93+
}
7694
} else {
7795
fprintf(stderr, "Unknown argument: '%s'\n", argv[argindex]);
7896
exit(1);
7997
}
8098
}
8199

82100
if (argindex >= argc) {
83-
fprintf(stderr, "Usage: clusterclient [--events] [--use-cluster-nodes] "
84-
"HOST:PORT\n");
101+
fprintf(stderr, "Usage: clusterclient [--events] [--connection-events] "
102+
"[--use-cluster-nodes] [--select-db NUM] HOST:PORT\n");
85103
exit(1);
86104
}
87105
const char *initnode = argv[argindex];
@@ -97,6 +115,12 @@ int main(int argc, char **argv) {
97115
if (show_events) {
98116
options.event_callback = eventCallback;
99117
}
118+
if (show_connection_events) {
119+
options.connect_callback = connectCallback;
120+
}
121+
if (select_db > 0) {
122+
options.select_db = select_db;
123+
}
100124

101125
valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options);
102126
if (cc == NULL || cc->err) {

tests/clusterclient_async.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ void disconnectCallback(const valkeyAsyncContext *ac, int status) {
242242
int main(int argc, char **argv) {
243243
int use_cluster_nodes = 0;
244244
int show_connection_events = 0;
245+
int select_db = 0;
245246

246247
int optind;
247248
for (optind = 1; optind < argc && argv[optind][0] == '-'; optind++) {
@@ -253,14 +254,22 @@ int main(int argc, char **argv) {
253254
show_connection_events = 1;
254255
} else if (strcmp(argv[optind], "--blocking-initial-update") == 0) {
255256
blocking_initial_update = 1;
257+
} else if (strcmp(argv[optind], "--select-db") == 0) {
258+
if (++optind < argc) /* Need an additional argument */
259+
select_db = atoi(argv[optind]);
260+
if (select_db == 0) {
261+
fprintf(stderr, "Missing or faulty argument for --select-db\n");
262+
exit(1);
263+
}
256264
} else {
257265
fprintf(stderr, "Unknown argument: '%s'\n", argv[optind]);
258266
}
259267
}
260268

261269
if (optind >= argc) {
262270
fprintf(stderr,
263-
"Usage: clusterclient_async [--use-cluster-nodes] HOST:PORT\n");
271+
"Usage: clusterclient_async [--events] [--connection-events] "
272+
"[--use-cluster-nodes] [--select-db NUM] HOST:PORT\n");
264273
exit(1);
265274
}
266275
const char *initnode = argv[optind];
@@ -283,6 +292,9 @@ int main(int argc, char **argv) {
283292
options.async_connect_callback = connectCallback;
284293
options.async_disconnect_callback = disconnectCallback;
285294
}
295+
if (select_db > 0) {
296+
options.select_db = select_db;
297+
}
286298
valkeyClusterOptionsUseLibevent(&options, base);
287299

288300
valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options);

tests/scripts/client-disconnect-test.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@ syncpid1=$!;
1616
# Start simulated valkey node
1717
timeout 5s ./simulated-valkey.pl -p 7401 -d --sigcont $syncpid1 <<'EOF' &
1818
EXPECT CONNECT
19+
EXPECT ["SELECT", "2"]
20+
SEND +OK
1921
EXPECT ["CLUSTER", "SLOTS"]
2022
SEND [[0, 16383, ["127.0.0.1", 7401, "nodeid1"]]]
2123
EXPECT CLOSE
2224
2325
EXPECT CONNECT
26+
EXPECT ["SELECT", "2"]
27+
SEND +OK
2428
EXPECT ["SET", "foo", "initial"]
2529
SEND +OK
2630
@@ -35,7 +39,7 @@ server1=$!
3539
wait $syncpid1;
3640

3741
# Run client
38-
timeout 4s "$clientprog" --blocking-initial-update --connection-events 127.0.0.1:7401 > "$testname.out" <<'EOF'
42+
timeout 4s "$clientprog" --blocking-initial-update --connection-events --select-db 2 127.0.0.1:7401 > "$testname.out" <<'EOF'
3943
SET foo initial
4044
4145
# Send a command that is expected to be redirected just before

tests/scripts/slots-not-served-test.sh

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ syncpid1=$!;
1313
timeout 5s ./simulated-valkey.pl -p 7401 -d --sigcont $syncpid1 <<'EOF' &
1414
# The initial slotmap is not covering all slots.
1515
EXPECT CONNECT
16+
EXPECT ["SELECT", "5"]
17+
SEND +OK
1618
EXPECT ["CLUSTER", "SLOTS"]
1719
SEND [[0, 1, ["127.0.0.1", 7401, "nodeid7401"]]]
1820
@@ -36,7 +38,7 @@ server1=$!
3638
wait $syncpid1;
3739

3840
# Run client
39-
timeout 3s "$clientprog" --events 127.0.0.1:7401 > "$testname.out" <<'EOF'
41+
timeout 3s "$clientprog" --events --connection-events --select-db 5 127.0.0.1:7401 > "$testname.out" <<'EOF'
4042
GET foo1
4143
GET foo2
4244
EOF
@@ -56,7 +58,8 @@ if [ $clientexit -ne 0 ]; then
5658
fi
5759

5860
# Check the output from clusterclient
59-
expected="Event: slotmap-updated
61+
expected="Event: connect to 127.0.0.1:7401
62+
Event: slotmap-updated
6063
Event: ready
6164
Event: slotmap-updated
6265
error: slot not served by any node

0 commit comments

Comments
 (0)