Skip to content

Commit c390560

Browse files
remyerst-bast
andauthored
Delay considering a channel closed when seeing an on-chain spend (#2936)
Fixes Issue #2437 When an external channel is spent, add it to the `spentChannels` list instead of immediately removing it from the graph. RBF attempts can produce multiple spending txs in the mempool for the same channel. The `spendChannels` list maps the txid of the spending tx to the scid of the spent channel. When a channel announcement is validated with a funding tx on the `spentChannels` list, consider the new channel a splice of the corresponding spent channel. A splice updates the graph edges to preserve balance estimate information in the graph. If a spending tx from the `spentChannels` list is deeply buried before appearing in a valid channel announcement, remove the corresponding spent channel from the graph. The integration test demonstrates that local channels update their capacity, but we can not test the remote node (carol) because the ChannelAnnouncements are ignored because it has a duplicate scid. After PR #2941 we can fix this test. --------- Co-authored-by: t-bast <[email protected]>
1 parent 61af10a commit c390560

File tree

15 files changed

+566
-141
lines changed

15 files changed

+566
-141
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -104,20 +104,6 @@ object ZmqWatcher {
104104
def hints: Set[TxId]
105105
}
106106

107-
/**
108-
* Watch for the first transaction spending the given outpoint. We assume that txid is already confirmed or in the
109-
* mempool (i.e. the outpoint exists).
110-
*
111-
* NB: an event will be triggered only once when we see a transaction that spends the given outpoint. If you want to
112-
* react to the transaction spending the outpoint, you should use [[WatchSpent]] instead.
113-
*/
114-
sealed trait WatchSpentBasic[T <: WatchSpentBasicTriggered] extends Watch[T] {
115-
/** TxId of the outpoint to watch. */
116-
def txId: TxId
117-
/** Index of the outpoint to watch. */
118-
def outputIndex: Int
119-
}
120-
121107
/** This event is sent when a [[WatchConfirmed]] condition is met. */
122108
sealed trait WatchConfirmedTriggered extends WatchTriggered {
123109
/** Block in which the transaction was confirmed. */
@@ -134,11 +120,9 @@ object ZmqWatcher {
134120
def spendingTx: Transaction
135121
}
136122

137-
/** This event is sent when a [[WatchSpentBasic]] condition is met. */
138-
sealed trait WatchSpentBasicTriggered extends WatchTriggered
139-
140-
case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpentBasic[WatchExternalChannelSpentTriggered]
141-
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId) extends WatchSpentBasicTriggered
123+
case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { override def hints: Set[TxId] = Set.empty }
124+
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered
125+
case class UnwatchExternalChannelSpent(txId: TxId, outputIndex: Int) extends Command
142126

143127
case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered]
144128
case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered
@@ -197,7 +181,6 @@ object ZmqWatcher {
197181
private def utxo(w: GenericWatch): Option[OutPoint] = {
198182
w match {
199183
case w: WatchSpent[_] => Some(OutPoint(w.txId, w.outputIndex))
200-
case w: WatchSpentBasic[_] => Some(OutPoint(w.txId, w.outputIndex))
201184
case _ => None
202185
}
203186
}
@@ -245,7 +228,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
245228
.flatMap(watchedUtxos.get)
246229
.flatten
247230
.foreach {
248-
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
231+
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, tx))
249232
case w: WatchFundingSpent => context.self ! TriggerEvent(w.replyTo, w, WatchFundingSpentTriggered(tx))
250233
case w: WatchOutputSpent => context.self ! TriggerEvent(w.replyTo, w, WatchOutputSpentTriggered(tx))
251234
case _: WatchPublished => // nothing to do
@@ -339,9 +322,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
339322
val result = w match {
340323
case _ if watches.contains(w) =>
341324
Ignore // we ignore duplicates
342-
case w: WatchSpentBasic[_] =>
343-
checkSpentBasic(w)
344-
Keep
345325
case w: WatchSpent[_] =>
346326
checkSpent(w)
347327
Keep
@@ -375,6 +355,11 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
375355
}
376356
watching(watches -- deprecatedWatches, watchedUtxos)
377357

358+
case UnwatchExternalChannelSpent(txId, outputIndex) =>
359+
val deprecatedWatches = watches.keySet.collect { case w: WatchExternalChannelSpent if w.txId == txId && w.outputIndex == outputIndex => w }
360+
val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
361+
watching(watches -- deprecatedWatches, watchedUtxos1)
362+
378363
case ValidateRequest(replyTo, ann) =>
379364
client.validate(ann).map(replyTo ! _)
380365
Behaviors.same
@@ -390,17 +375,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
390375
}
391376
}
392377

393-
private def checkSpentBasic(w: WatchSpentBasic[_ <: WatchSpentBasicTriggered]): Future[Unit] = {
394-
// NB: we assume parent tx was published, we just need to make sure this particular output has not been spent
395-
client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect {
396-
case false =>
397-
log.info(s"output=${w.txId}:${w.outputIndex} has already been spent")
398-
w match {
399-
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
400-
}
401-
}
402-
}
403-
404378
private def checkSpent(w: WatchSpent[_ <: WatchSpentTriggered]): Future[Unit] = {
405379
// first let's see if the parent tx was published or not
406380
client.getTxConfirmations(w.txId).collect {

eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,8 @@ case class Commitments(params: ChannelParams,
805805
val remoteCommitIndex = active.head.remoteCommit.index
806806
val nextRemoteCommitIndex = remoteCommitIndex + 1
807807

808+
// While we have multiple active commitments, we use the most restrictive one.
809+
val capacity = active.map(_.capacity).min
808810
lazy val availableBalanceForSend: MilliSatoshi = active.map(_.availableBalanceForSend(params, changes)).min
809811
lazy val availableBalanceForReceive: MilliSatoshi = active.map(_.availableBalanceForReceive(params, changes)).min
810812

eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
2222
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
2323
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route}
2424
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
25-
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
25+
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
2626

2727
import scala.concurrent.duration.{DurationInt, FiniteDuration}
2828

@@ -195,6 +195,18 @@ case class BalanceEstimate private(low: MilliSatoshi,
195195
)
196196
}
197197

198+
def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalanceEstimate = {
199+
val newCapacities = capacities - desc.shortChannelId + (newShortChannelId -> newCapacity)
200+
val capacityDelta = (newCapacity - capacities.getOrElse(desc.shortChannelId, newCapacity)).toMilliSatoshi
201+
copy(
202+
// a capacity decrease will decrease the low bound, but not below 0
203+
low = (low + capacityDelta.min(0 msat)).max(0 msat),
204+
// a capacity increase will increase the high bound, but not above the capacity of the largest channel
205+
high = (high + capacityDelta.max(0 msat)).min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi),
206+
capacities = newCapacities
207+
)
208+
}
209+
198210
/**
199211
* Estimate the probability that we can successfully send `amount` through the channel
200212
*
@@ -263,6 +275,14 @@ case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstima
263275
defaultHalfLife
264276
)
265277

278+
def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalancesEstimates = BalancesEstimates(
279+
balances.updatedWith((desc.a, desc.b)) {
280+
case None => None
281+
case Some(balance) => Some(balance.updateEdge(desc, newShortChannelId, newCapacity))
282+
},
283+
defaultHalfLife
284+
)
285+
266286
def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi)(implicit log: LoggingAdapter): BalancesEstimates = {
267287
get(hop.nodeId, hop.nextNodeId).foreach { balance =>
268288
val estimatedProbability = balance.canSend(amount, TimestampSecond.now())
@@ -305,6 +325,13 @@ case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances:
305325
descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge).removeEdge(edge.reversed)),
306326
)
307327

328+
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): GraphWithBalanceEstimates = {
329+
GraphWithBalanceEstimates(
330+
graph.updateChannel(desc, newShortChannelId, newCapacity),
331+
balances.updateEdge(desc, newShortChannelId, newCapacity).updateEdge(desc.reversed, newShortChannelId, newCapacity)
332+
)
333+
}
334+
308335
def routeCouldRelay(route: Route)(implicit log: LoggingAdapter): GraphWithBalanceEstimates = {
309336
val (balances1, _) = route.hops.foldRight((balances, route.amount)) {
310337
case (hop, (balances, amount)) =>

eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,15 @@ object Graph {
624624
}
625625
}
626626

627-
case class Vertex(features: Features[NodeFeature], incomingEdges: Map[ChannelDesc, GraphEdge])
627+
case class Vertex(features: Features[NodeFeature], incomingEdges: Map[ChannelDesc, GraphEdge]) {
628+
def update(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): Vertex =
629+
incomingEdges.get(desc) match {
630+
case None => this
631+
case Some(edge) =>
632+
val updatedEdge = edge.copy(desc = desc.copy(shortChannelId = newShortChannelId), capacity = newCapacity)
633+
copy(incomingEdges = incomingEdges - desc + (desc.copy(shortChannelId = newShortChannelId) -> updatedEdge))
634+
}
635+
}
628636

629637
/** A graph data structure that uses an adjacency list, stores the incoming edges of the neighbors */
630638
case class DirectedGraph(private val vertices: Map[PublicKey, Vertex]) {
@@ -678,6 +686,22 @@ object Graph {
678686
descList.foldLeft(this)((acc, edge) => acc.removeChannel(edge))
679687
}
680688

689+
/**
690+
* Update the shortChannelId and capacity of edges corresponding to the given channel-desc,
691+
* both edges (corresponding to both directions) are updated.
692+
*
693+
* @param desc the channel description for the channel to update
694+
* @param newShortChannelId the new shortChannelId for this channel
695+
* @param newCapacity the new capacity of the channel
696+
* @return a new graph with updated vertexes
697+
*/
698+
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): DirectedGraph = {
699+
DirectedGraph(vertices
700+
.updatedWith(desc.b)(_.map(_.update(desc, newShortChannelId, newCapacity)))
701+
.updatedWith(desc.a)(_.map(_.update(desc.reversed, newShortChannelId, newCapacity)))
702+
)
703+
}
704+
681705
/**
682706
* @return For edges to be considered equal they must have the same in/out vertices AND same shortChannelId
683707
*/

eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId}
2626
import fr.acinq.eclair.Logs.LogCategory
2727
import fr.acinq.eclair._
2828
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
29-
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
29+
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
3030
import fr.acinq.eclair.channel._
31+
import fr.acinq.eclair.channel.fsm.Channel.ANNOUNCEMENTS_MINCONF
3132
import fr.acinq.eclair.crypto.TransportHandler
3233
import fr.acinq.eclair.db.NetworkDb
3334
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
@@ -113,7 +114,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
113114
scid2PrivateChannels = Map.empty,
114115
excludedChannels = Map.empty,
115116
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
116-
sync = Map.empty)
117+
sync = Map.empty,
118+
spentChannels = Map.empty)
117119
startWith(NORMAL, data)
118120
}
119121

@@ -259,8 +261,17 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
259261
case Event(r: ValidateResult, d) =>
260262
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)
261263

262-
case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
263-
stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
264+
case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
265+
val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId
266+
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid)
267+
watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2)
268+
stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId))
269+
270+
case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
271+
d.spentChannels.get(spendingTx.txid) match {
272+
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId)
273+
case None => stay()
274+
}
264275

265276
case Event(n: NodeAnnouncement, d: Data) =>
266277
stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n)
@@ -409,9 +420,9 @@ object Router {
409420
def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi] = if (u.channelFlags.isNode1) meta_opt.map(_.balance1) else meta_opt.map(_.balance2)
410421
def updateChannelUpdateSameSideAs(u: ChannelUpdate): PublicChannel = if (u.channelFlags.isNode1) copy(update_1_opt = Some(u)) else copy(update_2_opt = Some(u))
411422
def updateBalances(commitments: Commitments): PublicChannel = if (commitments.localNodeId == ann.nodeId1) {
412-
copy(meta_opt = Some(ChannelMeta(commitments.availableBalanceForSend, commitments.availableBalanceForReceive)))
423+
copy(capacity = commitments.capacity, meta_opt = Some(ChannelMeta(commitments.availableBalanceForSend, commitments.availableBalanceForReceive)))
413424
} else {
414-
copy(meta_opt = Some(ChannelMeta(commitments.availableBalanceForReceive, commitments.availableBalanceForSend)))
425+
copy(capacity = commitments.capacity, meta_opt = Some(ChannelMeta(commitments.availableBalanceForReceive, commitments.availableBalanceForSend)))
415426
}
416427
def applyChannelUpdate(update: Either[LocalChannelUpdate, RemoteChannelUpdate]): PublicChannel = update match {
417428
case Left(lcu) => updateChannelUpdateSameSideAs(lcu.channelUpdate).updateBalances(lcu.commitments)
@@ -573,7 +584,6 @@ object Router {
573584
def +(ignoreNode: PublicKey): Ignore = copy(nodes = nodes + ignoreNode)
574585
def ++(ignoreNodes: Set[PublicKey]): Ignore = copy(nodes = nodes ++ ignoreNodes)
575586
def +(ignoreChannel: ChannelDesc): Ignore = copy(channels = channels + ignoreChannel)
576-
def emptyNodes(): Ignore = copy(nodes = Set.empty)
577587
def emptyChannels(): Ignore = copy(channels = Set.empty)
578588
// @formatter:on
579589
}
@@ -622,12 +632,6 @@ object Router {
622632
/** Full route including the final hop, if any. */
623633
val fullRoute: Seq[Hop] = hops ++ finalHop_opt.toSeq
624634

625-
/**
626-
* Fee paid for the trampoline hop, if any.
627-
* Note that when using MPP to reach the trampoline node, the trampoline fee must be counted only once.
628-
*/
629-
val trampolineFee: MilliSatoshi = finalHop_opt.collect { case hop: NodeHop => hop.fee(amount) }.getOrElse(0 msat)
630-
631635
/**
632636
* Fee paid for the blinded route, if any.
633637
* Note that when we are the introduction node for the blinded route, we cannot easily compute the fee without the
@@ -757,7 +761,8 @@ object Router {
757761
scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels
758762
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
759763
graphWithBalances: GraphWithBalanceEstimates,
760-
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
764+
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
765+
spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried
761766
) {
762767
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
763768
// let's assume this is a real scid

0 commit comments

Comments
 (0)