diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala index 347b92aa76..766750e76d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala @@ -104,20 +104,6 @@ object ZmqWatcher { def hints: Set[TxId] } - /** - * Watch for the first transaction spending the given outpoint. We assume that txid is already confirmed or in the - * mempool (i.e. the outpoint exists). - * - * NB: an event will be triggered only once when we see a transaction that spends the given outpoint. If you want to - * react to the transaction spending the outpoint, you should use [[WatchSpent]] instead. - */ - sealed trait WatchSpentBasic[T <: WatchSpentBasicTriggered] extends Watch[T] { - /** TxId of the outpoint to watch. */ - def txId: TxId - /** Index of the outpoint to watch. */ - def outputIndex: Int - } - /** This event is sent when a [[WatchConfirmed]] condition is met. */ sealed trait WatchConfirmedTriggered extends WatchTriggered { /** Block in which the transaction was confirmed. */ @@ -134,11 +120,9 @@ object ZmqWatcher { def spendingTx: Transaction } - /** This event is sent when a [[WatchSpentBasic]] condition is met. */ - sealed trait WatchSpentBasicTriggered extends WatchTriggered - - case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpentBasic[WatchExternalChannelSpentTriggered] - case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId) extends WatchSpentBasicTriggered + case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { override def hints: Set[TxId] = Set.empty } + case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered + case class UnwatchExternalChannelSpent(txId: TxId, outputIndex: Int) extends Command case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered] case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered @@ -197,7 +181,6 @@ object ZmqWatcher { private def utxo(w: GenericWatch): Option[OutPoint] = { w match { case w: WatchSpent[_] => Some(OutPoint(w.txId, w.outputIndex)) - case w: WatchSpentBasic[_] => Some(OutPoint(w.txId, w.outputIndex)) case _ => None } } @@ -245,7 +228,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client .flatMap(watchedUtxos.get) .flatten .foreach { - case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId)) + case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, tx)) case w: WatchFundingSpent => context.self ! TriggerEvent(w.replyTo, w, WatchFundingSpentTriggered(tx)) case w: WatchOutputSpent => context.self ! TriggerEvent(w.replyTo, w, WatchOutputSpentTriggered(tx)) case _: WatchPublished => // nothing to do @@ -339,9 +322,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client val result = w match { case _ if watches.contains(w) => Ignore // we ignore duplicates - case w: WatchSpentBasic[_] => - checkSpentBasic(w) - Keep case w: WatchSpent[_] => checkSpent(w) Keep @@ -375,6 +355,11 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client } watching(watches -- deprecatedWatches, watchedUtxos) + case UnwatchExternalChannelSpent(txId, outputIndex) => + val deprecatedWatches = watches.keySet.collect { case w: WatchExternalChannelSpent if w.txId == txId && w.outputIndex == outputIndex => w } + val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) } + watching(watches -- deprecatedWatches, watchedUtxos1) + case ValidateRequest(replyTo, ann) => client.validate(ann).map(replyTo ! _) Behaviors.same @@ -390,17 +375,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client } } - private def checkSpentBasic(w: WatchSpentBasic[_ <: WatchSpentBasicTriggered]): Future[Unit] = { - // NB: we assume parent tx was published, we just need to make sure this particular output has not been spent - client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect { - case false => - log.info(s"output=${w.txId}:${w.outputIndex} has already been spent") - w match { - case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId)) - } - } - } - private def checkSpent(w: WatchSpent[_ <: WatchSpentTriggered]): Future[Unit] = { // first let's see if the parent tx was published or not client.getTxConfirmations(w.txId).collect { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index 2cbe214432..0ad044adc0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -805,6 +805,8 @@ case class Commitments(params: ChannelParams, val remoteCommitIndex = active.head.remoteCommit.index val nextRemoteCommitIndex = remoteCommitIndex + 1 + // While we have multiple active commitments, we use the most restrictive one. + val capacity = active.map(_.capacity).min lazy val availableBalanceForSend: MilliSatoshi = active.map(_.availableBalanceForSend(params, changes)).min lazy val availableBalanceForReceive: MilliSatoshi = active.map(_.availableBalanceForReceive(params, changes)).min diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala index 06a6fcd55d..b53300eec2 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala @@ -22,7 +22,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong} import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge} import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route} import fr.acinq.eclair.wire.protocol.NodeAnnouncement -import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion} +import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion} import scala.concurrent.duration.{DurationInt, FiniteDuration} @@ -195,6 +195,18 @@ case class BalanceEstimate private(low: MilliSatoshi, ) } + def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalanceEstimate = { + val newCapacities = capacities - desc.shortChannelId + (newShortChannelId -> newCapacity) + val capacityDelta = (newCapacity - capacities.getOrElse(desc.shortChannelId, newCapacity)).toMilliSatoshi + copy( + // a capacity decrease will decrease the low bound, but not below 0 + low = (low + capacityDelta.min(0 msat)).max(0 msat), + // a capacity increase will increase the high bound, but not above the capacity of the largest channel + high = (high + capacityDelta.max(0 msat)).min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi), + capacities = newCapacities + ) + } + /** * Estimate the probability that we can successfully send `amount` through the channel * @@ -263,6 +275,14 @@ case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstima defaultHalfLife ) + def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalancesEstimates = BalancesEstimates( + balances.updatedWith((desc.a, desc.b)) { + case None => None + case Some(balance) => Some(balance.updateEdge(desc, newShortChannelId, newCapacity)) + }, + defaultHalfLife + ) + def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi)(implicit log: LoggingAdapter): BalancesEstimates = { get(hop.nodeId, hop.nextNodeId).foreach { balance => val estimatedProbability = balance.canSend(amount, TimestampSecond.now()) @@ -305,6 +325,13 @@ case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances: descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge).removeEdge(edge.reversed)), ) + def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): GraphWithBalanceEstimates = { + GraphWithBalanceEstimates( + graph.updateChannel(desc, newShortChannelId, newCapacity), + balances.updateEdge(desc, newShortChannelId, newCapacity).updateEdge(desc.reversed, newShortChannelId, newCapacity) + ) + } + def routeCouldRelay(route: Route)(implicit log: LoggingAdapter): GraphWithBalanceEstimates = { val (balances1, _) = route.hops.foldRight((balances, route.amount)) { case (hop, (balances, amount)) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala index 89af943807..f3e7c9f282 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala @@ -624,7 +624,15 @@ object Graph { } } - case class Vertex(features: Features[NodeFeature], incomingEdges: Map[ChannelDesc, GraphEdge]) + case class Vertex(features: Features[NodeFeature], incomingEdges: Map[ChannelDesc, GraphEdge]) { + def update(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): Vertex = + incomingEdges.get(desc) match { + case None => this + case Some(edge) => + val updatedEdge = edge.copy(desc = desc.copy(shortChannelId = newShortChannelId), capacity = newCapacity) + copy(incomingEdges = incomingEdges - desc + (desc.copy(shortChannelId = newShortChannelId) -> updatedEdge)) + } + } /** A graph data structure that uses an adjacency list, stores the incoming edges of the neighbors */ case class DirectedGraph(private val vertices: Map[PublicKey, Vertex]) { @@ -678,6 +686,22 @@ object Graph { descList.foldLeft(this)((acc, edge) => acc.removeChannel(edge)) } + /** + * Update the shortChannelId and capacity of edges corresponding to the given channel-desc, + * both edges (corresponding to both directions) are updated. + * + * @param desc the channel description for the channel to update + * @param newShortChannelId the new shortChannelId for this channel + * @param newCapacity the new capacity of the channel + * @return a new graph with updated vertexes + */ + def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): DirectedGraph = { + DirectedGraph(vertices + .updatedWith(desc.b)(_.map(_.update(desc, newShortChannelId, newCapacity))) + .updatedWith(desc.a)(_.map(_.update(desc.reversed, newShortChannelId, newCapacity))) + ) + } + /** * @return For edges to be considered equal they must have the same in/out vertices AND same shortChannelId */ diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 4974931a35..6dc18eb625 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -26,8 +26,9 @@ import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId} import fr.acinq.eclair.Logs.LogCategory import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel._ +import fr.acinq.eclair.channel.fsm.Channel.ANNOUNCEMENTS_MINCONF import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.db.NetworkDb import fr.acinq.eclair.io.Peer.PeerRoutingMessage @@ -113,7 +114,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm scid2PrivateChannels = Map.empty, excludedChannels = Map.empty, graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife), - sync = Map.empty) + sync = Map.empty, + spentChannels = Map.empty) startWith(NORMAL, data) } @@ -259,8 +261,17 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm case Event(r: ValidateResult, d) => stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r) - case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) => - stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId) + case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) => + val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId + log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid) + watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2) + stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId)) + + case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) => + d.spentChannels.get(spendingTx.txid) match { + case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId) + case None => stay() + } case Event(n: NodeAnnouncement, d: Data) => stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n) @@ -409,9 +420,9 @@ object Router { def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi] = if (u.channelFlags.isNode1) meta_opt.map(_.balance1) else meta_opt.map(_.balance2) def updateChannelUpdateSameSideAs(u: ChannelUpdate): PublicChannel = if (u.channelFlags.isNode1) copy(update_1_opt = Some(u)) else copy(update_2_opt = Some(u)) def updateBalances(commitments: Commitments): PublicChannel = if (commitments.localNodeId == ann.nodeId1) { - copy(meta_opt = Some(ChannelMeta(commitments.availableBalanceForSend, commitments.availableBalanceForReceive))) + copy(capacity = commitments.capacity, meta_opt = Some(ChannelMeta(commitments.availableBalanceForSend, commitments.availableBalanceForReceive))) } else { - copy(meta_opt = Some(ChannelMeta(commitments.availableBalanceForReceive, commitments.availableBalanceForSend))) + copy(capacity = commitments.capacity, meta_opt = Some(ChannelMeta(commitments.availableBalanceForReceive, commitments.availableBalanceForSend))) } def applyChannelUpdate(update: Either[LocalChannelUpdate, RemoteChannelUpdate]): PublicChannel = update match { case Left(lcu) => updateChannelUpdateSameSideAs(lcu.channelUpdate).updateBalances(lcu.commitments) @@ -573,7 +584,6 @@ object Router { def +(ignoreNode: PublicKey): Ignore = copy(nodes = nodes + ignoreNode) def ++(ignoreNodes: Set[PublicKey]): Ignore = copy(nodes = nodes ++ ignoreNodes) def +(ignoreChannel: ChannelDesc): Ignore = copy(channels = channels + ignoreChannel) - def emptyNodes(): Ignore = copy(nodes = Set.empty) def emptyChannels(): Ignore = copy(channels = Set.empty) // @formatter:on } @@ -622,12 +632,6 @@ object Router { /** Full route including the final hop, if any. */ val fullRoute: Seq[Hop] = hops ++ finalHop_opt.toSeq - /** - * Fee paid for the trampoline hop, if any. - * Note that when using MPP to reach the trampoline node, the trampoline fee must be counted only once. - */ - val trampolineFee: MilliSatoshi = finalHop_opt.collect { case hop: NodeHop => hop.fee(amount) }.getOrElse(0 msat) - /** * Fee paid for the blinded route, if any. * Note that when we are the introduction node for the blinded route, we cannot easily compute the fee without the @@ -757,7 +761,8 @@ object Router { scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graphWithBalances: GraphWithBalanceEstimates, - sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message + sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message + spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried ) { def resolve(scid: ShortChannelId): Option[KnownChannel] = { // let's assume this is a real scid diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index f9f4e2df1e..fc36c0cf6f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -24,7 +24,7 @@ import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write} import fr.acinq.bitcoin.scalacompat.{Satoshi, TxId} import fr.acinq.eclair.ShortChannelId.outputIndex import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{UtxoStatus, ValidateRequest, ValidateResult, WatchExternalChannelSpent} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel._ import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.db.NetworkDb @@ -113,7 +113,20 @@ object Validation { log.debug("validation successful for shortChannelId={}", c.shortChannelId) remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))) val capacity = tx.txOut(outputIndex).amount - Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None)) + d0.spentChannels.get(tx.txid) match { + case Some(parentScid) => + d0.channels.get(parentScid) match { + case Some(parentChannel) => + Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel)) + case None => + log.error("spent parent channel shortChannelId={} not found for splice shortChannelId={}", parentScid, c.shortChannelId) + val spendingTxs = d0.spentChannels.filter(_._2 == parentScid).keySet + spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) + val d1 = d0.copy(spentChannels = d0.spentChannels -- spendingTxs) + Some(addPublicChannel(d1, nodeParams, watcher, c, tx.txid, capacity, None)) + } + case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None)) + } } case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) => if (fundingTxStatus.spendingTxConfirmed) { @@ -156,6 +169,46 @@ object Validation { } } + private def updateSplicedPublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, spliceTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = { + implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors + val fundingOutputIndex = outputIndex(ann.shortChannelId) + watcher ! WatchExternalChannelSpent(ctx.self, spliceTxId, fundingOutputIndex, ann.shortChannelId) + watcher ! UnwatchExternalChannelSpent(parentChannel.fundingTxId, outputIndex(parentChannel.ann.shortChannelId)) + // we notify front nodes that the channel has been replaced + ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil)) + ctx.system.eventStream.publish(ChannelLost(parentChannel.shortChannelId)) + nodeParams.db.network.addChannel(ann, spliceTxId, capacity) + nodeParams.db.network.removeChannel(parentChannel.shortChannelId) + val newPubChan = parentChannel.copy( + ann = ann, + fundingTxId = spliceTxId, + capacity = capacity, + // we keep the previous channel updates to ensure that the channel is still used until we receive the new ones + update_1_opt = parentChannel.update_1_opt, + update_2_opt = parentChannel.update_2_opt, + ) + log.debug("replacing parent channel scid={} with splice channel scid={}; splice channel={}", parentChannel.shortChannelId, ann.shortChannelId, newPubChan) + // we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid + log.debug("updating the graph for shortChannelId={}", newPubChan.shortChannelId) + val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity) + val spendingTxs = d.spentChannels.filter(_._2 == parentChannel.shortChannelId).keySet + spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) + val spentChannels1 = d.spentChannels -- spendingTxs + d.copy( + // we also add the splice scid -> channelId and remove the parent scid -> channelId mappings + channels = d.channels + (newPubChan.shortChannelId -> newPubChan) - parentChannel.shortChannelId, + // remove the parent channel from the pruned channels + prunedChannels = d.prunedChannels - parentChannel.shortChannelId, + // we also add the newly validated channels to the rebroadcast queue + rebroadcast = d.rebroadcast.copy( + // we rebroadcast the splice channel to our peers + channels = d.rebroadcast.channels + (newPubChan.ann -> d.awaiting.getOrElse(newPubChan.ann, if (isRelatedTo(ann, nodeParams.nodeId)) Seq(LocalGossip) else Nil).toSet), + ), + graphWithBalances = graph1, + spentChannels = spentChannels1 + ) + } + private def addPublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, privChan_opt: Option[PrivateChannel])(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors val fundingOutputIndex = outputIndex(ann.shortChannelId) @@ -214,10 +267,10 @@ object Validation { } else d1 } - def handleChannelSpent(d: Data, db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { + def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors - val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.ann - log.info("funding tx of channelId={} has been spent", shortChannelId) + val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get + log.info("funding tx for channelId={} was spent", shortChannelId) // we need to remove nodes that aren't tied to any channels anymore val channels1 = d.channels - shortChannelId val prunedChannels1 = d.prunedChannels - shortChannelId @@ -236,7 +289,15 @@ object Validation { db.removeNode(nodeId) ctx.system.eventStream.publish(NodeLost(nodeId)) } - d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1) + // we no longer need to track this or alternative transactions that spent the parent channel + // either this channel was really closed, or it was spliced and the announcement was not received in time + // we will re-add a spliced channel as a new channel later when we receive the announcement + watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId)) + val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet + // stop watching the spending txs that will never confirm, we already got confirmations for this spending tx + (spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId)) + val spentChannels1 = d.spentChannels -- spendingTxs + d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1) } def handleNodeAnnouncement(d: Data, db: NetworkDb, origins: Set[GossipOrigin], n: NodeAnnouncement, wasStashed: Boolean = false)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { @@ -545,7 +606,7 @@ object Validation { val scid2PrivateChannels1 = d.scid2PrivateChannels - lcd.shortIds.localAlias.toLong -- lcd.shortIds.real.toOption.map(_.toLong) // a local channel has permanently gone down if (lcd.shortIds.real.toOption.exists(d.channels.contains)) { - // the channel was public, we will receive (or have already received) a WatchEventSpentBasic event, that will trigger a clean up of the channel + // the channel was public, we will receive (or have already received) a WatchSpent event, that will trigger a clean up of the channel // so let's not do anything here d.copy(scid2PrivateChannels = scid2PrivateChannels1) } else if (d.privateChannels.contains(lcd.channelId)) { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala index 934c5d6365..790f3bd33d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala @@ -375,7 +375,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I shortIds4.real.toOption.get.toLong -> channelId4, ) val g = GraphWithBalanceEstimates(DirectedGraph(Nil), 1 hour) - val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty) + val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty, Map.empty) eclair.findRoute(c, 250_000 msat, None) val routeRequest1 = router.expectMsgType[RouteRequest] diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala index 2e6a842225..18adb2d1e0 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala @@ -227,7 +227,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind watcher ! ListWatches(probe.ref) assert(probe.expectMsgType[Set[Watch[_]]].isEmpty) - // If we try to watch a transaction that has already been confirmed, we should immediately receive a WatchEventConfirmed. + // If we try to watch a transaction that has already been confirmed, we should immediately receive a WatchConfirmedTriggered event. watcher ! WatchFundingConfirmed(probe.ref, tx1.txid, 1) assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx1.txid) watcher ! WatchFundingConfirmed(probe.ref, tx2.txid, 2) @@ -270,7 +270,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind generateBlocks(3) assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid) - // If we watch the transaction when it's already confirmed, we immediately receive the WatchEventConfirmed. + // If we watch the transaction when it's already confirmed, we immediately receive the WatchConfirmedTriggered event. watcher ! WatchTxConfirmed(probe.ref, tx.txid, 3, Some(delay.copy(delay = 720))) assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid) }) @@ -293,23 +293,25 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind assert(probe.expectMsgType[Set[Watch[_]]].size == 2) bitcoinClient.publishTransaction(tx1) - // tx and tx1 aren't confirmed yet, but we trigger the WatchEventSpent when we see tx1 in the mempool. + // tx and tx1 aren't confirmed yet, but we trigger the WatchSpentTriggered event when we see tx1 in the mempool. probe.expectMsgAllOf( - WatchExternalChannelSpentTriggered(RealShortChannelId(5)), + WatchExternalChannelSpentTriggered(RealShortChannelId(5), tx1), WatchFundingSpentTriggered(tx1) ) - // Let's confirm tx and tx1: seeing tx1 in a block should trigger WatchEventSpent again, but not WatchEventSpentBasic - // (which only triggers once). + // Let's confirm tx and tx1: seeing tx1 in a block should trigger both WatchSpentTriggered events again. bitcoinClient.getBlockHeight().pipeTo(probe.ref) val initialBlockHeight = probe.expectMsgType[BlockHeight] generateBlocks(1) - probe.expectMsg(WatchFundingSpentTriggered(tx1)) + probe.expectMsgAllOf( + WatchExternalChannelSpentTriggered(RealShortChannelId(5), tx1), + WatchFundingSpentTriggered(tx1) + ) probe.expectNoMessage(100 millis) watcher ! ListWatches(probe.ref) val watches1 = probe.expectMsgType[Set[Watch[_]]] - assert(watches1.size == 1) - assert(watches1.forall(_.isInstanceOf[WatchFundingSpent])) + assert(watches1.size == 2) + assert(watches1.forall(_.isInstanceOf[WatchSpent[_]])) // Let's submit tx2, and set a watch after it has been confirmed this time. bitcoinClient.publishTransaction(tx2) @@ -321,8 +323,8 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind watcher ! ListWatches(probe.ref) val watches2 = probe.expectMsgType[Set[Watch[_]]] - assert(watches2.size == 1) - assert(watches2.forall(_.isInstanceOf[WatchFundingSpent])) + assert(watches2.size == 2) + assert(watches2.forall(_.isInstanceOf[WatchSpent[_]])) watcher ! StopWatching(probe.ref) // We use hints and see if we can find tx2 @@ -341,12 +343,61 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind watcher ! StopWatching(probe.ref) watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, 0, RealShortChannelId(1)) - probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(1))) + probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(1), tx2)) + watcher ! StopWatching(probe.ref) watcher ! WatchFundingSpent(probe.ref, tx1.txid, 0, Set.empty) probe.expectMsg(WatchFundingSpentTriggered(tx2)) }) } + test("unwatch external channel") { + withWatcher(f => { + import f._ + + val (priv, address) = createExternalAddress() + val tx1 = sendToAddress(address, 250_000 sat, probe) + val outputIndex1 = tx1.txOut.indexWhere(_.publicKeyScript == Script.write(Script.pay2wpkh(priv.publicKey))) + val spendingTx1 = createSpendP2WPKH(tx1, priv, priv.publicKey, 500 sat, 0, 0) + val tx2 = sendToAddress(address, 200_000 sat, probe) + val outputIndex2 = tx2.txOut.indexWhere(_.publicKeyScript == Script.write(Script.pay2wpkh(priv.publicKey))) + val spendingTx2 = createSpendP2WPKH(tx2, priv, priv.publicKey, 500 sat, 0, 0) + + watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, outputIndex1, RealShortChannelId(3)) + watcher ! WatchExternalChannelSpent(probe.ref, tx2.txid, outputIndex2, RealShortChannelId(5)) + watcher ! UnwatchExternalChannelSpent(tx1.txid, outputIndex1 + 1) // ignored + watcher ! UnwatchExternalChannelSpent(randomTxId(), outputIndex1) // ignored + + // When publishing the transaction, the watch triggers immediately. + bitcoinClient.publishTransaction(spendingTx1) + probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), spendingTx1)) + probe.expectNoMessage(100 millis) + + // If we unwatch the transaction, we will ignore when it's published. + watcher ! UnwatchExternalChannelSpent(tx2.txid, outputIndex2) + bitcoinClient.publishTransaction(spendingTx2) + probe.expectNoMessage(100 millis) + + // If we watch again, this will trigger immediately because the transaction is in the mempool. + watcher ! WatchExternalChannelSpent(probe.ref, tx2.txid, outputIndex2, RealShortChannelId(5)) + probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(5), spendingTx2)) + probe.expectNoMessage(100 millis) + + // We make the transactions confirm while we're not watching. + watcher ! UnwatchExternalChannelSpent(tx1.txid, outputIndex1) + watcher ! UnwatchExternalChannelSpent(tx2.txid, outputIndex2) + bitcoinClient.getBlockHeight().pipeTo(probe.ref) + val initialBlockHeight = probe.expectMsgType[BlockHeight] + system.eventStream.subscribe(probe.ref, classOf[CurrentBlockHeight]) + generateBlocks(1) + awaitCond(probe.expectMsgType[CurrentBlockHeight].blockHeight >= initialBlockHeight + 1) + + // If we watch again after confirmation, the watch instantly triggers. + watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, outputIndex1, RealShortChannelId(3)) + probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), spendingTx1)) + probe.expectNoMessage(100 millis) + }) + } + test("watch for unknown spent transactions") { withWatcher(f => { import f._ @@ -373,7 +424,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind probe.expectMsgAllOf(tx2.txid, WatchFundingSpentTriggered(tx2)) probe.expectNoMessage(100 millis) generateBlocks(1) - probe.expectMsg(WatchFundingSpentTriggered(tx2)) // tx2 is confirmed which triggers WatchEventSpent again + probe.expectMsg(WatchFundingSpentTriggered(tx2)) // tx2 is confirmed which triggers a WatchSpentTriggered event again generateBlocks(1) assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx == tx1) // tx1 now has 3 confirmations }) @@ -393,7 +444,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind // It may happen that transactions get included in a block without getting into our mempool first (e.g. a miner could // try to hide a revoked commit tx from the network until it gets confirmed, in an attempt to steal funds). // When we receive that block, we must send an event for every transaction inside it to analyze them and potentially - // trigger `WatchSpent` / `WatchSpentBasic`. + // trigger `WatchSpent`. generateBlocks(1) val txs = Seq( listener.fishForMessage() { case m: NewTransaction => Set(tx1.txid, tx2.txid).contains(m.tx.txid) }, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala index 5a959adeb4..4da80ab858 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala @@ -194,8 +194,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec { val receivedByF = listReceivedByAddress(finalAddressF) (receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC }, max = 30 seconds, interval = 1 second) - // we generate blocks to make txs confirm - generateBlocks(2, Some(minerAddress)) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12, Some(minerAddress)) // and we wait for the channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) @@ -235,8 +235,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec { val receivedByF = listReceivedByAddress(finalAddressF, sender) (receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC }, max = 30 seconds, interval = 1 second) - // we generate blocks to make txs confirm - generateBlocks(2, Some(minerAddress)) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12, Some(minerAddress)) // and we wait for the channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) @@ -288,8 +288,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec { val receivedByF = listReceivedByAddress(finalAddressF, sender) (receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC }, max = 30 seconds, interval = 1 second) - // we generate blocks to make txs confirm - generateBlocks(2, Some(minerAddress)) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12, Some(minerAddress)) // and we wait for the channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) @@ -344,8 +344,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec { val receivedByF = listReceivedByAddress(finalAddressF, sender) (receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC }, max = 30 seconds, interval = 1 second) - // we generate blocks to make tx confirm - generateBlocks(2, Some(minerAddress)) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12, Some(minerAddress)) // and we wait for the channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) @@ -587,13 +587,13 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec { bitcoinClient.getMempool().pipeTo(sender.ref) sender.expectMsgType[Seq[Transaction]].exists(_.txIn.head.outPoint.txid == fundingOutpoint.txid) }, max = 20 seconds, interval = 1 second) - generateBlocks(3) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12) awaitCond(stateListener.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) - bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt, limit = 10).pipeTo(sender.ref) + bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt, limit = 12).pipeTo(sender.ref) val closingTx = sender.expectMsgType[Transaction] assert(closingTx.txOut.map(_.publicKeyScript).toSet == Set(finalPubKeyScriptC, finalPubKeyScriptF)) - awaitAnnouncements(1) } @@ -641,8 +641,8 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec { val receivedByC = listReceivedByAddress(finalAddressC, sender) (receivedByC diff previouslyReceivedByC).size == 5 }, max = 30 seconds, interval = 1 second) - // we generate blocks to make txs confirm - generateBlocks(2) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12) // and we wait for C's channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitAnnouncements(1) @@ -766,7 +766,7 @@ abstract class AnchorChannelIntegrationSpec extends ChannelIntegrationSpec { }, max = 20 seconds, interval = 1 second) // get the claim-remote-output confirmed, then the channel can go to the CLOSED state - generateBlocks(2) + generateBlocks(12) awaitCond(stateListener.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitAnnouncements(1) } @@ -793,8 +793,8 @@ abstract class AnchorChannelIntegrationSpec extends ChannelIntegrationSpec { val receivedByC = listReceivedByAddress(finalAddressC, sender) (receivedByC diff previouslyReceivedByC).size == 6 }, max = 30 seconds, interval = 1 second) - // we generate blocks to make txs confirm - generateBlocks(2) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12) // and we wait for C's channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitAnnouncements(1) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala new file mode 100644 index 0000000000..25a044d4dc --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala @@ -0,0 +1,82 @@ +package fr.acinq.eclair.integration.basic.channel + +import fr.acinq.bitcoin.scalacompat.{ByteVector32, SatoshiLong} +import fr.acinq.eclair.channel.states.ChannelStateTestsTags +import fr.acinq.eclair.channel.{DATA_NORMAL, RES_SPLICE, RealScidStatus} +import fr.acinq.eclair.integration.basic.ThreeNodesIntegrationSpec +import fr.acinq.eclair.integration.basic.fixtures.MinimalNodeFixture.{getChannelData, getPeerChannels, spliceIn} +import fr.acinq.eclair.integration.basic.fixtures.composite.ThreeNodesFixture +import fr.acinq.eclair.{FeatureSupport, Features} +import org.scalatest.{Tag, TestData} +import scodec.bits.HexStringSyntax + +/** + * These test checks the gossip sent between nodes by the Router + */ +class GossipIntegrationSpec extends ThreeNodesIntegrationSpec { + + import fr.acinq.eclair.integration.basic.fixtures.MinimalNodeFixture.{connect, getRouterData, knownFundingTxs, nodeParamsFor, openChannel, watcherAutopilot} + + override def createFixture(testData: TestData): FixtureParam = { + // seeds have been chosen so that node ids start with 02aaaa for alice, 02bbbb for bob, etc. + val aliceParams = nodeParamsFor("alice", ByteVector32(hex"b4acd47335b25ab7b84b8c020997b12018592bb4631b868762154d77fa8b93a3")) + val aliceParams1 = aliceParams.copy( + features = aliceParams.features.add(Features.SplicePrototype, FeatureSupport.Optional) + ) + val bobParams = nodeParamsFor("bob", ByteVector32(hex"7620226fec887b0b2ebe76492e5a3fd3eb0e47cd3773263f6a81b59a704dc492")) + val bobParams1 = bobParams.copy( + features = bobParams.features.add(Features.SplicePrototype, FeatureSupport.Optional) + ) + val carolParams = nodeParamsFor("carol", ByteVector32(hex"ebd5a5d3abfb3ef73731eb3418d918f247445183180522674666db98a66411cc")) + ThreeNodesFixture(aliceParams1, bobParams1, carolParams, testData.name) + } + + override def cleanupFixture(fixture: ThreeNodesFixture): Unit = { + fixture.cleanup() + } + + test("send gossip when alice->bob channel is spliced", Tag(ChannelStateTestsTags.DualFunding)) { f => + import f._ + connect(alice, bob) + connect(bob, carol) + + // we put watchers on auto pilot to confirm funding txs + alice.watcher.setAutoPilot(watcherAutopilot(knownFundingTxs(alice, bob, carol))) + bob.watcher.setAutoPilot(watcherAutopilot(knownFundingTxs(alice, bob, carol))) + carol.watcher.setAutoPilot(watcherAutopilot(knownFundingTxs(alice, bob, carol))) + + val channelId_ab = openChannel(alice, bob, 100_000 sat).channelId + val channelId_bc = openChannel(bob, carol, 100_000 sat).channelId + val channels = getPeerChannels(alice, bob.nodeId) ++ getPeerChannels(bob, carol.nodeId) + assert(channels.map(_.data.channelId).toSet == Set(channelId_ab, channelId_bc)) + + // channels confirm deeply + eventually { + assert(getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].shortIds.real.isInstanceOf[RealScidStatus.Final]) + assert(getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].shortIds.real.isInstanceOf[RealScidStatus.Final]) + } + val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].shortIds.real.asInstanceOf[RealScidStatus.Final].realScid + val scid_bc = getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].shortIds.real.asInstanceOf[RealScidStatus.Final].realScid + + // splice in to increase capacity of alice->bob channel + spliceIn(alice, channelId_ab, 100_000 sat, None).asInstanceOf[RES_SPLICE].fundingTxId + + // verify that the new capacity and scid are correctly propagated + eventually { + val channelData_alice1 = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL] + val channelData_bob1 = getChannelData(bob, channelId_ab).asInstanceOf[DATA_NORMAL] + assert(channelData_alice1.commitments.latest.capacity == 200_000.sat) + assert(channelData_bob1.commitments.latest.capacity == 200_000.sat) + assert(channelData_alice1.shortIds.real.toOption.get == channelData_bob1.shortIds.real.toOption.get) + val scid_ab1 = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].shortIds.real.asInstanceOf[RealScidStatus.Final].realScid + val ann_splice = getRouterData(alice).channels(scid_ab1) + assert(ann_splice.capacity == 200_000.sat) + assert(getRouterData(bob).channels(scid_ab1) == ann_splice) + // TODO: after PR 2941, the slice ChannelAnnouncement will have a new scid and not be ignore by carol + assert(getRouterData(carol).spentChannels.exists(_._2 == ann_splice.shortChannelId)) + // assert(scid_ab != scid_ab1) + // assert(getRouterData(carol).channels(scid_ab1).capacity == 200_000.sat) + } + + } +} \ No newline at end of file diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala index abbfb5e3f2..5d3b554529 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala @@ -8,7 +8,7 @@ import akka.testkit.{TestActor, TestProbe} import com.softwaremill.quicklens.ModifyPimp import com.typesafe.config.ConfigFactory import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, Satoshi, SatoshiLong, Transaction, TxId} +import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, OutPoint, Satoshi, SatoshiLong, Transaction, TxId} import fr.acinq.eclair.ShortChannelId.txIndex import fr.acinq.eclair.blockchain.SingleKeyOnChainWallet import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher @@ -30,7 +30,7 @@ import fr.acinq.eclair.payment.relay.{ChannelRelayer, PostRestartHtlcCleaner, Re import fr.acinq.eclair.payment.send.PaymentInitiator import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol.IPAddress -import fr.acinq.eclair.{BlockHeight, MilliSatoshi, NodeParams, SubscriptionsComplete, TestBitcoinCoreClient, TestDatabases} +import fr.acinq.eclair.{BlockHeight, MilliSatoshi, MilliSatoshiLong, NodeParams, SubscriptionsComplete, TestBitcoinCoreClient, TestDatabases} import org.scalatest.concurrent.{Eventually, IntegrationPatience} import org.scalatest.{Assertions, EitherValues} @@ -185,6 +185,14 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat sender.expectMsgType[OpenChannelResponse.Created] } + def spliceIn(node1: MinimalNodeFixture, channelId: ByteVector32, amountIn: Satoshi, pushAmount_opt: Option[MilliSatoshi])(implicit system: ActorSystem): CommandResponse[CMD_SPLICE] = { + val sender = TestProbe("sender") + val spliceIn = SpliceIn(additionalLocalFunding = amountIn, pushAmount = pushAmount_opt.getOrElse(0.msat)) + val cmd = CMD_SPLICE(sender.ref.toTyped, spliceIn_opt = Some(spliceIn), spliceOut_opt = None, requestFunding_opt = None) + sender.send(node1.register, Register.Forward(sender.ref.toTyped, channelId, cmd)) + sender.expectMsgType[CommandResponse[CMD_SPLICE]] + } + def confirmChannel(node1: MinimalNodeFixture, node2: MinimalNodeFixture, channelId: ByteVector32, blockHeight: BlockHeight, txIndex: Int)(implicit system: ActorSystem): Option[RealScidStatus.Temporary] = { val fundingTx = getChannelData(node1, channelId) match { case d: DATA_WAIT_FOR_DUAL_FUNDING_SIGNED => d.signingSession.fundingTx.tx.buildUnsignedTx() @@ -316,6 +324,13 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat case None => timers.startSingleTimer(watch, 10 millis) } Behaviors.same + case watch: ZmqWatcher.WatchExternalChannelSpent => + knownFundingTxs().find(_.txIn.exists(_.outPoint == OutPoint(watch.txId, watch.outputIndex))) match { + case Some(nextFundingTx) => + watch.replyTo ! ZmqWatcher.WatchExternalChannelSpentTriggered(watch.shortChannelId, nextFundingTx) + case None => timers.startSingleTimer(watch, 10 millis) + } + Behaviors.same case _ => Behaviors.same } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala index 2f99ed3832..15cc094900 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala @@ -21,7 +21,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong} import fr.acinq.eclair.payment.Invoice import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge} import fr.acinq.eclair.router.Router.{ChannelDesc, HopRelayParams} -import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, ShortChannelId, TimestampSecond, randomKey} +import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, randomKey} import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper import org.scalatest.funsuite.AnyFunSuite @@ -87,6 +87,67 @@ class BalanceEstimateSpec extends AnyFunSuite { assert(balance2.capacities.isEmpty) } + test("update channels after a splice") { + val a = makeEdge(0, 200 sat) + val b = makeEdge(1, 100 sat) + val unknownDesc = ChannelDesc(ShortChannelId(3), randomKey().publicKey, randomKey().publicKey) + val balance = BalanceEstimate.empty(1 day) + .addEdge(a) + .addEdge(b) + .couldNotSend(140_000 msat, TimestampSecond.now()) + .couldSend(60_000 msat, TimestampSecond.now()) + + // a splice-in that increases channel capacity increases high but not low bounds + val balance1 = balance.updateEdge(a.desc, RealShortChannelId(5), 250 sat) + assert(balance1.maxCapacity == 250.sat) + assert(balance1.low == 60_000.msat) + assert(balance1.high == 190_000.msat) + + // a splice-in that increases channel capacity of smaller channel does not increase high more than max capacity + val balance2 = balance + .updateEdge(b.desc, RealShortChannelId(5), 300 sat) + assert(balance2.maxCapacity == 300.sat) + assert(balance2.low == 60_000.msat) + assert(balance2.high == 300_000.msat) + + // a splice-out that decreases channel capacity decreases low bounds but not high bounds + val balance3 = balance + .updateEdge(a.desc, RealShortChannelId(5), 150 sat) + assert(balance3.maxCapacity == 150.sat) + assert(balance3.low == 10_000.msat) + assert(balance3.high == 140_000.msat) + + // a splice-out that decreases channel capacity of largest channel does not decrease low bounds below zero + val balance4 = balance + .updateEdge(a.desc, RealShortChannelId(5), 50 sat) + assert(balance4.maxCapacity == 100.sat) + assert(balance4.low == 0.msat) + assert(balance4.high == 100_000.msat) + + // a splice-out that does not decrease the largest channel only decreases low bounds + val balance5 = balance + .updateEdge(b.desc, RealShortChannelId(5), 50 sat) + assert(balance5.maxCapacity == 200.sat) + assert(balance5.low == 10_000.msat) + assert(balance5.high == 140_000.msat) + + // a splice of an unknown channel that increases max capacity does not change the low/high bounds + val balance6 = balance + .updateEdge(unknownDesc, RealShortChannelId(5), 900 sat) + assert(isValid(balance6)) + assert(balance6.maxCapacity == 900.sat) + assert(balance6.low == 60_000.msat) + assert(balance6.high == 140_000.msat) + + // a splice of an unknown channel below max capacity does not change max capacity or low/high bounds + val balance7 = balance + .updateEdge(unknownDesc, RealShortChannelId(5), 150 sat) + assert(isValid(balance7)) + assert(balance7.maxCapacity == 200.sat) + assert(balance7.low == 60_000.msat) + assert(balance7.high == 140_000.msat) + } + test("update bounds based on what could then could not be sent (increasing amounts)") { val now = TimestampSecond.now() val balance = BalanceEstimate.empty(1 day) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala index f39b64c75e..3258db0f80 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala @@ -2,7 +2,9 @@ package fr.acinq.eclair.router import akka.actor.ActorSystem import akka.testkit.{TestFSMRef, TestProbe} -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchFundingDeeplyBuriedTriggered} +import fr.acinq.bitcoin.scalacompat.Transaction +import fr.acinq.eclair.blockchain.CurrentBlockHeight +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchFundingDeeplyBuriedTriggered, WatchTxConfirmed, WatchTxConfirmedTriggered} import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} import fr.acinq.eclair.channel.{CMD_CLOSE, DATA_NORMAL} import fr.acinq.eclair.io.Peer.PeerRoutingMessage @@ -21,7 +23,7 @@ import scala.concurrent.duration.DurationInt */ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase { - case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], rebroadcastListener: TestProbe, channels: SetupFixture, testTags: Set[String]) { + case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], channels: SetupFixture, testTags: Set[String]) { //@formatter:off /** there is only one channel here */ def privateChannel: PrivateChannel = router.stateData.privateChannels.values.head @@ -33,14 +35,12 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu override def withFixture(test: OneArgTest): Outcome = { val channels = init(tags = test.tags) - val rebroadcastListener = TestProbe() val router: TestFSMRef[Router.State, Router.Data, Router] = { // we use alice's actor system so we share the same event stream implicit val system: ActorSystem = channels.alice.underlying.system - system.eventStream.subscribe(rebroadcastListener.ref, classOf[Router.Rebroadcast]) TestFSMRef(new Router(channels.alice.underlyingActor.nodeParams, channels.alice.underlyingActor.blockchain, initialized = None)) } - withFixture(test.toNoArgTest(FixtureParam(router, rebroadcastListener, channels, test.tags))) + withFixture(test.toNoArgTest(FixtureParam(router, channels, test.tags))) } private def internalTest(f: FixtureParam): Unit = { @@ -175,9 +175,11 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu channels.bob2alice.expectMsgType[Shutdown] channels.bob2alice.forward(channels.alice) if (testTags.contains(ChannelStateTestsTags.ChannelsPublic)) { - // if the channel was public, the router asked the watcher to watch the funding tx and will be notified - val watchSpentBasic = channels.alice2blockchain.expectMsgType[WatchExternalChannelSpent] - watchSpentBasic.replyTo ! WatchExternalChannelSpentTriggered(watchSpentBasic.shortChannelId) + // if the channel was public, the router asked the watcher to watch the funding tx and will be notified when it confirms + val watchSpent = channels.alice2blockchain.expectMsgType[WatchExternalChannelSpent] + watchSpent.replyTo ! WatchExternalChannelSpentTriggered(watchSpent.shortChannelId, Transaction(0, Nil, Nil, 0)) + val watchConfirmed = channels.alice2blockchain.expectMsgType[WatchTxConfirmed] + watchConfirmed.replyTo ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, Transaction(0, Nil, Nil, 0)) } // router cleans up the channel awaitAssert { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/GraphSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/GraphSpec.scala index a5ce621282..8d286c33d1 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/GraphSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/GraphSpec.scala @@ -461,4 +461,25 @@ class GraphSpec extends AnyFunSuite { assert(MessagePath.dijkstraMessagePath(graph, a, f, Set.empty, boundaries, BlockHeight(793397), wr).isEmpty) } } + + test("a channel update only changes the scid and capacity of one edge") { + // A --> B has two edges with different short channel ids. + val edge = makeEdge(7, a, b, 1 msat, 1) + val g = makeTestGraph().addEdge(edge) + + val g1 = g.updateChannel(ChannelDesc(ShortChannelId(7), a, b), RealShortChannelId(10), 99 sat) + val edge1 = g1.getEdge(ChannelDesc(ShortChannelId(10), a, b)).get + assert(edge1.capacity == 99.sat) + assert(g1.getEdge(ChannelDesc(ShortChannelId(7), a, b)).isEmpty) + + // Only the scid and capacity of one edge changes. + assert(g1 == makeTestGraph().addEdge(edge1)) + + // Updates are symmetric. + assert(g1 == g.updateChannel(ChannelDesc(ShortChannelId(7), b, a), RealShortChannelId(10), 99 sat)) + + // Updates to an unknown channel do not change the graph. + assert(g == g.updateChannel(ChannelDesc(ShortChannelId(1), randomKey().publicKey, b), RealShortChannelId(10), 99 sat)) + } + } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index 2cc58b6ca9..0dc65b2fb1 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -22,7 +22,7 @@ import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps import akka.testkit.TestProbe import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write} -import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, SatoshiLong, Transaction, TxOut} +import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, OutPoint, Satoshi, SatoshiLong, Transaction, TxIn, TxOut} import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel.{AvailableBalanceChanged, CommitmentsSpec, LocalChannelUpdate} import fr.acinq.eclair.crypto.TransportHandler @@ -38,7 +38,8 @@ import fr.acinq.eclair.router.RouteCalculationSpec.{DEFAULT_AMOUNT_MSAT, DEFAULT import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire.protocol._ -import fr.acinq.eclair.{BlockHeight, CltvExpiryDelta, Features, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TestConstants, TimestampSecond, randomBytes32, randomKey} +import fr.acinq.eclair.{BlockHeight, CltvExpiryDelta, Features, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TestConstants, TimestampSecond, randomBytes32, randomKey} +import org.scalatest.Inside.inside import scodec.bits._ import scala.concurrent.duration._ @@ -318,12 +319,31 @@ class RouterSpec extends BaseRouterSpec { probe.expectMsg(PublicNode(node_b, 2, publicChannelCapacity * 2)) } + def fundingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = { + val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))) + val fundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) + fundingTx + } + + def spendingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = { + val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))) + val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(fundingTx(node1, node2, capacity), 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) + nextFundingTx + } + test("properly announce lost channels and nodes") { fixture => import fixture._ val eventListener = TestProbe() system.eventStream.subscribe(eventListener.ref, classOf[NetworkEvent]) - router ! WatchExternalChannelSpentTriggered(scid_ab) + val probe = TestProbe() + probe.send(router, GetRouterData) + val channels = probe.expectMsgType[Data].channels + + router ! WatchExternalChannelSpentTriggered(scid_ab, spendingTx(funding_a, funding_b)) + watcher.expectMsgType[WatchTxConfirmed] + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b)) + watcher.expectMsg(UnwatchExternalChannelSpent(channels(scid_ab).fundingTxId, ShortChannelId.outputIndex(scid_ab))) eventListener.expectMsg(ChannelLost(scid_ab)) assert(nodeParams.db.network.getChannel(scid_ab).isEmpty) // a doesn't have any channels, b still has one with c @@ -332,7 +352,10 @@ class RouterSpec extends BaseRouterSpec { assert(nodeParams.db.network.getNode(b).nonEmpty) eventListener.expectNoMessage(200 milliseconds) - router ! WatchExternalChannelSpentTriggered(scid_cd) + router ! WatchExternalChannelSpentTriggered(scid_cd, spendingTx(funding_c, funding_d)) + watcher.expectMsgType[WatchTxConfirmed] + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_c, funding_d)) + watcher.expectMsg(UnwatchExternalChannelSpent(channels(scid_cd).fundingTxId, ShortChannelId.outputIndex(scid_cd))) eventListener.expectMsg(ChannelLost(scid_cd)) assert(nodeParams.db.network.getChannel(scid_cd).isEmpty) // d doesn't have any channels, c still has one with b @@ -341,7 +364,10 @@ class RouterSpec extends BaseRouterSpec { assert(nodeParams.db.network.getNode(c).nonEmpty) eventListener.expectNoMessage(200 milliseconds) - router ! WatchExternalChannelSpentTriggered(scid_bc) + router ! WatchExternalChannelSpentTriggered(scid_bc, spendingTx(funding_b, funding_c)) + watcher.expectMsgType[WatchTxConfirmed] + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_b, funding_c)) + watcher.expectMsg(UnwatchExternalChannelSpent(channels(scid_bc).fundingTxId, ShortChannelId.outputIndex(scid_bc))) eventListener.expectMsg(ChannelLost(scid_bc)) assert(nodeParams.db.network.getChannel(scid_bc).isEmpty) // now b and c do not have any channels @@ -360,13 +386,18 @@ class RouterSpec extends BaseRouterSpec { val priv_funding_u = randomKey() val scid_au = RealShortChannelId(fixture.nodeParams.currentBlockHeight - 5000, 5, 0) val ann = channelAnnouncement(scid_au, priv_a, priv_u, priv_funding_a, priv_funding_u) - val fundingTx = Transaction(2, Nil, Seq(TxOut(500_000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, priv_funding_u.publicKey))))), 0) + val fundingTx_au = fundingTx(funding_a, priv_funding_u.publicKey, 500_000 sat) router ! PeerRoutingMessage(TestProbe().ref, remoteNodeId, ann) watcher.expectMsgType[ValidateRequest] - watcher.send(router, ValidateResult(ann, Right((fundingTx, UtxoStatus.Unspent)))) + watcher.send(router, ValidateResult(ann, Right((fundingTx_au, UtxoStatus.Unspent)))) + watcher.expectMsgType[WatchExternalChannelSpent] eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(ann, 500_000 sat, None, None) :: Nil)) awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).nonEmpty)) + val probe = TestProbe() + probe.send(router, GetRouterData) + val channels = probe.expectMsgType[Data].channels + // The channel is pruned: we keep it in the DB until it is spent. router ! TickPruneStaleChannels eventListener.expectMsg(ChannelLost(scid_au)) @@ -374,7 +405,10 @@ class RouterSpec extends BaseRouterSpec { awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).nonEmpty)) // The channel is closed, now we can remove it from the DB. - router ! WatchExternalChannelSpentTriggered(scid_au) + router ! WatchExternalChannelSpentTriggered(scid_au, spendingTx(funding_a, priv_funding_u.publicKey)) + assert(watcher.expectMsgType[WatchTxConfirmed].txId == spendingTx(funding_a, priv_funding_u.publicKey).txid) + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, priv_funding_u.publicKey)) + watcher.expectMsg(UnwatchExternalChannelSpent(channels(scid_au).fundingTxId, ShortChannelId.outputIndex(scid_au))) eventListener.expectMsg(ChannelLost(scid_au)) eventListener.expectMsg(NodeLost(priv_u.publicKey)) awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).isEmpty)) @@ -888,12 +922,12 @@ class RouterSpec extends BaseRouterSpec { val scid = RealShortChannelId(fixture.nodeParams.currentBlockHeight - 5000, 5, 0) val capacity = 1_000_000.sat val ann = channelAnnouncement(scid, priv_a, priv_c, priv_funding_a, priv_funding_c) - val fundingTx = Transaction(2, Nil, Seq(TxOut(capacity, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_c))))), 0) val peerConnection = TestProbe() + val fundingTx_ac = fundingTx(funding_a, funding_c, capacity) peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann)) watcher.expectMsgType[ValidateRequest] - watcher.send(router, ValidateResult(ann, Right((fundingTx, UtxoStatus.Unspent)))) + watcher.send(router, ValidateResult(ann, Right((fundingTx_ac, UtxoStatus.Unspent)))) peerConnection.expectMsg(GossipDecision.Accepted(ann)) probe.send(router, GetChannels) assert(probe.expectMsgType[Iterable[ChannelAnnouncement]].exists(_.shortChannelId == scid)) @@ -911,7 +945,7 @@ class RouterSpec extends BaseRouterSpec { val staleUpdate = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, scid, CltvExpiryDelta(72), 1 msat, 10 msat, 100, htlcMaximum, timestamp = staleTimestamp) peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, staleUpdate)) peerConnection.expectMsg(GossipDecision.Stale(staleUpdate)) - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, None, None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, None, None))) // We receive a non-stale channel update for one side of the channel. val update_ac_1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, scid, CltvExpiryDelta(72), 1 msat, 10 msat, 100, htlcMaximum, timestamp = TimestampSecond.now() - 3.days) @@ -919,9 +953,9 @@ class RouterSpec extends BaseRouterSpec { peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(update_ac_1)) peerConnection.expectNoMessage(100 millis) if (update_ac_1.channelFlags.isNode1) { - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_1), None, None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_1), None, None))) } else { - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, Some(update_ac_1), None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, Some(update_ac_1), None))) } probe.send(router, GetRouterData) val routerData1 = probe.expectMsgType[Data] @@ -936,9 +970,9 @@ class RouterSpec extends BaseRouterSpec { peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(update_ac_2)) peerConnection.expectNoMessage(100 millis) if (update_ac_2.channelFlags.isNode1) { - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_2), None, None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_2), None, None))) } else { - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, Some(update_ac_2), None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, Some(update_ac_2), None))) } probe.send(router, GetRouterData) val routerData2 = probe.expectMsgType[Data] @@ -955,9 +989,9 @@ class RouterSpec extends BaseRouterSpec { assert(routerData3.channels.contains(scid)) assert(!routerData3.prunedChannels.contains(scid)) if (update_ac_2.channelFlags.isNode1) { - assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_2), Some(update_ca), None))) + assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_2), Some(update_ca), None))) } else { - assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ca), Some(update_ac_2), None))) + assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ca), Some(update_ac_2), None))) } assert(routerData3.graphWithBalances.graph.containsEdge(ChannelDesc(update_ac_2, ann))) assert(routerData3.graphWithBalances.graph.containsEdge(ChannelDesc(update_ca, ann))) @@ -973,19 +1007,19 @@ class RouterSpec extends BaseRouterSpec { { // When the local channel comes back online, it will send a LocalChannelUpdate to the router. - val balances = Set[Option[MilliSatoshi]](Some(10000 msat), Some(15000 msat)) - val commitments = CommitmentsSpec.makeCommitments(10000 msat, 15000 msat, a, b, announceChannel = true) + val commitments = CommitmentsSpec.makeCommitments(channel_ab.capacity - 50_000_000.msat, 50_000_000 msat, a, b, announceChannel = true) + val balances = Set(commitments.availableBalanceForSend, commitments.availableBalanceForReceive) sender.send(router, LocalChannelUpdate(sender.ref, null, scids_ab, b, Some(chan_ab), update_ab, commitments)) sender.send(router, GetRoutingState) - val channel_ab = sender.expectMsgType[RoutingState].channels.find(_.ann == chan_ab).get - assert(Set(channel_ab.meta_opt.map(_.balance1), channel_ab.meta_opt.map(_.balance2)) == balances) + val channel_ab1 = sender.expectMsgType[RoutingState].channels.find(_.ann == chan_ab).get + assert(Set(channel_ab1.meta_opt.map(_.balance1), channel_ab1.meta_opt.map(_.balance2)).flatten == balances) // And the graph should be updated too. sender.send(router, Router.GetRouterData) val g = sender.expectMsgType[Data].graphWithBalances.graph val edge_ab = g.getEdge(ChannelDesc(scid_ab, a, b)).get val edge_ba = g.getEdge(ChannelDesc(scid_ab, b, a)).get - assert(edge_ab.capacity == channel_ab.capacity && edge_ba.capacity == channel_ab.capacity) - assert(balances.contains(edge_ab.balance_opt)) + assert(edge_ab.capacity == channel_ab1.capacity && edge_ba.capacity == channel_ab1.capacity) + assert(edge_ab.balance_opt.contains(commitments.availableBalanceForSend)) assert(edge_ba.balance_opt.isEmpty) } @@ -996,53 +1030,54 @@ class RouterSpec extends BaseRouterSpec { assert(sender.expectMsgType[Data].rebroadcast.updates.isEmpty) // Then we update the balance without changing the contents of the channel update; the graph should still be updated. - val balances = Set[Option[MilliSatoshi]](Some(11000 msat), Some(14000 msat)) - val commitments = CommitmentsSpec.makeCommitments(11000 msat, 14000 msat, a, b, announceChannel = true) + val commitments = CommitmentsSpec.makeCommitments(channel_ab.capacity - 40_000_000.msat, 40_000_000 msat, a, b, announceChannel = true) + val balances = Set(commitments.availableBalanceForSend, commitments.availableBalanceForReceive) sender.send(router, LocalChannelUpdate(sender.ref, null, scids_ab, b, Some(chan_ab), update_ab, commitments)) sender.send(router, GetRoutingState) - val channel_ab = sender.expectMsgType[RoutingState].channels.find(_.ann == chan_ab).get - assert(Set(channel_ab.meta_opt.map(_.balance1), channel_ab.meta_opt.map(_.balance2)) == balances) + val channel_ab1 = sender.expectMsgType[RoutingState].channels.find(_.ann == chan_ab).get + assert(Set(channel_ab1.meta_opt.map(_.balance1), channel_ab1.meta_opt.map(_.balance2)).flatten == balances) // And the graph should be updated too. sender.send(router, Router.GetRouterData) val g = sender.expectMsgType[Data].graphWithBalances.graph val edge_ab = g.getEdge(ChannelDesc(scid_ab, a, b)).get val edge_ba = g.getEdge(ChannelDesc(scid_ab, b, a)).get - assert(edge_ab.capacity == channel_ab.capacity && edge_ba.capacity == channel_ab.capacity) - assert(balances.contains(edge_ab.balance_opt)) + assert(edge_ab.capacity == channel_ab1.capacity && edge_ba.capacity == channel_ab1.capacity) + assert(edge_ab.balance_opt.contains(commitments.availableBalanceForSend)) assert(edge_ba.balance_opt.isEmpty) } { // When HTLCs are relayed through the channel, balance changes are sent to the router. - val balances = Set[Option[MilliSatoshi]](Some(12000 msat), Some(13000 msat)) - val commitments = CommitmentsSpec.makeCommitments(12000 msat, 13000 msat, a, b, announceChannel = true) + val commitments = CommitmentsSpec.makeCommitments(channel_ab.capacity - 55_000_000.msat, 55_000_000 msat, a, b, announceChannel = true) + val balances = Set(commitments.availableBalanceForSend, commitments.availableBalanceForReceive) sender.send(router, AvailableBalanceChanged(sender.ref, null, scids_ab, commitments)) sender.send(router, GetRoutingState) - val channel_ab = sender.expectMsgType[RoutingState].channels.find(_.ann == chan_ab).get - assert(Set(channel_ab.meta_opt.map(_.balance1), channel_ab.meta_opt.map(_.balance2)) == balances) + val channel_ab1 = sender.expectMsgType[RoutingState].channels.find(_.ann == chan_ab).get + assert(Set(channel_ab1.meta_opt.map(_.balance1), channel_ab1.meta_opt.map(_.balance2)).flatten == balances) // And the graph should be updated too. sender.send(router, Router.GetRouterData) val g = sender.expectMsgType[Data].graphWithBalances.graph val edge_ab = g.getEdge(ChannelDesc(scid_ab, a, b)).get val edge_ba = g.getEdge(ChannelDesc(scid_ab, b, a)).get - assert(edge_ab.capacity == channel_ab.capacity && edge_ba.capacity == channel_ab.capacity) - assert(balances.contains(edge_ab.balance_opt)) + assert(edge_ab.capacity == channel_ab1.capacity && edge_ba.capacity == channel_ab1.capacity) + assert(edge_ab.balance_opt.contains(commitments.availableBalanceForSend)) assert(edge_ba.balance_opt.isEmpty) } { // Private channels should also update the graph when HTLCs are relayed through them. - val balances = Set(4620000 msat, 32620000 msat) - val commitments = CommitmentsSpec.makeCommitments(33000000 msat, 5000000 msat, a, g, announceChannel = false) + sender.send(router, GetRouterData) + val channel_ag = sender.expectMsgType[Data].privateChannels(channelId_ag_private) + val commitments = CommitmentsSpec.makeCommitments(channel_ag.meta.balance1 + 10_000_000.msat, channel_ag.meta.balance2 - 10_000_000.msat, a, g, announceChannel = false) sender.send(router, AvailableBalanceChanged(sender.ref, channelId_ag_private, scids_ab, commitments)) sender.send(router, Router.GetRouterData) val data = sender.expectMsgType[Data] - val channel_ag = data.privateChannels(channelId_ag_private) - assert(Set(channel_ag.meta.balance1, channel_ag.meta.balance2) == balances) + val channel_ag1 = data.privateChannels(channelId_ag_private) + assert(Set(channel_ag1.meta.balance1, channel_ag1.meta.balance2) == Set(commitments.availableBalanceForSend, commitments.availableBalanceForReceive)) // And the graph should be updated too. val edge_ag = data.graphWithBalances.graph.getEdge(ChannelDesc(alias_ag_private, a, g)).get - assert(edge_ag.capacity == channel_ag.capacity) - assert(edge_ag.balance_opt.contains(balances.last)) + assert(edge_ag.capacity == channel_ag1.capacity) + assert(edge_ag.balance_opt.contains(commitments.availableBalanceForSend)) } } @@ -1084,4 +1119,69 @@ class RouterSpec extends BaseRouterSpec { } } + test("update an existing channel after a splice") { fixture => + import fixture._ + + val eventListener = TestProbe() + system.eventStream.subscribe(eventListener.ref, classOf[NetworkEvent]) + val peerConnection = TestProbe() + + // Channel ab is spent by a splice tx. + val capacity1 = publicChannelCapacity - 100_000.sat + val spliceTx1 = spendingTx(funding_a, funding_b, capacity1) + router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx1) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx1.txid) + assert(w.minDepth == 12) + } + eventListener.expectNoMessage(100 millis) + + // Channel ab is spent and confirmed by an RBF of splice tx. + val capacity2 = publicChannelCapacity - 100_000.sat - 1000.sat + val spliceTx2 = spendingTx(funding_a, funding_b, capacity2) + router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx2) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx2.txid) + assert(w.minDepth == 12) + } + eventListener.expectNoMessage(100 millis) + + // The splice of channel ab is announced. + val spliceScid = RealShortChannelId(BlockHeight(450000), 1, 0) + val spliceAnn = channelAnnouncement(spliceScid, priv_a, priv_b, priv_funding_a, priv_funding_b) + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, spliceAnn)) + peerConnection.expectNoMessage(100 millis) + assert(watcher.expectMsgType[ValidateRequest].ann == spliceAnn) + watcher.send(router, ValidateResult(spliceAnn, Right(spliceTx2, UtxoStatus.Unspent))) + peerConnection.expectMsg(TransportHandler.ReadAck(spliceAnn)) + peerConnection.expectMsg(GossipDecision.Accepted(spliceAnn)) + assert(peerConnection.sender() == router) + + // And the graph should be updated too. + val sender = TestProbe() + sender.send(router, Router.GetRouterData) + val g = sender.expectMsgType[Data].graphWithBalances.graph + val edge_ab = g.getEdge(ChannelDesc(spliceScid, a, b)).get + val edge_ba = g.getEdge(ChannelDesc(spliceScid, b, a)).get + assert(g.getEdge(ChannelDesc(scid_ab, a, b)).isEmpty) + assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty) + assert(edge_ab.capacity == capacity2 && edge_ba.capacity == capacity2) + + // The channel update for the splice is confirmed and the channel is not removed. + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b)) + eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, capacity2, None, None) :: Nil)) + eventListener.expectMsg(ChannelLost(scid_ab)) + peerConnection.expectNoMessage(100 millis) + eventListener.expectNoMessage(100 millis) + + // The router no longer tracks the parent scid. + val probe = TestProbe() + awaitAssert({ + probe.send(router, GetRouterData) + val routerData = probe.expectMsgType[Data] + assert(routerData.spentChannels.isEmpty) + assert(!routerData.channels.contains(scid_ab)) + }) + } + }