@@ -63,6 +63,7 @@ object ZmqWatcher {
6363 private case class PublishBlockHeight (current : BlockHeight ) extends Command
6464 private case class ProcessNewBlock (blockId : BlockId ) extends Command
6565 private case class ProcessNewTransaction (tx : Transaction ) extends Command
66+ private case class SetWatchHint (w : GenericWatch , hint : WatchHint ) extends Command
6667
6768 final case class ValidateRequest (replyTo : ActorRef [ValidateResult ], ann : ChannelAnnouncement ) extends Command
6869 final case class ValidateResult (c : ChannelAnnouncement , fundingTx : Either [Throwable , (Transaction , UtxoStatus )])
@@ -155,7 +156,8 @@ object ZmqWatcher {
155156 case class WatchFundingDeeplyBuried (replyTo : ActorRef [WatchFundingDeeplyBuriedTriggered ], txId : TxId , minDepth : Long ) extends WatchConfirmed [WatchFundingDeeplyBuriedTriggered ]
156157 case class WatchFundingDeeplyBuriedTriggered (blockHeight : BlockHeight , txIndex : Int , tx : Transaction ) extends WatchConfirmedTriggered
157158
158- case class WatchTxConfirmed (replyTo : ActorRef [WatchTxConfirmedTriggered ], txId : TxId , minDepth : Long ) extends WatchConfirmed [WatchTxConfirmedTriggered ]
159+ case class RelativeDelay (parentTxId : TxId , delay : Long )
160+ case class WatchTxConfirmed (replyTo : ActorRef [WatchTxConfirmedTriggered ], txId : TxId , minDepth : Long , delay_opt : Option [RelativeDelay ] = None ) extends WatchConfirmed [WatchTxConfirmedTriggered ]
159161 case class WatchTxConfirmedTriggered (blockHeight : BlockHeight , txIndex : Int , tx : Transaction ) extends WatchConfirmedTriggered
160162
161163 case class WatchParentTxConfirmed (replyTo : ActorRef [WatchParentTxConfirmedTriggered ], txId : TxId , minDepth : Long ) extends WatchConfirmed [WatchParentTxConfirmedTriggered ]
@@ -167,6 +169,13 @@ object ZmqWatcher {
167169 private sealed trait AddWatchResult
168170 private case object Keep extends AddWatchResult
169171 private case object Ignore extends AddWatchResult
172+
173+ sealed trait WatchHint
174+ /**
175+ * In some cases we don't need to check watches every time a block is found and only need to check again after we
176+ * reach a specific block height. This is for example the case for transactions with a CSV delay.
177+ */
178+ private case class CheckAfterBlock (blockHeight : BlockHeight ) extends WatchHint
170179 // @formatter:on
171180
172181 def apply (nodeParams : NodeParams , blockCount : AtomicLong , client : BitcoinCoreClient ): Behavior [Command ] =
@@ -178,7 +187,7 @@ object ZmqWatcher {
178187 timers.startSingleTimer(TickNewBlock , 1 second)
179188 // we start a timer in case we don't receive ZMQ block events
180189 timers.startSingleTimer(TickBlockTimeout , blockTimeout)
181- new ZmqWatcher (nodeParams, blockCount, client, context, timers).watching(Set .empty[GenericWatch ], Map .empty[OutPoint , Set [GenericWatch ]])
190+ new ZmqWatcher (nodeParams, blockCount, client, context, timers).watching(Map .empty[GenericWatch , Option [ WatchHint ] ], Map .empty[OutPoint , Set [GenericWatch ]])
182191 }
183192 }
184193
@@ -224,7 +233,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
224233
225234 private val watchdog = context.spawn(Behaviors .supervise(BlockchainWatchdog (nodeParams, 150 seconds)).onFailure(SupervisorStrategy .resume), " blockchain-watchdog" )
226235
227- private def watching (watches : Set [GenericWatch ], watchedUtxos : Map [OutPoint , Set [GenericWatch ]]): Behavior [Command ] = {
236+ private def watching (watches : Map [GenericWatch , Option [ WatchHint ] ], watchedUtxos : Map [OutPoint , Set [GenericWatch ]]): Behavior [Command ] = {
228237 Behaviors .receiveMessage {
229238 case ProcessNewTransaction (tx) =>
230239 log.debug(" analyzing txid={} tx={}" , tx.txid, tx)
@@ -239,7 +248,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
239248 case _ : WatchPublished => // nothing to do
240249 case _ : WatchConfirmed [_] => // nothing to do
241250 }
242- watches.collect {
251+ watches.keySet. collect {
243252 case w : WatchPublished if w.txId == tx.txid => context.self ! TriggerEvent (w.replyTo, w, WatchPublishedTriggered (tx))
244253 }
245254 Behaviors .same
@@ -279,21 +288,32 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
279288 case Failure (t) => GetBlockCountFailed (t)
280289 case Success (currentHeight) => PublishBlockHeight (currentHeight)
281290 }
282- // TODO: beware of the herd effect
283- KamonExt .timeFuture(Metrics .NewBlockCheckConfirmedDuration .withoutTags()) {
284- Future .sequence(watches.collect {
285- case w : WatchPublished => checkPublished(w)
286- case w : WatchConfirmed [_] => checkConfirmed(w)
287- })
288- }
289291 Behaviors .same
290292
291293 case PublishBlockHeight (currentHeight) =>
292294 log.debug(" setting blockHeight={}" , currentHeight)
293295 blockHeight.set(currentHeight.toLong)
294296 context.system.eventStream ! EventStream .Publish (CurrentBlockHeight (currentHeight))
297+ // TODO: should we try to mitigate the herd effect and not check all watches immediately?
298+ KamonExt .timeFuture(Metrics .NewBlockCheckConfirmedDuration .withoutTags()) {
299+ Future .sequence(watches.collect {
300+ case (w : WatchPublished , _) => checkPublished(w)
301+ case (w : WatchConfirmed [_], hint) =>
302+ hint match {
303+ case Some (CheckAfterBlock (delayUntilBlock)) if currentHeight < delayUntilBlock => Future .successful(())
304+ case _ => checkConfirmed(w, currentHeight)
305+ }
306+ })
307+ }
295308 Behaviors .same
296309
310+ case SetWatchHint (w, hint) =>
311+ val watches1 = watches.get(w) match {
312+ case Some (_) => watches + (w -> Some (hint))
313+ case None => watches
314+ }
315+ watching(watches1, watchedUtxos)
316+
297317 case TriggerEvent (replyTo, watch, event) =>
298318 if (watches.contains(watch)) {
299319 log.debug(" triggering {}" , watch)
@@ -323,7 +343,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
323343 checkSpent(w)
324344 Keep
325345 case w : WatchConfirmed [_] =>
326- checkConfirmed(w)
346+ checkConfirmed(w, BlockHeight (blockHeight.get()) )
327347 Keep
328348 case w : WatchPublished =>
329349 checkPublished(w)
@@ -333,14 +353,14 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
333353 case Keep =>
334354 log.debug(" adding watch {}" , w)
335355 context.watchWith(w.replyTo, StopWatching (w.replyTo))
336- watching(watches + w , addWatchedUtxos(watchedUtxos, w))
356+ watching(watches + (w -> None ) , addWatchedUtxos(watchedUtxos, w))
337357 case Ignore =>
338358 Behaviors .same
339359 }
340360
341361 case StopWatching (origin) =>
342- // we remove watches associated to dead actors
343- val deprecatedWatches = watches.filter(_.replyTo == origin)
362+ // We remove watches associated to dead actors.
363+ val deprecatedWatches = watches.keySet. filter(_.replyTo == origin)
344364 val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
345365 watching(watches -- deprecatedWatches, watchedUtxos1)
346366
@@ -353,7 +373,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
353373 Behaviors .same
354374
355375 case r : ListWatches =>
356- r.replyTo ! watches
376+ r.replyTo ! watches.keySet
357377 Behaviors .same
358378
359379 }
@@ -414,7 +434,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
414434 client.getTransaction(w.txId).map(tx => context.self ! TriggerEvent (w.replyTo, w, WatchPublishedTriggered (tx)))
415435 }
416436
417- private def checkConfirmed (w : WatchConfirmed [_ <: WatchConfirmedTriggered ]): Future [Unit ] = {
437+ private def checkConfirmed (w : WatchConfirmed [_ <: WatchConfirmedTriggered ], currentHeight : BlockHeight ): Future [Unit ] = {
418438 log.debug(" checking confirmations of txid={}" , w.txId)
419439 // NB: this is very inefficient since internally we call `getrawtransaction` three times, but it doesn't really
420440 // matter because this only happens once, when the watched transaction has reached min_depth
@@ -431,7 +451,33 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
431451 }
432452 }
433453 }
434- case _ => Future .successful((): Unit )
454+ case Some (confirmations) =>
455+ // Once the transaction is confirmed, we don't need to check again at every new block, we only need to check
456+ // again once we should have reached the minimum depth to verify that there hasn't been a reorg.
457+ context.self ! SetWatchHint (w, CheckAfterBlock (currentHeight + w.minDepth - confirmations))
458+ Future .successful(())
459+ case None =>
460+ w match {
461+ case WatchTxConfirmed (_, _, _, Some (relativeDelay)) =>
462+ log.debug(" txId={} has a relative delay of {} blocks, checking parentTxId={}" , w.txId, relativeDelay.delay, relativeDelay.parentTxId)
463+ // Note how we add one block to avoid an off-by-one:
464+ // - if the parent is confirmed at block P
465+ // - the CSV delay is D and the minimum depth is M
466+ // - the first block that can include the child is P + D
467+ // - the first block at which we can reach minimum depth is P + D + M
468+ // - if we are currently at block P + N, the parent has C = N + 1 confirmations
469+ // - we want to check at block P + N + D + M + 1 - C = P + N + D + M + 1 - (N + 1) = P + D + M
470+ val delay = relativeDelay.delay + w.minDepth + 1
471+ client.getTxConfirmations(relativeDelay.parentTxId).map(_.getOrElse(0 )).collect {
472+ case confirmations if confirmations < delay => context.self ! SetWatchHint (w, CheckAfterBlock (currentHeight + delay - confirmations))
473+ }
474+ case _ =>
475+ // The transaction is unconfirmed: we don't need to check again at every new block: we can check only once
476+ // every minDepth blocks, which is more efficient. If the transaction is included at the current height in
477+ // a reorg, we will trigger the watch one block later than expected, but this is fine.
478+ context.self ! SetWatchHint (w, CheckAfterBlock (currentHeight + w.minDepth))
479+ Future .successful(())
480+ }
435481 }
436482 }
437483
0 commit comments