Skip to content

Commit abaacc2

Browse files
committed
Peer storage
1 parent 96d0c9a commit abaacc2

File tree

14 files changed

+248
-27
lines changed

14 files changed

+248
-27
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/Features.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ object Features {
275275
val mandatory = 38
276276
}
277277

278+
case object ProvideStorage extends Feature with InitFeature with NodeFeature {
279+
val rfcName = "option_provide_storage"
280+
val mandatory = 42
281+
}
282+
278283
case object ChannelType extends Feature with InitFeature with NodeFeature {
279284
val rfcName = "option_channel_type"
280285
val mandatory = 44
@@ -358,6 +363,7 @@ object Features {
358363
DualFunding,
359364
Quiescence,
360365
OnionMessages,
366+
ProvideStorage,
361367
ChannelType,
362368
ScidAlias,
363369
PaymentMetadata,

eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
9292
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config,
9393
willFundRates_opt: Option[LiquidityAds.WillFundRates],
9494
peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig,
95-
onTheFlyFundingConfig: OnTheFlyFunding.Config) {
95+
onTheFlyFundingConfig: OnTheFlyFunding.Config,
96+
peerStorageWriteDelayMax: FiniteDuration) {
9697
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
9798

9899
val nodeId: PublicKey = nodeKeyManager.nodeId
@@ -680,6 +681,7 @@ object NodeParams extends Logging {
680681
onTheFlyFundingConfig = OnTheFlyFunding.Config(
681682
proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS),
682683
),
684+
peerStorageWriteDelayMax = 1 minute,
683685
)
684686
}
685687
}

eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import fr.acinq.eclair.router.Router
1515
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
1616
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli}
1717
import grizzled.slf4j.Logging
18+
import scodec.bits.ByteVector
1819

1920
import java.io.File
2021
import java.util.UUID
@@ -292,6 +293,16 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb {
292293
runAsync(secondary.getRelayFees(nodeId))
293294
primary.getRelayFees(nodeId)
294295
}
296+
297+
override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = {
298+
runAsync(secondary.updateStorage(nodeId, data))
299+
primary.updateStorage(nodeId, data)
300+
}
301+
302+
override def getStorage(nodeId: PublicKey): Option[ByteVector] = {
303+
runAsync(secondary.getStorage(nodeId))
304+
primary.getStorage(nodeId)
305+
}
295306
}
296307

297308
case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends PaymentsDb {

eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package fr.acinq.eclair.db
1919
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2020
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
2121
import fr.acinq.eclair.wire.protocol.NodeAddress
22+
import scodec.bits.ByteVector
2223

2324
trait PeersDb {
2425

@@ -34,4 +35,8 @@ trait PeersDb {
3435

3536
def getRelayFees(nodeId: PublicKey): Option[RelayFees]
3637

38+
def updateStorage(nodeId: PublicKey, data: ByteVector): Unit
39+
40+
def getStorage(nodeId: PublicKey): Option[ByteVector]
41+
3742
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import fr.acinq.eclair.db.pg.PgUtils.PgLock
2626
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
2727
import fr.acinq.eclair.wire.protocol._
2828
import grizzled.slf4j.Logging
29-
import scodec.bits.BitVector
29+
import scodec.bits.{BitVector, ByteVector}
3030

3131
import java.sql.Statement
3232
import javax.sql.DataSource
3333

3434
object PgPeersDb {
3535
val DB_NAME = "peers"
36-
val CURRENT_VERSION = 3
36+
val CURRENT_VERSION = 4
3737
}
3838

3939
class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logging {
@@ -54,20 +54,28 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
5454
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
5555
}
5656

57+
def migration34(statement: Statement): Unit = {
58+
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
59+
}
60+
5761
using(pg.createStatement()) { statement =>
5862
getVersion(statement, DB_NAME) match {
5963
case None =>
6064
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
61-
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
65+
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, storage BYTEA)")
6266
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
63-
case Some(v@(1 | 2)) =>
67+
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
68+
case Some(v@(1 | 2 | 3)) =>
6469
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
6570
if (v < 2) {
6671
migration12(statement)
6772
}
6873
if (v < 3) {
6974
migration23(statement)
7075
}
76+
if (v < 4) {
77+
migration34(statement)
78+
}
7179
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
7280
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
7381
}
@@ -98,6 +106,10 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
98106
statement.setString(1, nodeId.value.toHex)
99107
statement.executeUpdate()
100108
}
109+
using(pg.prepareStatement("DELETE FROM local.peer_storage WHERE node_id = ?")) { statement =>
110+
statement.setString(1, nodeId.value.toHex)
111+
statement.executeUpdate()
112+
}
101113
}
102114
}
103115

@@ -155,4 +167,31 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
155167
}
156168
}
157169
}
170+
171+
override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Postgres) {
172+
withLock { pg =>
173+
using(pg.prepareStatement(
174+
"""
175+
INSERT INTO local.peer_storage (node_id, data)
176+
VALUES (?, ?)
177+
ON CONFLICT (node_id)
178+
DO UPDATE SET data = EXCLUDED.data
179+
""")) { statement =>
180+
statement.setString(1, nodeId.value.toHex)
181+
statement.setBytes(2, data.toArray)
182+
statement.executeUpdate()
183+
}
184+
}
185+
}
186+
187+
override def getStorage(nodeId: PublicKey): Option[ByteVector] = withMetrics("peers/get-storage", DbBackends.Postgres) {
188+
withLock { pg =>
189+
using(pg.prepareStatement("SELECT data FROM local.peer_storage WHERE node_id = ?")) { statement =>
190+
statement.setString(1, nodeId.value.toHex)
191+
statement.executeQuery()
192+
.headOption
193+
.map(rs => ByteVector(rs.getBytes("data")))
194+
}
195+
}
196+
}
158197
}

eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePeersDb.scala

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, setVersion, using}
2626
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
2727
import fr.acinq.eclair.wire.protocol._
2828
import grizzled.slf4j.Logging
29-
import scodec.bits.BitVector
29+
import scodec.bits.{BitVector, ByteVector}
3030

3131
import java.sql.{Connection, Statement}
3232

3333
object SqlitePeersDb {
3434
val DB_NAME = "peers"
35-
val CURRENT_VERSION = 2
35+
val CURRENT_VERSION = 3
3636
}
3737

3838
class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
@@ -46,13 +46,23 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
4646
statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)")
4747
}
4848

49+
def migration23(statement: Statement): Unit = {
50+
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL)")
51+
}
52+
4953
getVersion(statement, DB_NAME) match {
5054
case None =>
5155
statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)")
5256
statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)")
53-
case Some(v@1) =>
57+
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data NOT NULL)")
58+
case Some(v@(1 | 2)) =>
5459
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
55-
migration12(statement)
60+
if (v < 2) {
61+
migration12(statement)
62+
}
63+
if (v < 3) {
64+
migration23(statement)
65+
}
5666
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
5767
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
5868
}
@@ -128,4 +138,27 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
128138
)
129139
}
130140
}
141+
142+
override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Sqlite) {
143+
using(sqlite.prepareStatement("UPDATE peer_storage SET data = ? WHERE node_id = ?")) { update =>
144+
update.setBytes(1, data.toArray)
145+
update.setBytes(2, nodeId.value.toArray)
146+
if (update.executeUpdate() == 0) {
147+
using(sqlite.prepareStatement("INSERT INTO peer_storage VALUES (?, ?)")) { statement =>
148+
statement.setBytes(1, nodeId.value.toArray)
149+
statement.setBytes(2, data.toArray)
150+
statement.executeUpdate()
151+
}
152+
}
153+
}
154+
}
155+
156+
override def getStorage(nodeId: PublicKey): Option[ByteVector] = withMetrics("peers/get-storage", DbBackends.Sqlite) {
157+
using(sqlite.prepareStatement("SELECT data FROM peer_storage WHERE node_id = ?")) { statement =>
158+
statement.setBytes(1, nodeId.value.toArray)
159+
statement.executeQuery()
160+
.headOption
161+
.map(rs => ByteVector(rs.getBytes("data")))
162+
}
163+
}
131164
}

eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
4444
import fr.acinq.eclair.router.Router
4545
import fr.acinq.eclair.wire.protocol
4646
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
47-
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
47+
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, PeerStorageRetrieval, PeerStorageStore, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
48+
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, PeerStorageRetrieval, PeerStorageStore, RoutingMessage, SpliceInit, TlvStream, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
49+
import scodec.bits.ByteVector
4850

4951
/**
5052
* This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time.
@@ -84,7 +86,7 @@ class Peer(val nodeParams: NodeParams,
8486
FinalChannelId(state.channelId) -> channel
8587
}.toMap
8688
context.system.eventStream.publish(PeerCreated(self, remoteNodeId))
87-
goto(DISCONNECTED) using DisconnectedData(channels) // when we restart, we will attempt to reconnect right away, but then we'll wait
89+
goto(DISCONNECTED) using DisconnectedData(channels, PeerStorage(nodeParams.db.peers.getStorage(remoteNodeId), written = true, TimestampMilli.min)) // when we restart, we will attempt to reconnect right away, but then we'll wait
8890
}
8991

9092
when(DISCONNECTED) {
@@ -93,7 +95,7 @@ class Peer(val nodeParams: NodeParams,
9395
stay()
9496

9597
case Event(connectionReady: PeerConnection.ConnectionReady, d: DisconnectedData) =>
96-
gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) })
98+
gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) }, d.peerStorage)
9799

98100
case Event(Terminated(actor), d: DisconnectedData) if d.channels.values.toSet.contains(actor) =>
99101
// we have at most 2 ids: a TemporaryChannelId and a FinalChannelId
@@ -461,7 +463,7 @@ class Peer(val nodeParams: NodeParams,
461463
stopPeer()
462464
} else {
463465
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
464-
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) })
466+
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.peerStorage)
465467
}
466468

467469
case Event(Terminated(actor), d: ConnectedData) if d.channels.values.toSet.contains(actor) =>
@@ -480,7 +482,7 @@ class Peer(val nodeParams: NodeParams,
480482
log.debug(s"got new connection, killing current one and switching")
481483
d.peerConnection ! PeerConnection.Kill(KillReason.ConnectionReplaced)
482484
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
483-
gotoConnected(connectionReady, d.channels)
485+
gotoConnected(connectionReady, d.channels, d.peerStorage)
484486

485487
case Event(msg: OnionMessage, _: ConnectedData) =>
486488
OnionMessages.process(nodeParams.privateKey, msg) match {
@@ -513,6 +515,21 @@ class Peer(val nodeParams: NodeParams,
513515
d.peerConnection forward unknownMsg
514516
stay()
515517

518+
case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.channels.nonEmpty =>
519+
val timeSinceLastWrite = TimestampMilli.now() - d.peerStorage.lastWrite
520+
val peerStorage = if (timeSinceLastWrite >= nodeParams.peerStorageWriteDelayMax) {
521+
nodeParams.db.peers.updateStorage(remoteNodeId, store.blob)
522+
PeerStorage(Some(store.blob), written = true, TimestampMilli.now())
523+
} else {
524+
startSingleTimer("peer-storage-write", WritePeerStorage, nodeParams.peerStorageWriteDelayMax - timeSinceLastWrite)
525+
PeerStorage(Some(store.blob), written = false, d.peerStorage.lastWrite)
526+
}
527+
stay() using d.copy(peerStorage = peerStorage)
528+
529+
case Event(WritePeerStorage, d: ConnectedData) =>
530+
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
531+
stay() using d.copy(peerStorage = PeerStorage(d.peerStorage.data, written = true, TimestampMilli.now()))
532+
516533
case Event(unhandledMsg: LightningMessage, _) =>
517534
log.warning("ignoring message {}", unhandledMsg)
518535
stay()
@@ -744,7 +761,7 @@ class Peer(val nodeParams: NodeParams,
744761
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
745762
}
746763

747-
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef]): State = {
764+
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], peerStorage: PeerStorage): State = {
748765
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}")
749766
log.debug("got authenticated connection to address {}", connectionReady.address)
750767

@@ -754,6 +771,9 @@ class Peer(val nodeParams: NodeParams,
754771
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address)
755772
}
756773

774+
// If we have some data stored from our peer, we send it to them before doing anything else.
775+
peerStorage.data.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_))
776+
757777
// let's bring existing/requested channels online
758778
channels.values.toSet[ActorRef].foreach(_ ! INPUT_RECONNECTED(connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit)) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
759779

@@ -771,7 +791,7 @@ class Peer(val nodeParams: NodeParams,
771791
connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat))
772792
}
773793

774-
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, feerates, None)
794+
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, feerates, None, peerStorage)
775795
}
776796

777797
/**
@@ -906,12 +926,18 @@ object Peer {
906926
case class TemporaryChannelId(id: ByteVector32) extends ChannelId
907927
case class FinalChannelId(id: ByteVector32) extends ChannelId
908928

929+
case class PeerStorage(data: Option[ByteVector], written: Boolean, lastWrite: TimestampMilli)
930+
909931
sealed trait Data {
910932
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
933+
def peerStorage: PeerStorage
911934
}
912-
case object Nothing extends Data { override def channels = Map.empty }
913-
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef]) extends Data
914-
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates]) extends Data {
935+
case object Nothing extends Data {
936+
override def channels = Map.empty
937+
override def peerStorage: PeerStorage = PeerStorage(None, written = true, TimestampMilli.min)
938+
}
939+
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], peerStorage: PeerStorage) extends Data
940+
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
915941
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
916942
def localFeatures: Features[InitFeature] = localInit.features
917943
def remoteFeatures: Features[InitFeature] = remoteInit.features
@@ -1024,5 +1050,7 @@ object Peer {
10241050
case class RelayOnionMessage(messageId: ByteVector32, msg: OnionMessage, replyTo_opt: Option[typed.ActorRef[Status]])
10251051

10261052
case class RelayUnknownMessage(unknownMessage: UnknownMessage)
1053+
1054+
case object WritePeerStorage
10271055
// @formatter:on
10281056
}

0 commit comments

Comments
 (0)