Skip to content

Commit b00a95c

Browse files
committed
Use existing channel events
Instead of defining new events. We also keep a set of active channels to ensure that duplicate events don't mess up our state (even though this shouldn't happen, it feels safer).
1 parent 8e0b34f commit b00a95c

File tree

4 files changed

+48
-45
lines changed

4 files changed

+48
-45
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2588,13 +2588,6 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
25882588
case _: TransientChannelData => None
25892589
}
25902590
context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt))
2591-
2592-
if (state == NORMAL) {
2593-
peer ! Peer.ChannelDeactivated
2594-
}
2595-
if (nextState == NORMAL) {
2596-
peer ! Peer.ChannelActivated
2597-
}
25982591
}
25992592

26002593
if (nextState == CLOSED) {

eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class Peer(val nodeParams: NodeParams,
7373

7474
context.system.eventStream.subscribe(self, classOf[CurrentFeerates])
7575
context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight])
76+
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
7677

7778
startWith(INSTANTIATING, Nothing)
7879

@@ -90,7 +91,7 @@ class Peer(val nodeParams: NodeParams,
9091
} else {
9192
None
9293
}
93-
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = 0, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
94+
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
9495
}
9596

9697
when(DISCONNECTED) {
@@ -144,11 +145,11 @@ class Peer(val nodeParams: NodeParams,
144145
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
145146
stay() using d.copy(peerStorage = d.peerStorage.copy(written = true))
146147

147-
case Event(ChannelActivated, d: DisconnectedData) =>
148-
stay() using d.copy(activeChannels = d.activeChannels + 1)
148+
case Event(e: ChannelReadyForPayments, d: DisconnectedData) =>
149+
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)
149150

150-
case Event(ChannelDeactivated, d: DisconnectedData) =>
151-
stay() using d.copy(activeChannels = d.activeChannels - 1)
151+
case Event(e: LocalChannelDown, d: DisconnectedData) =>
152+
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
152153
}
153154

154155
when(CONNECTED) {
@@ -423,7 +424,7 @@ class Peer(val nodeParams: NodeParams,
423424
}
424425
stay()
425426

426-
case Event(e: ChannelReadyForPayments, _: ConnectedData) =>
427+
case Event(e: ChannelReadyForPayments, d: ConnectedData) =>
427428
pendingOnTheFlyFunding.foreach {
428429
case (paymentHash, pending) =>
429430
pending.status match {
@@ -439,7 +440,10 @@ class Peer(val nodeParams: NodeParams,
439440
}
440441
}
441442
}
442-
stay()
443+
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)
444+
445+
case Event(e: LocalChannelDown, d: ConnectedData) =>
446+
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
443447

444448
case Event(msg: HasChannelId, d: ConnectedData) =>
445449
d.channels.get(FinalChannelId(msg.channelId)) match {
@@ -533,26 +537,25 @@ class Peer(val nodeParams: NodeParams,
533537
d.peerConnection forward unknownMsg
534538
stay()
535539

536-
case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels > 0 =>
537-
// If we don't have any pending write operations, we write the updated peer storage to disk after a delay.
538-
// This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations.
539-
// If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay
540-
// writing to the DB and may never store our peer's backup.
541-
if (d.peerStorage.written) {
542-
startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
540+
case Event(store: PeerStorageStore, d: ConnectedData) =>
541+
if (nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels.nonEmpty) {
542+
// If we don't have any pending write operations, we write the updated peer storage to disk after a delay.
543+
// This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations.
544+
// If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay
545+
// writing to the DB and may never store our peer's backup.
546+
if (d.peerStorage.written) {
547+
startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
548+
}
549+
stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false))
550+
} else {
551+
log.debug("ignoring peer storage (feature={}, channels={})", nodeParams.features.hasFeature(Features.ProvideStorage), d.activeChannels.mkString(","))
552+
stay()
543553
}
544-
stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false))
545554

546555
case Event(WritePeerStorage, d: ConnectedData) =>
547556
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
548557
stay() using d.copy(peerStorage = d.peerStorage.copy(written = true))
549558

550-
case Event(ChannelActivated, d: ConnectedData) =>
551-
stay() using d.copy(activeChannels = d.activeChannels + 1)
552-
553-
case Event(ChannelDeactivated, d: ConnectedData) =>
554-
stay() using d.copy(activeChannels = d.activeChannels - 1)
555-
556559
case Event(unhandledMsg: LightningMessage, _) =>
557560
log.warning("ignoring message {}", unhandledMsg)
558561
stay()
@@ -784,7 +787,7 @@ class Peer(val nodeParams: NodeParams,
784787
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
785788
}
786789

787-
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage): State = {
790+
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage): State = {
788791
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}")
789792
log.debug("got authenticated connection to address {}", connectionReady.address)
790793

@@ -956,16 +959,16 @@ object Peer {
956959

957960
sealed trait Data {
958961
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
959-
def activeChannels: Int
962+
def activeChannels: Set[ByteVector32] // channels that are available to process payments
960963
def peerStorage: PeerStorage
961964
}
962965
case object Nothing extends Data {
963966
override def channels = Map.empty
964-
override def activeChannels: Int = 0
967+
override def activeChannels: Set[ByteVector32] = Set.empty
965968
override def peerStorage: PeerStorage = PeerStorage(None, written = true)
966969
}
967-
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage) extends Data
968-
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Int, currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
970+
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage) extends Data
971+
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
969972
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
970973
def localFeatures: Features[InitFeature] = localInit.features
971974
def remoteFeatures: Features[InitFeature] = remoteInit.features
@@ -1080,9 +1083,5 @@ object Peer {
10801083
case class RelayUnknownMessage(unknownMessage: UnknownMessage)
10811084

10821085
case object WritePeerStorage
1083-
1084-
case object ChannelActivated
1085-
1086-
case object ChannelDeactivated
10871086
// @formatter:on
10881087
}

eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ class PeerSpec extends FixtureSpec {
112112
// let's simulate a connection
113113
if (initializePeer) {
114114
switchboard.send(peer, Peer.Init(channels, Map.empty))
115-
channels.foreach(c => if (c.isInstanceOf[DATA_NORMAL]) peer ! ChannelActivated)
116115
}
117116
val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures())
118117
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = true, localInit, remoteInit))
@@ -759,22 +758,34 @@ class PeerSpec extends FixtureSpec {
759758
test("peer storage") { f =>
760759
import f._
761760

761+
// We connect with a previous backup.
762+
val channel = ChannelCodecsSpec.normal
762763
val peerConnection1 = peerConnection
763-
val peerConnection2 = TestProbe()
764-
val peerConnection3 = TestProbe()
765-
766764
nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef")
767-
connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef"))
765+
connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(channel), peerStorage = Some(hex"abcdef"))
766+
peer ! ChannelReadyForPayments(ActorRef.noSender, channel.remoteNodeId, channel.channelId, channel.commitments.latest.fundingTxIndex)
768767
peerConnection1.send(peer, PeerStorageStore(hex"deadbeef"))
769768
peerConnection1.send(peer, PeerStorageStore(hex"0123456789"))
769+
770+
// We disconnect and reconnect, sending the last backup we received.
770771
peer ! Peer.Disconnect(f.remoteNodeId)
772+
val peerConnection2 = TestProbe()
771773
connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789"))
772774
peerConnection2.send(peer, PeerStorageStore(hex"1111"))
775+
776+
// We reconnect again.
777+
val peerConnection3 = TestProbe()
773778
connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"1111"))
774779
// Because of the delayed writes, we may not have stored the latest value immediately, but we will eventually store it.
775780
eventually {
776781
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
777782
}
783+
784+
// Our channel closes, so we stop storing backups for that peer.
785+
peer ! LocalChannelDown(ActorRef.noSender, channel.channelId, channel.shortIds, channel.remoteNodeId)
786+
peerConnection3.send(peer, PeerStorageStore(hex"2222"))
787+
peer ! WritePeerStorage
788+
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
778789
}
779790

780791
}

eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
3838
private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw)
3939

4040
private val PeerNothingData = Peer.Nothing
41-
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = 0, PeerStorage(None, written = true))
42-
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = 0, recommendedFeerates, None, PeerStorage(None, written = true))
41+
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true))
42+
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true))
4343

4444
case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe)
4545

@@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
8282
import f._
8383

8484
val peer = TestProbe()
85-
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = 0, PeerStorage(None, written = true))))
85+
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true))))
8686
monitor.expectNoMessage()
8787
}
8888

0 commit comments

Comments
 (0)