Skip to content

Commit e28f23f

Browse files
Peer storage (#2888)
Implements lightning/bolts#1110 to allow storing a small amount of data for our peers. --------- Co-authored-by: t-bast <[email protected]>
1 parent 189e282 commit e28f23f

File tree

19 files changed

+456
-39
lines changed

19 files changed

+456
-39
lines changed

docs/release-notes/eclair-vnext.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66

77
<insert changes>
88

9+
### Peer storage
10+
11+
With this release, eclair supports the `option_provide_storage` feature introduced in <https://github.com/lightning/bolts/pull/1110>.
12+
When `option_provide_storage` is enabled, eclair will store a small encrypted backup for peers that request it.
13+
This backup is limited to 65kB and node operators should customize the `eclair.peer-storage` configuration section to match their desired SLAs.
14+
This is mostly intended for LSPs that serve mobile wallets to allow users to restore their channels when they switch phones.
15+
916
### API changes
1017

1118
<insert changes>

eclair-core/src/main/resources/reference.conf

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ eclair {
7373
option_dual_fund = optional
7474
option_quiesce = optional
7575
option_onion_messages = optional
76+
// This feature should only be enabled when acting as an LSP for mobile wallets.
77+
// When activating this feature, the peer-storage section should be customized to match desired SLAs.
78+
option_provide_storage = disabled
7679
option_channel_type = optional
7780
option_scid_alias = optional
7881
option_payment_metadata = optional
@@ -596,6 +599,19 @@ eclair {
596599
enabled = true // enable automatic purges of expired invoices from the database
597600
interval = 24 hours // interval between expired invoice purges
598601
}
602+
603+
peer-storage {
604+
// Peer storage is persisted only after this delay to reduce the number of writes when updating it multiple times in a row.
605+
// A small delay may result in a lot of IO write operations, which can have a negative performance impact on the node.
606+
// But using a large delay increases the risk of not storing the latest peer data if you restart your node while writes are pending.
607+
write-delay = 1 minute
608+
// Peer storage is kept this long after the last channel with that peer has been closed.
609+
// A long delay here guarantees that peers who are offline while their channels are closed will be able to get their funds
610+
// back if they restore from seed on a different device after the channels have been closed.
611+
removal-delay = 30 days
612+
// Frequency at which we clean our DB to remove peer storage from nodes with whom we don't have channels anymore.
613+
cleanup-frequency = 1 day
614+
}
599615
}
600616

601617
akka {

eclair-core/src/main/scala/fr/acinq/eclair/Features.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ object Features {
275275
val mandatory = 38
276276
}
277277

278+
case object ProvideStorage extends Feature with InitFeature with NodeFeature {
279+
val rfcName = "option_provide_storage"
280+
val mandatory = 42
281+
}
282+
278283
case object ChannelType extends Feature with InitFeature with NodeFeature {
279284
val rfcName = "option_channel_type"
280285
val mandatory = 44
@@ -358,6 +363,7 @@ object Features {
358363
DualFunding,
359364
Quiescence,
360365
OnionMessages,
366+
ProvideStorage,
361367
ChannelType,
362368
ScidAlias,
363369
PaymentMetadata,

eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
9292
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config,
9393
willFundRates_opt: Option[LiquidityAds.WillFundRates],
9494
peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig,
95-
onTheFlyFundingConfig: OnTheFlyFunding.Config) {
95+
onTheFlyFundingConfig: OnTheFlyFunding.Config,
96+
peerStorageConfig: PeerStorageConfig) {
9697
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
9798

9899
val nodeId: PublicKey = nodeKeyManager.nodeId
@@ -156,6 +157,13 @@ case class PaymentFinalExpiryConf(min: CltvExpiryDelta, max: CltvExpiryDelta) {
156157
}
157158
}
158159

160+
/**
161+
* @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates.
162+
* @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration.
163+
* @param cleanUpFrequency frequency at which we go through the DB to remove unused storage.
164+
*/
165+
case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration, cleanUpFrequency: FiniteDuration)
166+
159167
object NodeParams extends Logging {
160168

161169
/**
@@ -680,6 +688,11 @@ object NodeParams extends Logging {
680688
onTheFlyFundingConfig = OnTheFlyFunding.Config(
681689
proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS),
682690
),
691+
peerStorageConfig = PeerStorageConfig(
692+
writeDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS),
693+
removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS),
694+
cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS),
695+
)
683696
)
684697
}
685698
}

eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import fr.acinq.eclair.crypto.WeakEntropyPool
3737
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager, LocalOnChainKeyManager}
3838
import fr.acinq.eclair.db.Databases.FileBackup
3939
import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
40-
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
40+
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler, PeerStorageCleaner}
4141
import fr.acinq.eclair.io._
4242
import fr.acinq.eclair.message.Postman
4343
import fr.acinq.eclair.payment.offer.OfferManager
@@ -356,6 +356,9 @@ class Setup(val datadir: File,
356356
logger.warn("database backup is disabled")
357357
system.deadLetters
358358
}
359+
_ = if (nodeParams.features.hasFeature(Features.ProvideStorage)) {
360+
system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner")
361+
}
359362
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
360363
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
361364
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")

eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ import fr.acinq.eclair.payment.relay.OnTheFlyFunding
1313
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
1414
import fr.acinq.eclair.router.Router
1515
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
16-
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli}
16+
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
1717
import grizzled.slf4j.Logging
18+
import scodec.bits.ByteVector
1819

1920
import java.io.File
2021
import java.util.UUID
@@ -292,6 +293,21 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb {
292293
runAsync(secondary.getRelayFees(nodeId))
293294
primary.getRelayFees(nodeId)
294295
}
296+
297+
override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = {
298+
runAsync(secondary.updateStorage(nodeId, data))
299+
primary.updateStorage(nodeId, data)
300+
}
301+
302+
override def getStorage(nodeId: PublicKey): Option[ByteVector] = {
303+
runAsync(secondary.getStorage(nodeId))
304+
primary.getStorage(nodeId)
305+
}
306+
307+
override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = {
308+
runAsync(secondary.removePeerStorage(peerRemovedBefore))
309+
primary.removePeerStorage(peerRemovedBefore)
310+
}
295311
}
296312

297313
case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends PaymentsDb {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2024 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.db
18+
19+
import akka.actor.typed.Behavior
20+
import akka.actor.typed.scaladsl.Behaviors
21+
import fr.acinq.eclair.{PeerStorageConfig, TimestampSecond}
22+
23+
/**
24+
* This actor frequently deletes from our DB peer storage from nodes with whom we don't have channels anymore, after a
25+
* grace period.
26+
*/
27+
object PeerStorageCleaner {
28+
// @formatter:off
29+
sealed trait Command
30+
private case object CleanPeerStorage extends Command
31+
// @formatter:on
32+
33+
def apply(db: PeersDb, config: PeerStorageConfig): Behavior[Command] = {
34+
Behaviors.withTimers { timers =>
35+
timers.startTimerWithFixedDelay(CleanPeerStorage, config.cleanUpFrequency)
36+
Behaviors.receiveMessage {
37+
case CleanPeerStorage =>
38+
db.removePeerStorage(TimestampSecond.now() - config.removalDelay)
39+
Behaviors.same
40+
}
41+
}
42+
}
43+
44+
}

eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package fr.acinq.eclair.db
1818

1919
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
20+
import fr.acinq.eclair.TimestampSecond
2021
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
2122
import fr.acinq.eclair.wire.protocol.NodeAddress
23+
import scodec.bits.ByteVector
2224

2325
trait PeersDb {
2426

@@ -34,4 +36,13 @@ trait PeersDb {
3436

3537
def getRelayFees(nodeId: PublicKey): Option[RelayFees]
3638

39+
/** Update our peer's blob data when [[fr.acinq.eclair.Features.ProvideStorage]] is enabled. */
40+
def updateStorage(nodeId: PublicKey, data: ByteVector): Unit
41+
42+
/** Get the last blob of data we stored for that peer, if [[fr.acinq.eclair.Features.ProvideStorage]] is enabled. */
43+
def getStorage(nodeId: PublicKey): Option[ByteVector]
44+
45+
/** Remove storage from peers that have had no active channel with us for a while. */
46+
def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit
47+
3748
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@ package fr.acinq.eclair.db.pg
1818

1919
import fr.acinq.bitcoin.scalacompat.Crypto
2020
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
21-
import fr.acinq.eclair.MilliSatoshi
21+
import fr.acinq.eclair.{MilliSatoshi, TimestampSecond}
2222
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
2323
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
2424
import fr.acinq.eclair.db.PeersDb
2525
import fr.acinq.eclair.db.pg.PgUtils.PgLock
2626
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
2727
import fr.acinq.eclair.wire.protocol._
2828
import grizzled.slf4j.Logging
29-
import scodec.bits.BitVector
29+
import scodec.bits.{BitVector, ByteVector}
3030

3131
import java.sql.Statement
3232
import javax.sql.DataSource
3333

3434
object PgPeersDb {
3535
val DB_NAME = "peers"
36-
val CURRENT_VERSION = 3
36+
val CURRENT_VERSION = 4
3737
}
3838

3939
class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logging {
@@ -54,20 +54,31 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
5454
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
5555
}
5656

57+
def migration34(statement: Statement): Unit = {
58+
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)")
59+
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
60+
}
61+
5762
using(pg.createStatement()) { statement =>
5863
getVersion(statement, DB_NAME) match {
5964
case None =>
6065
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
6166
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
6267
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
63-
case Some(v@(1 | 2)) =>
68+
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)")
69+
70+
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
71+
case Some(v@(1 | 2 | 3)) =>
6472
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
6573
if (v < 2) {
6674
migration12(statement)
6775
}
6876
if (v < 3) {
6977
migration23(statement)
7078
}
79+
if (v < 4) {
80+
migration34(statement)
81+
}
7182
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
7283
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
7384
}
@@ -98,6 +109,20 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
98109
statement.setString(1, nodeId.value.toHex)
99110
statement.executeUpdate()
100111
}
112+
using(pg.prepareStatement("UPDATE local.peer_storage SET removed_peer_at = ? WHERE node_id = ?")) { statement =>
113+
statement.setTimestamp(1, TimestampSecond.now().toSqlTimestamp)
114+
statement.setString(2, nodeId.value.toHex)
115+
statement.executeUpdate()
116+
}
117+
}
118+
}
119+
120+
override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = withMetrics("peers/remove-storage", DbBackends.Postgres) {
121+
withLock { pg =>
122+
using(pg.prepareStatement("DELETE FROM local.peer_storage WHERE removed_peer_at < ?")) { statement =>
123+
statement.setTimestamp(1, peerRemovedBefore.toSqlTimestamp)
124+
statement.executeUpdate()
125+
}
101126
}
102127
}
103128

@@ -155,4 +180,32 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
155180
}
156181
}
157182
}
183+
184+
override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Postgres) {
185+
withLock { pg =>
186+
using(pg.prepareStatement(
187+
"""
188+
INSERT INTO local.peer_storage (node_id, data, last_updated_at, removed_peer_at)
189+
VALUES (?, ?, ?, NULL)
190+
ON CONFLICT (node_id)
191+
DO UPDATE SET data = EXCLUDED.data, last_updated_at = EXCLUDED.last_updated_at, removed_peer_at = NULL
192+
""")) { statement =>
193+
statement.setString(1, nodeId.value.toHex)
194+
statement.setBytes(2, data.toArray)
195+
statement.setTimestamp(3, TimestampSecond.now().toSqlTimestamp)
196+
statement.executeUpdate()
197+
}
198+
}
199+
}
200+
201+
override def getStorage(nodeId: PublicKey): Option[ByteVector] = withMetrics("peers/get-storage", DbBackends.Postgres) {
202+
withLock { pg =>
203+
using(pg.prepareStatement("SELECT data FROM local.peer_storage WHERE node_id = ?")) { statement =>
204+
statement.setString(1, nodeId.value.toHex)
205+
statement.executeQuery()
206+
.headOption
207+
.map(rs => ByteVector(rs.getBytes("data")))
208+
}
209+
}
210+
}
158211
}

0 commit comments

Comments
 (0)