@@ -128,7 +128,7 @@ async fn adkg_pairing_out_g2<'a, E, S, TBT>(
128128 adkg_scheme : S ,
129129 topic_transport : Arc < TBT > ,
130130 writer : Option < InMemoryWriter > ,
131- mut rng : impl AdkgRng + ' static ,
131+ rng : impl AdkgRng + ' static ,
132132) -> anyhow:: Result < ( ) >
133133where
134134 E : Pairing ,
@@ -171,7 +171,7 @@ where
171171 group_config,
172172 topic_transport,
173173 adkg_scheme,
174- & mut rng,
174+ rng,
175175 tx_adkg_out,
176176 )
177177 . await
@@ -315,7 +315,7 @@ async fn adkg_dxkr23<S, TBT>(
315315 group_config : GroupConfig ,
316316 topic_transport : Arc < TBT > ,
317317 adkg_scheme : S ,
318- rng : & mut impl AdkgRng ,
318+ rng : impl AdkgRng + ' static ,
319319 out : oneshot:: Sender < AdkgOutput < S :: Curve > > ,
320320) -> anyhow:: Result < ( ) >
321321where
@@ -325,9 +325,9 @@ where
325325 S :: ABAConfig : AbaConfig < ' static , PartyId , Input = AbaCrainInput < S :: Curve > > ,
326326 <S :: ACSSConfig as AcssConfig < ' static , S :: Curve , PartyId > >:: Output :
327327 Into < ShareWithPoly < S :: Curve > > ,
328- TBT : TopicBasedTransport < Identity = PartyId > ,
328+ TBT : TopicBasedTransport < Identity = PartyId > + Send + Sync + ' static ,
329329{
330- let mut adkg = adkg_scheme. new_adkg (
330+ let adkg = adkg_scheme. new_adkg (
331331 adkg_config. id ,
332332 group_config. n ,
333333 group_config. t ,
@@ -336,6 +336,10 @@ where
336336 pks. clone ( ) ,
337337 ) ?;
338338
339+ let ( adkg_start_tx, adkg_start_rx) = oneshot:: channel ( ) ;
340+ let ( adkg_stop_tx, adkg_stop_rx) = oneshot:: channel ( ) ;
341+ let adkg_out = adkg. run ( adkg_start_rx, adkg_stop_rx, rng, topic_transport) ;
342+
339343 // Calculate time to sleep before actively executing the adkg
340344 let sleep_duration = ( group_config. start_time - chrono:: Utc :: now ( ) )
341345 . to_std ( ) // TimeDelta to positive duration
@@ -353,11 +357,14 @@ where
353357 "Executing ADKG with a timeout of {}" ,
354358 humantime:: format_duration( adkg_config. timeout)
355359 ) ;
360+ if adkg_start_tx. send ( ( ) ) . is_err ( ) {
361+ anyhow:: bail!( "Failed to send ADKG start signal" ) ;
362+ }
356363
357364 let res = tokio:: select! {
358- output = adkg . start ( rng , topic_transport ) => {
359- let output = match output {
360- Ok ( adkg_out) => {
365+ output = adkg_out => {
366+ let output: anyhow :: Result <_> = match output {
367+ Some ( Ok ( adkg_out) ) => {
361368 tracing:: info!( used_sessions = ?adkg_out. used_sessions, "Successfully obtained secret key & output from ADKG" ) ;
362369 if out. send( adkg_out) . is_err( ) {
363370 // fails if the receiver side is dropped early
@@ -368,9 +375,13 @@ where
368375 tokio:: time:: sleep( adkg_config. grace_period) . await ;
369376 Ok ( ( ) )
370377 }
371- Err ( e) => {
378+ Some ( Err ( e) ) => {
372379 tracing:: error!( "failed to obtain output from ADKG: {e:?}" ) ;
373- Err ( e)
380+ Err ( e. into( ) )
381+ }
382+ None => {
383+ tracing:: error!( "failed to obtain output from ADKG: stopped before an output" ) ;
384+ Err ( anyhow!( "ADKG stopped before output" ) )
374385 }
375386 } ;
376387
@@ -384,9 +395,9 @@ where
384395 } ;
385396
386397 tracing:: warn!( "Stopping ADKG..." ) ;
387- adkg . stop ( ) . await ;
398+ let _ = adkg_stop_tx . send ( ( ) ) ;
388399
389- Ok ( res?? )
400+ res?
390401}
391402
392403/// Pairing-based DLEQ proof that there exists an s_j s.t. P_1 = [s_j] G_1 \land P_2 = [s_j] G_2,
0 commit comments