diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 6825a82c97..bb912da344 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -609,6 +609,8 @@ eclair { // A long delay here guarantees that peers who are offline while their channels are closed will be able to get their funds // back if they restore from seed on a different device after the channels have been closed. removal-delay = 30 days + // Frequency at which we clean our DB to remove peer storage from nodes with whom we don't have channels anymore. + cleanup-frequency = 1 day } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index a71457ad64..9a0fbaf034 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -158,10 +158,11 @@ case class PaymentFinalExpiryConf(min: CltvExpiryDelta, max: CltvExpiryDelta) { } /** - * @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates. - * @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration. + * @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates. + * @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration. + * @param cleanUpFrequency frequency at which we go through the DB to remove unused storage. */ -case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration) +case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration, cleanUpFrequency: FiniteDuration) object NodeParams extends Logging { @@ -690,6 +691,7 @@ object NodeParams extends Logging { peerStorageConfig = PeerStorageConfig( writeDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS), removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS), + cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS), ) ) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 83d8e9f7af..884ec04992 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -357,7 +357,7 @@ class Setup(val datadir: File, system.deadLetters } _ = if (nodeParams.features.hasFeature(Features.ProvideStorage)) { - system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig.removalDelay)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner") + system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner") } dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume)) register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index ad90f3b0c3..a9c21e88d7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -2588,13 +2588,6 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case _: TransientChannelData => None } context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt)) - - if (state == NORMAL) { - peer ! Peer.ChannelDeactivated - } - if (nextState == NORMAL) { - peer ! Peer.ChannelActivated - } } if (nextState == CLOSED) { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala index bc2df32285..7153fd5cad 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/PeerStorageCleaner.scala @@ -18,23 +18,27 @@ package fr.acinq.eclair.db import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors -import fr.acinq.eclair.TimestampSecond - -import scala.concurrent.duration.{DurationInt, FiniteDuration} +import fr.acinq.eclair.{PeerStorageConfig, TimestampSecond} +/** + * This actor frequently deletes from our DB peer storage from nodes with whom we don't have channels anymore, after a + * grace period. + */ object PeerStorageCleaner { + // @formatter:off sealed trait Command private case object CleanPeerStorage extends Command + // @formatter:on - def apply(db: PeersDb, removalDelay: FiniteDuration): Behavior[Command] = { - Behaviors.withTimers { timers => - timers.startTimerWithFixedDelay(CleanPeerStorage, 1 day) - Behaviors.receiveMessage { - case CleanPeerStorage => - db.removePeerStorage(TimestampSecond.now() - removalDelay) - Behaviors.same - } + def apply(db: PeersDb, config: PeerStorageConfig): Behavior[Command] = { + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(CleanPeerStorage, config.cleanUpFrequency) + Behaviors.receiveMessage { + case CleanPeerStorage => + db.removePeerStorage(TimestampSecond.now() - config.removalDelay) + Behaviors.same } } + } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala index 4573f4d379..26971273c5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala @@ -47,7 +47,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { } def migration23(statement: Statement): Unit = { - statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)") } @@ -55,7 +55,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { case None => statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)") - statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") + statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)") statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)") case Some(v@(1 | 2)) => @@ -101,7 +101,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging { override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = withMetrics("peers/remove-storage", DbBackends.Sqlite) { using(sqlite.prepareStatement("DELETE FROM peer_storage WHERE removed_peer_at < ?")) { statement => - statement.setTimestamp(1, peerRemovedBefore.toSqlTimestamp) + statement.setLong(1, peerRemovedBefore.toLong) statement.executeUpdate() } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index 724801d482..5a6dd5bf11 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -73,6 +73,7 @@ class Peer(val nodeParams: NodeParams, context.system.eventStream.subscribe(self, classOf[CurrentFeerates]) context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight]) + context.system.eventStream.subscribe(self, classOf[LocalChannelDown]) startWith(INSTANTIATING, Nothing) @@ -90,7 +91,7 @@ class Peer(val nodeParams: NodeParams, } else { None } - goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = 0, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait + goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait } when(DISCONNECTED) { @@ -144,11 +145,11 @@ class Peer(val nodeParams: NodeParams, d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) - case Event(ChannelActivated, d: DisconnectedData) => - stay() using d.copy(activeChannels = d.activeChannels + 1) + case Event(e: ChannelReadyForPayments, d: DisconnectedData) => + stay() using d.copy(activeChannels = d.activeChannels + e.channelId) - case Event(ChannelDeactivated, d: DisconnectedData) => - stay() using d.copy(activeChannels = d.activeChannels - 1) + case Event(e: LocalChannelDown, d: DisconnectedData) => + stay() using d.copy(activeChannels = d.activeChannels - e.channelId) } when(CONNECTED) { @@ -423,7 +424,7 @@ class Peer(val nodeParams: NodeParams, } stay() - case Event(e: ChannelReadyForPayments, _: ConnectedData) => + case Event(e: ChannelReadyForPayments, d: ConnectedData) => pendingOnTheFlyFunding.foreach { case (paymentHash, pending) => pending.status match { @@ -439,7 +440,10 @@ class Peer(val nodeParams: NodeParams, } } } - stay() + stay() using d.copy(activeChannels = d.activeChannels + e.channelId) + + case Event(e: LocalChannelDown, d: ConnectedData) => + stay() using d.copy(activeChannels = d.activeChannels - e.channelId) case Event(msg: HasChannelId, d: ConnectedData) => d.channels.get(FinalChannelId(msg.channelId)) match { @@ -533,26 +537,25 @@ class Peer(val nodeParams: NodeParams, d.peerConnection forward unknownMsg stay() - case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels > 0 => - // If we don't have any pending write operations, we write the updated peer storage to disk after a delay. - // This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations. - // If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay - // writing to the DB and may never store our peer's backup. - if (d.peerStorage.written) { - startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + case Event(store: PeerStorageStore, d: ConnectedData) => + if (nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels.nonEmpty) { + // If we don't have any pending write operations, we write the updated peer storage to disk after a delay. + // This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations. + // If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay + // writing to the DB and may never store our peer's backup. + if (d.peerStorage.written) { + startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay) + } + stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false)) + } else { + log.debug("ignoring peer storage (feature={}, channels={})", nodeParams.features.hasFeature(Features.ProvideStorage), d.activeChannels.mkString(",")) + stay() } - stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false)) case Event(WritePeerStorage, d: ConnectedData) => d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _)) stay() using d.copy(peerStorage = d.peerStorage.copy(written = true)) - case Event(ChannelActivated, d: ConnectedData) => - stay() using d.copy(activeChannels = d.activeChannels + 1) - - case Event(ChannelDeactivated, d: ConnectedData) => - stay() using d.copy(activeChannels = d.activeChannels - 1) - case Event(unhandledMsg: LightningMessage, _) => log.warning("ignoring message {}", unhandledMsg) stay() @@ -784,7 +787,7 @@ class Peer(val nodeParams: NodeParams, context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId)) } - private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage): State = { + private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage): State = { require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}") log.debug("got authenticated connection to address {}", connectionReady.address) @@ -956,16 +959,16 @@ object Peer { sealed trait Data { def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef] - def activeChannels: Int + def activeChannels: Set[ByteVector32] // channels that are available to process payments def peerStorage: PeerStorage } case object Nothing extends Data { override def channels = Map.empty - override def activeChannels: Int = 0 + override def activeChannels: Set[ByteVector32] = Set.empty override def peerStorage: PeerStorage = PeerStorage(None, written = true) } - case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage) extends Data - case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Int, currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data { + case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage) extends Data + case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data { val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit) def localFeatures: Features[InitFeature] = localInit.features def remoteFeatures: Features[InitFeature] = remoteInit.features @@ -1080,9 +1083,5 @@ object Peer { case class RelayUnknownMessage(unknownMessage: UnknownMessage) case object WritePeerStorage - - case object ChannelActivated - - case object ChannelDeactivated // @formatter:on } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 7baff3b2aa..31cf3e6ebf 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -241,7 +241,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), - peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds) + peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -418,7 +418,7 @@ object TestConstants { willFundRates_opt = Some(defaultLiquidityRates), peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds), onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds), - peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds) + peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala index 76f4719014..cf687634e9 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala @@ -18,10 +18,10 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases} +import fr.acinq.eclair._ import fr.acinq.eclair.db.pg.PgPeersDb import fr.acinq.eclair.db.sqlite.SqlitePeersDb import fr.acinq.eclair.payment.relay.Relayer.RelayFees -import fr.acinq.eclair._ import fr.acinq.eclair.wire.protocol.{NodeAddress, Tor2, Tor3} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.HexStringSyntax @@ -108,7 +108,7 @@ class PeersDbSpec extends AnyFunSuite { } } - test("peer storage") { + test("add/update/remove peer storage") { forAllDbs { dbs => val db = dbs.peers @@ -126,6 +126,21 @@ class PeersDbSpec extends AnyFunSuite { db.updateStorage(b, hex"abcd") assert(db.getStorage(a) == Some(hex"6789")) assert(db.getStorage(b) == Some(hex"abcd")) + + // Actively used storage shouldn't be removed. + db.removePeerStorage(TimestampSecond.now() + 1.hour) + assert(db.getStorage(a) == Some(hex"6789")) + assert(db.getStorage(b) == Some(hex"abcd")) + + // After removing the peer, peer storage can be removed. + db.removePeer(a) + assert(db.getStorage(a) == Some(hex"6789")) + db.removePeerStorage(TimestampSecond.now() - 1.hour) + assert(db.getStorage(a) == Some(hex"6789")) + db.removePeerStorage(TimestampSecond.now() + 1.hour) + assert(db.getStorage(a) == None) + assert(db.getStorage(b) == Some(hex"abcd")) } } + } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 6d72d44360..c839341d36 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -112,7 +112,6 @@ class PeerSpec extends FixtureSpec { // let's simulate a connection if (initializePeer) { switchboard.send(peer, Peer.Init(channels, Map.empty)) - channels.foreach(c => if (c.isInstanceOf[DATA_NORMAL]) peer ! ChannelActivated) } val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures()) switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = true, localInit, remoteInit)) @@ -759,22 +758,34 @@ class PeerSpec extends FixtureSpec { test("peer storage") { f => import f._ + // We connect with a previous backup. + val channel = ChannelCodecsSpec.normal val peerConnection1 = peerConnection - val peerConnection2 = TestProbe() - val peerConnection3 = TestProbe() - nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef") - connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef")) + connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(channel), peerStorage = Some(hex"abcdef")) + peer ! ChannelReadyForPayments(ActorRef.noSender, channel.remoteNodeId, channel.channelId, channel.commitments.latest.fundingTxIndex) peerConnection1.send(peer, PeerStorageStore(hex"deadbeef")) peerConnection1.send(peer, PeerStorageStore(hex"0123456789")) + + // We disconnect and reconnect, sending the last backup we received. peer ! Peer.Disconnect(f.remoteNodeId) + val peerConnection2 = TestProbe() connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789")) peerConnection2.send(peer, PeerStorageStore(hex"1111")) + + // We reconnect again. + val peerConnection3 = TestProbe() connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"1111")) // Because of the delayed writes, we may not have stored the latest value immediately, but we will eventually store it. eventually { assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) } + + // Our channel closes, so we stop storing backups for that peer. + peer ! LocalChannelDown(ActorRef.noSender, channel.channelId, channel.shortIds, channel.remoteNodeId) + peerConnection3.send(peer, PeerStorageStore(hex"2222")) + peer ! WritePeerStorage + assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111")) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala index e29ea45964..dba103b8ba 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala @@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw) private val PeerNothingData = Peer.Nothing - private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = 0, PeerStorage(None, written = true)) - private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = 0, recommendedFeerates, None, PeerStorage(None, written = true)) + private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true)) + private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true)) case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe) @@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike import f._ val peer = TestProbe() - peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = 0, PeerStorage(None, written = true)))) + peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true)))) monitor.expectNoMessage() }