Skip to content

Commit d1109fa

Browse files
committed
Delay considering a channel closed when seeing an on-chain spend
Issue #2437 When external channel is spent, add it to the spentChannels list instead of immediately removing it from the graph. Remove spent channels after 12 blocks. When a newly added channel is validated, if it spends the shared output of a recently spent channel then it is a splice. A splice updates the graph edges to preserve balance estimate information in the graph. Channels are immediately removed from the db so they will not added to the graph if a restart occurs before 12-blocks elapse.
1 parent 96d0c9a commit d1109fa

File tree

9 files changed

+243
-15
lines changed

9 files changed

+243
-15
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
2121
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
2222
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route}
2323
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
24-
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
24+
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
2525

2626
import scala.concurrent.duration.{DurationInt, FiniteDuration}
2727

@@ -194,6 +194,18 @@ case class BalanceEstimate private(low: MilliSatoshi,
194194
)
195195
}
196196

197+
def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalanceEstimate = {
198+
val newCapacities = capacities - desc.shortChannelId + (newShortChannelId -> newCapacity)
199+
val capacityDelta = (newCapacity - capacities.getOrElse(desc.shortChannelId, newCapacity)).toMilliSatoshi
200+
copy(
201+
// a capacity decrease will decrease the low bound, but not below 0
202+
low = (low + capacityDelta.min(0 msat)).max(0 msat),
203+
// a capacity increase will increase the high bound, but not above the capacity of the largest channel
204+
high = (high + capacityDelta.max(0 msat)).min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi),
205+
capacities = newCapacities
206+
)
207+
}
208+
197209
/**
198210
* Estimate the probability that we can successfully send `amount` through the channel
199211
*
@@ -256,6 +268,14 @@ case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstima
256268
defaultHalfLife
257269
)
258270

271+
def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalancesEstimates = BalancesEstimates(
272+
balances.updatedWith((desc.a, desc.b)) {
273+
case None => None
274+
case Some(balance) => Some(balance.updateEdge(desc, newShortChannelId, newCapacity))
275+
},
276+
defaultHalfLife
277+
)
278+
259279
def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = {
260280
get(hop.nodeId, hop.nextNodeId).foreach { balance =>
261281
val estimatedProbability = balance.canSend(amount)
@@ -298,6 +318,13 @@ case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances:
298318
descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge).removeEdge(edge.reversed)),
299319
)
300320

321+
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): GraphWithBalanceEstimates = {
322+
GraphWithBalanceEstimates(
323+
graph.updateChannel(desc, newShortChannelId, newCapacity),
324+
balances.updateEdge(desc, newShortChannelId, newCapacity).updateEdge(desc.reversed, newShortChannelId, newCapacity)
325+
)
326+
}
327+
301328
def routeCouldRelay(route: Route): GraphWithBalanceEstimates = {
302329
val (balances1, _) = route.hops.foldRight((balances, route.amount)) {
303330
case (hop, (balances, amount)) =>

eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,26 @@ object Graph {
678678
descList.foldLeft(this)((acc, edge) => acc.removeChannel(edge))
679679
}
680680

681+
/**
682+
* Update the shortChannelId and capacity of edges corresponding to the given channel-desc,
683+
* both edges (corresponding to both directions) are updated.
684+
*
685+
* @param desc the channel description for the channel to update
686+
* @param newShortChannelId the new shortChannelId for this channel
687+
* @param newCapacity the new capacity of the channel
688+
* @return a new graph with updated vertexes
689+
*/
690+
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): DirectedGraph = {
691+
val newDesc = desc.copy(shortChannelId = newShortChannelId)
692+
val updatedVertices =
693+
vertices
694+
.updatedWith(desc.b)(_.map(vertexB => vertexB.copy(incomingEdges = vertexB.incomingEdges - desc +
695+
(newDesc -> vertexB.incomingEdges(desc).copy(desc = newDesc, capacity = newCapacity)))))
696+
.updatedWith(desc.a)(_.map(vertexA => vertexA.copy(incomingEdges = vertexA.incomingEdges - desc.reversed +
697+
(newDesc.reversed -> vertexA.incomingEdges(desc.reversed).copy(desc = newDesc.reversed, capacity = newCapacity)))))
698+
DirectedGraph(updatedVertices)
699+
}
700+
681701
/**
682702
* @return For edges to be considered equal they must have the same in/out vertices AND same shortChannelId
683703
*/

eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2525
import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId}
2626
import fr.acinq.eclair.Logs.LogCategory
2727
import fr.acinq.eclair._
28+
import fr.acinq.eclair.blockchain.CurrentBlockHeight
2829
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
2930
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
3031
import fr.acinq.eclair.channel._
@@ -64,6 +65,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
6465
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
6566
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
6667
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
68+
context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight])
6769
context.system.eventStream.publish(SubscriptionsComplete(this.getClass))
6870

6971
startTimerWithFixedDelay(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval)
@@ -74,7 +76,10 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
7476
{
7577
log.info("loading network announcements from db...")
7678
val (pruned, channels) = db.listChannels().partition { case (_, pc) => pc.isStale(nodeParams.currentBlockHeight) }
77-
val nodes = db.listNodes()
79+
val nodeIds = (pruned.values ++ channels.values).flatMap(pc => pc.ann.nodeId1 :: pc.ann.nodeId2 :: Nil).toSet
80+
val (isolatedNodes, nodes) = db.listNodes().partition(n => !nodeIds.contains(n.nodeId))
81+
log.info("removed {} isolated nodes from db", isolatedNodes.size)
82+
isolatedNodes.foreach(n => db.removeNode(n.nodeId))
7883
Metrics.Nodes.withoutTags().update(nodes.size)
7984
Metrics.Channels.withoutTags().update(channels.size)
8085
log.info("loaded from db: channels={} nodes={}", channels.size, nodes.size)
@@ -113,7 +118,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
113118
scid2PrivateChannels = Map.empty,
114119
excludedChannels = Map.empty,
115120
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
116-
sync = Map.empty)
121+
sync = Map.empty,
122+
spentChannels = Map.empty)
117123
startWith(NORMAL, data)
118124
}
119125

@@ -260,6 +266,20 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
260266
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)
261267

262268
case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
269+
log.info("funding tx of channelId={} has been spent - delay removing it from the graph for 12 blocks", shortChannelId)
270+
// remove the channel from the db so it will not be added to the graph if a restart occurs before 12 blocks elapse
271+
db.removeChannel(shortChannelId)
272+
stay() using d.copy(spentChannels = d.spentChannels + (shortChannelId -> nodeParams.currentBlockHeight))
273+
274+
case Event(c: CurrentBlockHeight, d) =>
275+
val spentChannels1 = d.spentChannels.filter {
276+
// spent channels may be confirmed as a splice; wait 12 blocks before removing them from the graph
277+
case (_, blockHeight) if blockHeight >= c.blockHeight + 12 => true
278+
case (shortChannelId, _) => self ! HandleChannelSpent(shortChannelId); false
279+
}
280+
stay() using d.copy(spentChannels = spentChannels1)
281+
282+
case Event(HandleChannelSpent(shortChannelId), d: Data) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
263283
stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
264284

265285
case Event(n: NodeAnnouncement, d: Data) =>
@@ -757,7 +777,8 @@ object Router {
757777
scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels
758778
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
759779
graphWithBalances: GraphWithBalanceEstimates,
760-
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
780+
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
781+
spentChannels: Map[RealShortChannelId, BlockHeight], // channels with funding txs spent less than 12 blocks ago
761782
) {
762783
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
763784
// let's assume this is a real scid
@@ -797,4 +818,8 @@ object Router {
797818

798819
/** We have tried to relay this amount from this channel and it failed. */
799820
case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop)
821+
822+
/** Funding Tx of the channel id has been spent and not updated with a splice within 12 blocks. */
823+
private case class HandleChannelSpent(shortChannelId: RealShortChannelId)
824+
800825
}

eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,18 @@ object Validation {
111111
None
112112
} else {
113113
log.debug("validation successful for shortChannelId={}", c.shortChannelId)
114+
val sharedInputTxId_opt = tx.txIn.find(_.signatureScript == fundingOutputScript).map(_.outPoint.txid)
114115
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
115116
val capacity = tx.txOut(outputIndex).amount
116-
Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
117+
// if a channel spends the shared output of a recently spent channel, then it is a splice
118+
sharedInputTxId_opt match {
119+
case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
120+
case Some(sharedInputTxId) =>
121+
d0.spentChannels.find(spent => d0.channels.get(spent._1).exists(_.fundingTxId == sharedInputTxId)) match {
122+
case Some((parentScid, _)) => Some(splicePublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, d0.channels(parentScid)))
123+
case None => log.error("channel shortChannelId={} is a splice, but not matching channel found!", c.shortChannelId); None
124+
}
125+
}
117126
}
118127
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
119128
if (fundingTxStatus.spendingTxConfirmed) {
@@ -156,6 +165,37 @@ object Validation {
156165
}
157166
}
158167

168+
private def splicePublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
169+
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
170+
val fundingOutputIndex = outputIndex(ann.shortChannelId)
171+
watcher ! WatchExternalChannelSpent(ctx.self, fundingTxId, fundingOutputIndex, ann.shortChannelId)
172+
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil))
173+
nodeParams.db.network.addChannel(ann, fundingTxId, capacity)
174+
nodeParams.db.network.removeChannel(parentChannel.shortChannelId)
175+
val pubChan = PublicChannel(
176+
ann = ann,
177+
fundingTxId = fundingTxId,
178+
capacity = capacity,
179+
update_1_opt = parentChannel.update_1_opt,
180+
update_2_opt = parentChannel.update_2_opt,
181+
meta_opt = parentChannel.meta_opt
182+
)
183+
log.debug("replacing parent channel scid={} with splice channel scid={}; splice channel={}", parentChannel.shortChannelId, ann.shortChannelId, pubChan)
184+
// we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid
185+
log.debug("updating the graph for shortChannelId={}", pubChan.shortChannelId)
186+
val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity)
187+
d.copy(
188+
// we also add the splice scid -> channelId and remove the parent scid -> channelId mappings
189+
channels = d.channels + (pubChan.shortChannelId -> pubChan) - parentChannel.shortChannelId,
190+
// we also add the newly validated channels to the rebroadcast queue
191+
rebroadcast = d.rebroadcast.copy(
192+
// we rebroadcast the splice channel to our peers
193+
channels = d.rebroadcast.channels + (pubChan.ann -> d.awaiting.getOrElse(pubChan.ann, if (pubChan.nodeId1 == nodeParams.nodeId || pubChan.nodeId2 == nodeParams.nodeId) Seq(LocalGossip) else Nil).toSet),
194+
),
195+
graphWithBalances = graph1
196+
)
197+
}
198+
159199
private def addPublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, privChan_opt: Option[PrivateChannel])(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
160200
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
161201
val fundingOutputIndex = outputIndex(ann.shortChannelId)

eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
376376
shortIds4.real.toOption.get.toLong -> channelId4,
377377
)
378378
val g = GraphWithBalanceEstimates(DirectedGraph(Nil), 1 hour)
379-
val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty)
379+
val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty, Map.empty)
380380

381381
eclair.findRoute(c, 250_000 msat, None)
382382
val routeRequest1 = router.expectMsgType[RouteRequest]

eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
2121
import fr.acinq.eclair.payment.Invoice
2222
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
2323
import fr.acinq.eclair.router.Router.{ChannelDesc, HopRelayParams}
24-
import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, ShortChannelId, TimestampSecond, randomKey}
24+
import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, randomKey}
2525
import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper
2626
import org.scalatest.funsuite.AnyFunSuite
2727

@@ -84,6 +84,68 @@ class BalanceEstimateSpec extends AnyFunSuite {
8484
assert(balance2.capacities.isEmpty)
8585
}
8686

87+
test("update channels after a splice") {
88+
val a = makeEdge(0, 200 sat)
89+
val b = makeEdge(1, 100 sat)
90+
val unknownDesc = ChannelDesc(ShortChannelId(3), randomKey().publicKey, randomKey().publicKey)
91+
val balance = BalanceEstimate.empty(1 day)
92+
.addEdge(a)
93+
.addEdge(b)
94+
.couldNotSend(140_000 msat, TimestampSecond.now())
95+
.couldSend(60_000 msat, TimestampSecond.now())
96+
97+
// a splice-in that increases channel capacity increases high but not low bounds
98+
val balance1 = balance
99+
.updateEdge(a.desc, RealShortChannelId(5), 250 sat)
100+
assert(balance1.maxCapacity == 250.sat)
101+
assert(balance1.low == 60_000.msat)
102+
assert(balance1.high == 190_000.msat)
103+
104+
// a splice-in that increases channel capacity of smaller channel does not increase high more than max capacity
105+
val balance2 = balance
106+
.updateEdge(b.desc, RealShortChannelId(5), 300 sat)
107+
assert(balance2.maxCapacity == 300.sat)
108+
assert(balance2.low == 60_000.msat)
109+
assert(balance2.high == 300_000.msat)
110+
111+
// a splice-out that decreases channel capacity decreases low bounds but not high bounds
112+
val balance3 = balance
113+
.updateEdge(a.desc, RealShortChannelId(5), 150 sat)
114+
assert(balance3.maxCapacity == 150.sat)
115+
assert(balance3.low == 10_000.msat)
116+
assert(balance3.high == 140_000.msat)
117+
118+
// a splice-out that decreases channel capacity of largest channel does not decrease low bounds below zero
119+
val balance4 = balance
120+
.updateEdge(a.desc, RealShortChannelId(5), 50 sat)
121+
assert(balance4.maxCapacity == 100.sat)
122+
assert(balance4.low == 0.msat)
123+
assert(balance4.high == 100_000.msat)
124+
125+
// a splice-out that does not decrease the largest channel only decreases low bounds
126+
val balance5 = balance
127+
.updateEdge(b.desc, RealShortChannelId(5), 50 sat)
128+
assert(balance5.maxCapacity == 200.sat)
129+
assert(balance5.low == 10_000.msat)
130+
assert(balance5.high == 140_000.msat)
131+
132+
// a splice of an unknown channel that increases max capacity does not change the low/high bounds
133+
val balance6 = balance
134+
.updateEdge(unknownDesc, RealShortChannelId(5), 900 sat)
135+
assert(isValid(balance6))
136+
assert(balance6.maxCapacity == 900.sat)
137+
assert(balance6.low == 60_000.msat)
138+
assert(balance6.high == 140_000.msat)
139+
140+
// a splice of an unknown channel below max capacity does not change max capacity or low/high bounds
141+
val balance7 = balance
142+
.updateEdge(unknownDesc, RealShortChannelId(5), 150 sat)
143+
assert(isValid(balance7))
144+
assert(balance7.maxCapacity == 200.sat)
145+
assert(balance7.low == 60_000.msat)
146+
assert(balance7.high == 140_000.msat)
147+
}
148+
87149
test("update bounds based on what could then could not be sent (increasing amounts)") {
88150
val now = TimestampSecond.now()
89151
val balance = BalanceEstimate.empty(1 day)

0 commit comments

Comments
 (0)