@@ -16,10 +16,12 @@ use tokio::sync::Mutex;
16
16
use tokio:: task:: JoinSet ;
17
17
use tokio:: time:: sleep;
18
18
use tonic:: transport:: server:: TcpIncoming ;
19
+ use tonic:: transport:: Channel ;
19
20
use tonic:: transport:: Server ;
20
- use tonic:: transport:: { Channel , Endpoint } ;
21
21
use tonic:: { Request , Response , Status } ;
22
22
23
+ use crate :: net:: connect;
24
+ use crate :: timeout:: try_parse_grpc_timeout;
23
25
use crate :: torchftpb:: lighthouse_service_client:: LighthouseServiceClient ;
24
26
use crate :: torchftpb:: manager_service_client:: ManagerServiceClient ;
25
27
use crate :: torchftpb:: {
@@ -57,18 +59,11 @@ pub struct Manager {
57
59
heartbeat_interval : Duration ,
58
60
}
59
61
60
- pub async fn manager_client_new (
61
- addr : String ,
62
- timeout : Duration ,
63
- ) -> Result < ManagerServiceClient < Channel > > {
62
+ pub async fn manager_client_new ( addr : String ) -> Result < ManagerServiceClient < Channel > > {
64
63
// TODO add retries + backoff so other nodes can start before the rank0 comes up
65
64
66
65
info ! ( "ManagerClient: establishing connection to {}" , & addr) ;
67
- let conn = Endpoint :: new ( addr. clone ( ) ) ?
68
- . timeout ( timeout)
69
- . connect_timeout ( Duration :: from_secs ( 60 ) )
70
- . connect ( )
71
- . await ?;
66
+ let conn = connect ( addr. clone ( ) ) . await ?;
72
67
Ok ( ManagerServiceClient :: new ( conn) )
73
68
}
74
69
@@ -163,12 +158,57 @@ impl Manager {
163
158
& self . lighthouse_addr
164
159
) ;
165
160
166
- let conn = Endpoint :: new ( self . lighthouse_addr . clone ( ) ) ?
167
- . connect_timeout ( Duration :: from_secs ( 60 ) )
168
- . connect ( )
169
- . await ?;
161
+ let conn = connect ( self . lighthouse_addr . clone ( ) ) . await ?;
170
162
Ok ( LighthouseServiceClient :: new ( conn) )
171
163
}
164
+
165
+ async fn _run_quorum (
166
+ & self ,
167
+ state : & mut ManagerState ,
168
+ requester : QuorumMember ,
169
+ timeout : Duration ,
170
+ ) -> Result < ( ) , Status > {
171
+ if ( state. participants . len ( ) as u64 ) < self . world_size {
172
+ return Ok ( ( ) ) ;
173
+ }
174
+
175
+ state. participants . clear ( ) ;
176
+ info ! ( "all workers joined -- starting quorum" ) ;
177
+
178
+ // TODO: don't hold the lock during quorum
179
+
180
+ let mut client = self
181
+ . lighthouse_client_new ( )
182
+ . await
183
+ . map_err ( |e| Status :: from_error ( e. into ( ) ) ) ?;
184
+
185
+ let mut lighthouse_request = tonic:: Request :: new ( LighthouseQuorumRequest {
186
+ requester : Some ( requester) ,
187
+ } ) ;
188
+ lighthouse_request. set_timeout ( timeout) ;
189
+
190
+ let response = tokio:: time:: timeout ( timeout, client. quorum ( lighthouse_request) )
191
+ . await
192
+ . unwrap_or_else ( |e| {
193
+ Err ( Status :: cancelled ( format ! (
194
+ "lighthouse quorum timed out: {}" ,
195
+ e. to_string( )
196
+ ) ) )
197
+ } ) ?;
198
+ let resp = response. into_inner ( ) ;
199
+
200
+ info ! ( "got lighthouse quorum {:?}" , resp) ;
201
+
202
+ state
203
+ . channel
204
+ . send (
205
+ resp. quorum
206
+ . ok_or_else ( || Status :: internal ( "missing quorum" ) ) ?,
207
+ )
208
+ . map_err ( |e| Status :: from_error ( e. into ( ) ) ) ?;
209
+
210
+ Ok ( ( ) )
211
+ }
172
212
}
173
213
174
214
#[ tonic:: async_trait]
@@ -182,6 +222,15 @@ impl ManagerService for Arc<Manager> {
182
222
183
223
info ! ( "got quorum request for rank {}" , rank) ;
184
224
225
+ let timeout = try_parse_grpc_timeout ( & request. metadata ( ) )
226
+ . map_err ( |e| {
227
+ Status :: invalid_argument ( format ! (
228
+ "invalid timeout {}" ,
229
+ e. to_str( ) . unwrap_or( "invalid" )
230
+ ) )
231
+ } ) ?
232
+ . ok_or_else ( || Status :: invalid_argument ( "missing timeout" ) ) ?;
233
+
185
234
let mut rx = {
186
235
let mut state = self . state . lock ( ) . await ;
187
236
@@ -195,50 +244,19 @@ impl ManagerService for Arc<Manager> {
195
244
state. participants . insert ( rank) ;
196
245
let rx = state. channel . subscribe ( ) ;
197
246
198
- if state. participants . len ( ) as u64 >= self . world_size {
199
- state. participants . clear ( ) ;
200
- info ! ( "all workers joined -- starting quorum" ) ;
201
-
202
- // TODO: don't hold the lock during quorum
203
-
204
- let mut client = self
205
- . lighthouse_client_new ( )
206
- . await
207
- . map_err ( |e| Status :: from_error ( e. into ( ) ) ) ?;
208
-
209
- let mut lighthouse_request = tonic:: Request :: new ( LighthouseQuorumRequest {
210
- requester : Some ( QuorumMember {
211
- replica_id : self . replica_id . clone ( ) ,
212
- address : self . address ( ) ,
213
- store_address : self . store_address . clone ( ) ,
214
- step : req. step ,
215
- world_size : self . world_size ,
216
- shrink_only : req. shrink_only ,
217
- } ) ,
218
- } ) ;
219
-
220
- // propagate timeout from request to lighthouse
221
- let timeout = request
222
- . metadata ( )
223
- . get ( "grpc-timeout" )
224
- . ok_or_else ( || Status :: internal ( "grpc-timeout not set" ) ) ?;
225
- lighthouse_request
226
- . metadata_mut ( )
227
- . insert ( "grpc-timeout" , timeout. clone ( ) ) ;
228
-
229
- let response = client. quorum ( lighthouse_request) . await . unwrap ( ) ;
230
- let resp = response. into_inner ( ) ;
231
-
232
- info ! ( "got lighthouse quorum {:?}" , resp) ;
233
-
234
- state
235
- . channel
236
- . send (
237
- resp. quorum
238
- . ok_or_else ( || Status :: internal ( "missing quorum" ) ) ?,
239
- )
240
- . map_err ( |e| Status :: from_error ( e. into ( ) ) ) ?;
241
- }
247
+ self . _run_quorum (
248
+ & mut state,
249
+ QuorumMember {
250
+ replica_id : self . replica_id . clone ( ) ,
251
+ address : self . address ( ) ,
252
+ store_address : self . store_address . clone ( ) ,
253
+ step : req. step ,
254
+ world_size : self . world_size ,
255
+ shrink_only : req. shrink_only ,
256
+ } ,
257
+ timeout,
258
+ )
259
+ . await ?;
242
260
243
261
rx
244
262
} ;
@@ -395,11 +413,7 @@ mod tests {
395
413
use crate :: lighthouse:: { Lighthouse , LighthouseOpt } ;
396
414
397
415
async fn should_commit ( rank : i64 , should_commit : bool ) -> Result < ShouldCommitResponse > {
398
- let mut client = manager_client_new (
399
- "http://localhost:29531" . to_string ( ) ,
400
- Duration :: from_secs ( 10 ) ,
401
- )
402
- . await ?;
416
+ let mut client = manager_client_new ( "http://localhost:29531" . to_string ( ) ) . await ?;
403
417
404
418
let request = tonic:: Request :: new ( ShouldCommitRequest {
405
419
rank : rank,
@@ -470,7 +484,7 @@ mod tests {
470
484
. await ?;
471
485
let manager_fut = tokio:: spawn ( manager. clone ( ) . run ( ) ) ;
472
486
473
- let mut client = manager_client_new ( manager. address ( ) , Duration :: from_secs ( 10 ) ) . await ?;
487
+ let mut client = manager_client_new ( manager. address ( ) ) . await ?;
474
488
475
489
let mut request = tonic:: Request :: new ( ManagerQuorumRequest {
476
490
rank : 0 ,
@@ -527,8 +541,7 @@ mod tests {
527
541
. await ?;
528
542
let manager_fut = tokio:: spawn ( manager. clone ( ) . run ( ) ) ;
529
543
530
- let mut client =
531
- manager_client_new ( manager. address ( ) , Duration :: from_secs ( 10 ) ) . await ?;
544
+ let mut client = manager_client_new ( manager. address ( ) ) . await ?;
532
545
533
546
let mut request = tonic:: Request :: new ( ManagerQuorumRequest {
534
547
rank : 0 ,
0 commit comments