Skip to content

Commit fb58d8c

Browse files
authored
Store remote features in PeersDb (#2978)
We only store information about our peers when we succeed in making an outgoing connection to them. The only information we stored was the address that we used when connecting. We now also store the features supported by our peer when we last connected to them. Once we have a channel with a peer that connected to us, we store their details in our DB. We don't store the address they're connecting from, because we don't know if we will be able to connect to them using this address, but we store their features.
1 parent 3e5929b commit fb58d8c

File tree

10 files changed

+407
-163
lines changed

10 files changed

+407
-163
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import fr.acinq.eclair.payment._
1212
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
1313
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
1414
import fr.acinq.eclair.router.Router
15-
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
16-
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
15+
import fr.acinq.eclair.wire.protocol._
16+
import fr.acinq.eclair.{CltvExpiry, Features, InitFeature, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
1717
import grizzled.slf4j.Logging
1818
import scodec.bits.ByteVector
1919

@@ -264,22 +264,27 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb {
264264

265265
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-peers").build()))
266266

267-
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress): Unit = {
268-
runAsync(secondary.addOrUpdatePeer(nodeId, address))
269-
primary.addOrUpdatePeer(nodeId, address)
267+
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = {
268+
runAsync(secondary.addOrUpdatePeer(nodeId, address, features))
269+
primary.addOrUpdatePeer(nodeId, address, features)
270+
}
271+
272+
override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = {
273+
runAsync(secondary.addOrUpdatePeerFeatures(nodeId, features))
274+
primary.addOrUpdatePeerFeatures(nodeId, features)
270275
}
271276

272277
override def removePeer(nodeId: Crypto.PublicKey): Unit = {
273278
runAsync(secondary.removePeer(nodeId))
274279
primary.removePeer(nodeId)
275280
}
276281

277-
override def getPeer(nodeId: Crypto.PublicKey): Option[NodeAddress] = {
282+
override def getPeer(nodeId: Crypto.PublicKey): Option[NodeInfo] = {
278283
runAsync(secondary.getPeer(nodeId))
279284
primary.getPeer(nodeId)
280285
}
281286

282-
override def listPeers(): Map[Crypto.PublicKey, NodeAddress] = {
287+
override def listPeers(): Map[Crypto.PublicKey, NodeInfo] = {
283288
runAsync(secondary.listPeers())
284289
primary.listPeers()
285290
}

eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,25 @@
1717
package fr.acinq.eclair.db
1818

1919
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
20-
import fr.acinq.eclair.TimestampSecond
2120
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
22-
import fr.acinq.eclair.wire.protocol.NodeAddress
21+
import fr.acinq.eclair.wire.protocol.{NodeAddress, NodeInfo}
22+
import fr.acinq.eclair.{Features, InitFeature, TimestampSecond}
2323
import scodec.bits.ByteVector
2424

25+
/** The PeersDb contains information about our direct peers, with whom we have or had channels. */
2526
trait PeersDb {
2627

27-
def addOrUpdatePeer(nodeId: PublicKey, address: NodeAddress): Unit
28+
/** Update our DB with a verified address and features for the given peer. */
29+
def addOrUpdatePeer(nodeId: PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit
30+
31+
/** Update our DB with the features for the given peer, without updating its address. */
32+
def addOrUpdatePeerFeatures(nodeId: PublicKey, features: Features[InitFeature]): Unit
2833

2934
def removePeer(nodeId: PublicKey): Unit
3035

31-
def getPeer(nodeId: PublicKey): Option[NodeAddress]
36+
def getPeer(nodeId: PublicKey): Option[NodeInfo]
3237

33-
def listPeers(): Map[PublicKey, NodeAddress]
38+
def listPeers(): Map[PublicKey, NodeInfo]
3439

3540
def addOrUpdateRelayFees(nodeId: PublicKey, fees: RelayFees): Unit
3641

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@ package fr.acinq.eclair.db.pg
1818

1919
import fr.acinq.bitcoin.scalacompat.Crypto
2020
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
21-
import fr.acinq.eclair.{MilliSatoshi, TimestampSecond}
2221
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
2322
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
2423
import fr.acinq.eclair.db.PeersDb
2524
import fr.acinq.eclair.db.pg.PgUtils.PgLock
2625
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
2726
import fr.acinq.eclair.wire.protocol._
27+
import fr.acinq.eclair.{Features, InitFeature, MilliSatoshi, TimestampSecond}
2828
import grizzled.slf4j.Logging
29-
import scodec.bits.{BitVector, ByteVector}
29+
import scodec.bits.ByteVector
3030

3131
import java.sql.Statement
3232
import javax.sql.DataSource
3333

3434
object PgPeersDb {
3535
val DB_NAME = "peers"
36-
val CURRENT_VERSION = 4
36+
val CURRENT_VERSION = 5
3737
}
3838

3939
class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logging {
@@ -59,16 +59,22 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
5959
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
6060
}
6161

62+
def migration45(statement: Statement): Unit = {
63+
statement.executeUpdate("ALTER TABLE local.peers RENAME COLUMN data TO node_address")
64+
statement.executeUpdate("ALTER TABLE local.peers ALTER COLUMN node_address DROP NOT NULL")
65+
statement.executeUpdate("ALTER TABLE local.peers ADD COLUMN init_features BYTEA")
66+
}
67+
6268
using(pg.createStatement()) { statement =>
6369
getVersion(statement, DB_NAME) match {
6470
case None =>
6571
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
66-
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
72+
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, node_address BYTEA, init_features BYTEA)")
6773
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
6874
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)")
6975

7076
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
71-
case Some(v@(1 | 2 | 3)) =>
77+
case Some(v@(1 | 2 | 3 | 4)) =>
7278
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
7379
if (v < 2) {
7480
migration12(statement)
@@ -79,25 +85,47 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
7985
if (v < 4) {
8086
migration34(statement)
8187
}
88+
if (v < 5) {
89+
migration45(statement)
90+
}
8291
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
8392
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
8493
}
8594
setVersion(statement, DB_NAME, CURRENT_VERSION)
8695
}
8796
}
8897

89-
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeaddress: NodeAddress): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
98+
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
9099
withLock { pg =>
91-
val data = CommonCodecs.nodeaddress.encode(nodeaddress).require.toByteArray
100+
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
101+
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
92102
using(pg.prepareStatement(
93103
"""
94-
| INSERT INTO local.peers (node_id, data)
95-
| VALUES (?, ?)
104+
| INSERT INTO local.peers (node_id, node_address, init_features)
105+
| VALUES (?, ?, ?)
96106
| ON CONFLICT (node_id)
97-
| DO UPDATE SET data = EXCLUDED.data ;
98-
| """.stripMargin)) { statement =>
107+
| DO UPDATE SET node_address = EXCLUDED.node_address, init_features = EXCLUDED.init_features
108+
|""".stripMargin)) { statement =>
99109
statement.setString(1, nodeId.value.toHex)
100-
statement.setBytes(2, data)
110+
statement.setBytes(2, encodedAddress)
111+
statement.setBytes(3, encodedFeatures)
112+
statement.executeUpdate()
113+
}
114+
}
115+
}
116+
117+
override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
118+
withLock { pg =>
119+
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
120+
using(pg.prepareStatement(
121+
"""
122+
| INSERT INTO local.peers (node_id, node_address, init_features)
123+
| VALUES (?, NULL, ?)
124+
| ON CONFLICT (node_id)
125+
| DO UPDATE SET init_features = EXCLUDED.init_features
126+
|""".stripMargin)) { statement =>
127+
statement.setString(1, nodeId.value.toHex)
128+
statement.setBytes(2, encodedFeatures)
101129
statement.executeUpdate()
102130
}
103131
}
@@ -126,27 +154,29 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
126154
}
127155
}
128156

129-
override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get", DbBackends.Postgres) {
157+
override def getPeer(nodeId: PublicKey): Option[NodeInfo] = withMetrics("peers/get", DbBackends.Postgres) {
130158
withLock { pg =>
131-
using(pg.prepareStatement("SELECT data FROM local.peers WHERE node_id=?")) { statement =>
159+
using(pg.prepareStatement("SELECT node_address, init_features FROM local.peers WHERE node_id=?")) { statement =>
132160
statement.setString(1, nodeId.value.toHex)
133-
statement.executeQuery()
134-
.mapCodec(CommonCodecs.nodeaddress)
135-
.headOption
161+
statement.executeQuery().map { rs =>
162+
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
163+
val nodeFeatures_opt = rs.getBitVectorOpt("init_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
164+
NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
165+
}.headOption
136166
}
137167
}
138168
}
139169

140-
override def listPeers(): Map[PublicKey, NodeAddress] = withMetrics("peers/list", DbBackends.Postgres) {
170+
override def listPeers(): Map[PublicKey, NodeInfo] = withMetrics("peers/list", DbBackends.Postgres) {
141171
withLock { pg =>
142-
using(pg.createStatement()) { statement =>
143-
statement.executeQuery("SELECT node_id, data FROM local.peers")
172+
using(pg.prepareStatement("SELECT node_id, node_address, init_features FROM local.peers")) { statement =>
173+
statement.executeQuery()
144174
.map { rs =>
145-
val nodeid = PublicKey(rs.getByteVectorFromHex("node_id"))
146-
val nodeaddress = CommonCodecs.nodeaddress.decode(BitVector(rs.getBytes("data"))).require.value
147-
nodeid -> nodeaddress
148-
}
149-
.toMap
175+
val nodeId = PublicKey(rs.getByteVectorFromHex("node_id"))
176+
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
177+
val nodeFeatures_opt = rs.getBitVectorOpt("init_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
178+
nodeId -> NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
179+
}.toMap
150180
}
151181
}
152182
}

0 commit comments

Comments
 (0)