1
1
use {
2
- crate :: state:: { ProposalNumber , State } ,
2
+ crate :: state:: { self , ProposalNumber } ,
3
3
hyper:: {
4
4
header:: CONTENT_TYPE ,
5
5
server:: conn:: AddrStream ,
@@ -29,7 +29,7 @@ pub const CHOOSE_ENDPOINT: &str = "/choose";
29
29
#[ derive( Clone , Deserialize , Serialize ) ]
30
30
#[ serde( deny_unknown_fields) ]
31
31
pub struct PrepareRequest {
32
- pub proposal_number : ProposalNumber ,
32
+ pub proposal_number : Option < ProposalNumber > ,
33
33
}
34
34
35
35
// Response type for the "prepare" endpoint
@@ -40,25 +40,30 @@ pub struct PrepareResponse {
40
40
}
41
41
42
42
// Logic for the "prepare" endpoint
43
- fn prepare ( request : & PrepareRequest , state : & mut State ) -> PrepareResponse {
43
+ fn prepare (
44
+ request : & PrepareRequest ,
45
+ state : & mut ( state:: Durable , state:: Volatile ) ,
46
+ ) -> PrepareResponse {
44
47
debug ! (
45
48
"Received prepare request:\n {}" ,
46
49
serde_yaml:: to_string( request) . unwrap( ) , // Serialization is safe.
47
50
) ;
48
51
49
- match & state. min_proposal_number {
50
- Some ( proposal_number) => {
51
- if request. proposal_number > * proposal_number {
52
- state. min_proposal_number = Some ( request. proposal_number ) ;
52
+ if let Some ( requested_proposal_number) = request. proposal_number {
53
+ match & state. 0 . min_proposal_number {
54
+ Some ( proposal_number) => {
55
+ if requested_proposal_number > * proposal_number {
56
+ state. 0 . min_proposal_number = Some ( requested_proposal_number) ;
57
+ }
58
+ }
59
+ None => {
60
+ state. 0 . min_proposal_number = Some ( requested_proposal_number) ;
53
61
}
54
- }
55
- None => {
56
- state. min_proposal_number = Some ( request. proposal_number ) ;
57
62
}
58
63
}
59
64
60
65
PrepareResponse {
61
- accepted_proposal : state. accepted_proposal . clone ( ) ,
66
+ accepted_proposal : state. 0 . accepted_proposal . clone ( ) ,
62
67
}
63
68
}
64
69
@@ -77,25 +82,30 @@ pub struct AcceptResponse {
77
82
}
78
83
79
84
// Logic for the "accept" endpoint
80
- fn accept ( request : & AcceptRequest , state : & mut State ) -> AcceptResponse {
85
+ fn accept (
86
+ request : & AcceptRequest ,
87
+ state : & mut ( state:: Durable , state:: Volatile ) ,
88
+ ) -> AcceptResponse {
81
89
debug ! (
82
90
"Received accept request:\n {}" ,
83
91
serde_yaml:: to_string( request) . unwrap( ) , // Serialization is safe.
84
92
) ;
93
+
85
94
if state
95
+ . 0
86
96
. min_proposal_number
87
97
. as_ref ( )
88
98
. map_or ( true , |proposal_number| {
89
99
request. proposal . 0 >= * proposal_number
90
100
} )
91
101
{
92
- state. min_proposal_number = Some ( request. proposal . 0 ) ;
93
- state. accepted_proposal = Some ( request. proposal . clone ( ) ) ;
102
+ state. 0 . min_proposal_number = Some ( request. proposal . 0 ) ;
103
+ state. 0 . accepted_proposal = Some ( request. proposal . clone ( ) ) ;
94
104
}
95
105
96
106
AcceptResponse {
97
107
// The `unwrap` is safe since accepts must follow at least one prepare.
98
- min_proposal_number : state. min_proposal_number . unwrap ( ) ,
108
+ min_proposal_number : state. 0 . min_proposal_number . unwrap ( ) ,
99
109
}
100
110
}
101
111
@@ -112,20 +122,23 @@ pub struct ChooseRequest {
112
122
pub struct ChooseResponse ;
113
123
114
124
// Logic for the "choose" endpoint
115
- fn choose ( request : & ChooseRequest , state : & mut State ) -> ChooseResponse {
116
- if state. chosen_value . is_none ( ) {
117
- info ! ( "Consensus was achieved." ) ;
125
+ fn choose (
126
+ request : & ChooseRequest ,
127
+ state : & mut ( state:: Durable , state:: Volatile ) ,
128
+ ) -> ChooseResponse {
129
+ if state. 1 . chosen_value . is_none ( ) {
130
+ info ! ( "Consensus achieved." ) ;
118
131
println ! ( "{}" , request. value) ;
119
132
io:: stdout ( ) . flush ( ) . unwrap_or ( ( ) ) ;
120
- state. chosen_value = Some ( request. value . clone ( ) ) ;
133
+ state. 1 . chosen_value = Some ( request. value . clone ( ) ) ;
121
134
}
122
135
ChooseResponse { }
123
136
}
124
137
125
138
// Context for each service instance
126
139
#[ derive( Clone ) ]
127
140
struct Context {
128
- state : Arc < RwLock < State > > ,
141
+ state : Arc < RwLock < ( state :: Durable , state :: Volatile ) > > ,
129
142
data_file_path : PathBuf ,
130
143
}
131
144
@@ -158,7 +171,7 @@ async fn handle_request(
158
171
// Handle the request.
159
172
let mut guard = context. state. write( ) . await ;
160
173
let response = $endpoint( & payload, & mut guard) ;
161
- crate :: state:: write( & guard, & context. data_file_path) . await ?;
174
+ crate :: state:: write( & guard. 0 , & context. data_file_path) . await ?;
162
175
163
176
// Serialize the response.
164
177
Ok ( Response :: new( Body :: from(
@@ -181,12 +194,15 @@ async fn handle_request(
181
194
182
195
// Summary of the program state
183
196
( & Method :: GET , "/" ) => {
184
- // Respond with a representation of the program state. The `unwrap`
185
- // is safe because serialization should never fail.
186
- let state_repr = serde_yaml:: to_string ( & * context. state . read ( ) . await ) . unwrap ( ) ;
197
+ // Respond with a representation of the program state. The `unwrap`s
198
+ // are safe because serialization should never fail.
199
+ let state = context. state . read ( ) . await ;
200
+ let durable_state_repr = serde_yaml:: to_string ( & state. 0 ) . unwrap ( ) ;
201
+ let volatile_state_repr = serde_yaml:: to_string ( & state. 1 ) . unwrap ( ) ;
187
202
Ok ( Response :: new ( Body :: from ( format ! (
188
- "System operational.\n \n {}" ,
189
- state_repr,
203
+ "System operational.\n \n Durable state:\n \n {}\n \n Volatile state:\n \n {}" ,
204
+ durable_state_repr,
205
+ volatile_state_repr,
190
206
) ) ) )
191
207
}
192
208
@@ -216,7 +232,7 @@ async fn handle_request(
216
232
217
233
// Entrypoint for the acceptor
218
234
pub async fn acceptor (
219
- state : Arc < RwLock < State > > ,
235
+ state : Arc < RwLock < ( state :: Durable , state :: Volatile ) > > ,
220
236
data_file_path : & Path ,
221
237
address : SocketAddr ,
222
238
) -> Result < ( ) , io:: Error > {
@@ -257,49 +273,49 @@ mod tests {
257
273
fn prepare_initializes_min_proposal_number ( ) {
258
274
let mut state = initial ( ) ;
259
275
let request = PrepareRequest {
260
- proposal_number : ProposalNumber {
276
+ proposal_number : Some ( ProposalNumber {
261
277
round : 0 ,
262
278
proposer_address : SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 8080 ) ,
263
- } ,
279
+ } ) ,
264
280
} ;
265
281
let response = prepare ( & request, & mut state) ;
266
- assert_eq ! ( state. min_proposal_number, Some ( request. proposal_number) ) ;
282
+ assert_eq ! ( state. 0 . min_proposal_number, request. proposal_number) ;
267
283
assert_eq ! ( response. accepted_proposal, None ) ;
268
284
}
269
285
270
286
#[ test]
271
287
fn prepare_increases_min_proposal_number ( ) {
272
288
let mut state = initial ( ) ;
273
- state. min_proposal_number = Some ( ProposalNumber {
289
+ state. 0 . min_proposal_number = Some ( ProposalNumber {
274
290
round : 0 ,
275
291
proposer_address : SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 8080 ) ,
276
292
} ) ;
277
293
let request = PrepareRequest {
278
- proposal_number : ProposalNumber {
294
+ proposal_number : Some ( ProposalNumber {
279
295
round : 1 ,
280
296
proposer_address : SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 8080 ) ,
281
- } ,
297
+ } ) ,
282
298
} ;
283
299
let response = prepare ( & request, & mut state) ;
284
- assert_eq ! ( state. min_proposal_number, Some ( request. proposal_number) ) ;
300
+ assert_eq ! ( state. 0 . min_proposal_number, request. proposal_number) ;
285
301
assert_eq ! ( response. accepted_proposal, None ) ;
286
302
}
287
303
288
304
#[ test]
289
305
fn prepare_does_not_decrease_min_proposal_number ( ) {
290
306
let mut state = initial ( ) ;
291
- state. min_proposal_number = Some ( ProposalNumber {
307
+ state. 0 . min_proposal_number = Some ( ProposalNumber {
292
308
round : 1 ,
293
309
proposer_address : SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 8080 ) ,
294
310
} ) ;
295
311
let request = PrepareRequest {
296
- proposal_number : ProposalNumber {
312
+ proposal_number : Some ( ProposalNumber {
297
313
round : 0 ,
298
314
proposer_address : SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 8080 ) ,
299
- } ,
315
+ } ) ,
300
316
} ;
301
317
let response = prepare ( & request, & mut state) ;
302
- assert_ne ! ( state. min_proposal_number, Some ( request. proposal_number) ) ;
318
+ assert_ne ! ( state. 0 . min_proposal_number, request. proposal_number) ;
303
319
assert_eq ! ( response. accepted_proposal, None ) ;
304
320
}
305
321
@@ -313,13 +329,13 @@ mod tests {
313
329
} ,
314
330
"foo" . to_string ( ) ,
315
331
) ;
316
- state. min_proposal_number = Some ( accepted_proposal. 0 ) ;
317
- state. accepted_proposal = Some ( accepted_proposal. clone ( ) ) ;
332
+ state. 0 . min_proposal_number = Some ( accepted_proposal. 0 ) ;
333
+ state. 0 . accepted_proposal = Some ( accepted_proposal. clone ( ) ) ;
318
334
let request = PrepareRequest {
319
- proposal_number : ProposalNumber {
335
+ proposal_number : Some ( ProposalNumber {
320
336
round : 1 ,
321
337
proposer_address : SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: new ( 127 , 0 , 0 , 1 ) ) , 8080 ) ,
322
- } ,
338
+ } ) ,
323
339
} ;
324
340
let response = prepare ( & request, & mut state) ;
325
341
assert_eq ! ( response. accepted_proposal, Some ( accepted_proposal) ) ;
@@ -337,7 +353,7 @@ mod tests {
337
353
) ;
338
354
339
355
let prepare_request = PrepareRequest {
340
- proposal_number : proposal. 0 ,
356
+ proposal_number : Some ( proposal. 0 ) ,
341
357
} ;
342
358
prepare ( & prepare_request, & mut state) ;
343
359
@@ -346,9 +362,9 @@ mod tests {
346
362
} ;
347
363
let accept_response = accept ( & accept_request, & mut state) ;
348
364
349
- assert_eq ! ( state. accepted_proposal, Some ( proposal. clone( ) ) ) ;
365
+ assert_eq ! ( state. 0 . accepted_proposal, Some ( proposal. clone( ) ) ) ;
350
366
assert_eq ! ( accept_response. min_proposal_number, proposal. 0 ) ;
351
- assert_eq ! ( state. min_proposal_number, Some ( proposal. 0 ) ) ;
367
+ assert_eq ! ( state. 0 . min_proposal_number, Some ( proposal. 0 ) ) ;
352
368
}
353
369
354
370
#[ test]
@@ -371,12 +387,12 @@ mod tests {
371
387
) ;
372
388
373
389
let prepare_request1 = PrepareRequest {
374
- proposal_number : proposal0. 0 ,
390
+ proposal_number : Some ( proposal0. 0 ) ,
375
391
} ;
376
392
prepare ( & prepare_request1, & mut state) ;
377
393
378
394
let prepare_request2 = PrepareRequest {
379
- proposal_number : proposal1. 0 ,
395
+ proposal_number : Some ( proposal1. 0 ) ,
380
396
} ;
381
397
prepare ( & prepare_request2, & mut state) ;
382
398
@@ -385,9 +401,9 @@ mod tests {
385
401
} ;
386
402
let accept_response = accept ( & accept_request, & mut state) ;
387
403
388
- assert_eq ! ( state. accepted_proposal, None ) ;
404
+ assert_eq ! ( state. 0 . accepted_proposal, None ) ;
389
405
assert_eq ! ( accept_response. min_proposal_number, proposal1. 0 ) ;
390
- assert_eq ! ( state. min_proposal_number, Some ( proposal1. 0 ) ) ;
406
+ assert_eq ! ( state. 0 . min_proposal_number, Some ( proposal1. 0 ) ) ;
391
407
}
392
408
393
409
#[ test]
@@ -397,6 +413,6 @@ mod tests {
397
413
value : "foo" . to_string ( ) ,
398
414
} ;
399
415
choose ( & request, & mut state) ;
400
- assert_eq ! ( state. chosen_value, Some ( request. value) ) ;
416
+ assert_eq ! ( state. 1 . chosen_value, Some ( request. value) ) ;
401
417
}
402
418
}
0 commit comments