Skip to content

Commit 9efee49

Browse files
committed
Support learner change
Previously, learner is a static srv config, provide a cmd for use to flip the learner flag.
1 parent 2724344 commit 9efee49

File tree

5 files changed

+127
-1
lines changed

5 files changed

+127
-1
lines changed

include/libnuraft/raft_server.hxx

+12
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,18 @@ public:
260260
ptr< cmd_result< ptr<buffer> > >
261261
append_entries(const std::vector< ptr<buffer> >& logs);
262262

263+
/**
264+
* Flip learner flag of given server.
265+
* Learner will be excluded from the quorum.
266+
* Only leader will accept this operation.
267+
* This is also an asynchronous task.
268+
*
269+
* @param srv_id ID of the server to set as a learner.
270+
* @param to If `true`, set the server as a learner, otherwise, clear learner flag.
271+
* @return `ret->get_result_code()` will be OK on success.
272+
*/
273+
ptr<cmd_result<ptr<buffer>>> flip_learner_flag(int32 srv_id, bool to);
274+
263275
/**
264276
* Parameters for `req_ext_cb` callback function.
265277
*/

include/libnuraft/srv_config.hxx

+2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public:
7777

7878
bool is_learner() const { return learner_; }
7979

80+
void set_learner(bool to) { learner_ = to; }
81+
8082
bool is_new_joiner() const { return new_joiner_; }
8183

8284
void set_new_joiner(bool to) { new_joiner_ = to; }

src/handle_user_cmd.cxx

+58
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,64 @@ ptr< cmd_result< ptr<buffer> > > raft_server::append_entries_ext
109109
return send_msg_to_leader(req, ext_params);
110110
}
111111

112+
113+
ptr< cmd_result< ptr<buffer> > > raft_server::flip_learner_flag(int32 srv_id, bool to)
114+
{
115+
ptr<cmd_result<ptr<buffer>>> ret = cs_new<cmd_result<ptr<buffer>>>(nullptr);
116+
if (role_ != srv_role::leader || write_paused_) {
117+
p_er("this is not a leader, cannot handle flip_learner_flag");
118+
ret->set_result_code(cmd_result_code::NOT_LEADER);
119+
return ret;
120+
}
121+
if (config_changing_) {
122+
p_wn("previous config has not committed yet");
123+
ret->set_result_code(cmd_result_code::CONFIG_CHANGING);
124+
return ret;
125+
}
126+
127+
ptr<cluster_config> cur_conf = get_config();
128+
ptr<buffer> enc_conf_buf = cur_conf->serialize();
129+
ptr<cluster_config> new_conf = cluster_config::deserialize(*enc_conf_buf);
130+
new_conf->set_log_idx(log_store_->next_slot());
131+
132+
bool found = false;
133+
for (auto& ss: new_conf->get_servers()) {
134+
if (ss->get_id() != srv_id) {
135+
continue;
136+
}
137+
found = true;
138+
if (ss->is_learner() == to) {
139+
p_in("server %d already has learner flag set to %s\n",
140+
srv_id,
141+
to ? "true" : "false");
142+
ret->set_result_code(cmd_result_code::OK);
143+
return ret;
144+
}
145+
p_in("set learner flag to %s for server %d",
146+
to ? "true" : "false", srv_id);
147+
ss->set_learner(to);
148+
break;
149+
}
150+
if (!found) {
151+
p_er("server %d not found", srv_id);
152+
ret->set_result_code(SERVER_NOT_FOUND);
153+
return ret;
154+
}
155+
p_in("copy new conf buf");
156+
ptr<buffer> new_conf_buf(new_conf->serialize());
157+
ptr<log_entry> entry(cs_new<log_entry>(state_->get_term(),
158+
new_conf_buf,
159+
log_val_type::conf,
160+
timer_helper::get_timeofday_us()));
161+
store_log_entry(entry);
162+
config_changing_ = true;
163+
uncommitted_config_ = new_conf;
164+
p_in("request append entries");
165+
request_append_entries();
166+
ret->set_result_code(OK);
167+
return ret;
168+
}
169+
112170
ptr< cmd_result< ptr<buffer> > > raft_server::send_msg_to_leader
113171
( ptr<req_msg>& req,
114172
const req_ext_params& ext_params )

src/raft_server.cxx

-1
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,6 @@ ptr<resp_msg> raft_server::process_req(req_msg& req,
759759

760760
} else if (req.get_type() == msg_type::priority_change_request) {
761761
resp = handle_priority_change_req(req);
762-
763762
} else {
764763
// extended requests
765764
resp = handle_ext_msg(req, guard);

tests/unit/asio_service_test.cxx

+55
Original file line numberDiff line numberDiff line change
@@ -2409,6 +2409,58 @@ int full_consensus_test() {
24092409
return 0;
24102410
}
24112411

2412+
2413+
int flip_learner_flag_test() {
2414+
reset_log_files();
2415+
2416+
std::string s1_addr = "tcp://127.0.0.1:20010";
2417+
std::string s2_addr = "tcp://127.0.0.1:20020";
2418+
std::string s3_addr = "tcp://127.0.0.1:20030";
2419+
2420+
RaftAsioPkg s1(1, s1_addr);
2421+
RaftAsioPkg s2(2, s2_addr);
2422+
RaftAsioPkg s3(3, s3_addr);
2423+
std::vector<RaftAsioPkg*> pkgs = {&s1, &s2, &s3};
2424+
2425+
_msg("launching asio-raft servers\n");
2426+
CHK_Z( launch_servers(pkgs, false) );
2427+
_msg("organizing raft group\n");
2428+
CHK_Z( make_group(pkgs) );
2429+
// Set async.
2430+
for (auto& entry: pkgs) {
2431+
RaftAsioPkg* pp = entry;
2432+
raft_params param = pp->raftServer->get_current_params();
2433+
param.return_method_ = raft_params::async_handler;
2434+
pp->raftServer->update_params(param);
2435+
}
2436+
// Set to learner.
2437+
ptr<cmd_result<ptr<buffer>>> result = s1.raftServer->flip_learner_flag(s3.myId, true);
2438+
CHK_EQ(cmd_result_code::OK, result->get_result_code());
2439+
TestSuite::sleep_ms(RaftAsioPkg::HEARTBEAT_MS * 5, "wait for replication");
2440+
for (auto& entry: pkgs) {
2441+
RaftAsioPkg* pp = entry;
2442+
ptr<cluster_config> conf = pp->raftServer->get_config();
2443+
CHK_TRUE(conf->get_server(s3.myId)->is_learner());
2444+
}
2445+
// Clear leaner.
2446+
result = s1.raftServer->flip_learner_flag(s3.myId, false);
2447+
CHK_EQ(cmd_result_code::OK, result->get_result_code());
2448+
TestSuite::sleep_ms(RaftAsioPkg::HEARTBEAT_MS * 5, "wait for replication");
2449+
for (auto& entry: pkgs) {
2450+
RaftAsioPkg* pp = entry;
2451+
ptr<cluster_config> conf = pp->raftServer->get_config();
2452+
CHK_FALSE(conf->get_server(s3.myId)->is_learner());
2453+
}
2454+
2455+
s1.raftServer->shutdown();
2456+
s2.raftServer->shutdown();
2457+
s3.raftServer->shutdown();
2458+
TestSuite::sleep_sec(1, "shutting down");
2459+
2460+
SimpleLogger::shutdown();
2461+
return 0;
2462+
}
2463+
24122464
int custom_commit_condition_test() {
24132465
reset_log_files();
24142466

@@ -2916,6 +2968,9 @@ int main(int argc, char** argv) {
29162968
ts.doTest( "full consensus test",
29172969
full_consensus_test );
29182970

2971+
ts.doTest("flip learner flag test",
2972+
flip_learner_flag_test);
2973+
29192974
ts.doTest( "custom commit condition test",
29202975
custom_commit_condition_test );
29212976

0 commit comments

Comments
 (0)