@@ -135,7 +135,13 @@ impl LnVerificationService {
135135 tx. commit ( ) . await ?;
136136
137137 tracing:: info!( "Verification {} finalised" , verification. id) ;
138- let _ = self . completion_tx . send ( verification. id . clone ( ) ) ;
138+ if let Err ( e) = self . completion_tx . send ( verification. id . clone ( ) ) {
139+ tracing:: warn!(
140+ "Failed to broadcast verification completion for {}: {}" ,
141+ verification. id,
142+ e
143+ ) ;
144+ }
139145
140146 Ok ( Some ( verification) )
141147 }
@@ -202,8 +208,9 @@ impl LnVerificationService {
202208 }
203209
204210 /// Get a verification and wait for it to finalize if needed.
205- /// - First gets and syncs the verification
206- /// - If not finalized, waits up to `timeout` for payment
211+ /// - Subscribes to broadcast channel FIRST to avoid race condition
212+ /// - Then gets and syncs the verification
213+ /// - If not finalized, waits up to `timeout` for payment using pre-subscribed receiver
207214 ///
208215 /// # Returns
209216 /// * `Ok(Some(VerificationResponse::Success))` - Verification finalized (already or within timeout)
@@ -215,21 +222,24 @@ impl LnVerificationService {
215222 id : & VerificationId ,
216223 timeout : Duration ,
217224 ) -> Result < Option < VerificationResponse > , LnVerificationError > {
218- let mut verification = match self . get_and_sync_verification ( id) . await ? {
225+ let receiver = self . subscribe ( ) ;
226+
227+ let verification = match self . get_and_sync_verification ( id) . await ? {
219228 Some ( v) => v,
220229 None => return Ok ( None ) ,
221230 } ;
222231
223- if !verification. is_finalised ( ) {
224- match self . wait_for_payment ( id, timeout) . await ? {
225- Some ( v) => {
226- verification = v;
227- Ok ( Some ( VerificationResponse :: Success ( verification) ) )
228- }
229- None => Ok ( Some ( VerificationResponse :: TimedOut ) ) ,
230- }
231- } else {
232- Ok ( Some ( VerificationResponse :: Success ( verification) ) )
232+ if verification. is_finalised ( ) {
233+ // Already finalized
234+ return Ok ( Some ( VerificationResponse :: Success ( verification) ) ) ;
235+ }
236+
237+ match self
238+ . wait_for_payment_with_receiver ( id, timeout, receiver)
239+ . await ?
240+ {
241+ Some ( v) => Ok ( Some ( VerificationResponse :: Success ( v) ) ) ,
242+ None => Ok ( Some ( VerificationResponse :: TimedOut ) ) ,
233243 }
234244 }
235245
@@ -239,21 +249,27 @@ impl LnVerificationService {
239249 self . completion_tx . subscribe ( )
240250 }
241251
242- /// Wait for a verification to be finalized, with timeout .
252+ /// Wait for a verification to be finalized using a pre-subscribed receiver .
243253 /// Returns Ok(Some(verification)) if finalized within timeout, Ok(None) if timeout exceeded.
244- async fn wait_for_payment (
254+ async fn wait_for_payment_with_receiver (
245255 & self ,
246256 id : & VerificationId ,
247257 timeout : Duration ,
258+ mut receiver : broadcast:: Receiver < VerificationId > ,
248259 ) -> Result < Option < LightningVerificationEntity > , LnVerificationError > {
249260 let future = async {
250- let mut receiver = self . subscribe ( ) ;
251261 loop {
252262 match receiver. recv ( ) . await {
253263 Ok ( finalized_id) if finalized_id == * id => break ,
254264 Ok ( _) => continue , // Different verification ID, keep waiting
255265 Err ( broadcast:: error:: RecvError :: Lagged ( n) ) => {
256266 tracing:: warn!( "Broadcast receiver lagged by {} messages" , n) ;
267+ // Check if our verification was in the missed messages
268+ if let Ok ( Some ( v) ) = self . get_verification ( id) . await
269+ && v. is_finalised ( )
270+ {
271+ break ;
272+ }
257273 continue ;
258274 }
259275 Err ( broadcast:: error:: RecvError :: Closed ) => {
0 commit comments