Skip to content

Commit db27c71

Browse files
committed
Peer storage, but not persistent
1 parent 5e9d8c3 commit db27c71

File tree

6 files changed

+81
-13
lines changed

6 files changed

+81
-13
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/Features.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ object Features {
275275
val mandatory = 38
276276
}
277277

278+
case object ProvideStorage extends Feature with InitFeature with NodeFeature {
279+
val rfcName = "option_provide_storage"
280+
val mandatory = 42
281+
}
282+
278283
case object ChannelType extends Feature with InitFeature with NodeFeature {
279284
val rfcName = "option_channel_type"
280285
val mandatory = 44
@@ -358,6 +363,7 @@ object Features {
358363
DualFunding,
359364
Quiescence,
360365
OnionMessages,
366+
ProvideStorage,
361367
ChannelType,
362368
ScidAlias,
363369
PaymentMetadata,

eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ import fr.acinq.eclair.router.Router
4545
import fr.acinq.eclair.wire.protocol
4646
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
4747
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, RoutingMessage, SpliceInit, TlvStream, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
48+
import fr.acinq.eclair.wire.protocol.LiquidityAds.PaymentDetails
49+
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, PeerStorageRetrieval, PeerStorageStore, RoutingMessage, SpliceInit, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
50+
import scodec.bits.ByteVector
4851

4952
/**
5053
* This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time.
@@ -84,7 +87,7 @@ class Peer(val nodeParams: NodeParams,
8487
FinalChannelId(state.channelId) -> channel
8588
}.toMap
8689
context.system.eventStream.publish(PeerCreated(self, remoteNodeId))
87-
goto(DISCONNECTED) using DisconnectedData(channels) // when we restart, we will attempt to reconnect right away, but then we'll wait
90+
goto(DISCONNECTED) using DisconnectedData(channels, None) // when we restart, we will attempt to reconnect right away, but then we'll wait
8891
}
8992

9093
when(DISCONNECTED) {
@@ -93,7 +96,7 @@ class Peer(val nodeParams: NodeParams,
9396
stay()
9497

9598
case Event(connectionReady: PeerConnection.ConnectionReady, d: DisconnectedData) =>
96-
gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) })
99+
gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) }, d.peerStorage)
97100

98101
case Event(Terminated(actor), d: DisconnectedData) if d.channels.values.toSet.contains(actor) =>
99102
// we have at most 2 ids: a TemporaryChannelId and a FinalChannelId
@@ -454,7 +457,7 @@ class Peer(val nodeParams: NodeParams,
454457
stopPeer()
455458
} else {
456459
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
457-
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) })
460+
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.peerStorage)
458461
}
459462

460463
case Event(Terminated(actor), d: ConnectedData) if d.channels.values.toSet.contains(actor) =>
@@ -473,7 +476,7 @@ class Peer(val nodeParams: NodeParams,
473476
log.debug(s"got new connection, killing current one and switching")
474477
d.peerConnection ! PeerConnection.Kill(KillReason.ConnectionReplaced)
475478
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
476-
gotoConnected(connectionReady, d.channels)
479+
gotoConnected(connectionReady, d.channels, d.peerStorage)
477480

478481
case Event(msg: OnionMessage, _: ConnectedData) =>
479482
OnionMessages.process(nodeParams.privateKey, msg) match {
@@ -506,6 +509,9 @@ class Peer(val nodeParams: NodeParams,
506509
d.peerConnection forward unknownMsg
507510
stay()
508511

512+
case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.channels.nonEmpty =>
513+
stay() using d.copy(peerStorage = Some(store.blob))
514+
509515
case Event(unhandledMsg: LightningMessage, _) =>
510516
log.warning("ignoring message {}", unhandledMsg)
511517
stay()
@@ -716,7 +722,7 @@ class Peer(val nodeParams: NodeParams,
716722
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
717723
}
718724

719-
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef]): State = {
725+
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], peerStorage: Option[ByteVector]): State = {
720726
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}")
721727
log.debug("got authenticated connection to address {}", connectionReady.address)
722728

@@ -726,6 +732,9 @@ class Peer(val nodeParams: NodeParams,
726732
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address)
727733
}
728734

735+
// If we have some data stored from our peer, we send it to them before doing anything else.
736+
peerStorage.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_))
737+
729738
// let's bring existing/requested channels online
730739
channels.values.toSet[ActorRef].foreach(_ ! INPUT_RECONNECTED(connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit)) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
731740

@@ -742,7 +751,7 @@ class Peer(val nodeParams: NodeParams,
742751
connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat))
743752
}
744753

745-
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels)
754+
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, peerStorage)
746755
}
747756

748757
/**
@@ -879,10 +888,14 @@ object Peer {
879888

880889
sealed trait Data {
881890
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
891+
def peerStorage: Option[ByteVector]
892+
}
893+
case object Nothing extends Data {
894+
override def channels = Map.empty
895+
override def peerStorage: Option[ByteVector] = None
882896
}
883-
case object Nothing extends Data { override def channels = Map.empty }
884-
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef]) extends Data
885-
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef]) extends Data {
897+
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], peerStorage: Option[ByteVector]) extends Data
898+
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], peerStorage: Option[ByteVector]) extends Data {
886899
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
887900
def localFeatures: Features[InitFeature] = localInit.features
888901
def remoteFeatures: Features[InitFeature] = remoteInit.features

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import fr.acinq.eclair.wire.protocol.CommonCodecs._
2222
import fr.acinq.eclair.{Features, InitFeature, KamonExt}
2323
import scodec.bits.{BinStringSyntax, BitVector, ByteVector}
2424
import scodec.codecs._
25-
import scodec.{Attempt, Codec}
25+
import scodec.{Attempt, Codec, Err}
2626

2727
/**
2828
* Created by PM on 15/11/2016.
@@ -389,6 +389,17 @@ object LightningMessageCodecs {
389389
("onionPacket" | MessageOnionCodecs.messageOnionPacketCodec) ::
390390
("tlvStream" | OnionMessageTlv.onionMessageTlvCodec)).as[OnionMessage]
391391

392+
private def isAcceptableBlobLength(length: Int) =
393+
if (length <= 65531) Attempt.Successful(length) else Attempt.failure(Err(s"length $length is larger than 65531"))
394+
395+
val peerStorageStore: Codec[PeerStorageStore] = (
396+
("blob" | variableSizeBytes(uint16.exmap(isAcceptableBlobLength, isAcceptableBlobLength), bytes)) ::
397+
("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageStore]
398+
399+
val peerStorageRetrieval: Codec[PeerStorageRetrieval] = (
400+
("blob" | variableSizeBytes(uint16.exmap(isAcceptableBlobLength, isAcceptableBlobLength), bytes)) ::
401+
("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageRetrieval]
402+
392403
// NB: blank lines to minimize merge conflicts
393404

394405
//
@@ -476,6 +487,8 @@ object LightningMessageCodecs {
476487
val lightningMessageCodec = discriminated[LightningMessage].by(uint16)
477488
.typecase(1, warningCodec)
478489
.typecase(2, stfuCodec)
490+
.typecase(7, peerStorageStore)
491+
.typecase(9, peerStorageRetrieval)
479492
.typecase(16, initCodec)
480493
.typecase(17, errorCodec)
481494
.typecase(18, pingCodec)

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,10 @@ case class GossipTimestampFilter(chainHash: BlockHash, firstTimestamp: Timestamp
600600

601601
case class OnionMessage(blindingKey: PublicKey, onionRoutingPacket: OnionRoutingPacket, tlvStream: TlvStream[OnionMessageTlv] = TlvStream.empty) extends LightningMessage
602602

603+
case class PeerStorageStore(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends LightningMessage
604+
605+
case class PeerStorageRetrieval(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends LightningMessage
606+
603607
// NB: blank lines to minimize merge conflicts
604608

605609
//
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2021 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.wire.protocol
18+
19+
import fr.acinq.eclair.wire.protocol.CommonCodecs.varint
20+
import fr.acinq.eclair.wire.protocol.TlvCodecs.tlvStream
21+
import scodec.Codec
22+
import scodec.codecs.discriminated
23+
24+
/**
25+
* Created by thomash on July 2024.
26+
*/
27+
28+
sealed trait PeerStorageTlv extends Tlv
29+
30+
object PeerStorageTlv {
31+
val peerStorageTlvCodec: Codec[TlvStream[PeerStorageTlv]] = tlvStream(discriminated[PeerStorageTlv].by(varint))
32+
}

eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
3737
private val channels = Map(Peer.FinalChannelId(randomBytes32()) -> system.deadLetters)
3838

3939
private val PeerNothingData = Peer.Nothing
40-
private val PeerDisconnectedData = Peer.DisconnectedData(channels)
41-
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) })
40+
private val PeerDisconnectedData = Peer.DisconnectedData(channels, None)
41+
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, None)
4242

4343
case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe)
4444

@@ -81,7 +81,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
8181
import f._
8282

8383
val peer = TestProbe()
84-
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty)))
84+
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, None)))
8585
monitor.expectNoMessage()
8686
}
8787

0 commit comments

Comments
 (0)