@@ -40,7 +40,10 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
40
40
break ;
41
41
}
42
42
}
43
+ }
43
44
45
+ void ReplicationStateMachine::notify_committed_lsn (int64_t lsn) {
46
+ LOGD (" got committed lsn notification , lsn={}" , lsn);
44
47
// handle no_space_left error if we have any
45
48
const auto [target_lsn, chunk_id] = get_no_space_left_error_info ();
46
49
if (std::numeric_limits< homestore::repl_lsn_t >::max () == target_lsn) {
@@ -64,8 +67,7 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
64
67
}
65
68
66
69
if (target_lsn == lsn) {
67
- // check if there is pending no_space_left error to be handled. only follower will handle this
68
- LOGD (" handle no_space_left_error_info, lsn={}, chunk_id={}" , lsn, chunk_id);
70
+ LOGD (" match no_space_left_error_info, lsn={}, chunk_id={}" , lsn, chunk_id);
69
71
handle_no_space_left (lsn, chunk_id);
70
72
reset_no_space_left_error_info ();
71
73
}
@@ -134,13 +136,29 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header,
134
136
}
135
137
}
136
138
137
- // cancel no_space_left_error_info if matches
138
139
const auto [target_lsn, chunk_id] = get_no_space_left_error_info ();
139
- if (target_lsn >= lsn) {
140
- LOGD (" cancel no_space_left_error_info, wait_commit_lsn={}, chunk_id={}, currrent lsn={}" , target_lsn, chunk_id,
141
- lsn);
142
- reset_no_space_left_error_info ();
143
- }
140
+
141
+ RELEASE_ASSERT (
142
+ target_lsn >= lsn,
143
+ " wait_commit_lsn should be bigger than rollbacked lsn wait_commit_lsn={}, chunk_id={}, currrent lsn={}" ,
144
+ target_lsn, chunk_id, lsn);
145
+
146
+ // if target_lsn is int64_max, it`s is also ok to reset_no_space_left_error_info
147
+ reset_no_space_left_error_info ();
148
+ }
149
+
150
+ void ReplicationStateMachine::on_config_rollback (int64_t lsn) {
151
+ LOGD (" rollback config at lsn={}" , lsn);
152
+
153
+ const auto [target_lsn, chunk_id] = get_no_space_left_error_info ();
154
+
155
+ RELEASE_ASSERT (
156
+ target_lsn >= lsn,
157
+ " wait_commit_lsn should be bigger than rollbacked lsn wait_commit_lsn={}, chunk_id={}, currrent lsn={}" ,
158
+ target_lsn, chunk_id, lsn);
159
+
160
+ // if target_lsn is int64_max, it`s is also ok to reset_no_space_left_error_info
161
+ reset_no_space_left_error_info ();
144
162
}
145
163
146
164
void ReplicationStateMachine::on_restart () { LOGD (" ReplicationStateMachine::on_restart" ); }
@@ -807,61 +825,23 @@ void HSHomeObject::on_snp_ctx_meta_blk_recover_completed(bool success) {
807
825
LOGI (" Snapshot context meta blk recover completed" );
808
826
}
809
827
810
- // on_no_space_left will be called in raft channel, which will blocking new logs to come in.
811
- // here, we need to check if all the appended logs have been already committed.
812
-
813
- // if yes, we can guarantee all the index entry of all the logs have already been written to pg index table(which is
814
- // used by gc thread to identify valid blobs), so we can handle the no_space_left error directly.
815
-
816
- // if not, we need to set an no_space_left error info and wait for the commit thread to handle this error after the
817
- // lsn-1 is committed.
818
-
819
828
void ReplicationStateMachine::on_no_space_left (homestore::repl_lsn_t lsn, homestore::chunk_num_t chunk_id) {
820
829
LOGD (" got no_space_left error at lsn={}, chunk_id={}" , lsn, chunk_id);
821
830
822
- // we need to pause statemachin here before we check and compare last_append_lsn and last_commit_lsn, since there is
823
- // very corner case that when we do this, the last_append_lsn is being committed. for example, last_append_lsn is 10
824
- // and last_commit_lsn is 9, so we will set an erro info and wait for the commit thread to handle it after lsn 10
825
- // is committed. but lsn 10 is probably being committed now, if the commit is completed after the comparation but
826
- // before we set the error info, then commit thread will never handle the error info, since all the appended log
827
- // have been committed and new more new logs can be appended, so on_commit will never be trigger again.
828
- repl_dev ()->pause_statemachine ();
829
-
830
- // basically, there are three cases:
831
- // 1. if there is some appended logs that have not been committed, we need to set an error info and delegate
832
- // handle_no_space_left to on_commit.
833
- // 2. if all the appended logs have been committed, but the lsn, which no_space_left is triggered, is exactly the
834
- // last_append_lsn + 1, we must handle_no_space_left inline.
835
- // 3. if all the appended logs have been committed, but the lsn, which no_space_left is triggered, is larger than
836
- // last_append_lsn + 1, we can select anyone of both, handle_no_space_left inline or set an error info or delegate
837
- // it to on_commit.
838
-
839
- // last_append_lsn here means the raft_lsn of the last log in logstore.
840
- const auto last_append_lsn = repl_dev ()->get_last_append_lsn ();
841
- const auto last_commit_lsn = repl_dev ()->get_last_commit_lsn ();
842
- LOGD (" last_append_lsn={}, last_commit_lsn={}" , last_append_lsn, last_commit_lsn);
843
- if (last_append_lsn == last_commit_lsn) {
844
- // if last_append_lsn == last_commit_lsn, it means all the logs have been committed, so we have to handle it
845
- // here, since on_commit will never be triggered from now on.
846
- LOGD (" all the committed logs have been committed, handle no_space_left directly!" );
847
- handle_no_space_left (lsn, chunk_id);
848
- } else {
849
- const auto [error_lsn, error_chunk_id] = get_no_space_left_error_info ();
850
- if (lsn - 1 < error_lsn) {
851
- /* set a new error info or overwrite an existing error info*/
852
- // we wait for the commit thread to handle this error after lsn - 1 is committed.
853
- LOGD (" wait for the commit thread to handle no_space_left error after lsn {} is committed, "
854
- " last_append_lsn {}, last_commit_lsn {}" ,
855
- lsn - 1 , last_append_lsn, last_commit_lsn);
856
- set_no_space_left_error_info (lsn - 1 , error_chunk_id);
857
- } else {
858
- LOGD (" got no_space_left error but my expected lsn {} is larger than existing error info lsn {}, "
859
- " ignore it!" ,
860
- lsn - 1 , error_lsn);
861
- }
862
- }
831
+ const auto [target_lsn, error_chunk_id] = get_no_space_left_error_info ();
832
+
833
+ RELEASE_ASSERT (lsn - 1 <= target_lsn,
834
+ " new target lsn should be less than or equal to the existing target "
835
+ " lsn, new_target_lsn={}, existing_target_lsn={}" ,
836
+ lsn - 1 , target_lsn);
837
+
838
+ // set a new error info or overwrite an existing error info, postpone handling this error until lsn - 1 is
839
+ // committed.
840
+ LOGD (" set no_space_left error info with lsn={}, chunk_id={}, existing error info: lsn={}, chunk_id={}" , lsn - 1 ,
841
+ chunk_id, target_lsn, error_chunk_id);
863
842
864
- repl_dev ()->resume_statemachine ();
843
+ // setting the same error info is ok.
844
+ set_no_space_left_error_info (lsn - 1 , chunk_id);
865
845
}
866
846
867
847
void ReplicationStateMachine::set_no_space_left_error_info (homestore::repl_lsn_t lsn, homestore::chunk_num_t chunk_id) {
0 commit comments