Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ eclair {
// A long delay here guarantees that peers who are offline while their channels are closed will be able to get their funds
// back if they restore from seed on a different device after the channels have been closed.
removal-delay = 30 days
// Frequency at which we clean our DB to remove peer storage from nodes with whom we don't have channels anymore.
cleanup-frequency = 1 day
}
}

Expand Down
8 changes: 5 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ case class PaymentFinalExpiryConf(min: CltvExpiryDelta, max: CltvExpiryDelta) {
}

/**
* @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates.
* @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration.
* @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates.
* @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration.
* @param cleanUpFrequency frequency at which we go through the DB to remove unused storage.
*/
case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration)
case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration, cleanUpFrequency: FiniteDuration)

object NodeParams extends Logging {

Expand Down Expand Up @@ -690,6 +691,7 @@ object NodeParams extends Logging {
peerStorageConfig = PeerStorageConfig(
writeDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS),
removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS),
cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS),
)
)
}
Expand Down
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class Setup(val datadir: File,
system.deadLetters
}
_ = if (nodeParams.features.hasFeature(Features.ProvideStorage)) {
system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig.removalDelay)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner")
system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner")
}
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2588,13 +2588,6 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case _: TransientChannelData => None
}
context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt))

if (state == NORMAL) {
peer ! Peer.ChannelDeactivated
}
if (nextState == NORMAL) {
peer ! Peer.ChannelActivated
}
}

if (nextState == CLOSED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@ package fr.acinq.eclair.db

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.eclair.TimestampSecond

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import fr.acinq.eclair.{PeerStorageConfig, TimestampSecond}

/**
* This actor frequently deletes from our DB peer storage from nodes with whom we don't have channels anymore, after a
* grace period.
*/
object PeerStorageCleaner {
// @formatter:off
sealed trait Command
private case object CleanPeerStorage extends Command
// @formatter:on

def apply(db: PeersDb, removalDelay: FiniteDuration): Behavior[Command] = {
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(CleanPeerStorage, 1 day)
Behaviors.receiveMessage {
case CleanPeerStorage =>
db.removePeerStorage(TimestampSecond.now() - removalDelay)
Behaviors.same
}
def apply(db: PeersDb, config: PeerStorageConfig): Behavior[Command] = {
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(CleanPeerStorage, config.cleanUpFrequency)
Behaviors.receiveMessage {
case CleanPeerStorage =>
db.removePeerStorage(TimestampSecond.now() - config.removalDelay)
Behaviors.same
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
}

def migration23(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)")
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)")
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)")
statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)")
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)")

statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)")
case Some(v@(1 | 2)) =>
Expand Down Expand Up @@ -101,7 +101,7 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {

override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = withMetrics("peers/remove-storage", DbBackends.Sqlite) {
using(sqlite.prepareStatement("DELETE FROM peer_storage WHERE removed_peer_at < ?")) { statement =>
statement.setTimestamp(1, peerRemovedBefore.toSqlTimestamp)
statement.setLong(1, peerRemovedBefore.toLong)
statement.executeUpdate()
}
}
Expand Down
59 changes: 29 additions & 30 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class Peer(val nodeParams: NodeParams,

context.system.eventStream.subscribe(self, classOf[CurrentFeerates])
context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight])
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])

startWith(INSTANTIATING, Nothing)

Expand All @@ -90,7 +91,7 @@ class Peer(val nodeParams: NodeParams,
} else {
None
}
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = 0, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
}

when(DISCONNECTED) {
Expand Down Expand Up @@ -144,11 +145,11 @@ class Peer(val nodeParams: NodeParams,
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
stay() using d.copy(peerStorage = d.peerStorage.copy(written = true))

case Event(ChannelActivated, d: DisconnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels + 1)
case Event(e: ChannelReadyForPayments, d: DisconnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)

case Event(ChannelDeactivated, d: DisconnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels - 1)
case Event(e: LocalChannelDown, d: DisconnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
}

when(CONNECTED) {
Expand Down Expand Up @@ -423,7 +424,7 @@ class Peer(val nodeParams: NodeParams,
}
stay()

case Event(e: ChannelReadyForPayments, _: ConnectedData) =>
case Event(e: ChannelReadyForPayments, d: ConnectedData) =>
pendingOnTheFlyFunding.foreach {
case (paymentHash, pending) =>
pending.status match {
Expand All @@ -439,7 +440,10 @@ class Peer(val nodeParams: NodeParams,
}
}
}
stay()
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)

case Event(e: LocalChannelDown, d: ConnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)

case Event(msg: HasChannelId, d: ConnectedData) =>
d.channels.get(FinalChannelId(msg.channelId)) match {
Expand Down Expand Up @@ -533,26 +537,25 @@ class Peer(val nodeParams: NodeParams,
d.peerConnection forward unknownMsg
stay()

case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels > 0 =>
// If we don't have any pending write operations, we write the updated peer storage to disk after a delay.
// This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations.
// If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay
// writing to the DB and may never store our peer's backup.
if (d.peerStorage.written) {
startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
case Event(store: PeerStorageStore, d: ConnectedData) =>
if (nodeParams.features.hasFeature(Features.ProvideStorage) && d.activeChannels.nonEmpty) {
// If we don't have any pending write operations, we write the updated peer storage to disk after a delay.
// This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations.
// If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay
// writing to the DB and may never store our peer's backup.
if (d.peerStorage.written) {
startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
}
stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false))
} else {
log.debug("ignoring peer storage (feature={}, channels={})", nodeParams.features.hasFeature(Features.ProvideStorage), d.activeChannels.mkString(","))
stay()
}
stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false))

case Event(WritePeerStorage, d: ConnectedData) =>
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
stay() using d.copy(peerStorage = d.peerStorage.copy(written = true))

case Event(ChannelActivated, d: ConnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels + 1)

case Event(ChannelDeactivated, d: ConnectedData) =>
stay() using d.copy(activeChannels = d.activeChannels - 1)

case Event(unhandledMsg: LightningMessage, _) =>
log.warning("ignoring message {}", unhandledMsg)
stay()
Expand Down Expand Up @@ -784,7 +787,7 @@ class Peer(val nodeParams: NodeParams,
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
}

private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage): State = {
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage): State = {
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}")
log.debug("got authenticated connection to address {}", connectionReady.address)

Expand Down Expand Up @@ -956,16 +959,16 @@ object Peer {

sealed trait Data {
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
def activeChannels: Int
def activeChannels: Set[ByteVector32] // channels that are available to process payments
def peerStorage: PeerStorage
}
case object Nothing extends Data {
override def channels = Map.empty
override def activeChannels: Int = 0
override def activeChannels: Set[ByteVector32] = Set.empty
override def peerStorage: PeerStorage = PeerStorage(None, written = true)
}
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Int, peerStorage: PeerStorage) extends Data
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Int, currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage) extends Data
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
def localFeatures: Features[InitFeature] = localInit.features
def remoteFeatures: Features[InitFeature] = remoteInit.features
Expand Down Expand Up @@ -1080,9 +1083,5 @@ object Peer {
case class RelayUnknownMessage(unknownMessage: UnknownMessage)

case object WritePeerStorage

case object ChannelActivated

case object ChannelDeactivated
// @formatter:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ object TestConstants {
willFundRates_opt = Some(defaultLiquidityRates),
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds),
onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds),
peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds)
peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour)
)

def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
Expand Down Expand Up @@ -418,7 +418,7 @@ object TestConstants {
willFundRates_opt = Some(defaultLiquidityRates),
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds),
onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds),
peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds)
peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour)
)

def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
Expand Down
19 changes: 17 additions & 2 deletions eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases}
import fr.acinq.eclair._
import fr.acinq.eclair.db.pg.PgPeersDb
import fr.acinq.eclair.db.sqlite.SqlitePeersDb
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair._
import fr.acinq.eclair.wire.protocol.{NodeAddress, Tor2, Tor3}
import org.scalatest.funsuite.AnyFunSuite
import scodec.bits.HexStringSyntax
Expand Down Expand Up @@ -108,7 +108,7 @@ class PeersDbSpec extends AnyFunSuite {
}
}

test("peer storage") {
test("add/update/remove peer storage") {
forAllDbs { dbs =>
val db = dbs.peers

Expand All @@ -126,6 +126,21 @@ class PeersDbSpec extends AnyFunSuite {
db.updateStorage(b, hex"abcd")
assert(db.getStorage(a) == Some(hex"6789"))
assert(db.getStorage(b) == Some(hex"abcd"))

// Actively used storage shouldn't be removed.
db.removePeerStorage(TimestampSecond.now() + 1.hour)
assert(db.getStorage(a) == Some(hex"6789"))
assert(db.getStorage(b) == Some(hex"abcd"))

// After removing the peer, peer storage can be removed.
db.removePeer(a)
assert(db.getStorage(a) == Some(hex"6789"))
db.removePeerStorage(TimestampSecond.now() - 1.hour)
assert(db.getStorage(a) == Some(hex"6789"))
db.removePeerStorage(TimestampSecond.now() + 1.hour)
assert(db.getStorage(a) == None)
assert(db.getStorage(b) == Some(hex"abcd"))
}
}

}
21 changes: 16 additions & 5 deletions eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class PeerSpec extends FixtureSpec {
// let's simulate a connection
if (initializePeer) {
switchboard.send(peer, Peer.Init(channels, Map.empty))
channels.foreach(c => if (c.isInstanceOf[DATA_NORMAL]) peer ! ChannelActivated)
}
val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures())
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = true, localInit, remoteInit))
Expand Down Expand Up @@ -759,22 +758,34 @@ class PeerSpec extends FixtureSpec {
test("peer storage") { f =>
import f._

// We connect with a previous backup.
val channel = ChannelCodecsSpec.normal
val peerConnection1 = peerConnection
val peerConnection2 = TestProbe()
val peerConnection3 = TestProbe()

nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef")
connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(ChannelCodecsSpec.normal), peerStorage = Some(hex"abcdef"))
connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(channel), peerStorage = Some(hex"abcdef"))
peer ! ChannelReadyForPayments(ActorRef.noSender, channel.remoteNodeId, channel.channelId, channel.commitments.latest.fundingTxIndex)
peerConnection1.send(peer, PeerStorageStore(hex"deadbeef"))
peerConnection1.send(peer, PeerStorageStore(hex"0123456789"))

// We disconnect and reconnect, sending the last backup we received.
peer ! Peer.Disconnect(f.remoteNodeId)
val peerConnection2 = TestProbe()
connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789"))
peerConnection2.send(peer, PeerStorageStore(hex"1111"))

// We reconnect again.
val peerConnection3 = TestProbe()
connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"1111"))
// Because of the delayed writes, we may not have stored the latest value immediately, but we will eventually store it.
eventually {
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
}

// Our channel closes, so we stop storing backups for that peer.
peer ! LocalChannelDown(ActorRef.noSender, channel.channelId, channel.shortIds, channel.remoteNodeId)
peerConnection3.send(peer, PeerStorageStore(hex"2222"))
peer ! WritePeerStorage
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
}

}
Expand Down
Loading
Loading