Skip to content

Commit 8da0fc6

Browse files
committed
Add wake-up step to channel and message relay
We allow relaying data to contain a wallet `node_id` instead of an scid. When that's the case, we start by waking up that wallet node before we try relaying onion messages or payments.
1 parent 4201501 commit 8da0fc6

20 files changed

+495
-200
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
8787
blockchainWatchdogSources: Seq[String],
8888
onionMessageConfig: OnionMessageConfig,
8989
purgeInvoicesInterval: Option[FiniteDuration],
90-
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config) {
90+
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config,
91+
wakeUpTimeout: FiniteDuration) {
9192
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
9293

9394
val nodeId: PublicKey = nodeKeyManager.nodeId
@@ -610,7 +611,8 @@ object NodeParams extends Logging {
610611
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(
611612
batchSize = config.getInt("db.revoked-htlc-info-cleaner.batch-size"),
612613
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
613-
)
614+
),
615+
wakeUpTimeout = 30 seconds,
614616
)
615617
}
616618
}

eclair-core/src/main/scala/fr/acinq/eclair/io/MessageRelay.scala

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616

1717
package fr.acinq.eclair.io
1818

19-
import akka.actor.typed.Behavior
2019
import akka.actor.typed.eventstream.EventStream
2120
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
2221
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
22+
import akka.actor.typed.{Behavior, SupervisorStrategy}
2323
import akka.actor.{ActorRef, typed}
2424
import fr.acinq.bitcoin.scalacompat.ByteVector32
2525
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
@@ -32,6 +32,8 @@ import fr.acinq.eclair.router.Router
3232
import fr.acinq.eclair.wire.protocol.OnionMessage
3333
import fr.acinq.eclair.{EncodedNodeId, NodeParams, ShortChannelId}
3434

35+
import scala.concurrent.duration.DurationInt
36+
3537
object MessageRelay {
3638
// @formatter:off
3739
sealed trait Command
@@ -42,29 +44,18 @@ object MessageRelay {
4244
policy: RelayPolicy,
4345
replyTo_opt: Option[typed.ActorRef[Status]]) extends Command
4446
case class WrappedPeerInfo(peerInfo: PeerInfoResponse) extends Command
45-
case class WrappedConnectionResult(result: PeerConnection.ConnectionResult) extends Command
46-
case class WrappedOptionalNodeId(nodeId_opt: Option[PublicKey]) extends Command
47+
private case class WrappedConnectionResult(result: PeerConnection.ConnectionResult) extends Command
48+
private case class WrappedOptionalNodeId(nodeId_opt: Option[PublicKey]) extends Command
49+
private case class WrappedPeerReadyResult(result: PeerReadyNotifier.Result) extends Command
4750

48-
sealed trait Status {
49-
val messageId: ByteVector32
50-
}
51+
sealed trait Status { val messageId: ByteVector32 }
5152
case class Sent(messageId: ByteVector32) extends Status
5253
sealed trait Failure extends Status
53-
case class AgainstPolicy(messageId: ByteVector32, policy: RelayPolicy) extends Failure {
54-
override def toString: String = s"Relay prevented by policy $policy"
55-
}
56-
case class ConnectionFailure(messageId: ByteVector32, failure: PeerConnection.ConnectionResult.Failure) extends Failure {
57-
override def toString: String = s"Can't connect to peer: ${failure.toString}"
58-
}
59-
case class Disconnected(messageId: ByteVector32) extends Failure {
60-
override def toString: String = "Peer is not connected"
61-
}
62-
case class UnknownChannel(messageId: ByteVector32, channelId: ShortChannelId) extends Failure {
63-
override def toString: String = s"Unknown channel: $channelId"
64-
}
65-
case class DroppedMessage(messageId: ByteVector32, reason: DropReason) extends Failure {
66-
override def toString: String = s"Message dropped: $reason"
67-
}
54+
case class AgainstPolicy(messageId: ByteVector32, policy: RelayPolicy) extends Failure { override def toString: String = s"Relay prevented by policy $policy" }
55+
case class ConnectionFailure(messageId: ByteVector32, failure: PeerConnection.ConnectionResult.Failure) extends Failure { override def toString: String = s"Can't connect to peer: ${failure.toString}" }
56+
case class Disconnected(messageId: ByteVector32) extends Failure { override def toString: String = "Peer is not connected" }
57+
case class UnknownChannel(messageId: ByteVector32, channelId: ShortChannelId) extends Failure { override def toString: String = s"Unknown channel: $channelId" }
58+
case class DroppedMessage(messageId: ByteVector32, reason: DropReason) extends Failure { override def toString: String = s"Message dropped: $reason" }
6859

6960
sealed trait RelayPolicy
7061
case object RelayChannelsOnly extends RelayPolicy
@@ -100,15 +91,15 @@ private class MessageRelay(nodeParams: NodeParams,
10091
def queryNextNodeId(msg: OnionMessage, nextNode: Either[ShortChannelId, EncodedNodeId]): Behavior[Command] = {
10192
nextNode match {
10293
case Left(outgoingChannelId) if outgoingChannelId == ShortChannelId.toSelf =>
103-
withNextNodeId(msg, nodeParams.nodeId)
94+
withNextNodeId(msg, EncodedNodeId.WithPublicKey.Plain(nodeParams.nodeId))
10495
case Left(outgoingChannelId) =>
10596
register ! Register.GetNextNodeId(context.messageAdapter(WrappedOptionalNodeId), outgoingChannelId)
10697
waitForNextNodeId(msg, outgoingChannelId)
10798
case Right(EncodedNodeId.ShortChannelIdDir(isNode1, scid)) =>
10899
router ! Router.GetNodeId(context.messageAdapter(WrappedOptionalNodeId), scid, isNode1)
109100
waitForNextNodeId(msg, scid)
110101
case Right(encodedNodeId: EncodedNodeId.WithPublicKey) =>
111-
withNextNodeId(msg, encodedNodeId.publicKey)
102+
withNextNodeId(msg, encodedNodeId)
112103
}
113104
}
114105

@@ -118,33 +109,39 @@ private class MessageRelay(nodeParams: NodeParams,
118109
replyTo_opt.foreach(_ ! UnknownChannel(messageId, channelId))
119110
Behaviors.stopped
120111
case WrappedOptionalNodeId(Some(nextNodeId)) =>
121-
withNextNodeId(msg, nextNodeId)
112+
withNextNodeId(msg, EncodedNodeId.WithPublicKey.Plain(nextNodeId))
122113
}
123114
}
124115

125-
private def withNextNodeId(msg: OnionMessage, nextNodeId: PublicKey): Behavior[Command] = {
126-
if (nextNodeId == nodeParams.nodeId) {
127-
OnionMessages.process(nodeParams.privateKey, msg) match {
128-
case OnionMessages.DropMessage(reason) =>
129-
replyTo_opt.foreach(_ ! DroppedMessage(messageId, reason))
130-
Behaviors.stopped
131-
case OnionMessages.SendMessage(nextNode, nextMessage) =>
132-
// We need to repeat the process until we identify the (real) next node, or find out that we're the recipient.
133-
queryNextNodeId(nextMessage, nextNode)
134-
case received: OnionMessages.ReceiveMessage =>
135-
context.system.eventStream ! EventStream.Publish(received)
136-
replyTo_opt.foreach(_ ! Sent(messageId))
137-
Behaviors.stopped
138-
}
139-
} else {
140-
policy match {
141-
case RelayChannelsOnly =>
142-
switchboard ! GetPeerInfo(context.messageAdapter(WrappedPeerInfo), prevNodeId)
143-
waitForPreviousPeerForPolicyCheck(msg, nextNodeId)
144-
case RelayAll =>
145-
switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic, isPersistent = false)
146-
waitForConnection(msg)
147-
}
116+
private def withNextNodeId(msg: OnionMessage, nextNodeId: EncodedNodeId.WithPublicKey): Behavior[Command] = {
117+
nextNodeId match {
118+
case EncodedNodeId.WithPublicKey.Plain(nodeId) if nodeId == nodeParams.nodeId =>
119+
OnionMessages.process(nodeParams.privateKey, msg) match {
120+
case OnionMessages.DropMessage(reason) =>
121+
replyTo_opt.foreach(_ ! DroppedMessage(messageId, reason))
122+
Behaviors.stopped
123+
case OnionMessages.SendMessage(nextNode, nextMessage) =>
124+
// We need to repeat the process until we identify the (real) next node, or find out that we're the recipient.
125+
queryNextNodeId(nextMessage, nextNode)
126+
case received: OnionMessages.ReceiveMessage =>
127+
context.system.eventStream ! EventStream.Publish(received)
128+
replyTo_opt.foreach(_ ! Sent(messageId))
129+
Behaviors.stopped
130+
}
131+
case EncodedNodeId.WithPublicKey.Plain(nodeId) =>
132+
policy match {
133+
case RelayChannelsOnly =>
134+
switchboard ! GetPeerInfo(context.messageAdapter(WrappedPeerInfo), prevNodeId)
135+
waitForPreviousPeerForPolicyCheck(msg, nodeId)
136+
case RelayAll =>
137+
switchboard ! Peer.Connect(nodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic, isPersistent = false)
138+
waitForConnection(msg)
139+
}
140+
case EncodedNodeId.WithPublicKey.Wallet(nodeId) =>
141+
context.log.info("trying to wake up next peer to relay onion message (nodeId={})", nodeId)
142+
val notifier = context.spawnAnonymous(Behaviors.supervise(PeerReadyNotifier(nodeId, timeout_opt = Some(Left(nodeParams.wakeUpTimeout)))).onFailure(SupervisorStrategy.stop))
143+
notifier ! PeerReadyNotifier.NotifyWhenPeerReady(context.messageAdapter(WrappedPeerReadyResult))
144+
waitForWalletNodeUp(msg)
148145
}
149146
}
150147

@@ -180,4 +177,15 @@ private class MessageRelay(nodeParams: NodeParams,
180177
Behaviors.stopped
181178
}
182179
}
180+
181+
private def waitForWalletNodeUp(msg: OnionMessage): Behavior[Command] = {
182+
Behaviors.receiveMessagePartial {
183+
case WrappedPeerReadyResult(r: PeerReadyNotifier.PeerReady) =>
184+
r.peer ! Peer.RelayOnionMessage(messageId, msg, replyTo_opt)
185+
Behaviors.stopped
186+
case WrappedPeerReadyResult(_: PeerReadyNotifier.PeerUnavailable) =>
187+
replyTo_opt.foreach(_ ! Disconnected(messageId))
188+
Behaviors.stopped
189+
}
190+
}
183191
}

eclair-core/src/main/scala/fr/acinq/eclair/io/PeerReadyNotifier.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ object PeerReadyNotifier {
8383
case WrappedListing(Switchboard.SwitchboardServiceKey.Listing(listings)) =>
8484
listings.headOption match {
8585
case Some(switchboard) =>
86-
waitForPeerConnected(replyTo, remoteNodeId, switchboard, context, timers)
86+
waitForPeerConnected(replyTo, remoteNodeId, switchboard, context, timers)
8787
case None =>
8888
context.log.error("no switchboard found")
8989
replyTo ! PeerUnavailable(remoteNodeId)

eclair-core/src/main/scala/fr/acinq/eclair/payment/Monitoring.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ object Monitoring {
115115
val Failure = "failure"
116116

117117
object FailureType {
118+
val WakeUp = "WakeUp"
118119
val Remote = "Remote"
119120
val Malformed = "MalformedHtlc"
120121

eclair-core/src/main/scala/fr/acinq/eclair/payment/PaymentPacket.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ object IncomingPaymentPacket {
128128
decryptEncryptedRecipientData(add, privateKey, payload, encrypted.data).flatMap {
129129
case DecodedEncryptedRecipientData(blindedPayload, nextBlinding) =>
130130
validateBlindedChannelRelayPayload(add, payload, blindedPayload, nextBlinding, nextPacket).flatMap {
131-
case ChannelRelayPacket(_, payload, nextPacket) if payload.outgoingChannelId == ShortChannelId.toSelf =>
131+
case ChannelRelayPacket(_, payload, nextPacket) if payload.outgoing == Right(ShortChannelId.toSelf) =>
132132
decrypt(add.copy(onionRoutingPacket = nextPacket, tlvStream = add.tlvStream.copy(records = Set(UpdateAddHtlcTlv.BlindingPoint(nextBlinding)))), privateKey, features)
133133
case relayPacket => Right(relayPacket)
134134
}

0 commit comments

Comments
 (0)