Skip to content

Commit 8454406

Browse files
committed
Peer storage, but not persistent
1 parent 96d0c9a commit 8454406

File tree

6 files changed

+80
-14
lines changed

6 files changed

+80
-14
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/Features.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ object Features {
275275
val mandatory = 38
276276
}
277277

278+
case object ProvideStorage extends Feature with InitFeature with NodeFeature {
279+
val rfcName = "option_provide_storage"
280+
val mandatory = 42
281+
}
282+
278283
case object ChannelType extends Feature with InitFeature with NodeFeature {
279284
val rfcName = "option_channel_type"
280285
val mandatory = 44
@@ -358,6 +363,7 @@ object Features {
358363
DualFunding,
359364
Quiescence,
360365
OnionMessages,
366+
ProvideStorage,
361367
ChannelType,
362368
ScidAlias,
363369
PaymentMetadata,

eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
4444
import fr.acinq.eclair.router.Router
4545
import fr.acinq.eclair.wire.protocol
4646
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
47-
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
47+
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, PeerStorageRetrieval, PeerStorageStore, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
48+
import scodec.bits.ByteVector
4849

4950
/**
5051
* This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time.
@@ -84,7 +85,7 @@ class Peer(val nodeParams: NodeParams,
8485
FinalChannelId(state.channelId) -> channel
8586
}.toMap
8687
context.system.eventStream.publish(PeerCreated(self, remoteNodeId))
87-
goto(DISCONNECTED) using DisconnectedData(channels) // when we restart, we will attempt to reconnect right away, but then we'll wait
88+
goto(DISCONNECTED) using DisconnectedData(channels, None) // when we restart, we will attempt to reconnect right away, but then we'll wait
8889
}
8990

9091
when(DISCONNECTED) {
@@ -93,7 +94,7 @@ class Peer(val nodeParams: NodeParams,
9394
stay()
9495

9596
case Event(connectionReady: PeerConnection.ConnectionReady, d: DisconnectedData) =>
96-
gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) })
97+
gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) }, d.peerStorage)
9798

9899
case Event(Terminated(actor), d: DisconnectedData) if d.channels.values.toSet.contains(actor) =>
99100
// we have at most 2 ids: a TemporaryChannelId and a FinalChannelId
@@ -461,7 +462,7 @@ class Peer(val nodeParams: NodeParams,
461462
stopPeer()
462463
} else {
463464
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
464-
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) })
465+
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.peerStorage)
465466
}
466467

467468
case Event(Terminated(actor), d: ConnectedData) if d.channels.values.toSet.contains(actor) =>
@@ -480,7 +481,7 @@ class Peer(val nodeParams: NodeParams,
480481
log.debug(s"got new connection, killing current one and switching")
481482
d.peerConnection ! PeerConnection.Kill(KillReason.ConnectionReplaced)
482483
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
483-
gotoConnected(connectionReady, d.channels)
484+
gotoConnected(connectionReady, d.channels, d.peerStorage)
484485

485486
case Event(msg: OnionMessage, _: ConnectedData) =>
486487
OnionMessages.process(nodeParams.privateKey, msg) match {
@@ -513,6 +514,9 @@ class Peer(val nodeParams: NodeParams,
513514
d.peerConnection forward unknownMsg
514515
stay()
515516

517+
case Event(store: PeerStorageStore, d: ConnectedData) if nodeParams.features.hasFeature(Features.ProvideStorage) && d.channels.nonEmpty =>
518+
stay() using d.copy(peerStorage = Some(store.blob))
519+
516520
case Event(unhandledMsg: LightningMessage, _) =>
517521
log.warning("ignoring message {}", unhandledMsg)
518522
stay()
@@ -744,7 +748,7 @@ class Peer(val nodeParams: NodeParams,
744748
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
745749
}
746750

747-
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef]): State = {
751+
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], peerStorage: Option[ByteVector]): State = {
748752
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}")
749753
log.debug("got authenticated connection to address {}", connectionReady.address)
750754

@@ -754,6 +758,9 @@ class Peer(val nodeParams: NodeParams,
754758
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address)
755759
}
756760

761+
// If we have some data stored from our peer, we send it to them before doing anything else.
762+
peerStorage.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_))
763+
757764
// let's bring existing/requested channels online
758765
channels.values.toSet[ActorRef].foreach(_ ! INPUT_RECONNECTED(connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit)) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
759766

@@ -771,7 +778,7 @@ class Peer(val nodeParams: NodeParams,
771778
connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat))
772779
}
773780

774-
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, feerates, None)
781+
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, feerates, None, peerStorage)
775782
}
776783

777784
/**
@@ -908,10 +915,14 @@ object Peer {
908915

909916
sealed trait Data {
910917
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
918+
def peerStorage: Option[ByteVector]
919+
}
920+
case object Nothing extends Data {
921+
override def channels = Map.empty
922+
override def peerStorage: Option[ByteVector] = None
911923
}
912-
case object Nothing extends Data { override def channels = Map.empty }
913-
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef]) extends Data
914-
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates]) extends Data {
924+
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], peerStorage: Option[ByteVector]) extends Data
925+
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: Option[ByteVector]) extends Data {
915926
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
916927
def localFeatures: Features[InitFeature] = localInit.features
917928
def remoteFeatures: Features[InitFeature] = remoteInit.features

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import fr.acinq.eclair.wire.protocol.CommonCodecs._
2222
import fr.acinq.eclair.{Features, InitFeature, KamonExt}
2323
import scodec.bits.{BinStringSyntax, BitVector, ByteVector}
2424
import scodec.codecs._
25-
import scodec.{Attempt, Codec}
25+
import scodec.{Attempt, Codec, Err}
2626

2727
/**
2828
* Created by PM on 15/11/2016.
@@ -389,6 +389,17 @@ object LightningMessageCodecs {
389389
("onionPacket" | MessageOnionCodecs.messageOnionPacketCodec) ::
390390
("tlvStream" | OnionMessageTlv.onionMessageTlvCodec)).as[OnionMessage]
391391

392+
private def isAcceptableBlobLength(length: Int) =
393+
if (length <= 65531) Attempt.Successful(length) else Attempt.failure(Err(s"length $length is larger than 65531"))
394+
395+
val peerStorageStore: Codec[PeerStorageStore] = (
396+
("blob" | variableSizeBytes(uint16.exmap(isAcceptableBlobLength, isAcceptableBlobLength), bytes)) ::
397+
("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageStore]
398+
399+
val peerStorageRetrieval: Codec[PeerStorageRetrieval] = (
400+
("blob" | variableSizeBytes(uint16.exmap(isAcceptableBlobLength, isAcceptableBlobLength), bytes)) ::
401+
("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageRetrieval]
402+
392403
// NB: blank lines to minimize merge conflicts
393404

394405
//
@@ -476,6 +487,8 @@ object LightningMessageCodecs {
476487
val lightningMessageCodec = discriminated[LightningMessage].by(uint16)
477488
.typecase(1, warningCodec)
478489
.typecase(2, stfuCodec)
490+
.typecase(7, peerStorageStore)
491+
.typecase(9, peerStorageRetrieval)
479492
.typecase(16, initCodec)
480493
.typecase(17, errorCodec)
481494
.typecase(18, pingCodec)

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,10 @@ case class GossipTimestampFilter(chainHash: BlockHash, firstTimestamp: Timestamp
602602

603603
case class OnionMessage(blindingKey: PublicKey, onionRoutingPacket: OnionRoutingPacket, tlvStream: TlvStream[OnionMessageTlv] = TlvStream.empty) extends LightningMessage
604604

605+
case class PeerStorageStore(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends LightningMessage
606+
607+
case class PeerStorageRetrieval(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends LightningMessage
608+
605609
// NB: blank lines to minimize merge conflicts
606610

607611
//
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2021 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.wire.protocol
18+
19+
import fr.acinq.eclair.wire.protocol.CommonCodecs.varint
20+
import fr.acinq.eclair.wire.protocol.TlvCodecs.tlvStream
21+
import scodec.Codec
22+
import scodec.codecs.discriminated
23+
24+
/**
25+
* Created by thomash on July 2024.
26+
*/
27+
28+
sealed trait PeerStorageTlv extends Tlv
29+
30+
object PeerStorageTlv {
31+
val peerStorageTlvCodec: Codec[TlvStream[PeerStorageTlv]] = tlvStream(discriminated[PeerStorageTlv].by(varint))
32+
}

eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
3838
private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw)
3939

4040
private val PeerNothingData = Peer.Nothing
41-
private val PeerDisconnectedData = Peer.DisconnectedData(channels)
42-
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, recommendedFeerates, None)
41+
private val PeerDisconnectedData = Peer.DisconnectedData(channels, None)
42+
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, recommendedFeerates, None, None)
4343

4444
case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe)
4545

@@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
8282
import f._
8383

8484
val peer = TestProbe()
85-
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty)))
85+
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, None)))
8686
monitor.expectNoMessage()
8787
}
8888

0 commit comments

Comments
 (0)