4
4
AcceptRequest , AcceptResponse , ChooseRequest , ChooseResponse , PrepareRequest ,
5
5
PrepareResponse , ACCEPT_ENDPOINT , CHOOSE_ENDPOINT , PREPARE_ENDPOINT ,
6
6
} ,
7
+ rpc:: { broadcast_all, broadcast_quorum} ,
7
8
state:: { ProposalNumber , State } ,
8
9
} ,
9
- futures:: { stream:: FuturesUnordered , StreamExt } ,
10
- hyper:: { client:: HttpConnector , Body , Client , Method , Request } ,
10
+ hyper:: Client ,
11
11
rand:: { thread_rng, Rng } ,
12
- serde:: { de:: DeserializeOwned , Serialize } ,
13
- std:: { cmp:: min, io, net:: SocketAddr , path:: Path , sync:: Arc , time:: Duration } ,
12
+ std:: { io, net:: SocketAddr , path:: Path , sync:: Arc , time:: Duration } ,
14
13
tokio:: { sync:: RwLock , time:: sleep} ,
15
14
} ;
16
15
17
16
// Duration constants
18
- const EXPONENTIAL_BACKOFF_MIN : Duration = Duration :: from_millis ( 100 ) ;
19
- const EXPONENTIAL_BACKOFF_MAX : Duration = Duration :: from_secs ( 2 ) ;
20
- const EXPONENTIAL_BACKOFF_MULTIPLIER : u32 = 2 ;
21
17
const RESTART_DELAY_MIN : Duration = Duration :: from_millis ( 0 ) ;
22
18
const RESTART_DELAY_MAX : Duration = Duration :: from_millis ( 100 ) ;
23
19
20
+ // Generate a new proposal number.
21
+ fn generate_proposal_number (
22
+ nodes : & [ SocketAddr ] ,
23
+ node_index : usize ,
24
+ state : & mut State ,
25
+ ) -> ProposalNumber {
26
+ let proposal_number = ProposalNumber {
27
+ round : state. next_round ,
28
+ proposer_address : nodes[ node_index] ,
29
+ } ;
30
+ state. next_round += 1 ;
31
+ proposal_number
32
+ }
33
+
24
34
// Propose a value to the cluster.
25
35
pub async fn propose (
26
36
state : Arc < RwLock < State > > ,
@@ -31,6 +41,9 @@ pub async fn propose(
31
41
) -> Result < ( ) , io:: Error > {
32
42
// Retry until the protocol succeeds.
33
43
loop {
44
+ // Create an HTTP client.
45
+ let client = Client :: new ( ) ;
46
+
34
47
// Generate a new proposal number.
35
48
let proposal_number = {
36
49
// The `unwrap` is safe since it can only fail if a panic already happened.
@@ -40,11 +53,8 @@ pub async fn propose(
40
53
proposal_number
41
54
} ;
42
55
43
- // Create an HTTP client.
44
- let client = Client :: new ( ) ;
45
-
46
56
// Send a prepare message to all the nodes.
47
- info ! (
57
+ debug ! (
48
58
"Preparing value `{}` with proposal number:\n {}" ,
49
59
original_value,
50
60
// Serialization is safe.
@@ -65,19 +75,19 @@ pub async fn propose(
65
75
. max_by_key ( |accepted_proposal| accepted_proposal. 0 )
66
76
{
67
77
// There was an accepted proposal. Use that.
68
- info ! (
78
+ debug ! (
69
79
"Discovered existing value from cluster: {}" ,
70
80
accepted_proposal. 1 ,
71
81
) ;
72
82
accepted_proposal. 1
73
83
} else {
74
84
// Propose the given value.
75
- info ! ( "Quorum replied with no existing value." ) ;
85
+ debug ! ( "Quorum replied with no existing value." ) ;
76
86
original_value. to_owned ( )
77
87
} ;
78
88
79
89
// Send an accept message to all the nodes.
80
- info ! (
90
+ debug ! (
81
91
"Requesting acceptance of value `{}` with proposal number:\n {}" ,
82
92
new_value,
83
93
// The `unwrap` is safe because serialization should never fail.
@@ -110,127 +120,24 @@ pub async fn propose(
110
120
}
111
121
if value_chosen {
112
122
// The protocol succeeded. Notify all the nodes and return.
113
- info ! ( "Consensus achieved. Notifying all the nodes." ) ;
123
+ debug ! ( "Consensus achieved. Notifying all the nodes." ) ;
114
124
broadcast_all :: < ChooseResponse > (
115
125
& client,
116
126
nodes,
117
127
CHOOSE_ENDPOINT ,
118
128
& ChooseRequest { value : new_value } ,
119
129
)
120
130
. await ;
121
- info ! ( "All nodes notified ." ) ;
131
+ info ! ( "Proposer finished ." ) ;
122
132
return Ok ( ( ) ) ;
123
133
}
124
134
125
135
// The protocol failed. Sleep for a random duration before starting over.
126
- info ! ( "Failed to reach consensus. Starting over." ) ;
136
+ debug ! ( "Failed to reach consensus. Starting over." ) ;
127
137
sleep ( thread_rng ( ) . gen_range ( RESTART_DELAY_MIN ..RESTART_DELAY_MAX ) ) . await ;
128
138
}
129
139
}
130
140
131
- // Generate a new proposal number.
132
- fn generate_proposal_number (
133
- nodes : & [ SocketAddr ] ,
134
- node_index : usize ,
135
- state : & mut State ,
136
- ) -> ProposalNumber {
137
- let proposal_number = ProposalNumber {
138
- round : state. next_round ,
139
- proposer_address : nodes[ node_index] ,
140
- } ;
141
- state. next_round += 1 ;
142
- proposal_number
143
- }
144
-
145
- // Send a request without retries.
146
- async fn try_to_send < T : DeserializeOwned > (
147
- client : & Client < HttpConnector , Body > ,
148
- node : SocketAddr ,
149
- endpoint : & str ,
150
- payload : & impl Serialize ,
151
- ) -> Result < T , hyper:: Error > {
152
- Ok ( bincode:: deserialize (
153
- & hyper:: body:: to_bytes (
154
- client
155
- . request (
156
- Request :: builder ( )
157
- . method ( Method :: POST )
158
- . uri ( format ! ( "http://{}{}" , node, endpoint) )
159
- // The `unwrap` is safe because serialization should never fail.
160
- . body ( Body :: from ( bincode:: serialize ( & payload) . unwrap ( ) ) )
161
- . unwrap ( ) , // Safe since we constructed a well-formed request
162
- )
163
- . await ?
164
- . into_body ( ) ,
165
- )
166
- . await ?,
167
- )
168
- . unwrap ( ) ) // Safe under non-Byzantine conditions
169
- }
170
-
171
- // Send a request, retrying with exponential backoff until it succeeds.
172
- async fn send < T : DeserializeOwned > (
173
- client : & Client < HttpConnector , Body > ,
174
- node : SocketAddr ,
175
- endpoint : & str ,
176
- payload : & impl Serialize ,
177
- ) -> T {
178
- // The delay between requests
179
- let mut delay = EXPONENTIAL_BACKOFF_MIN ;
180
-
181
- // Retry until the request succeeds.
182
- loop {
183
- // Send the request.
184
- match try_to_send ( client, node, endpoint, payload) . await {
185
- Ok ( response) => {
186
- return response;
187
- }
188
- Err ( error) => {
189
- // Log the error.
190
- error ! ( "Received error: {}" , error) ;
191
- }
192
- }
193
-
194
- // Sleep before retrying.
195
- sleep ( delay) . await ;
196
- delay = min (
197
- delay * EXPONENTIAL_BACKOFF_MULTIPLIER ,
198
- EXPONENTIAL_BACKOFF_MAX ,
199
- ) ;
200
- }
201
- }
202
-
203
- // Send a request to all nodes. Return once a majority of responses come in.
204
- async fn broadcast_quorum < T : DeserializeOwned > (
205
- client : & Client < HttpConnector , Body > ,
206
- nodes : & [ SocketAddr ] ,
207
- endpoint : & str ,
208
- payload : & impl Serialize ,
209
- ) -> Vec < T > {
210
- nodes
211
- . iter ( )
212
- . map ( |node| send ( client, * node, endpoint, payload) )
213
- . collect :: < FuturesUnordered < _ > > ( )
214
- . take ( nodes. len ( ) / 2 + 1 )
215
- . collect ( )
216
- . await
217
- }
218
-
219
- // Send a request to all nodes. Return once all responses come in.
220
- async fn broadcast_all < T : DeserializeOwned > (
221
- client : & Client < HttpConnector , Body > ,
222
- nodes : & [ SocketAddr ] ,
223
- endpoint : & str ,
224
- payload : & impl Serialize ,
225
- ) -> Vec < T > {
226
- nodes
227
- . iter ( )
228
- . map ( |node| send ( client, * node, endpoint, payload) )
229
- . collect :: < FuturesUnordered < _ > > ( )
230
- . collect ( )
231
- . await
232
- }
233
-
234
141
#[ cfg( test) ]
235
142
mod tests {
236
143
use {
0 commit comments