Skip to content

Commit 5fbb4a9

Browse files
committed
Delay considering a channel closed when seeing an on-chain spend
Issue #2437 When external channel is spent, add it to the spentChannels list instead of immediately removing it from the graph. Remove spent channels after 12 blocks. When a newly added channel is validated, if it spends the shared output of a recently spent channel then it is a splice. A splice updates the graph edges to preserve balance estimate information in the graph.
1 parent 96d0c9a commit 5fbb4a9

File tree

8 files changed

+226
-12
lines changed

8 files changed

+226
-12
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
2121
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
2222
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route}
2323
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
24-
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
24+
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
2525

2626
import scala.concurrent.duration.{DurationInt, FiniteDuration}
2727

@@ -194,6 +194,18 @@ case class BalanceEstimate private(low: MilliSatoshi,
194194
)
195195
}
196196

197+
def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalanceEstimate = {
198+
val newCapacities = capacities - desc.shortChannelId + (newShortChannelId -> newCapacity)
199+
val capacityDelta = (newCapacity - capacities.getOrElse(desc.shortChannelId, newCapacity)).toMilliSatoshi
200+
copy(
201+
// a capacity decrease will decrease the low bound, but not below 0
202+
low = (low + capacityDelta.min(0 msat)).max(0 msat),
203+
// a capacity increase will increase the high bound, but not above the capacity of the largest channel
204+
high = (high + capacityDelta.max(0 msat)).min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi),
205+
capacities = newCapacities
206+
)
207+
}
208+
197209
/**
198210
* Estimate the probability that we can successfully send `amount` through the channel
199211
*
@@ -256,6 +268,14 @@ case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstima
256268
defaultHalfLife
257269
)
258270

271+
def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalancesEstimates = BalancesEstimates(
272+
balances.updatedWith((desc.a, desc.b)) {
273+
case None => None
274+
case Some(balance) => Some(balance.updateEdge(desc, newShortChannelId, newCapacity))
275+
},
276+
defaultHalfLife
277+
)
278+
259279
def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = {
260280
get(hop.nodeId, hop.nextNodeId).foreach { balance =>
261281
val estimatedProbability = balance.canSend(amount)
@@ -298,6 +318,13 @@ case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances:
298318
descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge).removeEdge(edge.reversed)),
299319
)
300320

321+
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): GraphWithBalanceEstimates = {
322+
GraphWithBalanceEstimates(
323+
graph.updateChannel(desc, newShortChannelId, newCapacity),
324+
balances.updateEdge(desc, newShortChannelId, newCapacity).updateEdge(desc.reversed, newShortChannelId, newCapacity)
325+
)
326+
}
327+
301328
def routeCouldRelay(route: Route): GraphWithBalanceEstimates = {
302329
val (balances1, _) = route.hops.foldRight((balances, route.amount)) {
303330
case (hop, (balances, amount)) =>

eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,26 @@ object Graph {
678678
descList.foldLeft(this)((acc, edge) => acc.removeChannel(edge))
679679
}
680680

681+
/**
682+
* Update the shortChannelId and capacity of edges corresponding to the given channel-desc,
683+
* both edges (corresponding to both directions) are updated.
684+
*
685+
* @param desc the channel description for the channel to update
686+
* @param newShortChannelId the new shortChannelId for this channel
687+
* @param newCapacity the new capacity of the channel
688+
* @return a new graph with updated vertexes
689+
*/
690+
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): DirectedGraph = {
691+
val newDesc = desc.copy(shortChannelId = newShortChannelId)
692+
val updatedVertices =
693+
vertices
694+
.updatedWith(desc.b)(_.map(vertexB => vertexB.copy(incomingEdges = vertexB.incomingEdges - desc +
695+
(newDesc -> vertexB.incomingEdges(desc).copy(desc = newDesc, capacity = newCapacity)))))
696+
.updatedWith(desc.a)(_.map(vertexA => vertexA.copy(incomingEdges = vertexA.incomingEdges - desc.reversed +
697+
(newDesc.reversed -> vertexA.incomingEdges(desc.reversed).copy(desc = newDesc.reversed, capacity = newCapacity)))))
698+
DirectedGraph(updatedVertices)
699+
}
700+
681701
/**
682702
* @return For edges to be considered equal they must have the same in/out vertices AND same shortChannelId
683703
*/

eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2525
import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId}
2626
import fr.acinq.eclair.Logs.LogCategory
2727
import fr.acinq.eclair._
28+
import fr.acinq.eclair.blockchain.CurrentBlockHeight
2829
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
2930
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
3031
import fr.acinq.eclair.channel._
@@ -64,6 +65,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
6465
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
6566
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
6667
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
68+
context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight])
6769
context.system.eventStream.publish(SubscriptionsComplete(this.getClass))
6870

6971
startTimerWithFixedDelay(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval)
@@ -113,7 +115,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
113115
scid2PrivateChannels = Map.empty,
114116
excludedChannels = Map.empty,
115117
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
116-
sync = Map.empty)
118+
sync = Map.empty,
119+
spentChannels = Map.empty)
117120
startWith(NORMAL, data)
118121
}
119122

@@ -260,6 +263,18 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
260263
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)
261264

262265
case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
266+
log.info("funding tx of channelId={} has been spent - delay removing it from the graph for 12 blocks", shortChannelId)
267+
stay() using d.copy(spentChannels = d.spentChannels + (shortChannelId -> nodeParams.currentBlockHeight))
268+
269+
case Event(c: CurrentBlockHeight, d) =>
270+
val spentChannels1 = d.spentChannels.filter {
271+
// spent channels may be confirmed as a splice; wait 12 blocks before removing them from the graph
272+
case (_, blockHeight) if blockHeight >= c.blockHeight + 12 => true
273+
case (shortChannelId, _) => self ! HandleChannelSpent(shortChannelId); false
274+
}
275+
stay() using d.copy(spentChannels = spentChannels1)
276+
277+
case Event(HandleChannelSpent(shortChannelId), d: Data) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
263278
stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
264279

265280
case Event(n: NodeAnnouncement, d: Data) =>
@@ -757,7 +772,8 @@ object Router {
757772
scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels
758773
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
759774
graphWithBalances: GraphWithBalanceEstimates,
760-
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
775+
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
776+
spentChannels: Map[RealShortChannelId, BlockHeight],
761777
) {
762778
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
763779
// let's assume this is a real scid
@@ -797,4 +813,8 @@ object Router {
797813

798814
/** We have tried to relay this amount from this channel and it failed. */
799815
case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop)
816+
817+
/** Funding Tx of the channel id has been spent and not updated with a splice within 12 blocks. */
818+
private case class HandleChannelSpent(shortChannelId: RealShortChannelId)
819+
800820
}

eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,18 @@ object Validation {
111111
None
112112
} else {
113113
log.debug("validation successful for shortChannelId={}", c.shortChannelId)
114+
val sharedInputTxId_opt = tx.txIn.find(_.signatureScript == fundingOutputScript).map(_.outPoint.txid)
114115
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
115116
val capacity = tx.txOut(outputIndex).amount
116-
Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
117+
// if a channel spends the shared output of a recently spent channel, then it is a splice
118+
sharedInputTxId_opt match {
119+
case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
120+
case Some(sharedInputTxId) =>
121+
d0.spentChannels.find(spent => d0.channels.get(spent._1).exists(_.fundingTxId == sharedInputTxId)) match {
122+
case Some((parentScid, _)) => Some(splicePublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, d0.channels(parentScid)))
123+
case None => log.error("channel shortChannelId={} is a splice, but not matching channel found!", c.shortChannelId); None
124+
}
125+
}
117126
}
118127
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
119128
if (fundingTxStatus.spendingTxConfirmed) {
@@ -156,6 +165,37 @@ object Validation {
156165
}
157166
}
158167

168+
private def splicePublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
169+
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
170+
val fundingOutputIndex = outputIndex(ann.shortChannelId)
171+
watcher ! WatchExternalChannelSpent(ctx.self, fundingTxId, fundingOutputIndex, ann.shortChannelId)
172+
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil))
173+
nodeParams.db.network.addChannel(ann, fundingTxId, capacity)
174+
nodeParams.db.network.removeChannel(parentChannel.shortChannelId)
175+
val pubChan = PublicChannel(
176+
ann = ann,
177+
fundingTxId = fundingTxId,
178+
capacity = capacity,
179+
update_1_opt = parentChannel.update_1_opt,
180+
update_2_opt = parentChannel.update_2_opt,
181+
meta_opt = parentChannel.meta_opt
182+
)
183+
log.debug("replacing parent channel scid={} with splice channel scid={}; splice channel={}", parentChannel.shortChannelId, ann.shortChannelId, pubChan)
184+
// we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid
185+
log.debug("updating the graph for shortChannelId={}", pubChan.shortChannelId)
186+
val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity)
187+
d.copy(
188+
// we also add the splice scid -> channelId and remove the parent scid -> channelId mappings
189+
channels = d.channels + (pubChan.shortChannelId -> pubChan) - parentChannel.shortChannelId,
190+
// we also add the newly validated channels to the rebroadcast queue
191+
rebroadcast = d.rebroadcast.copy(
192+
// we rebroadcast the splice channel to our peers
193+
channels = d.rebroadcast.channels + (pubChan.ann -> d.awaiting.getOrElse(pubChan.ann, if (pubChan.nodeId1 == nodeParams.nodeId || pubChan.nodeId2 == nodeParams.nodeId) Seq(LocalGossip) else Nil).toSet),
194+
),
195+
graphWithBalances = graph1
196+
)
197+
}
198+
159199
private def addPublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, privChan_opt: Option[PrivateChannel])(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
160200
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
161201
val fundingOutputIndex = outputIndex(ann.shortChannelId)

eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
376376
shortIds4.real.toOption.get.toLong -> channelId4,
377377
)
378378
val g = GraphWithBalanceEstimates(DirectedGraph(Nil), 1 hour)
379-
val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty)
379+
val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty, Map.empty)
380380

381381
eclair.findRoute(c, 250_000 msat, None)
382382
val routeRequest1 = router.expectMsgType[RouteRequest]

eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
2121
import fr.acinq.eclair.payment.Invoice
2222
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
2323
import fr.acinq.eclair.router.Router.{ChannelDesc, HopRelayParams}
24-
import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, ShortChannelId, TimestampSecond, randomKey}
24+
import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, randomKey}
2525
import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper
2626
import org.scalatest.funsuite.AnyFunSuite
2727

@@ -84,6 +84,68 @@ class BalanceEstimateSpec extends AnyFunSuite {
8484
assert(balance2.capacities.isEmpty)
8585
}
8686

87+
test("update channels after a splice") {
88+
val a = makeEdge(0, 200 sat)
89+
val b = makeEdge(1, 100 sat)
90+
val unknownDesc = ChannelDesc(ShortChannelId(3), randomKey().publicKey, randomKey().publicKey)
91+
val balance = BalanceEstimate.empty(1 day)
92+
.addEdge(a)
93+
.addEdge(b)
94+
.couldNotSend(140_000 msat, TimestampSecond.now())
95+
.couldSend(60_000 msat, TimestampSecond.now())
96+
97+
// a splice-in that increases channel capacity increases high but not low bounds
98+
val balance1 = balance
99+
.updateEdge(a.desc, RealShortChannelId(5), 250 sat)
100+
assert(balance1.maxCapacity == 250.sat)
101+
assert(balance1.low == 60_000.msat)
102+
assert(balance1.high == 190_000.msat)
103+
104+
// a splice-in that increases channel capacity of smaller channel does not increase high more than max capacity
105+
val balance2 = balance
106+
.updateEdge(b.desc, RealShortChannelId(5), 300 sat)
107+
assert(balance2.maxCapacity == 300.sat)
108+
assert(balance2.low == 60_000.msat)
109+
assert(balance2.high == 300_000.msat)
110+
111+
// a splice-out that decreases channel capacity decreases low bounds but not high bounds
112+
val balance3 = balance
113+
.updateEdge(a.desc, RealShortChannelId(5), 150 sat)
114+
assert(balance3.maxCapacity == 150.sat)
115+
assert(balance3.low == 10_000.msat)
116+
assert(balance3.high == 140_000.msat)
117+
118+
// a splice-out that decreases channel capacity of largest channel does not decrease low bounds below zero
119+
val balance4 = balance
120+
.updateEdge(a.desc, RealShortChannelId(5), 50 sat)
121+
assert(balance4.maxCapacity == 100.sat)
122+
assert(balance4.low == 0.msat)
123+
assert(balance4.high == 100_000.msat)
124+
125+
// a splice-out that does not decrease the largest channel only decreases low bounds
126+
val balance5 = balance
127+
.updateEdge(b.desc, RealShortChannelId(5), 50 sat)
128+
assert(balance5.maxCapacity == 200.sat)
129+
assert(balance5.low == 10_000.msat)
130+
assert(balance5.high == 140_000.msat)
131+
132+
// a splice of an unknown channel that increases max capacity does not change the low/high bounds
133+
val balance6 = balance
134+
.updateEdge(unknownDesc, RealShortChannelId(5), 900 sat)
135+
assert(isValid(balance6))
136+
assert(balance6.maxCapacity == 900.sat)
137+
assert(balance6.low == 60_000.msat)
138+
assert(balance6.high == 140_000.msat)
139+
140+
// a splice of an unknown channel below max capacity does not change max capacity or low/high bounds
141+
val balance7 = balance
142+
.updateEdge(unknownDesc, RealShortChannelId(5), 150 sat)
143+
assert(isValid(balance7))
144+
assert(balance7.maxCapacity == 200.sat)
145+
assert(balance7.low == 60_000.msat)
146+
assert(balance7.high == 140_000.msat)
147+
}
148+
87149
test("update bounds based on what could then could not be sent (increasing amounts)") {
88150
val now = TimestampSecond.now()
89151
val balance = BalanceEstimate.empty(1 day)

eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package fr.acinq.eclair.router
22

33
import akka.actor.ActorSystem
44
import akka.testkit.{TestFSMRef, TestProbe}
5+
import fr.acinq.eclair.blockchain.CurrentBlockHeight
56
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchFundingDeeplyBuriedTriggered}
67
import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags}
78
import fr.acinq.eclair.channel.{CMD_CLOSE, DATA_NORMAL}
@@ -21,7 +22,7 @@ import scala.concurrent.duration.DurationInt
2122
*/
2223
class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {
2324

24-
case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], rebroadcastListener: TestProbe, channels: SetupFixture, testTags: Set[String]) {
25+
case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], channels: SetupFixture, testTags: Set[String]) {
2526
//@formatter:off
2627
/** there is only one channel here */
2728
def privateChannel: PrivateChannel = router.stateData.privateChannels.values.head
@@ -33,14 +34,12 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu
3334

3435
override def withFixture(test: OneArgTest): Outcome = {
3536
val channels = init(tags = test.tags)
36-
val rebroadcastListener = TestProbe()
3737
val router: TestFSMRef[Router.State, Router.Data, Router] = {
3838
// we use alice's actor system so we share the same event stream
3939
implicit val system: ActorSystem = channels.alice.underlying.system
40-
system.eventStream.subscribe(rebroadcastListener.ref, classOf[Router.Rebroadcast])
4140
TestFSMRef(new Router(channels.alice.underlyingActor.nodeParams, channels.alice.underlyingActor.blockchain, initialized = None))
4241
}
43-
withFixture(test.toNoArgTest(FixtureParam(router, rebroadcastListener, channels, test.tags)))
42+
withFixture(test.toNoArgTest(FixtureParam(router, channels, test.tags)))
4443
}
4544

4645
private def internalTest(f: FixtureParam): Unit = {
@@ -178,6 +177,7 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu
178177
// if the channel was public, the router asked the watcher to watch the funding tx and will be notified
179178
val watchSpentBasic = channels.alice2blockchain.expectMsgType[WatchExternalChannelSpent]
180179
watchSpentBasic.replyTo ! WatchExternalChannelSpentTriggered(watchSpentBasic.shortChannelId)
180+
channels.alice.underlying.system.eventStream.publish(CurrentBlockHeight(BlockHeight(400012)))
181181
}
182182
// router cleans up the channel
183183
awaitAssert {

0 commit comments

Comments
 (0)