diff --git a/.gitignore b/.gitignore index dc38d40b..6b18d83c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,14 @@ *.o *.gcno *~ -libcraft.so -libcraft.a +*.so +*.a +*.gcda +/tests.c +__pycache__ +.hypothesis +*.gcov +CLinkedListQueue/ +tests/main_test.c +tests_main + diff --git a/Makefile b/Makefile index c51b3e95..ac737df7 100644 --- a/Makefile +++ b/Makefile @@ -3,10 +3,10 @@ TEST_DIR = ./tests LLQUEUE_DIR = $(CONTRIB_DIR)/CLinkedListQueue VPATH = src -GCOV_OUTPUT = *.gcda *.gcno *.gcov +GCOV_OUTPUT = *.gcda *.gcno *.gcov src/*.gcda src/*.gcno src/*.gcov GCOV_CCFLAGS = -fprofile-arcs -ftest-coverage SHELL = /bin/bash -CFLAGS += -Iinclude -Werror -Werror=return-type -Werror=uninitialized -Wcast-align \ +override CFLAGS += -Iinclude -Werror -Werror=return-type -Werror=uninitialized -Wcast-align \ -Wno-pointer-sign -fno-omit-frame-pointer -fno-common -fsigned-char \ -Wunused-variable \ $(GCOV_CCFLAGS) -I$(LLQUEUE_DIR) -Iinclude -g -O2 -fPIC @@ -18,10 +18,10 @@ ASANFLAGS = -fsanitize=address SHAREDFLAGS = -dynamiclib SHAREDEXT = dylib # We need to include the El Capitan specific /usr/includes, aargh -CFLAGS += -I/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.11.sdk/usr/include/ -CFLAGS += -I/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.12.sdk/usr/include -CFLAGS += $(ASANFLAGS) -CFLAGS += -Wno-nullability-completeness +override CFLAGS += -I/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.11.sdk/usr/include/ +override CFLAGS += -I/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.12.sdk/usr/include +override CFLAGS += $(ASANFLAGS) +override CFLAGS += -Wno-nullability-completeness else SHAREDFLAGS = -shared SHAREDEXT = so @@ -96,4 +96,9 @@ clean: @rm -f $(TEST_DIR)/main_test.c src/*.o $(GCOV_OUTPUT); \ if [ -f "libraft.$(SHAREDEXT)" ]; then rm libraft.$(SHAREDEXT); fi;\ if [ -f libraft.a ]; then rm libraft.a; fi;\ - if [ -f tests_main ]; then rm tests_main; fi; + if [ -f tests_main ]; then rm tests_main; fi;\ + if [ -f tests.c ]; then rm tests.c; fi;\ + if [ -f tests.o ]; then rm tests.o; fi;\ + if [ -f tests.cpython* ]; then rm tests.cpython*; fi;\ + if [ -d CLinkedListQueue ]; then rm -rf CLinkedListQueue; fi;\ + if [ -d .hypothesis ]; then rm -rf .hypothesis; fi; diff --git a/src/raft_server.c b/src/raft_server.c index aefa2184..ed95163a 100644 --- a/src/raft_server.c +++ b/src/raft_server.c @@ -314,7 +314,7 @@ int raft_recv_appendentries_response(raft_server_t* me_, raft_index_t next_idx = raft_node_get_next_idx(node); assert(0 < next_idx); /* Stale response -- ignore */ - if (r->current_idx < match_idx) + if (match_idx == next_idx - 1) return 0; if (r->current_idx < next_idx - 1) raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_))); @@ -416,7 +416,7 @@ int raft_recv_appendentries( } else if (ae->term < me->current_term) { - /* 1. Reply false if term < currentTerm (§5.1) */ + /* Reply false if term < currentTerm (§5.1) */ __log(me_, node, "AE term %d is less than current term %d", ae->term, me->current_term); goto out; @@ -427,52 +427,51 @@ int raft_recv_appendentries( me->timeout_elapsed = 0; - /* Not the first appendentries we've received */ - /* NOTE: the log starts at 1 */ - if (0 < ae->prev_log_idx) + if (ae->prev_log_idx == me->snapshot_last_idx && me->snapshot_last_term != ae->prev_log_term) { - raft_entry_t* ety = raft_get_entry_from_idx(me_, ae->prev_log_idx); + /* Should never happen; something is seriously wrong! */ + __log(me_, node, "Snapshot AE prev conflicts with committed entry"); + e = RAFT_ERR_SHUTDOWN; + goto out; + } - /* Is a snapshot */ - if (ae->prev_log_idx == me->snapshot_last_idx) - { - if (me->snapshot_last_term != ae->prev_log_term) - { - /* Should never happen; something is seriously wrong! */ - __log(me_, node, "Snapshot AE prev conflicts with committed entry"); - e = RAFT_ERR_SHUTDOWN; - goto out; - } - } - /* 2. Reply false if log doesn't contain an entry at prevLogIndex - whose term matches prevLogTerm (§5.3) */ - else if (!ety) - { - __log(me_, node, "AE no log at prev_idx %d", ae->prev_log_idx); - goto out; - } - else if (ety->term != ae->prev_log_term) + /* Reject entries that would leave a gap */ + if (ae->prev_log_idx > raft_get_current_idx(me_)) { + __log(me_, node, "AE prev_idx is greater than current idx pli:%d ci:%d", + ae->prev_log_idx, raft_get_current_idx(me_)); + goto out; + } + + /* NOTE: the log starts at 1 */ + raft_entry_t* ety = raft_get_entry_from_idx(me_, ae->prev_log_idx); + + /* ety is NULL when: + * 1. prev_log_idx is 0 + * 2. log at prev_log_idx has been compacted + * 3. prev_log_idx is greater than current index (already replied false) + */ + if (ety && ety->term != ae->prev_log_term) + { + /* log mismatch */ + __log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d comi:%d lcomi:%d pli:%d", + ety->term, ae->prev_log_term, raft_get_current_idx(me_), + raft_get_commit_idx(me_), ae->leader_commit, ae->prev_log_idx); + if (ae->prev_log_idx <= raft_get_commit_idx(me_)) { - __log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d comi:%d lcomi:%d pli:%d", - ety->term, ae->prev_log_term, raft_get_current_idx(me_), - raft_get_commit_idx(me_), ae->leader_commit, ae->prev_log_idx); - if (ae->prev_log_idx <= raft_get_commit_idx(me_)) - { - /* Should never happen; something is seriously wrong! */ - __log(me_, node, "AE prev conflicts with committed entry"); - e = RAFT_ERR_SHUTDOWN; - goto out; - } - /* Delete all the following log entries because they don't match */ - e = raft_delete_entry_from_idx(me_, ae->prev_log_idx); + /* Should never happen; something is seriously wrong! */ + __log(me_, node, "AE prev conflicts with committed entry"); + e = RAFT_ERR_SHUTDOWN; goto out; } + /* Delete all the following log entries because they don't match */ + e = raft_delete_entry_from_idx(me_, ae->prev_log_idx); + goto out; } r->success = 1; r->current_idx = ae->prev_log_idx; - /* 3. If an existing entry conflicts with a new one (same index + /* If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3) */ int i; @@ -481,7 +480,15 @@ int raft_recv_appendentries( raft_entry_t* ety = &ae->entries[i]; raft_index_t ety_index = ae->prev_log_idx + 1 + i; raft_entry_t* existing_ety = raft_get_entry_from_idx(me_, ety_index); - if (existing_ety && existing_ety->term != ety->term) + if (ety_index <= log_get_base(((raft_server_private_t*)me_)->log)) + { + /* Already snapshotted */ + } + else if (!existing_ety) + { + break; + } + else if (existing_ety->term != ety->term) { if (ety_index <= raft_get_commit_idx(me_)) { @@ -497,8 +504,6 @@ int raft_recv_appendentries( goto out; break; } - else if (!existing_ety) - break; r->current_idx = ety_index; } @@ -898,7 +903,7 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node) raft_index_t next_idx = raft_node_get_next_idx(node); /* figure out if the client needs a snapshot sent */ - if (0 < me->snapshot_last_idx && next_idx < me->snapshot_last_idx) + if (next_idx <= me->snapshot_last_idx) { if (me->cb.send_snapshot) me->cb.send_snapshot(me_, me->udata, node); @@ -948,7 +953,7 @@ int raft_send_appendentries_all(raft_server_t* me_) continue; e = raft_send_appendentries(me_, me->nodes[i]); - if (0 != e) + if (0 != e && RAFT_ERR_NEEDS_SNAPSHOT != e) return e; } @@ -1346,7 +1351,7 @@ int raft_end_snapshot(raft_server_t *me_) raft_index_t next_idx = raft_node_get_next_idx(node); /* figure out if the client needs a snapshot sent */ - if (0 < me->snapshot_last_idx && next_idx < me->snapshot_last_idx) + if (next_idx <= me->snapshot_last_idx) { if (me->cb.send_snapshot) me->cb.send_snapshot(me_, me->udata, node); @@ -1373,15 +1378,59 @@ int raft_begin_load_snapshot( if (last_included_index < me->last_applied_idx) return -1; - /* snapshot was unnecessary */ - if (last_included_index < raft_get_current_idx(me_)) - return -1; + if (last_included_index <= raft_get_current_idx(me_)) { + raft_entry_t* ety = raft_get_entry_from_idx(me_, last_included_index); + /* ety is NULL when: + * 1. last_included_index is 0 (already returned false) + * 2. log at last_included_index has been compacted (stale msg or already loaded snapshot) + * 3. last_included_index is greater than current index (not in the current condition) + */ - if (last_included_term == me->snapshot_last_term && last_included_index == me->snapshot_last_idx) - return RAFT_ERR_SNAPSHOT_ALREADY_LOADED; + if (last_included_index == me->snapshot_last_idx) + { + /* already loaded */ + if (me->snapshot_last_term != last_included_term) + { + /* Should never happen; something is seriously wrong! */ + __log(me_, NULL, "Load snapshot last_included_index conflicts with committed entry"); + return RAFT_ERR_SHUTDOWN; + } + return RAFT_ERR_SNAPSHOT_ALREADY_LOADED; + } + else if (!ety) + { + /* stale msg */ + __log(me_, NULL, "Load snapshot log at last_included_index was compacted lii:%d", last_included_index); + return -1; + } + else if (ety->term != last_included_term) + { + /* log mismatch */ + __log(me_, NULL, "Load snapshot term doesn't match last_included_term (ie. %d vs %d) ci:%d comi:%d lii:%d", + ety->term, last_included_term, raft_get_current_idx(me_), + raft_get_commit_idx(me_), last_included_index); + if (last_included_index <= raft_get_commit_idx(me_)) + { + /* Should never happen; something is seriously wrong! */ + __log(me_, NULL, "Load snapshot last_included_index conflicts with committed entry"); + return RAFT_ERR_SHUTDOWN; + } + /* Snapshot is necessary */ + /* No need to delete mismatch log entries because loading snapshot clears all */ + } + else { + /* Snapshot was unnecessary, reply success to inform the Leader to update next idx and match idx */ + return RAFT_ERR_SNAPSHOT_ALREADY_LOADED; + /* Another choice: can also load the snapshot. However, matching logs should be retained. + * Since Follower itself can call raft_begin_snapshot, there is no need to load the snapshot */ + } + } - me->current_term = last_included_term; - me->voted_for = -1; + if (me->current_term < last_included_term) { + int e = raft_set_current_term(me_, last_included_term); + if (0 != e) + return e; + } raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER); me->current_leader = NULL; @@ -1399,8 +1448,10 @@ int raft_begin_load_snapshot( { if (raft_get_nodeid(me_) == raft_node_get_id(me->nodes[i])) my_node_by_idx = i; - else - raft_node_set_active(me->nodes[i], 0); + else { + raft_node_free(me->nodes[i]); + me->nodes[i] = NULL; + } } /* this will be realloc'd by a raft_add_node */ diff --git a/src/raft_server_properties.c b/src/raft_server_properties.c index 0a85b1a9..d61b3444 100644 --- a/src/raft_server_properties.c +++ b/src/raft_server_properties.c @@ -221,6 +221,16 @@ raft_term_t raft_get_last_log_term(raft_server_t* me_) raft_entry_t* ety = raft_get_entry_from_idx(me_, current_idx); if (ety) return ety->term; + else + { + /* ety is NULL when: + * 1. current_idx is 0 + * 2. current_idx is compacted (must equals to snapshot_last_idx) + * 3. current_idx is greater than current_idx (never) + */ + assert(current_idx == raft_get_snapshot_last_idx(me_)); + return raft_get_snapshot_last_term(me_); + } } return 0; } diff --git a/tests/test_cluster.c b/tests/test_cluster.c new file mode 100644 index 00000000..ee7efd54 --- /dev/null +++ b/tests/test_cluster.c @@ -0,0 +1,295 @@ +// This file is not generated by code + +#include "test_cluster.h" + +static void common_trace_snapshot_2_entries(CuTest *tc) { + // init 3 servers, S0 becomes Leader + info(1, "init 3 servers"); rc_init_cluster(3, tc); + info(2, "S0 becomes Candidate"); rc_election_timeout(0); + info(2, "S0 sends RV 1 2 to S1 and S2"); assert_msg_seq_type(1, MSG_REQUESTVOTE); assert_msg_seq_type(2, MSG_REQUESTVOTE); + info(3, "S1 receives RV 1"); rc_deliver(1, MSG_REQUESTVOTE); + info(3, "S1 sends RVR 3 (granted) to S0"); assert_msg_seq_type(3, MSG_REQUESTVOTE_RESPONSE); + info(4, "S0 receives RVR 3 and become Leader"); rc_deliver(3, MSG_REQUESTVOTE_RESPONSE); + info(4, "S0 sends AE 4 5 to S1 and S2"); assert_msg_seq_type(4, MSG_APPENDENTRIES); assert_msg_seq_type(5, MSG_APPENDENTRIES); + info(5, "S1 S2 receives AE 4 5"); rc_deliver(4, MSG_APPENDENTRIES); rc_deliver(5, MSG_APPENDENTRIES); + info(5, "S1 S2 sends AER 6 7"); assert_msg_seq_type(6, MSG_APPENDENTRIES_RESPONSE); assert_msg_seq_type(7, MSG_APPENDENTRIES_RESPONSE); + info(6, "S0 receives AER 6 7"); rc_deliver(6, MSG_APPENDENTRIES_RESPONSE); rc_deliver(7, MSG_APPENDENTRIES_RESPONSE); + // Client issues 2 entries, Leader S0 replicates those 2 entries to S1 and advances commit index to 2. + // While S2's network is slow and S2 is lagged behind + // Why 2 entries? Because making a snapshot needs at least 2 entries log count + info(7, "Client sends CMD1 to S0"); rc_client_operation(0, CMD1); + info(7, "S0 sends AE 8 9 to S1 and S2"); assert_msg_seq_type(8, MSG_APPENDENTRIES); assert_msg_seq_type(9, MSG_APPENDENTRIES); + info(8, "S1 receives AE 8"); rc_deliver(8, MSG_APPENDENTRIES); + info(8, "S1 sends AER 10"); assert_msg_seq_type(10, MSG_APPENDENTRIES_RESPONSE); + info(9, "Client sends CMD1 to S0"); rc_client_operation(0, CMD1); + info(9, "S0 will not send AE"); assert_last_msg_type(MSG_APPENDENTRIES_RESPONSE); + info(10, "S0 receives AER 10"); rc_deliver(10, MSG_APPENDENTRIES_RESPONSE); + info(10, "S0 aggressively sends AE 11"); assert_msg_seq_type(11, MSG_APPENDENTRIES); + info(11, "S1 receives AE 11"); rc_deliver(11, MSG_APPENDENTRIES); + info(11, "S1 sends AER 12"); assert_msg_seq_type(12, MSG_APPENDENTRIES_RESPONSE); + info(12, "S0 receives AER 12"); rc_deliver(12, MSG_APPENDENTRIES_RESPONSE); + info(12, "S0 advance commit index to 2"); CuAssertIntEquals(tc, 2, raft_get_commit_idx(sv[0].server)); + // S2's network becomes good now + info(13, "S2 receives RV 2"); rc_deliver(2, MSG_REQUESTVOTE); + info(13, "S2 sends RVR 13 (granted) to S0"); assert_msg_seq_type(13, MSG_REQUESTVOTE_RESPONSE); + info(14, "S2 receives AE 9"); rc_deliver(9, MSG_APPENDENTRIES); + info(14, "S2 sends AER 14 to S0"); assert_msg_seq_type(14, MSG_APPENDENTRIES_RESPONSE); + info(15, "S0 receives RVR 13"); rc_deliver(13, MSG_REQUESTVOTE_RESPONSE); + // Leader request times out and broadcasts heartbeats (sends two entries to S2) + info(16, "S0 request times out"); rc_request_timeout(0); + info(16, "S0 sends AE 15 16 to S1 and S2"); assert_msg_seq_type(15, MSG_APPENDENTRIES); assert_msg_seq_type(16, MSG_APPENDENTRIES); + info(17, "S2 receives AE 16"); rc_deliver(16, MSG_APPENDENTRIES); + info(17, "S2 sends AER 17"); assert_msg_seq_type(17, MSG_APPENDENTRIES_RESPONSE); + info(18, "S2 has commit index 2"); CuAssertIntEquals(tc, 2, raft_get_commit_idx(sv[2].server)); + info(18, "S2 makes a snapshot"); rc_exec_snapshot(2); +} + +// Trigger condition: Snapshot + No leadership change + No node failure + No network failure +// Run this testcase with CFLAGS="-DTEST_CLUSTER_PRINT_LOG", check STATE 18,19,20 and see what happens +void TestCluster_follower_log_no_more_than_leader_after_ae_success(CuTest *tc) { + common_trace_snapshot_2_entries(tc); + + // Now S2's log base is 2, log count is 0. Leader request times out and broadcasts heartbeats (sends two entries to S2 again) + info(19, "S0 request times out"); rc_request_timeout(0); + info(19, "S0 sends AE 18 19 to S1 and S2"); assert_msg_seq_type(18, MSG_APPENDENTRIES); assert_msg_seq_type(19, MSG_APPENDENTRIES); + + // Invariant is violated. + info(20, "S2 receives AE 19. Guess S2's log length?"); + assert_inv4(19); +} + +// Trigger condition: Snapshot + No leadership change + No node failure + No network failure +// Run this testcase with CFLAGS="-DTEST_CLUSTER_PRINT_LOG", check STATE 22 and see what happens +void TestCluster_next_idx_greater_than_match_idx(CuTest *tc) { + common_trace_snapshot_2_entries(tc); + + // Now S2's log base is 2, log count is 0. + info(19, "S0 receives AER 14"); rc_deliver(14, MSG_APPENDENTRIES_RESPONSE); + info(19, "S0 aggressively sends AE 18"); assert_msg_seq_type(18, MSG_APPENDENTRIES); + info(20, "S2 receives AE 18"); rc_deliver(18, MSG_APPENDENTRIES); + info(20, "S2 sends AER 19 to S0"); assert_msg_seq_type(19, MSG_APPENDENTRIES_RESPONSE); + info(21, "S0 receives AER (true) 17"); rc_deliver(17, MSG_APPENDENTRIES_RESPONSE); + info(21, "S0 advance next idx to 3"); CuAssertIntEquals(tc, 3, raft_node_get_next_idx(raft_get_node(sv[0].server, 2))); + info(21, "S0 advance match idx to 2"); CuAssertIntEquals(tc, 2, raft_node_get_match_idx(raft_get_node(sv[0].server, 2))); + + // AER 19 should be a stale msg. However, Leader thinks it is log mismatch and sends a retry + info(22, "S0 receives AER 19"); rc_deliver(19, MSG_APPENDENTRIES_RESPONSE); + info(22, "Guess next idx and match idx?"); + assert_inv11(); +} + +// Trigger condition: Snapshot + No leadership change + No node failure + No network failure +// Run this testcase with CFLAGS="-DTEST_CLUSTER_PRINT_LOG", check STATE 22 and see what happens +void TestCluster_ae_should_contain_entries_if_not_synchronized(CuTest *tc) { + common_trace_snapshot_2_entries(tc); + + info(19, "S0 makes a snapshot"); rc_exec_snapshot(0); + info(19, "S0 snapshot last included idx is 2"); CuAssertIntEquals(tc, 2, raft_get_snapshot_last_idx(sv[0].server)); + info(19, "S0 sends SS 18 to S2"); assert_msg_seq_type(18, MSG_SNAPSHOT); + + // S0 has one uncommitted entries + info(20, "Client sends CMD1 to S0"); rc_client_operation(0, CMD1); + info(20, "S0 sends AE 19 to S1"); assert_msg_seq_type(19, MSG_APPENDENTRIES); + // S0 will not aggressively send ae because next idx 2 is snapshotted + info(21, "S0 receives AER 14"); rc_deliver(14, MSG_APPENDENTRIES_RESPONSE); + info(21, "S0 advances next idx to 2"); CuAssertIntEquals(tc, 2, raft_node_get_next_idx(raft_get_node(sv[0].server, 2))); + + info(22, "S0 request times out. Guess how many entries sent to S2?"); + assert_inv12(0); +} + +// Trigger condition: Snapshot + Leadership change + No node failure + Network unordered message +// Run this testcase with CFLAGS="-DTEST_CLUSTER_PRINT_LOG", check STATE 33 and see what happens +void TestCluster_current_term_is_monotonic(CuTest *tc) { + common_trace_snapshot_2_entries(tc); + + // S0 gets more committed logs + info(19, "Client sends CMD1 x2 to S0"); rc_client_operation(0, CMD1); rc_client_operation(0, CMD1); + info(19, "S0 sends AE 18 to S1"); assert_msg_seq_type(18, MSG_APPENDENTRIES); + info(20, "S1 receives AE 18"); rc_deliver(18, MSG_APPENDENTRIES); + info(20, "S1 sends AER 19 to S0"); assert_msg_seq_type(19, MSG_APPENDENTRIES_RESPONSE); + info(21, "S0 receives AER 19"); rc_deliver(19, MSG_APPENDENTRIES_RESPONSE); + info(21, "S0 aggressively sends AE 20 to S1"); assert_msg_seq_type(20, MSG_APPENDENTRIES); + info(22, "S1 receives AE 20"); rc_deliver(20, MSG_APPENDENTRIES); + info(22, "S1 sends AER 21 to S0"); assert_msg_seq_type(21, MSG_APPENDENTRIES_RESPONSE); + info(23, "S0 receives AER 21"); rc_deliver(21, MSG_APPENDENTRIES_RESPONSE); + info(23, "S0 advances commit idx to 4"); CuAssertIntEquals(tc, 4, raft_get_commit_idx(sv[0].server)); + info(24, "S0 request times out"); rc_request_timeout(0); + info(24, "S0 sends AE 22 23 to S1 and S2"); assert_msg_seq_type(22, MSG_APPENDENTRIES); assert_msg_seq_type(23, MSG_APPENDENTRIES); + info(25, "S1 receives AE 22"); rc_deliver(22, MSG_APPENDENTRIES); + info(25, "S1 sends AER 24 to S0"); assert_msg_seq_type(24, MSG_APPENDENTRIES_RESPONSE); + info(25, "S1 commit idx is 4"); CuAssertIntEquals(tc, 4, raft_get_commit_idx(sv[1].server)); + + // S1 becomes Leader + info(26, "S1 becomes Candidate"); rc_election_timeout(1); + info(26, "S1 sends RV 25 26 to S0 and S2"); assert_msg_seq_type(25, MSG_REQUESTVOTE); assert_msg_seq_type(26, MSG_REQUESTVOTE); + info(27, "S2 receives RV 26"); rc_deliver(26, MSG_REQUESTVOTE); + info(27, "S2 sends RVR 27 (granted) to S2"); assert_msg_seq_type(27, MSG_REQUESTVOTE_RESPONSE); + info(28, "S1 receives RVR 27"); rc_deliver(27, MSG_REQUESTVOTE_RESPONSE); + info(28, "S1 becomes Leader"); CuAssertTrue(tc, raft_is_leader(sv[1].server)); + // S1 sends snapshot to S2 + info(29, "S1 sends AE 28 29 to S0 and S2"); assert_msg_seq_type(28, MSG_APPENDENTRIES); assert_msg_seq_type(29, MSG_APPENDENTRIES); + info(30, "S2 receives AE 29"); rc_deliver(29, MSG_APPENDENTRIES); + info(30, "S2 sends AER 30 to S1"); assert_msg_seq_type(30, MSG_APPENDENTRIES_RESPONSE); + info(31, "S1 receives AER 30"); rc_deliver(30, MSG_APPENDENTRIES_RESPONSE); + info(31, "S1 aggressively sends AE 31"); assert_msg_seq_type(31, MSG_APPENDENTRIES); + info(32, "S1 makes a snapshot"); rc_exec_snapshot(1); + info(32, "S1 snapshot last included idx is 4"); CuAssertIntEquals(tc, 4, raft_get_snapshot_last_idx(sv[1].server)); + info(32, "S1 sends SS 32 to S2"); assert_msg_seq_type(32, MSG_SNAPSHOT); + + info(33, "S2 receives SS 32. Guess S2 current term?"); + assert_inv5_before_action(); + rc_deliver(32, MSG_SNAPSHOT); + assert_inv5_after_action(); +} + +// Trigger condition: Snapshot + Leadership change + Node restart + No Network failure +// Run this testcase with CFLAGS="-DTEST_CLUSTER_PRINT_LOG", check STATE 25/34 and see what happens +void TestCluster_will_exist_leader(CuTest *tc) { + common_trace_snapshot_2_entries(tc); + + info(19, "S0 makes a snapshot"); rc_exec_snapshot(0); + info(19, "S0 sends SS 18 to S2"); assert_msg_seq_type(18, MSG_SNAPSHOT); + info(20, "S1 receives AE 15"); rc_deliver(15, MSG_APPENDENTRIES); + info(20, "S1 sends AER 19 to S0"); assert_msg_seq_type(19, MSG_APPENDENTRIES_RESPONSE); + info(20, "S1 makes a snapshot"); rc_exec_snapshot(1); + + // Fault injection: S0 restart + info(21, "S0 restarts and becomes Follower"); rc_restart(0); + + // S0 becomes Candidate + info(22, "S0 becomes Candidate"); rc_election_timeout(0); + info(22, "S0 sends RV 20 21 to S1 and S2"); assert_msg_seq_type(20, MSG_REQUESTVOTE); assert_msg_seq_type(21, MSG_REQUESTVOTE); + info(23, "S1 receives RV 20"); rc_deliver(20, MSG_REQUESTVOTE); + info(23, "S1 sends RVR 22 to S0"); assert_msg_seq_type(22, MSG_REQUESTVOTE_RESPONSE); + info(24, "S2 receives RV 21"); rc_deliver(21, MSG_REQUESTVOTE); + info(24, "S2 sends RVR 23 to S0"); assert_msg_seq_type(23, MSG_REQUESTVOTE_RESPONSE); + info(25, "S0 receives RVR 22"); rc_deliver(22, MSG_REQUESTVOTE_RESPONSE); + info(25, "S0 receives RVR 23"); rc_deliver(23, MSG_REQUESTVOTE_RESPONSE); + if (raft_is_leader(sv[0].server)) { + info(25, "S0 is Leader"); + return; + } else { + info(25, "S0 is not Leader"); + } + + // S1 becomes Candidate + info(26, "S1 becomes Candidate"); rc_election_timeout(1); + info(26, "S1 sends RV 24 25 to S0 and S2"); assert_msg_seq_type(24, MSG_REQUESTVOTE); assert_msg_seq_type(25, MSG_REQUESTVOTE); + info(27, "S0 receives RV 24"); rc_deliver(24, MSG_REQUESTVOTE); + info(27, "S0 sends RVR 26 to S1"); assert_msg_seq_type(26, MSG_REQUESTVOTE_RESPONSE); + info(28, "S2 receives RV 25"); rc_deliver(25, MSG_REQUESTVOTE); + info(28, "S2 sends RVR 27 to S1"); assert_msg_seq_type(27, MSG_REQUESTVOTE_RESPONSE); + info(29, "S1 receives RVR 26"); rc_deliver(26, MSG_REQUESTVOTE_RESPONSE); + info(29, "S1 receives RVR 27"); rc_deliver(27, MSG_REQUESTVOTE_RESPONSE); + if (raft_is_leader(sv[1].server)) { + info(29, "S1 is Leader"); + return; + } else { + info(29, "S1 is not Leader"); + } + + // S2 becomes Candidate + info(30, "S2 becomes Candidate"); rc_election_timeout(2); + info(30, "S2 sends RV 28 29 to S0 and S1"); assert_msg_seq_type(28, MSG_REQUESTVOTE); assert_msg_seq_type(29, MSG_REQUESTVOTE); + info(31, "S0 receives RV 28"); rc_deliver(28, MSG_REQUESTVOTE); + info(31, "S0 sends RVR 30 to S2"); assert_msg_seq_type(30, MSG_REQUESTVOTE_RESPONSE); + info(32, "S1 receives RV 29"); rc_deliver(29, MSG_REQUESTVOTE); + info(32, "S1 sends RVR 31 to S2"); assert_msg_seq_type(31, MSG_REQUESTVOTE_RESPONSE); + info(33, "S2 receives RVR 30"); rc_deliver(30, MSG_REQUESTVOTE_RESPONSE); + info(33, "S2 receives RVR 31"); rc_deliver(31, MSG_REQUESTVOTE_RESPONSE); + if (raft_is_leader(sv[0].server)) { + info(33, "S2 is Leader"); + return; + } else { + info(33, "S2 is not Leader"); + } + + info(34, "Guess how many Leaders?"); + assert_inv13(); +} + +// Trigger condition: Snapshot + No leadership change + No node restart + Network unordered message +// Run this testcase with CFLAGS="-DTEST_CLUSTER_PRINT_LOG", check STATE 22 and see what happens +void TestCluster_ae_retry_once(CuTest *tc) { + common_trace_snapshot_2_entries(tc); + + info(20, "S0 receives AER 14"); rc_deliver(14, MSG_APPENDENTRIES_RESPONSE); + info(20, "S0 aggressively sends AE 18 to S2"); assert_msg_seq_type(18, MSG_APPENDENTRIES); + info(21, "S2 receives AE 18"); rc_deliver(18, MSG_APPENDENTRIES); + info(21, "S2 sends AER 19 to S0"); assert_msg_seq_type(19, MSG_APPENDENTRIES_RESPONSE); + info(22, "S0 receives AER 19"); rc_deliver(19, MSG_APPENDENTRIES_RESPONSE); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? S0 should not aggressively send AE, current seq: %ld\n", get_seq()); + #endif + CuAssertTrue(tc, get_seq() == 19); +} + +// Trigger condition: Snapshot + No leadership change + No node restart + Network unordered message + Network partition +// Run this testcase with CFLAGS="-DTEST_CLUSTER_PRINT_LOG", check STATE 24 and see what happens +void TestCluster_handle_snapshot_make_progress(CuTest *tc) { + // init 3 servers, S0 becomes Leader + info(1, "init 3 servers"); rc_init_cluster(3, tc); + info(2, "S0 becomes Candidate"); rc_election_timeout(0); + info(2, "S0 sends RV 1 2 to S1 and S2"); assert_msg_seq_type(1, MSG_REQUESTVOTE); assert_msg_seq_type(2, MSG_REQUESTVOTE); + info(3, "S1 receives RV 1"); rc_deliver(1, MSG_REQUESTVOTE); + info(3, "S1 sends RVR 3 (granted) to S0"); assert_msg_seq_type(3, MSG_REQUESTVOTE_RESPONSE); + info(4, "S0 receives RVR 3 and become Leader"); rc_deliver(3, MSG_REQUESTVOTE_RESPONSE); + info(4, "S0 sends AE 4 5 to S1 and S2"); assert_msg_seq_type(4, MSG_APPENDENTRIES); assert_msg_seq_type(5, MSG_APPENDENTRIES); + info(5, "S1 S2 receives AE 4 5"); rc_deliver(4, MSG_APPENDENTRIES); rc_deliver(5, MSG_APPENDENTRIES); + info(5, "S1 S2 sends AER 6 7"); assert_msg_seq_type(6, MSG_APPENDENTRIES_RESPONSE); assert_msg_seq_type(7, MSG_APPENDENTRIES_RESPONSE); + info(6, "S0 receives AER 6 7"); rc_deliver(6, MSG_APPENDENTRIES_RESPONSE); rc_deliver(7, MSG_APPENDENTRIES_RESPONSE); + // Fault injection: network partition + info(7, "Network partition add S0"); add_to_network_partition(0); + info(8, "Client sends CMD1 x3 to S0"); rc_client_operation(0, CMD1); rc_client_operation(0, CMD1); rc_client_operation(0, CMD1); + // S1 becomes Leader + info(9, "S1 becomes Candidate"); rc_election_timeout(1); + info(9, "S1 sends RV 8 to S2"); assert_msg_seq_type(8, MSG_REQUESTVOTE); + info(10, "S2 receives RV 8"); rc_deliver(8, MSG_REQUESTVOTE); + info(10, "S2 sends RVR 9 (granted) to S1"); assert_msg_seq_type(9, MSG_REQUESTVOTE_RESPONSE); + info(11, "S1 receives RVR 9 and become Leader");rc_deliver(9, MSG_REQUESTVOTE_RESPONSE); + info(11, "S1 sends AE 10 to S2"); assert_msg_seq_type(10, MSG_APPENDENTRIES); + info(12, "S2 receives AE 10"); rc_deliver(10, MSG_APPENDENTRIES); + info(12, "S2 sends AER 11"); assert_msg_seq_type(11, MSG_APPENDENTRIES_RESPONSE); + info(13, "S0 receives AER 11"); rc_deliver(11, MSG_APPENDENTRIES_RESPONSE); + // S1 commits 2 entries and makes a snapshot + info(14, "Client sends CMD1 x2 to S1"); rc_client_operation(1, CMD1); rc_client_operation(1, CMD1); + info(14, "S1 sends AE 12 to S2"); assert_msg_seq_type(12, MSG_APPENDENTRIES); + info(15, "S2 receives AE 12"); rc_deliver(12, MSG_APPENDENTRIES); + info(15, "S2 sends AER 13 to S1"); assert_msg_seq_type(13, MSG_APPENDENTRIES_RESPONSE); + info(16, "S1 receives AER 13"); rc_deliver(13, MSG_APPENDENTRIES_RESPONSE); + info(16, "S1 aggressively sends AE 14 to S2"); assert_msg_seq_type(14, MSG_APPENDENTRIES); + info(17, "S2 receives AE 14"); rc_deliver(14, MSG_APPENDENTRIES); + info(17, "S2 sends AER 15"); assert_msg_seq_type(15, MSG_APPENDENTRIES_RESPONSE); + info(18, "S1 receives AER 15"); rc_deliver(15, MSG_APPENDENTRIES_RESPONSE); + info(18, "S1 commit idx is 2"); CuAssertIntEquals(tc, 2, raft_get_commit_idx(sv[1].server)); + info(19, "S1 makes a snapshot"); rc_exec_snapshot(1); + info(19, "S1 snapshot last included idx is 2"); CuAssertIntEquals(tc, 2, raft_get_snapshot_last_idx(sv[1].server)); + // Network recover + info(20, "Network recover"); clear_network_partition(); + info(21, "S1 request times out"); rc_request_timeout(1); + info(21, "S1 sends SS 16 to S0"); assert_msg_seq_type(16, MSG_SNAPSHOT); + int seq; + if (msg_hook.type == MSG_APPENDENTRIES) { + // bug fixed: S1 sends SS won't cancel subsequent sendings + info(21, "S1 sends AE 17 to S2"); assert_msg_seq_type(17, MSG_APPENDENTRIES); + info(22, "S0 receives SS 16"); rc_deliver(16, MSG_SNAPSHOT); + info(22, "S0 sends AER 18 to S1"); assert_msg_seq_type(18, MSG_APPENDENTRIES_RESPONSE); + info(23, "S1 receives AER 18"); rc_deliver(18, MSG_APPENDENTRIES_RESPONSE); + info(24, "S1 request times out"); rc_request_timeout(1); + seq = 19; + } + else { + info(22, "S0 receives SS 16"); rc_deliver(16, MSG_SNAPSHOT); + info(22, "S0 sends AER 17 to S1"); assert_msg_seq_type(17, MSG_APPENDENTRIES_RESPONSE); + info(23, "S1 receives AER 17"); rc_deliver(17, MSG_APPENDENTRIES_RESPONSE); + info(24, "S1 request times out"); rc_request_timeout(1); + seq = 18; + } + info(24, "Guess what type of msg S1 sent to S0?"); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? S1 should send AE, really sent: %s\n", get_msg_type_str(get_msg(seq)->type)); + #endif + CuAssertTrue(tc, get_msg(seq)->type == MSG_APPENDENTRIES); +} + diff --git a/tests/test_cluster.h b/tests/test_cluster.h new file mode 100644 index 00000000..ab50be10 --- /dev/null +++ b/tests/test_cluster.h @@ -0,0 +1,803 @@ +#ifndef __TEST_CLUSTER__ +#define __TEST_CLUSTER__ + +#include +#include +#include +#include +#include +#include +#include "CuTest.h" + +#include "raft.h" +#include "raft_log.h" +#include "raft_private.h" + +// check code assertions +// #define TEST_CLUSTER_CODE_ASSERTION + +// for debug use +// #define TEST_CLUSTER_PRINT_LOG +static void info(int state, const char *info) { + #ifdef TEST_CLUSTER_PRINT_LOG + static int last_print_state = 0; + if (state != last_print_state) { + printf("\n[STATE %d]\n", state); + last_print_state = state; + } + printf("+ %s\n", info); + #endif +} + +static CuTest *tc; + +// data structure size +#define MAX_SERVERS 10 +#define MAX_TERM 10 +#define MAX_LOG_ENTRIES 20 +#define MAX_EACH_MSGS 100 +#define MSG_TYPES 5 +#define MAX_MSGS (MAX_EACH_MSGS * MSG_TYPES) +#define MAX_MSG_ENTRIES (MAX_EACH_MSGS * MAX_LOG_ENTRIES) + +// fake time out settings +#define ELECTION_TIMEOUT 1000 +#define REQUEST_TIMEOUT 200 + +// server data structure +typedef struct { + int id; + raft_server_t* server; +} server_t; + +static server_t sv[MAX_SERVERS]; +static int num_server = 0; + +// msg types +typedef enum { + MSG_NIL, + MSG_REQUESTVOTE = 1, + MSG_REQUESTVOTE_RESPONSE = 2, + MSG_APPENDENTRIES = 4, + MSG_APPENDENTRIES_RESPONSE = 8, + MSG_SNAPSHOT = 16 +} msg_type_e; + +#ifdef TEST_CLUSTER_PRINT_LOG +static const char *get_msg_type_str(msg_type_e type) { + switch (type) + { + case MSG_NIL: + return "(nil)"; + case MSG_REQUESTVOTE: + return "RV"; + case MSG_REQUESTVOTE_RESPONSE: + return "RVR"; + case MSG_APPENDENTRIES: + return "AE"; + case MSG_APPENDENTRIES_RESPONSE: + return "AER"; + case MSG_SNAPSHOT: + return "SS"; + default: + CuAssertTrue(tc, 0); + } + return ""; +} +#endif + +// client cmd types +typedef enum { + CMD1, + CMD2 +} cmd_entry_e; + +// snapshot msg type +typedef struct { + raft_term_t term; + raft_index_t last_included_idx; + raft_term_t last_included_term; +} msg_snapshot_t; + +// msg type +typedef struct { + msg_type_e type; + int from; + int to; + size_t len; + void *msg; +} msg_t; + +// network msg buf +static msg_t msgs[MAX_MSGS + 1]; // index 1 is never used +static msg_entry_t msg_entry[MAX_MSG_ENTRIES]; +static msg_entry_response_t msg_entry_response[MAX_EACH_MSGS]; +static msg_requestvote_t msg_requestvote[MAX_EACH_MSGS]; +static msg_requestvote_response_t msg_requestvote_response[MAX_EACH_MSGS]; +static msg_appendentries_t msg_appendentries[MAX_EACH_MSGS]; +static msg_appendentries_response_t msg_appendentries_response[MAX_EACH_MSGS]; +static msg_snapshot_t msg_snapshot[MAX_EACH_MSGS]; + +// next msg pointer +static msg_t *next_msg; +static msg_entry_t *next_msg_entry; +static msg_entry_response_t *next_msg_entry_response; +static msg_requestvote_t *next_msg_requestvote; +static msg_requestvote_response_t *next_msg_requestvote_response; +static msg_appendentries_t *next_msg_appendentries; +static msg_appendentries_response_t *next_msg_appendentries_response; +static msg_snapshot_t *next_msg_snapshot; + +// log entry +static raft_entry_data_t entry1 = {.buf = (void*)"1", .len = 1}; +static raft_entry_data_t entry2 = {.buf = (void*)"2", .len = 1}; + +// bit-wise network partition data structure +static int network_partition = 0; + +// network partition functions +static void add_to_network_partition(int node_id) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network partition add server id: S%d\n", node_id); + #endif + network_partition |= 1 << node_id; +} +static void clear_network_partition() { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network clear partition\n"); + #endif + network_partition = 0; +} +static void delete_from_network_partition(int node_id) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network partition delete server id: S%d\n", node_id); + #endif + network_partition &= ~(1 << node_id); +} +static int is_partitioned_node(int node_id) { + return network_partition & (1 << node_id); +} + +static msg_t *set_msg(void *msg, msg_type_e type, int from, int to) { + static msg_t m; + m.type = type; + m.from = from; + m.to = to; + m.msg = msg; + switch (type) + { + case MSG_REQUESTVOTE: + m.len = sizeof(msg_requestvote_t); + break; + case MSG_REQUESTVOTE_RESPONSE: + m.len = sizeof(msg_requestvote_response_t); + break; + case MSG_APPENDENTRIES: + m.len = sizeof(msg_appendentries_t); + break; + case MSG_APPENDENTRIES_RESPONSE: + m.len = sizeof(msg_appendentries_response_t); + break; + case MSG_SNAPSHOT: + m.len = sizeof(msg_snapshot_t); + break; + default: + CuAssertTrue(tc, 0); + } + return &m; +} + +static long get_seq() { + return (long)(next_msg - &msgs[1]); +} + +// get a copy of the msg to be added so that we can check it simply +static msg_t msg_hook; + +static void do_add_msg(msg_t *m) { + if (!m || !m->msg) { + CuAssertTrue(tc, 0); + } + memcpy(&msg_hook, m, sizeof(*m)); + if (is_partitioned_node(m->from) || is_partitioned_node(m->to)) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network drop unreachable msg, type: %s, direction S%d -> S%d\n", + get_msg_type_str(m->type), m->from, m->to); + #endif + return; + } + memcpy(next_msg++, m, sizeof(*m)); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network add seq: %ld, type: %s, direction S%d -> S%d\n", + get_seq(), get_msg_type_str(m->type), m->from, m->to); + #endif +} + +// add msg to network +static void add_msg(void* msg, msg_type_e type, raft_server_t* from, raft_node_t* to) { + int from_id = raft_get_nodeid(from); + int to_id = raft_node_get_id(to); + msg_t *m = set_msg(msg, type, from_id, to_id); + do_add_msg(m); +} + +// get the msg +static msg_t *get_msg(int seq) { + return &msgs[seq]; +} + +// assert msg matches type +static void assert_msg_seq_type(int seq, msg_type_e type) { + msg_t *m = get_msg(seq); + CuAssertTrue(tc, m != NULL); + if (type != MSG_NIL) { + assert(m->type & type); + CuAssertTrue(tc, m->type & type); + } +} + +// assert last msg matches type +static void assert_last_msg_type(msg_type_e type) { + if (type != MSG_NIL) { + CuAssertTrue(tc, msg_hook.type & type); + } +} + +// duplicate the msg. seq starts from 1 +static void dup_msg(int seq) { + msg_t *m = get_msg(seq); + memcpy(next_msg++, m, sizeof(*m)); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network duplicate seq: %d -> %ld, type: %s, direction S%d -> S%d\n", + seq, get_seq(), get_msg_type_str(m->type), m->from, m->to); + #endif +} + +// delete the msg after delivering +static void delete_msg(int seq) { + msg_t *m = get_msg(seq); + CuAssertTrue(tc, m != NULL); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network delete seq: %d, type: %s, direction S%d -> S%d\n", seq, get_msg_type_str(m->type), m->from, m->to); + #endif + memset(m, 0, sizeof(*m)); +} + +// drop the msg +static void drop_msg(int seq) { + msg_t *m = get_msg(seq); + CuAssertTrue(tc, m != NULL); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network drop seq: %d, type: %s, direction S%d -> S%d\n", seq, get_msg_type_str(m->type), m->from, m->to); + #endif + memset(m, 0, sizeof(*m)); +} + +// callback send functions +static int raft_cbs_send_requestvote(raft_server_t* raft, void* udata, raft_node_t* node, msg_requestvote_t* msg) { + memcpy(next_msg_requestvote, msg, sizeof(*msg)); + add_msg(next_msg_requestvote++, MSG_REQUESTVOTE, raft, node); + return 0; +} + +static int raft_cbs_send_appendentries(raft_server_t* raft, void* udata, raft_node_t* node, msg_appendentries_t* msg) { + memcpy(next_msg_appendentries, msg, sizeof(*msg)); + next_msg_appendentries->entries = next_msg_entry; + for (int i = 0; i < msg->n_entries; i++) { + memcpy(next_msg_entry++, &(msg->entries[i]), sizeof(*next_msg_entry)); + } + add_msg(next_msg_appendentries++, MSG_APPENDENTRIES, raft, node); + return 0; +} + +static int raft_cbs_send_snapshot(raft_server_t* raft, void *user_data, raft_node_t* node) { + next_msg_snapshot->term = raft_get_current_term(raft); + next_msg_snapshot->last_included_idx = raft_get_snapshot_last_idx(raft); + next_msg_snapshot->last_included_term = raft_get_snapshot_last_term(raft); + add_msg(next_msg_snapshot++, MSG_SNAPSHOT, raft, node); + return 0; +} + +// do nothing functions +static int raft_cbs_persist_vote (raft_server_t* r, void *u, raft_node_id_t v) { return 0; } +static int raft_cbs_persist_term (raft_server_t* r, void *u, raft_term_t t, raft_node_id_t v) { return 0; } +static int raft_cbs_applylog (raft_server_t* r, void *u, raft_entry_t *e, raft_index_t i) { return 0; } +static int raft_cbs_logentry_offer(raft_server_t* r, void *u, raft_entry_t *e, raft_index_t i) { return 0; } +static int raft_cbs_logentry_pop (raft_server_t* r, void *u, raft_entry_t *e, raft_index_t i) { return 0; } + +// log when macro TEST_CLUSTER_PRINT_LOG is defined +static void raft_cbs_log(raft_server_t* raft, raft_node_t* node, void *udata, const char *buf) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" * Server id: %d, peer id: %d, info: %s", raft_get_nodeid(raft), (node ? raft_node_get_id(node) : -1), buf); + size_t len = strlen(buf); + if (buf[len - 1] != '\n') + printf("\n"); + #endif +} + +// callback functions setting +static raft_cbs_t raft_cbs_funcs = { + .send_requestvote = raft_cbs_send_requestvote, + .send_appendentries = raft_cbs_send_appendentries, + .send_snapshot = raft_cbs_send_snapshot, + .applylog = raft_cbs_applylog, + .persist_vote = raft_cbs_persist_vote, + .persist_term = raft_cbs_persist_term, + .log_offer = raft_cbs_logentry_offer, + .log_pop = raft_cbs_logentry_pop, + .log = raft_cbs_log +}; + +// init data structure functions +static void init_client_entry() { + for (size_t i = 0; i < MAX_MSG_ENTRIES; i++) { + msg_entry[i].id = i; + msg_entry[i].type = RAFT_LOGTYPE_NORMAL; + } + next_msg_entry = &msg_entry[0]; +} + +static void init_network_pointers() { + next_msg = &msgs[1]; // index 1 is never used + next_msg_entry_response = &msg_entry_response[0]; + next_msg_requestvote = &msg_requestvote[0]; + next_msg_requestvote_response = &msg_requestvote_response[0]; + next_msg_appendentries = &msg_appendentries[0]; + next_msg_appendentries_response = &msg_appendentries_response[0]; + next_msg_snapshot = &msg_snapshot[0]; + +} + +static void init_network() { + init_network_pointers(); + for (int i = 0; i < MAX_MSGS; i++) + memset(next_msg++, 0, sizeof(*next_msg)); + for (int i = 0; i < MAX_EACH_MSGS; i++) { + memset(next_msg_entry_response++, 0, sizeof(*next_msg_entry_response)); + memset(next_msg_requestvote++, 0, sizeof(*next_msg_requestvote)); + memset(next_msg_requestvote_response++, 0, sizeof(*next_msg_requestvote_response)); + memset(next_msg_appendentries++, 0, sizeof(*next_msg_appendentries)); + memset(next_msg_appendentries_response++, 0, sizeof(*next_msg_appendentries_response)); + memset(next_msg_snapshot++, 0, sizeof(*next_msg_snapshot)); + } + init_network_pointers(); + network_partition = 0; +} + +// init cluster +static void rc_init_cluster(int n_server, CuTest *_tc) { + CuAssertTrue(_tc, n_server <= MAX_SERVERS); + + // free exist servers + for (int i = 0; i < num_server; i++) { + raft_free(sv[i].server); + sv[i].server = NULL; + sv[i].id = 0; + } + + tc = _tc; + num_server = n_server; + + // create servers. + for (int i = 0; i < num_server; i++) { + sv[i].id = i; + sv[i].server = raft_new(); + CuAssertTrue(tc, sv[i].server != NULL); + raft_set_callbacks(sv[i].server, &raft_cbs_funcs, &sv[i]); + raft_set_election_timeout(sv[i].server, ELECTION_TIMEOUT); + + // we set rand timeout greater than election timeout + while (((raft_server_private_t*)sv[i].server)->election_timeout_rand == ELECTION_TIMEOUT) + raft_set_election_timeout(sv[i].server, ELECTION_TIMEOUT); + raft_set_request_timeout(sv[i].server, REQUEST_TIMEOUT); + raft_randomize_election_timeout(sv[i].server); + } + + // connect servers. + for (int i = 0; i < num_server; i++) { + for (int j = 0; j < num_server; j++) { + int is_self = sv[i].id == sv[j].id; + raft_node_t* node = raft_add_node(sv[i].server, &sv[j], sv[j].id, is_self); + CuAssertTrue(tc, node != NULL); + raft_node_set_voting_committed(node, true); + raft_node_set_active(node, true); + } + } + + // init client entry + init_client_entry(); + + // init network + init_network(); +} + +static void rc_election_timeout(int server_id) { + CuAssertTrue(tc, !raft_is_leader(sv[server_id].server)); + CuAssertTrue(tc, raft_periodic(sv[server_id].server, ELECTION_TIMEOUT * 2) != RAFT_ERR_SHUTDOWN); +} + +static void rc_request_timeout(int server_id) { + CuAssertTrue(tc, raft_is_leader(sv[server_id].server)); + CuAssertTrue(tc, raft_periodic(sv[server_id].server, REQUEST_TIMEOUT) != RAFT_ERR_SHUTDOWN); +} + +// simulate restart +static void rc_restart(int server_id) { + raft_server_private_t* me = (raft_server_private_t*)sv[server_id].server; + me->state = RAFT_STATE_FOLLOWER; + me->commit_idx = me->snapshot_last_idx; + me->current_leader = NULL; + me->timeout_elapsed = 0; + me->last_applied_idx = me->commit_idx; + raft_randomize_election_timeout((raft_server_t*)me); +} + +// deliver msg functions +static void rc_deliver_requestvote( + raft_server_t* me, raft_node_t* sender, msg_requestvote_t* m, msg_requestvote_response_t *r) +{ + CuAssertTrue(tc, raft_recv_requestvote(me, sender, m, r) != RAFT_ERR_SHUTDOWN); +} +static void rc_deliver_requestvote_response( + raft_server_t* me, raft_node_t* sender, msg_requestvote_response_t* m) +{ + CuAssertTrue(tc, raft_recv_requestvote_response(me, sender, m) != RAFT_ERR_SHUTDOWN); +} +static void rc_deliver_appendentries( + raft_server_t* me, raft_node_t* sender, msg_appendentries_t* m, msg_appendentries_response_t *r) +{ + CuAssertTrue(tc, raft_recv_appendentries(me, sender, m, r) != RAFT_ERR_SHUTDOWN); +} +static void rc_deliver_appendentries_response( + raft_server_t* me, raft_node_t* sender, msg_appendentries_response_t *m) +{ + CuAssertTrue(tc, raft_recv_appendentries_response(me, sender, m) != RAFT_ERR_SHUTDOWN); +} + +// deliver snapshot msg +static void rc_deliver_snapshot_request( + raft_server_t* me, raft_node_t* sender, msg_snapshot_t* m, msg_appendentries_response_t *r) +{ + r->success = false; + r->current_idx = raft_get_current_idx(me); + r->first_idx = r->current_idx; + r->term = raft_get_current_term(me); + + // according to raft paper + if (m->term < raft_get_current_term(me)) + return; + + int e = raft_begin_load_snapshot(me, m->last_included_term, m->last_included_idx); + if (e != 0) { + if (e == RAFT_ERR_SNAPSHOT_ALREADY_LOADED) { + // to inform the Leader to update next idx and match idx if needed + goto out; + } + else if (e == -1) { + return; + } + CuAssertTrue(tc, 0); // maybe RAFT_ERR_SHUTDOWN + } + + // re-add other servers since servers have been removed after load snapshot. + for (int j = 0, i = raft_get_nodeid(me); j < num_server; j++) { + int is_self = sv[i].id == sv[j].id; + raft_node_t* node = raft_add_node(sv[i].server, &sv[j], sv[j].id, is_self); + if (!node) + continue; + raft_node_set_voting_committed(node, true); + raft_node_set_active(node, true); + } + raft_end_load_snapshot(me); + +out: + r->success = true; + r->current_idx = m->last_included_idx; + r->first_idx = r->current_idx; + r->term = raft_get_current_term(me); +} + +// deliver a msg by seq +static void rc_deliver(int seq, msg_type_e type) { + msg_t *m = get_msg(seq); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" - Network deliver seq: %d, type: %s, direction S%d <- S%d\n", seq, get_msg_type_str(m->type), m->to, m->from); + #endif + assert_msg_seq_type(seq, type); + void *msg = m->msg; + raft_server_t *me = sv[m->to].server; + raft_node_t *sender = raft_get_node(me, m->from); + msg_t *r = NULL; + switch (m->type) + { + case MSG_APPENDENTRIES: + // ensure message type and len match + CuAssertTrue(tc, m->len == sizeof(msg_appendentries_t)); + r = set_msg(next_msg_appendentries_response++, MSG_APPENDENTRIES_RESPONSE, m->to, m->from); + rc_deliver_appendentries(me, sender, (msg_appendentries_t*)msg, (msg_appendentries_response_t*)r->msg); + break; + case MSG_APPENDENTRIES_RESPONSE: + CuAssertTrue(tc, m->len == sizeof(msg_appendentries_response_t)); + rc_deliver_appendentries_response(me, sender, (msg_appendentries_response_t*)msg); + break; + case MSG_REQUESTVOTE: + CuAssertTrue(tc, m->len == sizeof(msg_requestvote_t)); + // raft servers only vote for candidate after election timeout, + // advance timeout_elapsed to make the server votable. + if (((raft_server_private_t*)me)->state != RAFT_STATE_LEADER + && raft_get_timeout_elapsed(me) < ELECTION_TIMEOUT) + { + CuAssertTrue(tc, raft_periodic(me, ELECTION_TIMEOUT) != RAFT_ERR_SHUTDOWN); + } + r = set_msg(next_msg_requestvote_response++, MSG_REQUESTVOTE_RESPONSE, m->to, m->from); + rc_deliver_requestvote(me, sender, (msg_requestvote_t*)msg, (msg_requestvote_response_t*)r->msg); + break; + case MSG_REQUESTVOTE_RESPONSE: + CuAssertTrue(tc, m->len == sizeof(msg_requestvote_response_t)); + rc_deliver_requestvote_response(me, sender, (msg_requestvote_response_t*)msg); + break; + case MSG_SNAPSHOT: + CuAssertTrue(tc, m->len == sizeof(msg_snapshot_t)); + r = set_msg(next_msg_appendentries_response++, MSG_APPENDENTRIES_RESPONSE, m->to, m->from); + rc_deliver_snapshot_request(me, sender, (msg_snapshot_t*)msg, (msg_appendentries_response_t*)r->msg); + break; + default: + CuAssertTrue(tc, 0); + } + delete_msg(seq); + if (r) + do_add_msg(r); +} + +// server do snapshot +static void rc_exec_snapshot(int server_id) { + CuAssertTrue(tc, raft_begin_snapshot(sv[server_id].server, 0) != RAFT_ERR_SHUTDOWN); + // nothing to do here + CuAssertTrue(tc, raft_end_snapshot(sv[server_id].server) != RAFT_ERR_SHUTDOWN); +} + +// client append log to leader +static void rc_client_operation(int leader, cmd_entry_e cmd) { + CuAssertTrue(tc, raft_is_leader(sv[leader].server)); + if (cmd == CMD1) + next_msg_entry->data = entry1; + else if (cmd == CMD2) + next_msg_entry->data = entry2; + else + CuAssertTrue(tc, 0); + int ok = raft_recv_entry(sv[leader].server, next_msg_entry++, next_msg_entry_response++) != RAFT_ERR_SHUTDOWN; + CuAssertTrue(tc, ok); +} + +// inv 1: at most one Leader per term +static void assert_inv1() { + static int leader_term[MAX_TERM]; + for (int i = 0; i < MAX_TERM; i++) { + leader_term[i] = -1; + } + for (int i = 0; i < num_server; i++) { + if (raft_is_leader(sv[i].server)) { + int term = raft_get_current_term(sv[i].server); + CuAssertTrue(tc, term < MAX_TERM); + CuAssertTrue(tc, leader_term[term] == -1); + leader_term[term] = i; + } + } +} + +// inv 2: committed log replicated majority +static void assert_inv2() { + for (int i = 0; i < num_server; i++) { + raft_server_t *me = sv[i].server; + raft_server_private_t *_me = (raft_server_private_t *)sv[i].server; + int commit_idx = raft_get_commit_idx(me); + if (!raft_is_leader(me) || commit_idx == 0) + continue; + int n = 0; + raft_entry_t *entries = log_get_from_idx((log_t *)_me->log, log_get_base((log_t *)_me->log) + 1, &n); + if (!entries || !n) + continue; + int part = raft_get_num_snapshottable_logs(me); + CuAssertTrue(tc, part <= n); + int n_replicated = 1; + for (int j = 0; j < num_server; j++) { + if (i == j) + continue; + raft_server_t *other = sv[j].server; + raft_server_private_t *_other = (raft_server_private_t *)sv[j].server; + if (raft_get_current_idx(other) < commit_idx || raft_get_snapshot_last_idx(other) >= commit_idx) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? INV2: committed log (idx: %d) is not replicated to: S%d\n", commit_idx, j); + #endif + continue; + } + int n2 = 0; + raft_entry_t *other_entries = log_get_from_idx((log_t*)_other->log, log_get_base((log_t *)_me->log) + 1, &n2); + if (!entries || !n2) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? INV2: committed log (idx: %d) is not replicated to: S%d\n", commit_idx, j); + #endif + continue; + } + int other_part = commit_idx - log_get_base((log_t *)_me->log); + CuAssertTrue(tc, other_part <= n2); + int min_part = (part < other_part) ? part : other_part; + int ok = 1; + for (int k = 0; k < min_part; k++) { + int a_idx = part - 1 - k, b_idx = other_part - 1 - k; + ok &= ((char*)(entries[a_idx].data.buf))[0] == ((char*)(other_entries[b_idx].data.buf))[0]; + ok &= entries[a_idx].id == other_entries[b_idx].id; + ok &= entries[a_idx].term == other_entries[b_idx].term; + } + if (ok) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? INV2: committed log (idx: %d) replicated to: S%d\n", commit_idx, j); + #endif + n_replicated++; + } + #ifdef TEST_CLUSTER_PRINT_LOG + else { + printf(" ? INV2: committed log (idx: %d) is not replicated to: S%d\n", commit_idx, j); + } + #endif + } + CuAssertTrue(tc, n_replicated * 2 > num_server); + } +} + +// inv 3: committed log is durable (won't be rolled back) +// It is violated in TLA+, however it won't be violated in c code +// since raft_server.c detects "AE prev conflicts with committed entry" +// and return RAFT_ERR_SHUTDOWN which cause another assertion +static struct { + int base; + int snapshottable; + int commit_idx; + raft_entry_t entries[MAX_LOG_ENTRIES]; +} inv3_entries[MAX_SERVERS]; +static void assert_inv3_before_action() { + for (int i = 0; i < num_server; i++) { + raft_server_t *me = sv[i].server; + raft_server_private_t *_me = (raft_server_private_t *)sv[i].server; + int n = 0; + int base = log_get_base((log_t *)_me->log); + inv3_entries[i].base = base; + inv3_entries[i].snapshottable = raft_get_num_snapshottable_logs(me); + inv3_entries[i].commit_idx = raft_get_commit_idx(me); + raft_entry_t *entries = log_get_from_idx((log_t *)_me->log, base + 1, &n); + if (!entries || !n) { + continue; + } + for (int k = 0; k < n; k++) { + memcpy(&(inv3_entries[i].entries[k]), &entries[k], sizeof(raft_entry_t)); + } + } +} +static void assert_inv3_after_action() { + for (int i = 0; i < num_server; i++) { + raft_server_t *me = sv[i].server; + raft_server_private_t *_me = (raft_server_private_t *)sv[i].server; + int n = 0; + int base = log_get_base((log_t *)_me->log); + raft_entry_t *entries = log_get_from_idx((log_t *)_me->log, base + 1, &n); + if (!entries || !n) { + continue; + } + int commit_idx = raft_get_commit_idx(me); + int snapshottable = raft_get_num_snapshottable_logs(me); + int idx = (commit_idx < inv3_entries[i].commit_idx) ? commit_idx : inv3_entries[i].commit_idx; + int len = (snapshottable < inv3_entries[i].snapshottable) ? snapshottable: inv3_entries[i].snapshottable; + int ok = 1; + for (int k = 0; k < len; k++) { + int a_idx = idx - base - 1 - k, b_idx = idx - inv3_entries[i].base - 1 - k; + ok &= ((char*)(entries[a_idx].data.buf))[0] == ((char*)(inv3_entries[i].entries[b_idx].data.buf))[0]; + // ok &= entries[a_idx].id == inv3_entries[i].entries[b_idx].id; + ok &= entries[a_idx].term == inv3_entries[i].entries[b_idx].term; + } + CuAssertTrue(tc, ok); + } +} + +// inv 4: Follower's log is less than or equal to leader's log after receiving AE and changing its log +static void assert_inv4(int seq) { + msg_t *m = get_msg(seq); + CuAssertTrue(tc, m != NULL); + CuAssertTrue(tc, m->type == MSG_APPENDENTRIES); + raft_server_t *me = sv[m->to].server; + raft_server_t *leader = sv[m->from].server; + int follower_log_len_before = raft_get_current_idx(me); + int server_term = ((msg_appendentries_t*)m->msg)->term; + rc_deliver(seq, MSG_APPENDENTRIES); + int follower_log_len_after = raft_get_current_idx(me); + int leader_log_len = raft_get_current_idx(leader); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? INV4: follower log length: before: %d after: %d, leader log length: %d\n", + follower_log_len_before, follower_log_len_after, leader_log_len); + #endif + if (follower_log_len_before != follower_log_len_after && server_term == raft_get_current_term(leader)) { + CuAssertTrue(tc, follower_log_len_after <= leader_log_len); + } +} + +// inv 5: monotonic current term +static int inv5_terms[MAX_SERVERS]; +static void assert_inv5_before_action() { + for (int i = 0; i < num_server; i++) + inv5_terms[i] = raft_get_current_term(sv[i].server); +} +static void assert_inv5_after_action() { + int inv5_terms_after[MAX_SERVERS]; + for (int i = 0; i < num_server; i++) + inv5_terms_after[i] = raft_get_current_term(sv[i].server); + // assert(0); + for (int i = 0; i < num_server; i++) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? INV5: for S%d: current term before: %d, current term after: %d\n", + i, inv5_terms[i], inv5_terms_after[i]); + #endif + CuAssertTrue(tc, inv5_terms[i] <= inv5_terms_after[i]); + } +} + +// Did not found these invariants would be violated +// inv 6: monotonic commit index +// inv 7: monotonic match index +// inv 8: (internal) +// inv 9: snapshot last index is less than or equal to commit index +// inv 10: next index is greater than zero + +// inv 11: next index is greater than match index +// (it seems that this bug does not cause any disasters) +static void assert_inv11() { + for (int i = 0; i < num_server; i++) { + raft_server_t *me = sv[i].server; + if (!raft_is_leader(me)) + continue; + for (int j = 0; j < num_server; j++) { + raft_node_t *node = raft_get_node_from_idx(me, j); + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? INV11: for S%d: next idx: %ld, match idx: %ld\n", + j, raft_node_get_next_idx(node), raft_node_get_match_idx(node)); + #endif + CuAssertTrue(tc, raft_node_get_match_idx(node) < raft_node_get_next_idx(node)); + } + } +} + +// inv 12: AE msg should contain at least one entry +static void assert_inv12(int server_id) { + clear_network_partition(); + rc_request_timeout(server_id); + raft_server_t *me = sv[server_id].server; + for (int i = num_server - 1; i > 0; i--) { + msg_t *m = next_msg - i; + CuAssertTrue(tc, m != NULL); + if (m->type == MSG_SNAPSHOT) + continue; + CuAssertTrue(tc, m->type == MSG_APPENDENTRIES); + int next_idx = raft_node_get_next_idx(raft_get_node(me, (raft_node_id_t)m->to)); + int num_entries_should_send = raft_get_current_idx(me) + 1 - next_idx; + int num_entries_ae = ((msg_appendentries_t *)m->msg)->n_entries; + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? INV12: for S%d: should send: %d, really sent: %d\n", + m->to, num_entries_should_send, num_entries_ae); + #endif + CuAssertTrue(tc, num_entries_ae == num_entries_should_send); + } +} + +// inv 13: will exist a Leader +static void assert_inv13() { + int num_leader = 0; + for (int i = 0; i < num_server; i++) { + #ifdef TEST_CLUSTER_PRINT_LOG + printf(" ? INV13: for S%d: is leader: %d\n", i, raft_is_leader(sv[i].server)); + #endif + if (raft_is_leader(sv[i].server)) + num_leader++; + } + CuAssertTrue(tc, num_leader != 0); +} + +#endif //__TEST_CLUSTER__ + diff --git a/tests/test_cluster_more.c b/tests/test_cluster_more.c new file mode 100644 index 00000000..de692fe8 --- /dev/null +++ b/tests/test_cluster_more.c @@ -0,0 +1,190 @@ +// Generated by generate_runnable_testcase.py on 2021-08-24 13:49:33.522659 +// Trace file: trace_0_81007.inv_2 + +#include "test_cluster.h" + +// Note: +// Server id starts from 0, i.e. S0's id is 0 +// Msg seq starts from 1 + +// Abbreviation: +// rc_: Raft Controller +// RV: Request Vote msg +// RVR: Request Vote Response msg +// AE: Append Entries msg +// AER: Append Entries Response msg +// SS: SnapShot msg + +void TestCluster_trace_0_81007_inv_2_committed_log_replicated_majority(CuTest * tc) { + info( 1, "Init 3 servers "); rc_init_cluster(3, tc); + info( 2, "Network partition add S0 "); add_to_network_partition(0); + info( 3, "S1 becomes candidate "); rc_election_timeout(1); + info( 4, "Network not add unreachable RV S1 -> S0"); + info( 5, "S1 sends RV 1 to S2 "); assert_msg_seq_type(1, MSG_REQUESTVOTE); + info( 6, "S2 receives RV 1 from S1 "); rc_deliver(1, MSG_REQUESTVOTE); + info( 6, "S2 sends RVR 2 to S1 "); assert_msg_seq_type(2, MSG_REQUESTVOTE_RESPONSE); + info( 7, "S1 receives RVR 2 from S2 "); rc_deliver(2, MSG_REQUESTVOTE_RESPONSE); + info( 8, "Network not add unreachable AE S1 -> S0"); + info( 9, "S1 sends AE 3 to S2 "); assert_msg_seq_type(3, MSG_APPENDENTRIES); + info(10, "Client appends CMD2 to S1 "); rc_client_operation(1, CMD2); + info(11, "Network not add unreachable AE S1 -> S0"); + info(12, "S1 sends AE 4 to S2 "); assert_msg_seq_type(4, MSG_APPENDENTRIES); + info(13, "S1 sends heartbeat "); rc_request_timeout(1); + info(14, "Network not add unreachable AE S1 -> S0"); + info(15, "S1 sends AE 5 to S2 "); assert_msg_seq_type(5, MSG_APPENDENTRIES); + info(16, "Client appends CMD2 to S1 "); rc_client_operation(1, CMD2); + info(17, "S1 sends heartbeat "); rc_request_timeout(1); + info(18, "Network not add unreachable AE S1 -> S0"); + info(19, "S1 sends AE 6 to S2 "); assert_msg_seq_type(6, MSG_APPENDENTRIES); + info(20, "Network drop seq 3 "); drop_msg(3); + info(21, "S2 receives AE 4 from S1 "); rc_deliver(4, MSG_APPENDENTRIES); + info(21, "S2 sends AER 7 to S1 "); assert_msg_seq_type(7, MSG_APPENDENTRIES_RESPONSE); + info(22, "S1 receives AER 7 from S2 "); rc_deliver(7, MSG_APPENDENTRIES_RESPONSE); + info(22, "S1 sends AE|SS 8 to S2 "); assert_msg_seq_type(8, MSG_APPENDENTRIES | MSG_SNAPSHOT); + info(23, "Network drop seq 6 "); drop_msg(6); + info(24, "S2 receives unordered AE 8 from S1 "); rc_deliver(8, MSG_APPENDENTRIES); + info(24, "S2 sends AER 9 to S1 "); assert_msg_seq_type(9, MSG_APPENDENTRIES_RESPONSE); + info(25, "Client appends CMD1 to S1 "); rc_client_operation(1, CMD1); + info(26, "S1 receives AER 9 from S2 "); rc_deliver(9, MSG_APPENDENTRIES_RESPONSE); + info(26, "S1 sends AE|SS 10 to S2 "); assert_msg_seq_type(10, MSG_APPENDENTRIES | MSG_SNAPSHOT); + info(27, "Network duplicate seq 10 -> 11 "); dup_msg(10); + info(28, "Client appends CMD2 to S1 "); rc_client_operation(1, CMD2); + info(29, "S1 executes snapshot "); rc_exec_snapshot(1); + info(30, "Network not add unreachable SS S1 -> S0"); + info(31, "Network partition recovered "); clear_network_partition(); + info(32, "S2 executes snapshot "); rc_exec_snapshot(2); + info(33, "Network duplicate seq 5 -> 12 "); dup_msg(5); + info(34, "S2 receives AE 5 from S1 "); rc_deliver(5, MSG_APPENDENTRIES); + info(34, "S2 sends AER 13 to S1 "); assert_msg_seq_type(13, MSG_APPENDENTRIES_RESPONSE); + info(35, "S2 receives unordered AE 11 from S1 "); rc_deliver(11, MSG_APPENDENTRIES); + info(35, "S2 sends AER 14 to S1 "); assert_msg_seq_type(14, MSG_APPENDENTRIES_RESPONSE); + info(36, "S1 receives unordered AER 14 from S2 "); rc_deliver(14, MSG_APPENDENTRIES_RESPONSE); + info(36, "S1 sends AE|SS 15 to S2 "); assert_msg_seq_type(15, MSG_APPENDENTRIES | MSG_SNAPSHOT); + + info(36, "Guess committed logs replicated to how many servers?"); + assert_inv2(); +} + +void TestCluster_trace_4_63114_inv_3_committed_log_is_durable(CuTest * tc) { + info( 1, "Init 3 servers "); rc_init_cluster(3, tc); + info( 2, "S0 becomes candidate "); rc_election_timeout(0); + info( 3, "S0 sends RV 1 to S1 "); assert_msg_seq_type(1, MSG_REQUESTVOTE); + info( 4, "S0 sends RV 2 to S2 "); assert_msg_seq_type(2, MSG_REQUESTVOTE); + info( 5, "S1 receives RV 1 from S0 "); rc_deliver(1, MSG_REQUESTVOTE); + info( 5, "S1 sends RVR 3 to S0 "); assert_msg_seq_type(3, MSG_REQUESTVOTE_RESPONSE); + info( 6, "S2 becomes candidate "); rc_election_timeout(2); + info( 7, "S2 sends RV 4 to S0 "); assert_msg_seq_type(4, MSG_REQUESTVOTE); + info( 8, "S2 sends RV 5 to S1 "); assert_msg_seq_type(5, MSG_REQUESTVOTE); + info( 9, "Network drop seq 4 "); drop_msg(4); + info(10, "Network drop seq 2 "); drop_msg(2); + info(11, "Network duplicate seq 3 -> 6 "); dup_msg(3); + info(12, "S2 becomes candidate "); rc_election_timeout(2); + info(13, "S2 sends RV 7 to S0 "); assert_msg_seq_type(7, MSG_REQUESTVOTE); + info(14, "S2 sends RV 8 to S1 "); assert_msg_seq_type(8, MSG_REQUESTVOTE); + info(15, "S1 receives RV 5 from S2 "); rc_deliver(5, MSG_REQUESTVOTE); + info(15, "S1 sends RVR 9 to S2 "); assert_msg_seq_type(9, MSG_REQUESTVOTE_RESPONSE); + info(16, "Network partition add S0 "); add_to_network_partition(0); + info(17, "S0 receives RVR 3 from S1 "); rc_deliver(3, MSG_REQUESTVOTE_RESPONSE); + info(18, "Network not add unreachable AE S0 -> S1"); + info(19, "Network not add unreachable AE S0 -> S2"); + info(20, "Network not dup unreachable seq 7 "); + info(21, "Client appends CMD1 to S0 "); rc_client_operation(0, CMD1); + info(22, "Network not add unreachable AE S0 -> S1"); + info(23, "Network not add unreachable AE S0 -> S2"); + info(24, "Network partition recovered "); clear_network_partition(); + info(25, "S1 receives RV 8 from S2 "); rc_deliver(8, MSG_REQUESTVOTE); + info(25, "S1 sends RVR 10 to S2 "); assert_msg_seq_type(10, MSG_REQUESTVOTE_RESPONSE); + info(26, "S2 receives unordered RVR 10 from S1 "); rc_deliver(10, MSG_REQUESTVOTE_RESPONSE); + info(27, "S2 sends AE 11 to S0 "); assert_msg_seq_type(11, MSG_APPENDENTRIES); + info(28, "S2 sends AE 12 to S1 "); assert_msg_seq_type(12, MSG_APPENDENTRIES); + info(29, "S1 receives AE 12 from S2 "); rc_deliver(12, MSG_APPENDENTRIES); + info(29, "S1 sends AER 13 to S2 "); assert_msg_seq_type(13, MSG_APPENDENTRIES_RESPONSE); + info(30, "S0 receives unordered AE 11 from S2 "); rc_deliver(11, MSG_APPENDENTRIES); + info(30, "S0 sends AER 14 to S2 "); assert_msg_seq_type(14, MSG_APPENDENTRIES_RESPONSE); + info(31, "S0 receives unordered RV 7 from S2 "); rc_deliver(7, MSG_REQUESTVOTE); + info(31, "S0 sends RVR 15 to S2 "); assert_msg_seq_type(15, MSG_REQUESTVOTE_RESPONSE); + info(32, "S2 receives RVR 9 from S1 "); rc_deliver(9, MSG_REQUESTVOTE_RESPONSE); + info(33, "S2 receives AER 13 from S1 "); rc_deliver(13, MSG_APPENDENTRIES_RESPONSE); + info(34, "S2 receives AER 14 from S0 "); rc_deliver(14, MSG_APPENDENTRIES_RESPONSE); + info(35, "Client appends CMD1 to S2 "); rc_client_operation(2, CMD1); + info(36, "S2 sends AE 16 to S0 "); assert_msg_seq_type(16, MSG_APPENDENTRIES); + info(37, "S2 sends AE 17 to S1 "); assert_msg_seq_type(17, MSG_APPENDENTRIES); + info(38, "S2 receives RVR 15 from S0 "); rc_deliver(15, MSG_REQUESTVOTE_RESPONSE); + info(39, "Client appends CMD2 to S2 "); rc_client_operation(2, CMD2); + info(40, "S1 receives AE 17 from S2 "); rc_deliver(17, MSG_APPENDENTRIES); + info(40, "S1 sends AER 18 to S2 "); assert_msg_seq_type(18, MSG_APPENDENTRIES_RESPONSE); + info(41, "S2 receives AER 18 from S1 "); rc_deliver(18, MSG_APPENDENTRIES_RESPONSE); + info(41, "S2 sends AE|SS 19 to S1 "); assert_msg_seq_type(19, MSG_APPENDENTRIES | MSG_SNAPSHOT); + info(42, "S2 sends heartbeat "); rc_request_timeout(2); + info(43, "S2 sends AE 20 to S0 "); assert_msg_seq_type(20, MSG_APPENDENTRIES); + info(44, "S2 sends AE 21 to S1 "); assert_msg_seq_type(21, MSG_APPENDENTRIES); + info(45, "S0 receives RVR 6 from S1 "); rc_deliver(6, MSG_REQUESTVOTE_RESPONSE); + info(46, "Client appends CMD1 to S2 "); rc_client_operation(2, CMD1); + info(47, "S1 receives AE 19 from S2 "); rc_deliver(19, MSG_APPENDENTRIES); + info(47, "S1 sends AER 22 to S2 "); assert_msg_seq_type(22, MSG_APPENDENTRIES_RESPONSE); + info(48, "S1 receives AE 21 from S2 "); rc_deliver(21, MSG_APPENDENTRIES); + info(48, "S1 sends AER 23 to S2 "); assert_msg_seq_type(23, MSG_APPENDENTRIES_RESPONSE); + info(49, "S1 executes snapshot "); rc_exec_snapshot(1); + add_to_network_partition(0); + info(50, "S2 executes snapshot "); rc_exec_snapshot(2); + clear_network_partition(); + info(51, "S2 receives AER 22 from S1 "); rc_deliver(22, MSG_APPENDENTRIES_RESPONSE); + info(51, "S2 sends AE|SS 24 to S1 "); assert_msg_seq_type(24, MSG_APPENDENTRIES | MSG_SNAPSHOT); + info(52, "S2 sends heartbeat "); rc_request_timeout(2); + // info(53, "S2 sends AE 25 to S0 "); assert_msg_seq_type(25, MSG_APPENDENTRIES); + info(54, "S2 sends AE 26 to S1 "); assert_msg_seq_type(26, MSG_APPENDENTRIES); + info(55, "S0 receives unordered AE 25 from S2 "); rc_deliver(25, MSG_NIL); + // info(55, "S0 sends AER 27 to S2 "); assert_msg_seq_type(27, MSG_APPENDENTRIES_RESPONSE); + info(56, "S2 receives unordered AER 27 from S0 "); rc_deliver(27, MSG_APPENDENTRIES_RESPONSE); + info(57, "S1 receives AE 24 from S2 "); rc_deliver(24, MSG_APPENDENTRIES); + // info(57, "S1 sends AER 28 to S2 "); assert_msg_seq_type(28, MSG_APPENDENTRIES_RESPONSE); + info(58, "S0 receives AE 16 from S2 "); rc_deliver(16, MSG_APPENDENTRIES); + info(58, "S0 sends AER 29 to S2 "); assert_msg_seq_type(29, MSG_APPENDENTRIES_RESPONSE); +} + +void TestCluster_trace_3_99878_assertion_set_commit_idx_le_current_idx(CuTest * tc) { +#ifdef TEST_CLUSTER_CODE_ASSERTION + info( 1, "Init 3 servers "); rc_init_cluster(3, tc); + info( 2, "S0 becomes candidate "); rc_election_timeout(0); + info( 3, "S0 sends RV 1 to S1 "); assert_msg_seq_type(1, MSG_REQUESTVOTE); + info( 4, "S0 sends RV 2 to S2 "); assert_msg_seq_type(2, MSG_REQUESTVOTE); + info( 5, "Network partition add S0 "); add_to_network_partition(0); + info( 6, "Network not dup unreachable seq 2 "); + info( 7, "Network not dup unreachable seq 2 "); + info( 8, "S2 becomes candidate "); rc_election_timeout(2); + info( 9, "Network not add unreachable RV S2 -> S0"); + info(10, "S2 sends RV 3 to S1 "); assert_msg_seq_type(3, MSG_REQUESTVOTE); + info(11, "Network partition recovered "); clear_network_partition(); + info(12, "Network drop seq 2 "); drop_msg(2); + info(13, "Network drop seq 3 "); drop_msg(3); + info(14, "S1 restarts "); rc_restart(1); + info(15, "S1 receives RV 1 from S0 "); rc_deliver(1, MSG_REQUESTVOTE); + info(15, "S1 sends RVR 4 to S0 "); assert_msg_seq_type(4, MSG_REQUESTVOTE_RESPONSE); + info(16, "S0 receives RVR 4 from S1 "); rc_deliver(4, MSG_REQUESTVOTE_RESPONSE); + info(17, "S0 sends AE 5 to S1 "); assert_msg_seq_type(5, MSG_APPENDENTRIES); + info(18, "S0 sends AE 6 to S2 "); assert_msg_seq_type(6, MSG_APPENDENTRIES); + info(19, "Client appends CMD2 to S0 "); rc_client_operation(0, CMD2); + info(20, "S0 sends AE 7 to S1 "); assert_msg_seq_type(7, MSG_APPENDENTRIES); + info(21, "S0 sends AE 8 to S2 "); assert_msg_seq_type(8, MSG_APPENDENTRIES); + info(22, "S2 receives unordered AE 8 from S0 "); rc_deliver(8, MSG_APPENDENTRIES); + info(22, "S2 sends AER 9 to S0 "); assert_msg_seq_type(9, MSG_APPENDENTRIES_RESPONSE); + info(23, "Client appends CMD2 to S0 "); rc_client_operation(0, CMD2); + info(24, "S2 restarts "); rc_restart(2); + info(25, "S1 receives AE 5 from S0 "); rc_deliver(5, MSG_APPENDENTRIES); + info(25, "S1 sends AER 10 to S0 "); assert_msg_seq_type(10, MSG_APPENDENTRIES_RESPONSE); + info(26, "S0 receives AER 9 from S2 "); rc_deliver(9, MSG_APPENDENTRIES_RESPONSE); + info(26, "S0 sends AE|SS 11 to S2 "); assert_msg_seq_type(11, MSG_APPENDENTRIES | MSG_SNAPSHOT); + info(27, "S0 executes snapshot "); rc_exec_snapshot(0); + info(28, "Client appends CMD2 to S0 "); rc_client_operation(0, CMD2); + info(29, "Client appends CMD2 to S0 "); rc_client_operation(0, CMD2); + info(30, "S0 sends heartbeat "); rc_request_timeout(0); + info(31, "S0 sends AE 12 to S1 "); assert_msg_seq_type(12, MSG_NIL); + info(32, "S0 sends AE 13 to S2 "); assert_msg_seq_type(13, MSG_NIL); + info(33, "S1 receives unordered AE 12 from S0 "); rc_deliver(12, MSG_NIL); + info(33, "S1 sends AER 14 to S0 "); assert_msg_seq_type(14, MSG_NIL); + info(34, "S1 receives AE 7 from S0 "); rc_deliver(7, MSG_APPENDENTRIES); + info(34, "S1 sends AER 15 to S0 "); assert_msg_seq_type(15, MSG_APPENDENTRIES_RESPONSE); + info(35, "S1 becomes candidate "); rc_election_timeout(1); +#endif +} + diff --git a/tests/test_server.c b/tests/test_server.c index 25d8dd59..4ee66549 100644 --- a/tests/test_server.c +++ b/tests/test_server.c @@ -434,7 +434,7 @@ void TestRaft_server_wont_apply_entry_if_there_isnt_a_majority(CuTest* tc) } /* If commitidx > lastApplied: increment lastApplied, apply log[lastApplied] - * to state machine (�5.3) */ + * to state machine (§5.3) */ void TestRaft_server_increment_lastApplied_when_lastApplied_lt_commitidx( CuTest* tc) { @@ -811,7 +811,7 @@ void TestRaft_server_recv_requestvote_response_must_be_candidate_to_receive( CuAssertTrue(tc, 0 == raft_get_nvotes_for_me(r)); } -/* Reply false if term < currentTerm (�5.1) */ +/* Reply false if term < currentTerm (§5.1) */ void TestRaft_server_recv_requestvote_reply_false_if_term_less_than_current_term( CuTest * tc ) @@ -869,7 +869,7 @@ void TestRaft_leader_recv_requestvote_does_not_step_down( CuAssertIntEquals(tc, 1, raft_get_current_leader(r)); } -/* Reply true if term >= currentTerm (�5.1) */ +/* Reply true if term >= currentTerm (§5.1) */ void TestRaft_server_recv_requestvote_reply_true_if_term_greater_than_or_equal_to_current_term( CuTest * tc ) @@ -993,7 +993,7 @@ void TestRaft_server_recv_requestvote_depends_on_candidate_id( } /* If votedFor is null or candidateId, and candidate's log is at - * least as up-to-date as local log, grant vote (�5.2, �5.4) */ + * least as up-to-date as local log, grant vote (§5.2, §5.4) */ void TestRaft_server_recv_requestvote_dont_grant_vote_if_we_didnt_vote_for_this_candidate( CuTest * tc ) @@ -1031,7 +1031,7 @@ void TestRaft_server_recv_requestvote_dont_grant_vote_if_we_didnt_vote_for_this_ /* If requestvote is received within the minimum election timeout of * hearing from a current leader, it does not update its term or grant its - * vote (�6). + * vote (§6). */ void TestRaft_server_recv_requestvote_ignore_if_master_is_fresh(CuTest * tc) { @@ -1238,7 +1238,7 @@ void TestRaft_follower_recv_appendentries_increases_log(CuTest * tc) /* receive an appendentry with commit */ memset(&ae, 0, sizeof(msg_appendentries_t)); ae.term = 3; - ae.prev_log_term = 1; + ae.prev_log_term = 0; /* first appendentries msg */ ae.prev_log_idx = 0; ae.leader_commit = 5; @@ -1506,7 +1506,7 @@ void TestRaft_follower_recv_appendentries_add_new_entries_not_already_in_log( memset(&ae, 0, sizeof(msg_appendentries_t)); ae.term = 1; ae.prev_log_idx = 0; - ae.prev_log_term = 1; + ae.prev_log_term = 0; /* include entries */ msg_entry_t e[2]; memset(&e, 0, sizeof(msg_entry_t) * 2); @@ -1540,7 +1540,7 @@ void TestRaft_follower_recv_appendentries_does_not_add_dupe_entries_already_in_l memset(&ae, 0, sizeof(msg_appendentries_t)); ae.term = 1; ae.prev_log_idx = 0; - ae.prev_log_term = 1; + ae.prev_log_term = 0; /* include 1 entry */ msg_entry_t e[2]; memset(&e, 0, sizeof(msg_entry_t) * 2); @@ -1706,7 +1706,7 @@ void TestRaft_follower_recv_appendentries_set_commitidx_to_prevLogIdx( memset(&ae, 0, sizeof(msg_appendentries_t)); ae.term = 1; ae.prev_log_idx = 0; - ae.prev_log_term = 1; + ae.prev_log_term = 0; /* include entries */ msg_entry_t e[4]; memset(&e, 0, sizeof(msg_entry_t) * 4); @@ -1755,7 +1755,7 @@ void TestRaft_follower_recv_appendentries_set_commitidx_to_LeaderCommit( memset(&ae, 0, sizeof(msg_appendentries_t)); ae.term = 1; ae.prev_log_idx = 0; - ae.prev_log_term = 1; + ae.prev_log_term = 0; /* include entries */ msg_entry_t e[4]; memset(&e, 0, sizeof(msg_entry_t) * 4); @@ -1927,7 +1927,7 @@ void TestRaft_follower_recv_appendentries_heartbeat_does_not_overwrite_logs( memset(&ae, 0, sizeof(msg_appendentries_t)); ae.term = 1; ae.prev_log_idx = 0; - ae.prev_log_term = 1; + ae.prev_log_term = 0; /* include entries */ msg_entry_t e[4]; memset(&e, 0, sizeof(msg_entry_t) * 4); @@ -1991,7 +1991,7 @@ void TestRaft_follower_recv_appendentries_does_not_deleted_commited_entries( memset(&ae, 0, sizeof(msg_appendentries_t)); ae.term = 1; ae.prev_log_idx = 0; - ae.prev_log_term = 1; + ae.prev_log_term = 0; /* include entries */ msg_entry_t e[5]; memset(&e, 0, sizeof(msg_entry_t) * 4); @@ -2771,7 +2771,7 @@ void TestRaft_leader_retries_appendentries_with_decremented_NextIdx_log_inconsis /* * If there exists an N such that N > commitidx, a majority * of matchidx[i] = N, and log[N].term == currentTerm: - * set commitidx = N (�5.2, �5.4). */ + * set commitidx = N (§5.2, §5.4). */ void TestRaft_leader_append_entry_to_log_increases_idxno(CuTest * tc) { raft_cbs_t funcs = { diff --git a/tests/test_snapshotting.c b/tests/test_snapshotting.c index 25997b40..2b671deb 100644 --- a/tests/test_snapshotting.c +++ b/tests/test_snapshotting.c @@ -379,13 +379,13 @@ void TestRaft_follower_load_from_snapshot(CuTest * tc) CuAssertIntEquals(tc, 0, raft_periodic(r, 1000)); - /* current idx means snapshot was unnecessary */ ety.id = 2; raft_append_entry(r, &ety); ety.id = 3; raft_append_entry(r, &ety); raft_set_commit_idx(r, 7); - CuAssertIntEquals(tc, -1, raft_begin_load_snapshot(r, 6, 5)); + /* last_included_index conflicts with committed entry */ + CuAssertIntEquals(tc, RAFT_ERR_SHUTDOWN, raft_begin_load_snapshot(r, 6, 5)); CuAssertIntEquals(tc, 7, raft_get_commit_idx(r)); } @@ -467,7 +467,8 @@ void TestRaft_follower_load_from_snapshot_does_not_break_cluster_safety(CuTest * ety.data.len = strlen("entry"); raft_append_entry(r, &ety); - CuAssertIntEquals(tc, -1, raft_begin_load_snapshot(r, 2, 2)); + /* log mismatch */ + CuAssertIntEquals(tc, 0, raft_begin_load_snapshot(r, 2, 2)); } void TestRaft_follower_load_from_snapshot_fails_if_log_is_newer(CuTest * tc) @@ -495,7 +496,7 @@ void TestRaft_follower_load_from_snapshot_fails_if_log_is_newer(CuTest * tc) CuAssertIntEquals(tc, -1, raft_begin_load_snapshot(r, 2, 2)); } -void TestRaft_leader_sends_appendentries_when_node_next_index_was_compacted(CuTest* tc) +void TestRaft_leader_sends_snapshot_when_node_next_index_was_compacted(CuTest* tc) { raft_cbs_t funcs = { .send_appendentries = __raft_send_appendentries_capture, @@ -545,10 +546,10 @@ void TestRaft_leader_sends_appendentries_when_node_next_index_was_compacted(CuTe raft_set_state(r, RAFT_STATE_LEADER); raft_set_current_term(r, 2); - CuAssertIntEquals(tc, 0, raft_send_appendentries(r, node)); - CuAssertIntEquals(tc, 2, ae.term); - CuAssertIntEquals(tc, 3, ae.prev_log_idx); - CuAssertIntEquals(tc, 2, ae.prev_log_term); + CuAssertIntEquals(tc, RAFT_ERR_NEEDS_SNAPSHOT, raft_send_appendentries(r, node)); + // CuAssertIntEquals(tc, 2, ae.term); + // CuAssertIntEquals(tc, 3, ae.prev_log_idx); + // CuAssertIntEquals(tc, 2, ae.prev_log_term); } void TestRaft_recv_entry_fails_if_snapshot_in_progress(CuTest* tc) @@ -662,7 +663,7 @@ void TestRaft_follower_recv_appendentries_is_successful_when_previous_log_idx_eq CuAssertIntEquals(tc, 1, aer.success); } -void TestRaft_leader_sends_appendentries_with_correct_prev_log_idx_when_snapshotted( +void TestRaft_leader_not_send_appendentries_when_snapshotted( CuTest * tc) { raft_cbs_t funcs = { @@ -682,6 +683,7 @@ void TestRaft_leader_sends_appendentries_with_correct_prev_log_idx_when_snapshot /* i'm leader */ raft_set_state(r, RAFT_STATE_LEADER); + CuAssertTrue(tc, NULL != raft_add_node(r, NULL, 2, 0)); raft_node_t* p = raft_get_node_from_idx(r, 1); CuAssertTrue(tc, NULL != p); raft_node_set_next_idx(p, 4); @@ -689,9 +691,9 @@ void TestRaft_leader_sends_appendentries_with_correct_prev_log_idx_when_snapshot /* receive appendentries messages */ raft_send_appendentries(r, p); msg_appendentries_t* ae = sender_poll_msg_data(sender); - CuAssertTrue(tc, NULL != ae); - CuAssertIntEquals(tc, 2, ae->prev_log_term); - CuAssertIntEquals(tc, 4, ae->prev_log_idx); + CuAssertTrue(tc, NULL == ae); + // CuAssertIntEquals(tc, 2, ae->prev_log_term); + // CuAssertIntEquals(tc, 4, ae->prev_log_idx); } void TestRaft_cancel_snapshot_restores_state(CuTest* tc) @@ -812,6 +814,9 @@ void TestRaft_leader_sends_snapshot_if_log_was_compacted(CuTest* tc) aer.first_idx = 4; raft_recv_appendentries_response(r, node, &aer); - CuAssertIntEquals(tc, 1, send_snapshot_count); -} + CuAssertIntEquals(tc, 0, send_snapshot_count); + /* It is ok to send snapshot when request timeout */ + raft_periodic(r, ((raft_server_private_t*)r)->request_timeout); + CuAssertIntEquals(tc, 2, send_snapshot_count); +} diff --git a/tests/virtraft2.py b/tests/virtraft2.py index e875d93e..45feb86e 100644 --- a/tests/virtraft2.py +++ b/tests/virtraft2.py @@ -149,7 +149,7 @@ def __init__(self): self.members = [] -SnapshotMember = collections.namedtuple('SnapshotMember', ['id', 'voting'], verbose=False) +SnapshotMember = collections.namedtuple('SnapshotMember', ['id', 'voting']) def raft_send_requestvote(raft, udata, node, msg):