@@ -21,6 +21,113 @@ const EXPONENTIAL_BACKOFF_MULTIPLIER: u32 = 2;
21
21
const RESTART_DELAY_MIN : Duration = Duration :: from_millis ( 0 ) ;
22
22
const RESTART_DELAY_MAX : Duration = Duration :: from_millis ( 100 ) ;
23
23
24
+ // Propose a value to the cluster.
25
+ pub async fn propose (
26
+ state : Arc < RwLock < State > > ,
27
+ data_file_path : & Path ,
28
+ nodes : & [ SocketAddr ] ,
29
+ node_index : usize ,
30
+ original_value : & str ,
31
+ ) -> Result < ( ) , io:: Error > {
32
+ // Retry until the protocol succeeds.
33
+ loop {
34
+ // Generate a new proposal number.
35
+ let proposal_number = {
36
+ // The `unwrap` is safe since it can only fail if a panic already happened.
37
+ let mut guard = state. write ( ) . await ;
38
+ let proposal_number = generate_proposal_number ( nodes, node_index, & mut guard) ;
39
+ crate :: state:: write ( & guard, data_file_path) . await ?;
40
+ proposal_number
41
+ } ;
42
+
43
+ // Create an HTTP client.
44
+ let client = Client :: new ( ) ;
45
+
46
+ // Send a prepare message to all the nodes.
47
+ info ! (
48
+ "Preparing value `{}` with proposal number:\n {}" ,
49
+ original_value,
50
+ // Serialization is safe.
51
+ serde_yaml:: to_string( & proposal_number) . unwrap( ) ,
52
+ ) ;
53
+ let prepare_responses = broadcast_quorum :: < PrepareResponse > (
54
+ & client,
55
+ nodes,
56
+ PREPARE_ENDPOINT ,
57
+ & PrepareRequest { proposal_number } ,
58
+ )
59
+ . await ;
60
+
61
+ // Determine which value to propose.
62
+ let new_value = if let Some ( accepted_proposal) = prepare_responses
63
+ . iter ( )
64
+ . filter_map ( |response| response. accepted_proposal . clone ( ) )
65
+ . max_by_key ( |accepted_proposal| accepted_proposal. 0 )
66
+ {
67
+ // There was an accepted proposal. Use that.
68
+ info ! (
69
+ "Discovered existing value from cluster: {}" ,
70
+ accepted_proposal. 1 ,
71
+ ) ;
72
+ accepted_proposal. 1
73
+ } else {
74
+ // Propose the given value.
75
+ info ! ( "Quorum replied with no existing value." ) ;
76
+ original_value. to_owned ( )
77
+ } ;
78
+
79
+ // Send an accept message to all the nodes.
80
+ info ! (
81
+ "Requesting acceptance of value `{}` with proposal number:\n {}" ,
82
+ new_value,
83
+ // The `unwrap` is safe because serialization should never fail.
84
+ serde_yaml:: to_string( & proposal_number) . unwrap( ) ,
85
+ ) ;
86
+ let accept_responses = broadcast_quorum :: < AcceptResponse > (
87
+ & client,
88
+ nodes,
89
+ ACCEPT_ENDPOINT ,
90
+ & AcceptRequest {
91
+ proposal : ( proposal_number, new_value. clone ( ) ) ,
92
+ } ,
93
+ )
94
+ . await ;
95
+
96
+ // Determine if the proposed value was chosen.
97
+ let mut value_chosen = true ;
98
+ for response in accept_responses {
99
+ if response. min_proposal_number > proposal_number {
100
+ value_chosen = false ;
101
+ }
102
+
103
+ // Update the `next_round`, if applicable. The `unwrap` is safe
104
+ // since it can only fail if a panic already happened.
105
+ let mut guard = state. write ( ) . await ;
106
+ if guard. next_round <= response. min_proposal_number . round {
107
+ guard. next_round = response. min_proposal_number . round + 1 ;
108
+ crate :: state:: write ( & guard, data_file_path) . await ?;
109
+ }
110
+ }
111
+ if value_chosen {
112
+ // The protocol succeeded. Notify all the nodes and return.
113
+ info ! ( "Consensus achieved. Notifying all the nodes." ) ;
114
+ broadcast_all :: < ChooseResponse > (
115
+ & client,
116
+ nodes,
117
+ CHOOSE_ENDPOINT ,
118
+ & ChooseRequest { value : new_value } ,
119
+ )
120
+ . await ;
121
+ info ! ( "All nodes notified." ) ;
122
+ return Ok ( ( ) ) ;
123
+ }
124
+
125
+ // The protocol failed. Sleep for a random duration before starting over.
126
+ info ! ( "Failed to reach consensus. Starting over." ) ;
127
+ sleep ( thread_rng ( ) . gen_range ( RESTART_DELAY_MIN ..RESTART_DELAY_MAX ) ) . await ;
128
+ }
129
+ }
130
+
24
131
// Generate a new proposal number.
25
132
fn generate_proposal_number (
26
133
nodes : & [ SocketAddr ] ,
@@ -124,107 +231,6 @@ async fn broadcast_all<T: DeserializeOwned>(
124
231
. await
125
232
}
126
233
127
- // Propose a value to the cluster.
128
- pub async fn propose (
129
- state : Arc < RwLock < State > > ,
130
- data_file_path : & Path ,
131
- nodes : & [ SocketAddr ] ,
132
- node_index : usize ,
133
- original_value : & str ,
134
- ) -> Result < ( ) , io:: Error > {
135
- // Retry until the protocol succeeds.
136
- loop {
137
- // Generate a new proposal number.
138
- let proposal_number = {
139
- // The `unwrap` is safe since it can only fail if a panic already happened.
140
- let mut guard = state. write ( ) . await ;
141
- generate_proposal_number ( nodes, node_index, & mut guard)
142
- } ;
143
-
144
- // Persist the state.
145
- {
146
- // The `unwrap` is safe since it can only fail if a panic already happened.
147
- let guard = state. read ( ) . await ;
148
- crate :: state:: write ( & guard, data_file_path) . await ?;
149
- }
150
-
151
- // Create an HTTP client.
152
- let client = Client :: new ( ) ;
153
-
154
- // Send a prepare message to all the nodes.
155
- info ! (
156
- "Preparing value `{}` with proposal number:\n {}" ,
157
- original_value,
158
- // Serialization is safe.
159
- serde_yaml:: to_string( & proposal_number) . unwrap( ) ,
160
- ) ;
161
- let prepare_responses = broadcast_quorum :: < PrepareResponse > (
162
- & client,
163
- nodes,
164
- PREPARE_ENDPOINT ,
165
- & PrepareRequest { proposal_number } ,
166
- )
167
- . await ;
168
-
169
- // Determine which value to propose.
170
- let new_value = if let Some ( accepted_proposal) = prepare_responses
171
- . iter ( )
172
- . filter_map ( |response| response. accepted_proposal . clone ( ) )
173
- . max_by_key ( |accepted_proposal| accepted_proposal. 0 )
174
- {
175
- // There was an accepted proposal. Use that.
176
- info ! (
177
- "Discovered existing value from cluster: {}" ,
178
- accepted_proposal. 1 ,
179
- ) ;
180
- accepted_proposal. 1
181
- } else {
182
- // Propose the given value.
183
- info ! ( "Quorum replied with no existing value." ) ;
184
- original_value. to_owned ( )
185
- } ;
186
-
187
- // Send an accept message to all the nodes.
188
- info ! (
189
- "Requesting acceptance of value `{}` with proposal number:\n {}" ,
190
- new_value,
191
- // The `unwrap` is safe because serialization should never fail.
192
- serde_yaml:: to_string( & proposal_number) . unwrap( ) ,
193
- ) ;
194
- let accept_responses = broadcast_quorum :: < AcceptResponse > (
195
- & client,
196
- nodes,
197
- ACCEPT_ENDPOINT ,
198
- & AcceptRequest {
199
- proposal : ( proposal_number, new_value. clone ( ) ) ,
200
- } ,
201
- )
202
- . await ;
203
-
204
- // Was the proposed value chosen?
205
- if accept_responses
206
- . iter ( )
207
- . all ( |response| response. min_proposal_number == proposal_number)
208
- {
209
- // The protocol succeeded. Notify all the nodes and return.
210
- info ! ( "Consensus achieved. Notifying all the nodes." ) ;
211
- broadcast_all :: < ChooseResponse > (
212
- & client,
213
- nodes,
214
- CHOOSE_ENDPOINT ,
215
- & ChooseRequest { value : new_value } ,
216
- )
217
- . await ;
218
- info ! ( "All nodes notified." ) ;
219
- return Ok ( ( ) ) ;
220
- }
221
-
222
- // The protocol failed. Sleep for a random duration before starting over.
223
- info ! ( "Failed to reach consensus. Starting over." ) ;
224
- sleep ( thread_rng ( ) . gen_range ( RESTART_DELAY_MIN ..RESTART_DELAY_MAX ) ) . await ;
225
- }
226
- }
227
-
228
234
#[ cfg( test) ]
229
235
mod tests {
230
236
use {
0 commit comments