diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index a3f03628ac..c482fd4b58 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -23,6 +23,33 @@ See https://github.com/lightning/bolts/pull/1044 for more details. Support is disabled by default as the spec is not yet final. It can be enabled by setting `eclair.features.option_attributable_failure = optional` at the risk of being incompatible with the final spec. +### Local reputation and HTLC endorsement + +To protect against jamming attacks, eclair gives a reputation to its neighbors and uses it to decide if a HTLC should be relayed given how congested the outgoing channel is. +The reputation is basically how much this node paid us in fees divided by how much they should have paid us for the liquidity and slots that they blocked. +The reputation is per incoming node and endorsement level. +The confidence that the HTLC will be fulfilled is transmitted to the next node using the endorsement TLV of the `update_add_htlc` message. +Note that HTLCs that are considered dangerous are still relayed: this is the first phase of a network-wide experimentation aimed at collecting data. + +To configure, edit `eclair.conf`: + +```eclair.conf +// We assign reputations to our peers to prioritize payments during congestion. +// The reputation is computed as fees paid divided by what should have been paid if all payments were successful. +eclair.relay.peer-reputation { + // Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement + // value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md, + enabled = true + // Reputation decays with the following half life to emphasize recent behavior. + half-life = 7 days + // Payments that stay pending for longer than this get penalized + max-relay-duration = 12 seconds + // Pending payments are counted as failed, and because they could potentially stay pending for a very long time, + // the following multiplier is applied. + pending-multiplier = 1000 // A pending payment counts as a thousand failed ones. +} +``` + ### API changes - `listoffers` now returns more details about each offer. diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 5fe58f38db..c24b470a4b 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -251,6 +251,23 @@ eclair { // Number of blocks before the incoming HTLC expires that an async payment must be triggered by the receiver cancel-safety-before-timeout-blocks = 144 } + + // We assign reputation to our peers to prioritize payments during congestion. + // The reputation is computed as fees paid divided by what should have been paid if all payments were successful. + peer-reputation { + // Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement + // value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md, + enabled = true + // Reputation decays with the following half life to emphasize recent behavior. + half-life = 15 days + // Payments that stay pending for longer than this get penalized. + max-relay-duration = 12 seconds + // Pending payments are counted as failed, and because they could potentially stay pending for a very long time, + // the following multiplier is applied. We want it to be as close as possible to the true cost of a worst case + // HTLC (max-cltv-delta / max-relay-duration, around 100000 with default parameters) while still being comparable + // to the number of HTLCs received per peer during twice the half life. + pending-multiplier = 200 // A pending payment counts as two hundred failed ones. + } } on-chain-fees { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index ed275fc070..437e3841b6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -33,6 +33,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.offer.OffersConfig import fr.acinq.eclair.payment.relay.OnTheFlyFunding import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} +import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Announcements.AddressException import fr.acinq.eclair.router.Graph.{HeuristicsConstants, PaymentWeightRatios} import fr.acinq.eclair.router.Router._ @@ -640,7 +641,13 @@ object NodeParams extends Logging { privateChannelFees = getRelayFees(config.getConfig("relay.fees.private-channels")), minTrampolineFees = getRelayFees(config.getConfig("relay.fees.min-trampoline")), enforcementDelay = FiniteDuration(config.getDuration("relay.fees.enforcement-delay").getSeconds, TimeUnit.SECONDS), - asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks) + asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks), + peerReputationConfig = Reputation.Config( + enabled = config.getBoolean("relay.peer-reputation.enabled"), + halfLife = FiniteDuration(config.getDuration("relay.peer-reputation.half-life").getSeconds, TimeUnit.SECONDS), + maxRelayDuration = FiniteDuration(config.getDuration("relay.peer-reputation.max-relay-duration").getSeconds, TimeUnit.SECONDS), + pendingMultiplier = config.getDouble("relay.peer-reputation.pending-multiplier"), + ), ), db = database, autoReconnect = config.getBoolean("auto-reconnect"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 2e4f578121..3646b53d4c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -45,6 +45,7 @@ import fr.acinq.eclair.payment.offer.{DefaultOfferHandler, OfferManager} import fr.acinq.eclair.payment.receive.PaymentHandler import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer} import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator} +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router._ import fr.acinq.eclair.tor.{Controller, TorProtocolHandler} import fr.acinq.eclair.wire.protocol.NodeAddress @@ -379,7 +380,12 @@ class Setup(val datadir: File, paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume)) triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer") peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager") - relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume)) + reputationRecorder_opt = if (nodeParams.relayParams.peerReputationConfig.enabled) { + Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")) + } else { + None + } + relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, reputationRecorder_opt, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume)) _ = relayer ! PostRestartHtlcCleaner.Init(channels) // Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system, // we want to make sure the handler for post-restart broken HTLCs has finished initializing. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala index 3b08caa8a5..76225cfe9d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala @@ -26,7 +26,7 @@ import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningS import fr.acinq.eclair.io.Peer import fr.acinq.eclair.transactions.CommitmentSpec import fr.acinq.eclair.transactions.Transactions._ -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureReason, FundingCreated, FundingSigned, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxInitRbf, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureReason, FundingCreated, FundingSigned, HtlcFailureMessage, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxInitRbf, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc} import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, TimestampMilli, UInt64} import scodec.bits.ByteVector @@ -247,6 +247,10 @@ final case class CMD_GET_CHANNEL_STATE(replyTo: ActorRef) extends HasReplyToComm final case class CMD_GET_CHANNEL_DATA(replyTo: ActorRef) extends HasReplyToCommand final case class CMD_GET_CHANNEL_INFO(replyTo: akka.actor.typed.ActorRef[RES_GET_CHANNEL_INFO]) extends Command +case class OutgoingHtlcAdded(add: UpdateAddHtlc, upstream: Upstream.Hot, fee: MilliSatoshi) +case class OutgoingHtlcFailed(fail: HtlcFailureMessage) +case class OutgoingHtlcFulfilled(fulfill: UpdateFulfillHtlc) + /* 88888888b. 8888888888 .d8888b. 88888888b. ,ad8888ba, 888b 88 .d8888b. 8888888888 .d8888b. 88 "8b 88 d88P Y88b 88 "8b d8"' `"8b 8888b 88 d88P Y88b 88 d88P Y88b diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala index a51a32a2aa..c85adde16a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala @@ -34,6 +34,7 @@ case class RemoteError(e: protocol.Error) extends ChannelError // @formatter:on class ChannelException(val channelId: ByteVector32, message: String) extends RuntimeException(message) +class ChannelJammingException(override val channelId: ByteVector32, message: String) extends ChannelException(channelId, message) // @formatter:off case class InvalidChainHash (override val channelId: ByteVector32, local: BlockHash, remote: BlockHash) extends ChannelException(channelId, s"invalid chainHash (local=$local remote=$remote)") @@ -150,4 +151,6 @@ case class CommandUnavailableInThisState (override val channelId: Byte case class ForbiddenDuringSplice (override val channelId: ByteVector32, command: String) extends ChannelException(channelId, s"cannot process $command while splicing") case class ForbiddenDuringQuiescence (override val channelId: ByteVector32, command: String) extends ChannelException(channelId, s"cannot process $command while quiescent") case class ConcurrentRemoteSplice (override val channelId: ByteVector32) extends ChannelException(channelId, "splice attempt canceled, remote initiated splice before us") +case class TooManySmallHtlcs (override val channelId: ByteVector32, number: Long, below: MilliSatoshi) extends ChannelJammingException(channelId, s"too many small htlcs: $number HTLCs below $below") +case class ConfidenceTooLow (override val channelId: ByteVector32, confidence: Double, occupancy: Double) extends ChannelJammingException(channelId, s"confidence too low: confidence=$confidence occupancy=$occupancy") // @formatter:on \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index 5a62648009..905ec360c7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -448,7 +448,7 @@ case class Commitment(fundingTxIndex: Long, localCommit.spec.htlcs.collect(DirectedHtlc.incoming).filter(nearlyExpired) } - def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Unit] = { + def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf, confidence: Double): Either[ChannelException, Unit] = { // we allowed mismatches between our feerates and our remote's as long as commitments didn't contain any HTLC at risk // we need to verify that we're not disagreeing on feerates anymore before offering new HTLCs // NB: there may be a pending update_fee that hasn't been applied yet that needs to be taken into account @@ -507,7 +507,8 @@ case class Commitment(fundingTxIndex: Long, if (allowedHtlcValueInFlight < htlcValueInFlight) { return Left(HtlcValueTooHighInFlight(params.channelId, maximum = allowedHtlcValueInFlight, actual = htlcValueInFlight)) } - if (Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min < outgoingHtlcs.size) { + val maxAcceptedHtlcs = params.localParams.maxAcceptedHtlcs.min(params.remoteParams.maxAcceptedHtlcs) + if (maxAcceptedHtlcs < outgoingHtlcs.size) { return Left(TooManyAcceptedHtlcs(params.channelId, maximum = Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min)) } @@ -524,6 +525,18 @@ case class Commitment(fundingTxIndex: Long, return Left(RemoteDustHtlcExposureTooHigh(params.channelId, maxDustExposure, remoteDustExposureAfterAdd)) } + // Jamming protection + // Must be the last checks so that they can be ignored for shadow deployment. + for ((amountMsat, i) <- outgoingHtlcs.toSeq.map(_.amountMsat).sorted.zipWithIndex) { + if ((amountMsat.toLong < 1) || (math.log(amountMsat.toLong.toDouble) * maxAcceptedHtlcs / math.log(params.localParams.maxHtlcValueInFlightMsat.toLong.toDouble / maxAcceptedHtlcs) < i)) { + return Left(TooManySmallHtlcs(params.channelId, number = i + 1, below = amountMsat)) + } + } + val occupancy = (outgoingHtlcs.size.toDouble / maxAcceptedHtlcs).max(htlcValueInFlight.toLong.toDouble / allowedHtlcValueInFlight.toLong.toDouble) + if (confidence + 0.05 < occupancy) { + return Left(ConfidenceTooLow(params.channelId, confidence, occupancy)) + } + Right(()) } @@ -849,7 +862,7 @@ case class Commitments(params: ChannelParams, * @param cmd add HTLC command * @return either Left(failure, error message) where failure is a failure message (see BOLT #4 and the Failure Message class) or Right(new commitments, updateAddHtlc) */ - def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, (Commitments, UpdateAddHtlc)] = { + def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf)(implicit log: LoggingAdapter): Either[ChannelException, (Commitments, UpdateAddHtlc)] = { // we must ensure we're not relaying htlcs that are already expired, otherwise the downstream channel will instantly close // NB: we add a 3 blocks safety to reduce the probability of running into this when our bitcoin node is slightly outdated val minExpiry = CltvExpiry(currentHeight + 3) @@ -873,9 +886,23 @@ case class Commitments(params: ChannelParams, val changes1 = changes.addLocalProposal(add).copy(localNextHtlcId = changes.localNextHtlcId + 1) val originChannels1 = originChannels + (add.id -> cmd.origin) // we verify that this htlc is allowed in every active commitment - active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf)) - .collectFirst { case Left(f) => Left(f) } - .getOrElse(Right(copy(changes = changes1, originChannels = originChannels1), add)) + val canSendAdds = active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf, cmd.confidence)) + val result = canSendAdds.collectFirst { case Left(f) if !f.isInstanceOf[ChannelJammingException] => // We ignore jamming protection. TODO: enable jamming protection + Metrics.dropHtlc(f, Tags.Directions.Outgoing) + Left(f) + }.getOrElse(Right(copy(changes = changes1, originChannels = originChannels1), add)) + // Jamming protection is disabled but we still log which HTLCs would be dropped if it was enabled. + if (result.isRight) { + canSendAdds.collectFirst { + case Left(f: TooManySmallHtlcs) => + log.info("TooManySmallHtlcs: {} outgoing HTLCs are below {}}", f.number, f.below) + Metrics.dropHtlc(f, Tags.Directions.Outgoing) + case Left(f: ConfidenceTooLow) => + log.info("ConfidenceTooLow: confidence is {}% while channel is {}% full", (100 * f.confidence).toInt, (100 * f.occupancy).toInt) + Metrics.dropHtlc(f, Tags.Directions.Outgoing) + } + } + result } def receiveAdd(add: UpdateAddHtlc, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Commitments] = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Monitoring.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Monitoring.scala index 748751968c..648a0ce927 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Monitoring.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Monitoring.scala @@ -35,6 +35,7 @@ object Monitoring { val RemoteFeeratePerByte = Kamon.histogram("channels.remote-feerate-per-byte") val Splices = Kamon.histogram("channels.splices", "Splices") val ProcessMessage = Kamon.timer("channels.messages-processed") + val HtlcDropped = Kamon.counter("channels.htlc-dropped") def recordHtlcsInFlight(remoteSpec: CommitmentSpec, previousRemoteSpec: CommitmentSpec): Unit = { for (direction <- Tags.Directions.Incoming :: Tags.Directions.Outgoing :: Nil) { @@ -75,6 +76,10 @@ object Monitoring { Metrics.Splices.withTag(Tags.Origin, Tags.Origins.Remote).withTag(Tags.SpliceType, Tags.SpliceTypes.SpliceCpfp).record(Math.abs(fundingParams.remoteContribution.toLong)) } } + + def dropHtlc(reason: ChannelException, direction: String): Unit = { + HtlcDropped.withTag(Tags.Reason, reason.getClass.getSimpleName).withTag(Tags.Direction, direction).increment() + } } object Tags { @@ -85,6 +90,7 @@ object Monitoring { val State = "state" val CommitmentFormat = "commitment-format" val SpliceType = "splice-type" + val Reason = "reason" object Events { val Created = "created" diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index a6cfa0f9fe..7f3a06dc21 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -465,6 +465,7 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall case Right((commitments1, add)) => if (c.commit) self ! CMD_SIGN() context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.aliases, commitments1, d.lastAnnouncement_opt)) + context.system.eventStream.publish(OutgoingHtlcAdded(add, c.origin.upstream, nodeFee(d.channelUpdate.relayFees, add.amountMsat))) handleCommandSuccess(c, d.copy(commitments = commitments1)) sending add case Left(cause) => handleAddHtlcCommandError(c, cause, Some(d.channelUpdate)) } @@ -491,6 +492,7 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall case Right((commitments1, origin, htlc)) => // we forward preimages as soon as possible to the upstream channel because it allows us to pull funds relayer ! RES_ADD_SETTLED(origin, htlc, HtlcResult.RemoteFulfill(fulfill)) + context.system.eventStream.publish(OutgoingHtlcFulfilled(fulfill)) stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fulfill)) } @@ -524,12 +526,14 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall } case Event(fail: UpdateFailHtlc, d: DATA_NORMAL) => + context.system.eventStream.publish(OutgoingHtlcFailed(fail)) d.commitments.receiveFail(fail) match { case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fail)) } case Event(fail: UpdateFailMalformedHtlc, d: DATA_NORMAL) => + context.system.eventStream.publish(OutgoingHtlcFailed(fail)) d.commitments.receiveFailMalformed(fail) match { case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fail)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index ad6290b34e..7488b0c015 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -17,6 +17,7 @@ package fr.acinq.eclair.payment.relay import akka.actor.ActorRef +import akka.actor.typed import akka.actor.typed.Behavior import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.adapter.TypedActorRefOps @@ -31,6 +32,8 @@ import fr.acinq.eclair.io.{Peer, PeerReadyNotifier} import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags} import fr.acinq.eclair.payment.relay.Relayer.{OutgoingChannel, OutgoingChannelParams} import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket} +import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.reputation.ReputationRecorder.GetConfidence import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload import fr.acinq.eclair.wire.protocol._ @@ -47,6 +50,7 @@ object ChannelRelay { sealed trait Command private case object DoRelay extends Command private case class WrappedPeerReadyResult(result: PeerReadyNotifier.Result) extends Command + private case class WrappedConfidence(confidence: Double) extends Command private case class WrappedForwardFailure(failure: Register.ForwardFailure[CMD_ADD_HTLC]) extends Command private case class WrappedAddResponse(res: CommandResponse[CMD_ADD_HTLC]) extends Command private case class WrappedOnTheFlyFundingResponse(result: Peer.ProposeOnTheFlyFundingResponse) extends Command @@ -61,6 +65,7 @@ object ChannelRelay { def apply(nodeParams: NodeParams, register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[GetConfidence]], channels: Map[ByteVector32, Relayer.OutgoingChannel], originNode: PublicKey, relayId: UUID, @@ -72,8 +77,17 @@ object ChannelRelay { paymentHash_opt = Some(r.add.paymentHash), nodeAlias_opt = Some(nodeParams.alias))) { val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), r.receivedAt, originNode) - val confidence = (r.add.endorsement + 0.5) / 8 - new ChannelRelay(nodeParams, register, channels, r, upstream, confidence, context).start() + reputationRecorder_opt match { + case Some(reputationRecorder) => + reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), upstream, r.relayFeeMsat) + case None => + val confidence = (r.add.endorsement + 0.5) / 8 + context.self ! WrappedConfidence(confidence) + } + Behaviors.receiveMessagePartial { + case WrappedConfidence(confidence) => + new ChannelRelay(nodeParams, register, channels, r, upstream, confidence, context).start() + } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala index 53b94e4ae3..bf405faa94 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala @@ -16,14 +16,15 @@ package fr.acinq.eclair.payment.relay -import akka.actor.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.Behaviors +import akka.actor.{ActorRef, typed} import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel._ import fr.acinq.eclair.payment.IncomingPaymentPacket +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.{Logs, NodeParams, ShortChannelId, SubscriptionsComplete} import java.util.UUID @@ -58,6 +59,7 @@ object ChannelRelayer { def apply(nodeParams: NodeParams, register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.GetConfidence]], channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty, scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty, node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] = @@ -82,7 +84,7 @@ object ChannelRelayer { case None => Map.empty } context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), nextChannels.keys.mkString(",")) - context.spawn(ChannelRelay.apply(nodeParams, register, nextChannels, originNode, relayId, channelRelayPacket), name = relayId.toString) + context.spawn(ChannelRelay.apply(nodeParams, register, reputationRecorder_opt, nextChannels, originNode, relayId, channelRelayPacket), name = relayId.toString) Behaviors.same case GetOutgoingChannels(replyTo, Relayer.GetOutgoingChannels(enabledOnly)) => @@ -103,14 +105,14 @@ object ChannelRelayer { context.log.debug("adding mappings={} to channelId={}", mappings.keys.mkString(","), channelId) val scid2channels1 = scid2channels ++ mappings val node2channels1 = node2channels.addOne(remoteNodeId, channelId) - apply(nodeParams, register, channels1, scid2channels1, node2channels1) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2channels1, node2channels1) case WrappedLocalChannelDown(LocalChannelDown(_, channelId, realScids, aliases, remoteNodeId)) => context.log.debug("removed local channel info for channelId={} localAlias={}", channelId, aliases.localAlias) val channels1 = channels - channelId val scid2Channels1 = scid2channels - aliases.localAlias -- realScids val node2channels1 = node2channels.subtractOne(remoteNodeId, channelId) - apply(nodeParams, register, channels1, scid2Channels1, node2channels1) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2Channels1, node2channels1) case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, aliases, commitments, _)) => val channels1 = channels.get(channelId) match { @@ -119,7 +121,7 @@ object ChannelRelayer { channels + (channelId -> c.copy(commitments = commitments)) case None => channels // we only consider the balance if we have the channel_update } - apply(nodeParams, register, channels1, scid2channels, node2channels) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2channels, node2channels) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala index f238d907f0..ce1881ec80 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala @@ -38,6 +38,9 @@ import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode import fr.acinq.eclair.payment.send._ import fr.acinq.eclair.router.Router.{ChannelHop, HopRelayParams, Route, RouteParams} +import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.reputation.ReputationRecorder.GetTrampolineConfidence +import fr.acinq.eclair.router.Router.RouteParams import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound} import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload import fr.acinq.eclair.wire.protocol._ @@ -67,6 +70,7 @@ object NodeRelay { private case class WrappedResolvedPaths(resolved: Seq[ResolvedPath]) extends Command private case class WrappedPeerInfo(remoteFeatures_opt: Option[Features[InitFeature]]) extends Command private case class WrappedOnTheFlyFundingResponse(result: Peer.ProposeOnTheFlyFundingResponse) extends Command + private case class WrappedConfidence(confidence: Double) extends Command // @formatter:on trait OutgoingPaymentFactory { @@ -88,6 +92,7 @@ object NodeRelay { def apply(nodeParams: NodeParams, parent: typed.ActorRef[NodeRelayer.Command], register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]], relayId: UUID, nodeRelayPacket: NodeRelayPacket, outgoingPaymentFactory: OutgoingPaymentFactory, @@ -111,7 +116,7 @@ object NodeRelay { case _: IncomingPaymentPacket.RelayToNonTrampolinePacket => None case _: IncomingPaymentPacket.RelayToBlindedPathsPacket => None } - new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, router) + new NodeRelay(nodeParams, parent, register, reputationRecorder_opt, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, router) .receiving(Queue.empty, nodeRelayPacket.innerPayload, nextPacket_opt, incomingPaymentHandler) } } @@ -200,6 +205,7 @@ object NodeRelay { class NodeRelay private(nodeParams: NodeParams, parent: akka.actor.typed.ActorRef[NodeRelayer.Command], register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]], relayId: UUID, paymentHash: ByteVector32, paymentSecret: ByteVector32, @@ -335,26 +341,35 @@ class NodeRelay private(nodeParams: NodeParams, val amountOut = outgoingAmount(upstream, payloadOut) val expiryOut = outgoingExpiry(upstream, payloadOut) context.log.debug("relaying trampoline payment (amountIn={} expiryIn={} amountOut={} expiryOut={} isWallet={})", upstream.amountIn, upstream.expiryIn, amountOut, expiryOut, walletNodeId_opt.isDefined) - val confidence = (upstream.received.map(_.add.endorsement).min + 0.5) / 8 // We only make one try when it's a direct payment to a wallet. val maxPaymentAttempts = if (walletNodeId_opt.isDefined) 1 else nodeParams.maxPaymentAttempts - val paymentCfg = SendPaymentConfig(relayId, relayId, None, paymentHash, recipient.nodeId, upstream, None, None, storeInDb = false, publishEvent = false, recordPathFindingMetrics = true, confidence) - val routeParams = computeRouteParams(nodeParams, upstream.amountIn, upstream.expiryIn, amountOut, expiryOut) - // If the next node is using trampoline, we assume that they support MPP. - val useMultiPart = recipient.features.hasFeature(Features.BasicMultiPartPayment) || packetOut_opt.nonEmpty - val payFsmAdapters = { - context.messageAdapter[PreimageReceived](WrappedPreimageReceived) - context.messageAdapter[PaymentSent](WrappedPaymentSent) - context.messageAdapter[PaymentFailed](WrappedPaymentFailed) - }.toClassic - val payment = if (useMultiPart) { - SendMultiPartPayment(payFsmAdapters, recipient, maxPaymentAttempts, routeParams) - } else { - SendPaymentToNode(payFsmAdapters, recipient, maxPaymentAttempts, routeParams) + val totalFee = upstream.amountIn - payloadOut.outgoingAmount(upstream.amountIn) + reputationRecorder_opt match { + case Some(reputationRecorder) => reputationRecorder ! GetTrampolineConfidence(context.messageAdapter(confidence => WrappedConfidence(confidence.value)), upstream, totalFee) + case None => context.self ! WrappedConfidence((upstream.received.map(_.add.endorsement).min + 0.5) / 8) + } + Behaviors.receiveMessagePartial { + rejectExtraHtlcPartialFunction orElse { + case WrappedConfidence(confidence) => + val paymentCfg = SendPaymentConfig(relayId, relayId, None, paymentHash, recipient.nodeId, upstream, None, None, storeInDb = false, publishEvent = false, recordPathFindingMetrics = true, confidence) + val routeParams = computeRouteParams(nodeParams, upstream.amountIn, upstream.expiryIn, amountOut, expiryOut) + // If the next node is using trampoline, we assume that they support MPP. + val useMultiPart = recipient.features.hasFeature(Features.BasicMultiPartPayment) || packetOut_opt.nonEmpty + val payFsmAdapters = { + context.messageAdapter[PreimageReceived](WrappedPreimageReceived) + context.messageAdapter[PaymentSent](WrappedPaymentSent) + context.messageAdapter[PaymentFailed](WrappedPaymentFailed) + }.toClassic + val payment = if (useMultiPart) { + SendMultiPartPayment(payFsmAdapters, recipient, maxPaymentAttempts, routeParams) + } else { + SendPaymentToNode(payFsmAdapters, recipient, maxPaymentAttempts, routeParams) + } + val payFSM = outgoingPaymentFactory.spawnOutgoingPayFSM(context, paymentCfg, useMultiPart) + payFSM ! payment + sending(upstream, recipient, walletNodeId_opt, recipientFeatures_opt, payloadOut, TimestampMilli.now(), fulfilledUpstream = false) + } } - val payFSM = outgoingPaymentFactory.spawnOutgoingPayFSM(context, paymentCfg, useMultiPart) - payFSM ! payment - sending(upstream, recipient, walletNodeId_opt, recipientFeatures_opt, payloadOut, TimestampMilli.now(), fulfilledUpstream = false) } /** diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala index 75bb545c89..17cadc734d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala @@ -21,6 +21,7 @@ import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.payment._ +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.{Logs, NodeParams} import java.util.UUID @@ -57,7 +58,12 @@ object NodeRelayer { * NB: the payment secret used here is different from the invoice's payment secret and ensures we can * group together HTLCs that the previous trampoline node sent in the same MPP. */ - def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, router: akka.actor.ActorRef, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] = + def apply(nodeParams: NodeParams, + register: akka.actor.ActorRef, + reputationRecorder_opt: Option[ActorRef[ReputationRecorder.GetTrampolineConfidence]], + outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, + router: akka.actor.ActorRef, + children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] = Behaviors.setup { context => Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) { Behaviors.receiveMessage { @@ -72,15 +78,15 @@ object NodeRelayer { case None => val relayId = UUID.randomUUID() context.log.debug(s"spawning a new handler with relayId=$relayId") - val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, router), relayId.toString) + val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, reputationRecorder_opt, relayId, nodeRelayPacket, outgoingPaymentFactory, router), relayId.toString) context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId) handler ! NodeRelay.Relay(nodeRelayPacket, originNode) - apply(nodeParams, register, outgoingPaymentFactory, router, children + (childKey -> handler)) + apply(nodeParams, register, reputationRecorder_opt, outgoingPaymentFactory, router, children + (childKey -> handler)) } case RelayComplete(childHandler, paymentHash, paymentSecret) => // we do a back-and-forth between parent and child before stopping the child to prevent a race condition childHandler ! NodeRelay.Stop - apply(nodeParams, register, outgoingPaymentFactory, router, children - PaymentKey(paymentHash, paymentSecret)) + apply(nodeParams, register, reputationRecorder_opt, outgoingPaymentFactory, router, children - PaymentKey(paymentHash, paymentSecret)) case GetPendingPayments(replyTo) => replyTo ! children Behaviors.same diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala index 0a9de9fd8f..f601134a25 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala @@ -28,6 +28,7 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.PendingCommandsDb import fr.acinq.eclair.payment._ +import fr.acinq.eclair.reputation.{Reputation, ReputationRecorder} import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{CltvExpiryDelta, Logs, MilliSatoshi, NodeParams, RealShortChannelId, TimestampMilli} import grizzled.slf4j.Logging @@ -49,7 +50,7 @@ import scala.util.Random * It also receives channel HTLC events (fulfill / failed) and relays those to the appropriate handlers. * It also maintains an up-to-date view of local channel balances. */ -class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging { +class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.Command]], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging { import Relayer._ @@ -57,8 +58,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym implicit def implicitLog: LoggingAdapter = log private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, register, initialized), "post-restart-htlc-cleaner") - private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register)).onFailure(SupervisorStrategy.resume), "channel-relayer") - private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), router)).onFailure(SupervisorStrategy.resume), name = "node-relayer") + private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register, reputationRecorder_opt)).onFailure(SupervisorStrategy.resume), "channel-relayer") + private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, reputationRecorder_opt, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), router)).onFailure(SupervisorStrategy.resume), name = "node-relayer") def receive: Receive = { case init: PostRestartHtlcCleaner.Init => postRestartCleaner forward init @@ -125,8 +126,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym object Relayer extends Logging { - def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None): Props = - Props(new Relayer(nodeParams, router, register, paymentHandler, initialized)) + def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.Command]], initialized: Option[Promise[Done]] = None): Props = + Props(new Relayer(nodeParams, router, register, paymentHandler, reputationRecorder_opt, initialized)) // @formatter:off case class RelayFees(feeBase: MilliSatoshi, feeProportionalMillionths: Long) @@ -141,7 +142,8 @@ object Relayer extends Logging { privateChannelFees: RelayFees, minTrampolineFees: RelayFees, enforcementDelay: FiniteDuration, - asyncPaymentsParams: AsyncPaymentsParams) { + asyncPaymentsParams: AsyncPaymentsParams, + peerReputationConfig: Reputation.Config) { def defaultFees(announceChannel: Boolean): RelayFees = { if (announceChannel) { publicChannelFees diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala new file mode 100644 index 0000000000..fda3ea5d54 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala @@ -0,0 +1,92 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.reputation + +import fr.acinq.bitcoin.scalacompat.ByteVector32 +import fr.acinq.eclair.reputation.Reputation.HtlcId +import fr.acinq.eclair.{MilliSatoshi, TimestampMilli} + +import scala.concurrent.duration.FiniteDuration + +/** + * Created by thomash on 21/07/2023. + */ + +/** + * Local reputation for a given incoming node, that should be tracked for each incoming endorsement level. + * + * @param pastWeight How much fees we would have collected in the past if all payments had succeeded (exponential moving average). + * @param pastScore How much fees we have collected in the past (exponential moving average). + * @param lastSettlementAt Timestamp of the last recorded payment settlement. + * @param pending Set of pending payments (payments may contain multiple HTLCs when using trampoline). + * @param halfLife Half life for the exponential moving average. + * @param maxRelayDuration Duration after which payments are penalized for staying pending too long. + * @param pendingMultiplier How much to penalize pending payments. + */ +case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[HtlcId, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) { + private def decay(now: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife) + + private def pendingWeight(now: TimestampMilli): Double = pending.values.map(_.weight(now, maxRelayDuration, pendingMultiplier)).sum + + /** + * Estimate the confidence that a payment will succeed. + */ + def getConfidence(fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Double = { + val d = decay(now) + d * pastScore / (d * pastWeight + pendingWeight(now) + fee.toLong.toDouble * pendingMultiplier) + } + + /** + * Register a pending relay. + * + * @return updated reputation + */ + def attempt(htlcId: HtlcId, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Reputation = + copy(pending = pending + (htlcId -> Reputation.PendingPayment(fee, now))) + + /** + * When a payment is settled, we record whether it succeeded and how long it took. + * + * @return updated reputation + */ + def record(htlcId: HtlcId, isSuccess: Boolean, now: TimestampMilli = TimestampMilli.now()): Reputation = { + pending.get(htlcId) match { + case Some(p) => + val d = decay(now) + val newWeight = d * pastWeight + p.weight(now, maxRelayDuration, if (isSuccess) 1.0 else 0.0) + val newScore = d * pastScore + (if (isSuccess) p.fee.toLong.toDouble else 0) + Reputation(newWeight, newScore, now, pending - htlcId, halfLife, maxRelayDuration, pendingMultiplier) + case None => this + } + } +} + +object Reputation { + case class HtlcId(channelId: ByteVector32, id: Long) + + /** We're relaying that payment and are waiting for it to settle. */ + case class PendingPayment(fee: MilliSatoshi, startedAt: TimestampMilli) { + def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = { + val duration = now - startedAt + fee.toLong.toDouble * (duration / minDuration).max(multiplier) + } + } + + case class Config(enabled: Boolean, halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) + + def init(config: Config): Reputation = Reputation(0.0, 0.0, TimestampMilli.min, Map.empty, config.halfLife, config.maxRelayDuration, config.pendingMultiplier) +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala new file mode 100644 index 0000000000..3cb7509e28 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala @@ -0,0 +1,139 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.reputation + +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} +import akka.actor.typed.{ActorRef, Behavior} +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.eclair.MilliSatoshi +import fr.acinq.eclair.channel.Upstream.Hot +import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, Upstream} +import fr.acinq.eclair.reputation.Reputation.HtlcId +import fr.acinq.eclair.wire.protocol.{UpdateFailHtlc, UpdateFailMalformedHtlc} +import ReputationRecorder._ + +import scala.collection.mutable + + +/** + * Created by thomash on 21/07/2023. + */ + +object ReputationRecorder { + // @formatter:off + sealed trait Command + case class GetConfidence(replyTo: ActorRef[Confidence], upstream: Upstream.Hot.Channel, fee: MilliSatoshi) extends Command + case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], upstream: Upstream.Hot.Trampoline, fee: MilliSatoshi) extends Command + private case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command + private case class WrappedOutgoingHtlcFailed(failed: OutgoingHtlcFailed) extends Command + private case class WrappedOutgoingHtlcFulfilled(fulfilled: OutgoingHtlcFulfilled) extends Command + // @formatter:on + + /** + * @param nodeId nodeId of the upstream peer. + * @param endorsement endorsement value set by the upstream peer in the HTLC we received. + */ + case class PeerEndorsement(nodeId: PublicKey, endorsement: Int) + + object PeerEndorsement { + def apply(channel: Upstream.Hot.Channel): PeerEndorsement = PeerEndorsement(channel.receivedFrom, channel.add.endorsement) + } + + /** Confidence that the outgoing HTLC will succeed. */ + case class Confidence(value: Double) + + def apply(config: Reputation.Config): Behavior[Command] = + Behaviors.setup(context => { + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcFailed)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcFulfilled)) + new ReputationRecorder(config, context).run() + }) +} + +class ReputationRecorder(config: Reputation.Config, context: ActorContext[ReputationRecorder.Command]) { + private val reputations: mutable.Map[PeerEndorsement, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config)) + private val pending: mutable.Map[HtlcId, Upstream.Hot] = mutable.HashMap.empty + + def run(): Behavior[Command] = + Behaviors.receiveMessage { + case GetConfidence(replyTo, upstream, fee) => + val confidence = reputations(PeerEndorsement(upstream)).getConfidence(fee) + replyTo ! Confidence(confidence) + Behaviors.same + + case GetTrampolineConfidence(replyTo, upstream, totalFee) => + val confidence = + upstream.received + .groupMapReduce(r => PeerEndorsement(r.receivedFrom, r.add.endorsement))(_.add.amountMsat)(_ + _) + .map { + case (peerEndorsement, amount) => + val fee = amount * totalFee.toLong / upstream.amountIn.toLong + reputations(peerEndorsement).getConfidence(fee) + } + .min + replyTo ! Confidence(confidence) + Behaviors.same + + case WrappedOutgoingHtlcAdded(OutgoingHtlcAdded(add, upstream, fee)) => + val htlcId = HtlcId(add.channelId, add.id) + upstream match { + case channel: Hot.Channel => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).attempt(htlcId, fee) + pending += (htlcId -> upstream) + case trampoline: Hot.Trampoline => + trampoline.received.foreach(channel => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).attempt(htlcId, fee * channel.amountIn.toLong / trampoline.amountIn.toLong) + ) + pending += (htlcId -> upstream) + case _: Upstream.Local => () + } + Behaviors.same + + case WrappedOutgoingHtlcFailed(OutgoingHtlcFailed(fail)) => + val htlcId = fail match { + case UpdateFailHtlc(channelId, id, _, _) => HtlcId(channelId, id) + case UpdateFailMalformedHtlc(channelId, id, _, _, _) => HtlcId(channelId, id) + } + pending.get(htlcId) match { + case Some(channel: Hot.Channel) => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).record(htlcId, isSuccess = false) + case Some(trampoline: Hot.Trampoline) => + trampoline.received.foreach(channel => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).record(htlcId, isSuccess = false) + ) + case _ => () + } + pending -= htlcId + Behaviors.same + + case WrappedOutgoingHtlcFulfilled(OutgoingHtlcFulfilled(fulfill)) => + val htlcId = HtlcId(fulfill.channelId, fulfill.id) + pending.get(htlcId) match { + case Some(channel: Hot.Channel) => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).record(htlcId, isSuccess = true) + case Some(trampoline: Hot.Trampoline) => + trampoline.received.foreach(channel => + reputations(PeerEndorsement(channel)) = reputations(PeerEndorsement(channel)).record(htlcId, isSuccess = true) + ) + case _ => () + } + pending -= htlcId + Behaviors.same + } +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 404394a62b..ccef6baef8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -30,6 +30,7 @@ import fr.acinq.eclair.payment.offer.OffersConfig import fr.acinq.eclair.payment.relay.OnTheFlyFunding import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} import fr.acinq.eclair.router.Graph.{MessageWeightRatios, PaymentWeightRatios} +import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.router.{PathFindingExperimentConf, Router} import fr.acinq.eclair.wire.protocol._ @@ -177,7 +178,9 @@ object TestConstants { feeBase = 548000 msat, feeProportionalMillionths = 30), enforcementDelay = 10 minutes, - asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144))), + asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)), + peerReputationConfig = Reputation.Config(enabled = false, 1 day, 10 seconds, 100), + ), db = TestDatabases.inMemoryDb(), autoReconnect = false, initialRandomReconnectDelay = 5 seconds, @@ -365,7 +368,9 @@ object TestConstants { feeBase = 548000 msat, feeProportionalMillionths = 30), enforcementDelay = 10 minutes, - asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144))), + asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)), + peerReputationConfig = Reputation.Config(enabled = false, 2 day, 20 seconds, 200), + ), db = TestDatabases.inMemoryDb(), autoReconnect = false, initialRandomReconnectDelay = 5 seconds, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala index 958d8a8068..bff083098b 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala @@ -66,8 +66,8 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Channe val bobRegister = system.actorOf(Props(new TestRegister())) val alicePaymentHandler = system.actorOf(Props(new PaymentHandler(aliceParams, aliceRegister, TestProbe().ref))) val bobPaymentHandler = system.actorOf(Props(new PaymentHandler(bobParams, bobRegister, TestProbe().ref))) - val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler)) - val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler)) + val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, None)) + val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, None)) val wallet = new DummyOnChainWallet() val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(aliceParams, Alice.channelKeys(), wallet, bobParams.nodeId, alice2blockchain.ref, aliceRelayer, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref) val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(bobParams, Bob.channelKeys(), wallet, aliceParams.nodeId, bob2blockchain.ref, bobRelayer, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala index 8504a15b56..3684a9d38a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala @@ -28,6 +28,7 @@ import fr.acinq.eclair.payment.offer.{DefaultOfferHandler, OfferManager} import fr.acinq.eclair.payment.receive.{MultiPartHandler, PaymentHandler} import fr.acinq.eclair.payment.relay.{ChannelRelayer, PostRestartHtlcCleaner, Relayer} import fr.acinq.eclair.payment.send.PaymentInitiator +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol.IPAddress import fr.acinq.eclair.{BlockHeight, MilliSatoshi, MilliSatoshiLong, NodeParams, RealShortChannelId, SubscriptionsComplete, TestBitcoinCoreClient, TestDatabases} @@ -97,7 +98,7 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat val offerManager = system.spawn(OfferManager(nodeParams, 1 minute), "offer-manager") val defaultOfferHandler = system.spawn(DefaultOfferHandler(nodeParams, router), "default-offer-handler") val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler") - val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler), "relayer") + val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, None), "relayer") val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, bitcoinClient) val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory) val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala index fb5ea241ad..1ccd9c5383 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala @@ -57,7 +57,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit case class FixtureParam(nodeParams: NodeParams, register: TestProbe, sender: TestProbe, eventListener: TestProbe) { def createRelayer(nodeParams1: NodeParams): (ActorRef, ActorRef) = { - val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref)) + val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, None)) // we need ensure the post-htlc-restart child actor is initialized sender.send(relayer, Relayer.GetChildActors(sender.ref)) (relayer, sender.expectMsgType[Relayer.ChildActors].postRestartCleaner) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala index d1c9eb2134..6ed742dafc 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala @@ -35,6 +35,7 @@ import fr.acinq.eclair.io.{Peer, PeerReadyManager, Switchboard} import fr.acinq.eclair.payment.IncomingPaymentPacket.ChannelRelayPacket import fr.acinq.eclair.payment.relay.ChannelRelayer._ import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket, PaymentPacketSpec} +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.wire.protocol.BlindedRouteData.PaymentRelayData import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload @@ -56,7 +57,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val wakeUpTimeout = "wake_up_timeout" val onTheFlyFunding = "on_the_fly_funding" - case class FixtureParam(nodeParams: NodeParams, channelRelayer: typed.ActorRef[ChannelRelayer.Command], register: TestProbe[Any]) { + case class FixtureParam(nodeParams: NodeParams, channelRelayer: typed.ActorRef[ChannelRelayer.Command], register: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command]) { def createWakeUpActors(): (TestProbe[PeerReadyManager.Register], TestProbe[Switchboard.GetPeerInfo]) = { val peerReadyManager = TestProbe[PeerReadyManager.Register]() system.receptionist ! Receptionist.Register(PeerReadyManager.PeerReadyManagerServiceKey, peerReadyManager.ref) @@ -78,15 +79,16 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a .modify(_.peerWakeUpConfig.timeout).setToIf(test.tags.contains(wakeUpTimeout))(100 millis) .modify(_.features.activated).usingIf(test.tags.contains(onTheFlyFunding))(_ + (Features.OnTheFlyFunding -> FeatureSupport.Optional)) val register = TestProbe[Any]("register") - val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic)) + val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder") + val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref))) try { - withFixture(test.toNoArgTest(FixtureParam(nodeParams, channelRelayer, register))) + withFixture(test.toNoArgTest(FixtureParam(nodeParams, channelRelayer, register, reputationRecorder))) } finally { testKit.stop(channelRelayer) } } - def expectFwdFail(register: TestProbe[Any], channelId: ByteVector32, cmd: CMD_FAIL_HTLC): Register.Forward[CMD_FAIL_HTLC] = { + def expectFwdFail(register: TestProbe[Any], channelId: ByteVector32, cmd: CMD_FAIL_HTLC, reputationRecorder: TestProbe[ReputationRecorder.Command]): Register.Forward[CMD_FAIL_HTLC] = { val fwd = register.expectMessageType[Register.Forward[CMD_FAIL_HTLC]] assert(fwd.message.copy(htlcReceivedAt_opt = None) == cmd.copy(htlcReceivedAt_opt = None)) assert(fwd.channelId == channelId) @@ -104,6 +106,14 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a fwd } + def setConfidence(f: FixtureParam)(value: Double): Unit = { + import f._ + + val getConfidence = reputationRecorder.expectMessageType[ReputationRecorder.GetConfidence] + assert(getConfidence.upstream.receivedFrom == TestConstants.Alice.nodeParams.nodeId) + getConfidence.replyTo ! ReputationRecorder.Confidence(value) + } + def basicRelayTest(f: FixtureParam)(relayPayloadScid: ShortChannelId, lcu: LocalChannelUpdate, success: Boolean): Unit = { import f._ @@ -112,11 +122,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(lcu) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) if (success) { expectFwdAdd(register, lcu.channelId, outgoingAmount, outgoingExpiry, 7) } else { - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true), reputationRecorder) } } @@ -161,6 +172,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) } @@ -176,6 +188,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // We try to wake-up the next node. peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(outgoingNodeId, otherAttempts = 0) @@ -199,6 +212,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // We try to wake-up the next node. val wakeUp = peerReadyManager.expectMessageType[PeerReadyManager.Register] @@ -230,6 +244,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val r = createValidIncomingPacket(payload, outgoingAmount + u.channelUpdate.feeBaseMsat, outgoingExpiry + u.channelUpdate.cltvExpiryDelta) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // We try to wake-up the next node. peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(outgoingNodeId, otherAttempts = 1) @@ -240,7 +255,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a cleanUpWakeUpActors(peerReadyManager, switchboard) // We fail without attempting on-the-fly funding. - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true), reputationRecorder) } test("relay blinded payment (on-the-fly funding failed)", Tag(wakeUpEnabled), Tag(onTheFlyFunding)) { f => @@ -253,6 +268,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val r = createValidIncomingPacket(payload, outgoingAmount + u.channelUpdate.feeBaseMsat, outgoingExpiry + u.channelUpdate.cltvExpiryDelta) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // We try to wake-up the next node. peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(outgoingNodeId, otherAttempts = 1) @@ -265,7 +281,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val fwdNodeId = register.expectMessageType[ForwardNodeId[Peer.ProposeOnTheFlyFunding]] assert(fwdNodeId.nodeId == outgoingNodeId) fwdNodeId.replyTo ! Register.ForwardNodeIdFailure(fwdNodeId) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true), reputationRecorder) } test("relay blinded payment (on-the-fly funding not attempted)", Tag(wakeUpEnabled), Tag(onTheFlyFunding)) { f => @@ -279,6 +295,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // We try to wake-up the next node. peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(outgoingNodeId, otherAttempts = 0) @@ -292,7 +309,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a fwd.message.replyTo ! RES_ADD_FAILED(fwd.message, TooManyAcceptedHtlcs(channelIds(realScid1), 10), Some(u.channelUpdate)) // We fail without attempting on-the-fly funding. - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(TemporaryNodeFailure()), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(TemporaryNodeFailure()), None, commit = true), reputationRecorder) } test("relay with retries") { f => @@ -310,6 +327,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u2) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // first try val fwd1 = expectFwdAdd(register, channelIds(realScid2), outgoingAmount, outgoingExpiry, 7) @@ -322,7 +340,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a fwd1.message.replyTo ! RES_ADD_FAILED(fwd2.message, HtlcValueTooHighInFlight(channelIds(realScid1), 1000000000 msat, 1516977616 msat), Some(u1.channelUpdate)) // the relayer should give up - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(TemporaryChannelFailure(Some(u1.channelUpdate))), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(TemporaryChannelFailure(Some(u1.channelUpdate))), None, commit = true), reputationRecorder) } test("fail to relay when we have no channel_update for the next channel") { f => @@ -332,8 +350,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val r = createValidIncomingPacket(payload) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true), reputationRecorder) } test("fail to relay when register returns an error") { f => @@ -345,11 +364,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) fwd.replyTo ! Register.ForwardFailure(fwd) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true), reputationRecorder) } test("fail to relay when the channel is advertised as unusable (down)") { f => @@ -363,8 +383,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! WrappedLocalChannelDown(d) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(UnknownNextPeer()), None, commit = true), reputationRecorder) } test("fail to relay when channel is disabled") { f => @@ -376,8 +397,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(ChannelDisabled(u.channelUpdate.messageFlags, u.channelUpdate.channelFlags, Some(u.channelUpdate))), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(ChannelDisabled(u.channelUpdate.messageFlags, u.channelUpdate.channelFlags, Some(u.channelUpdate))), None, commit = true), reputationRecorder) } test("fail to relay when amount is below minimum") { f => @@ -389,8 +411,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(AmountBelowMinimum(outgoingAmount, Some(u.channelUpdate))), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(AmountBelowMinimum(outgoingAmount, Some(u.channelUpdate))), None, commit = true), reputationRecorder) } test("fail to relay blinded payment") { f => @@ -403,6 +426,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) val cmd = register.expectMessageType[Register.Forward[channel.Command]] assert(cmd.channelId == r.add.channelId) @@ -433,6 +457,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // We try to wake-up the next node, but we timeout before they connect. peerReadyManager.expectMessageType[PeerReadyManager.Register].replyTo ! PeerReadyManager.Registered(outgoingNodeId, otherAttempts = 0) @@ -452,6 +477,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) expectFwdAdd(register, channelIds(realScid1), r.amountToForward, r.outgoingCltv, 7).message } @@ -465,8 +491,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(IncorrectCltvExpiry(r.outgoingCltv, Some(u.channelUpdate))), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(IncorrectCltvExpiry(r.outgoingCltv, Some(u.channelUpdate))), None, commit = true), reputationRecorder) } test("fail to relay when fee is insufficient") { f => @@ -478,8 +505,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(FeeInsufficient(r.add.amountMsat, Some(u.channelUpdate))), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(FeeInsufficient(r.add.amountMsat, Some(u.channelUpdate))), None, commit = true), reputationRecorder) } test("relay that would fail (fee insufficient) with a recent channel update but succeed with the previous update") { f => @@ -491,6 +519,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u1) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // relay succeeds with current channel update (u1) with lower fees expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) @@ -499,6 +528,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u2) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // relay succeeds because the current update (u2) with higher fees occurred less than 10 minutes ago expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) @@ -508,9 +538,10 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u1) channelRelayer ! WrappedLocalChannelUpdate(u3) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // relay fails because the current update (u3) with higher fees occurred more than 10 minutes ago - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(FeeInsufficient(r.add.amountMsat, Some(u3.channelUpdate))), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(FeeInsufficient(r.add.amountMsat, Some(u3.channelUpdate))), None, commit = true), reputationRecorder) } test("fail to relay when there is a local error") { f => @@ -537,9 +568,10 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a testCases.foreach { testCase => channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) fwd.message.replyTo ! RES_ADD_FAILED(fwd.message, testCase.exc, Some(testCase.update)) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(testCase.failure), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(testCase.failure), None, commit = true), reputationRecorder) } } @@ -573,6 +605,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(60)) val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70), endorsementIn = 5) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(5.5 / 8) // select the channel to the same node, with the lowest capacity and balance but still high enough to handle the payment val cmd1 = expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, r.amountToForward, r.outgoingCltv, 5).message cmd1.replyTo ! RES_ADD_FAILED(cmd1, ChannelUnavailable(randomBytes32()), None) @@ -586,13 +619,14 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val cmd4 = expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, r.amountToForward, r.outgoingCltv, 5).message cmd4.replyTo ! RES_ADD_FAILED(cmd4, HtlcValueTooHighInFlight(randomBytes32(), 100000000 msat, 100000000 msat), Some(channelUpdates(ShortChannelId(11111)).channelUpdate)) // all the suitable channels have been tried - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(TemporaryChannelFailure(Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), None, commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(TemporaryChannelFailure(Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), None, commit = true), reputationRecorder) } { // higher amount payment (have to increased incoming htlc amount for fees to be sufficient) val payload = ChannelRelay.Standard(ShortChannelId(12345), 50000000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(payload, 60000000 msat, CltvExpiry(70), endorsementIn = 0) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(0) expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, r.amountToForward, r.outgoingCltv, 0).message } { @@ -600,6 +634,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 1000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(payload, 60000000 msat, CltvExpiry(70), endorsementIn = 6) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(6.5 / 8) expectFwdAdd(register, channelUpdates(ShortChannelId(33333)).channelId, r.amountToForward, r.outgoingCltv, 6).message } { @@ -607,6 +642,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 1000000000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(payload, 1010000000 msat, CltvExpiry(70)) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(7.5 / 8) expectFwdAdd(register, channelUpdates(ShortChannelId(12345)).channelId, r.amountToForward, r.outgoingCltv, 7).message } { @@ -614,6 +650,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(50)) val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70)) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(7.5 / 8) expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, r.amountToForward, r.outgoingCltv, 7).message } { @@ -621,7 +658,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(61)) val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70)) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(IncorrectCltvExpiry(CltvExpiry(61), Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), None, commit = true)) + setConfidence(f)(0.2) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, FailureReason.LocalFailure(IncorrectCltvExpiry(CltvExpiry(61), Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), None, commit = true), reputationRecorder) } } @@ -647,10 +685,11 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a testCases.foreach { testCase => channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) fwd.message.replyTo ! RES_SUCCESS(fwd.message, channelId1) fwd.message.origin.replyTo ! RES_ADD_SETTLED(fwd.message.origin, downstream_htlc, testCase.result) - expectFwdFail(register, r.add.channelId, testCase.cmd) + expectFwdFail(register, r.add.channelId, testCase.cmd, reputationRecorder) } } @@ -673,6 +712,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val r = createValidIncomingPacket(createBlindedPayload(Right(u.channelUpdate.shortChannelId), u.channelUpdate, isIntroduction), outgoingAmount + u.channelUpdate.feeBaseMsat, outgoingExpiry + u.channelUpdate.cltvExpiryDelta, endorsementIn = 0) channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(0) val fwd = expectFwdAdd(register, channelId1, outgoingAmount, outgoingExpiry, 0) fwd.message.replyTo ! RES_SUCCESS(fwd.message, channelId1) fwd.message.origin.replyTo ! RES_ADD_SETTLED(fwd.message.origin, downstream, htlcResult) @@ -716,6 +756,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a testCases.foreach { testCase => channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(3.5 / 8) val fwd1 = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 3) fwd1.message.replyTo ! RES_SUCCESS(fwd1.message, channelId1) @@ -741,6 +782,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(realScid1, outgoingAmount, outgoingExpiry) val r = createValidIncomingPacket(payload, endorsementIn = 3) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(3.5 / 8) val fwd1 = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 3) fwd1.message.replyTo ! RES_SUCCESS(fwd1.message, channelId1) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala index e7a146a99f..5206280bb3 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala @@ -44,6 +44,8 @@ import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode import fr.acinq.eclair.payment.send.{BlindedRecipient, ClearRecipient} import fr.acinq.eclair.router.Router.{ChannelHop, HopRelayParams, PaymentRouteNotFound, RouteRequest} import fr.acinq.eclair.router.{BalanceTooLow, BlindedRouteCreation, RouteNotFound, Router} +import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.reputation.ReputationRecorder.{Confidence, GetTrampolineConfidence} import fr.acinq.eclair.wire.protocol.OfferTypes._ import fr.acinq.eclair.wire.protocol.PaymentOnion.{FinalPayload, IntermediatePayload} import fr.acinq.eclair.wire.protocol.RouteBlindingEncryptedDataCodecs.blindedRouteDataCodec @@ -70,11 +72,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val wakeUpTimeout = "wake_up_timeout" val onTheFlyFunding = "on_the_fly_funding" - case class FixtureParam(nodeParams: NodeParams, router: TestProbe[Any], register: TestProbe[Any], mockPayFSM: TestProbe[Any], eventListener: TestProbe[PaymentEvent]) { + case class FixtureParam(nodeParams: NodeParams, router: TestProbe[Any], register: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command], mockPayFSM: TestProbe[Any], eventListener: TestProbe[PaymentEvent]) { def createNodeRelay(packetIn: IncomingPaymentPacket.NodeRelayPacket, useRealPaymentFactory: Boolean = false): (ActorRef[NodeRelay.Command], TestProbe[NodeRelayer.Command]) = { val parent = TestProbe[NodeRelayer.Command]("parent-relayer") val outgoingPaymentFactory = if (useRealPaymentFactory) RealOutgoingPaymentFactory(this) else FakeOutgoingPaymentFactory(this) - val nodeRelay = testKit.spawn(NodeRelay(nodeParams, parent.ref, register.ref.toClassic, relayId, packetIn, outgoingPaymentFactory, router.ref.toClassic)) + val nodeRelay = testKit.spawn(NodeRelay(nodeParams, parent.ref, register.ref.toClassic, Some(reputationRecorder.ref), relayId, packetIn, outgoingPaymentFactory, router.ref.toClassic)) (nodeRelay, parent) } @@ -117,16 +119,17 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl .modify(_.features.activated).usingIf(test.tags.contains(wakeUpEnabled))(_ + (Features.WakeUpNotificationClient -> FeatureSupport.Optional)) val router = TestProbe[Any]("router") val register = TestProbe[Any]("register") + val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder") val eventListener = TestProbe[PaymentEvent]("event-listener") system.eventStream ! EventStream.Subscribe(eventListener.ref) val mockPayFSM = TestProbe[Any]("pay-fsm") - withFixture(test.toNoArgTest(FixtureParam(nodeParams, router, register, mockPayFSM, eventListener))) + withFixture(test.toNoArgTest(FixtureParam(nodeParams, router, register, reputationRecorder, mockPayFSM, eventListener))) } test("create child handlers for new payments") { f => import f._ val probe = TestProbe[Any]() - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, FakeOutgoingPaymentFactory(f), router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), FakeOutgoingPaymentFactory(f), router.ref.toClassic)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(Map.empty) @@ -165,7 +168,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val outgoingPaymentFactory = FakeOutgoingPaymentFactory(f) { - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, router.ref.toClassic)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(Map.empty) } @@ -173,7 +176,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (paymentHash1, paymentSecret1, child1) = (randomBytes32(), randomBytes32(), TestProbe[NodeRelay.Command]()) val (paymentHash2, paymentSecret2, child2) = (randomBytes32(), randomBytes32(), TestProbe[NodeRelay.Command]()) val children = Map(PaymentKey(paymentHash1, paymentSecret1) -> child1.ref, PaymentKey(paymentHash2, paymentSecret2) -> child2.ref) - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, router.ref.toClassic, children)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, router.ref.toClassic, children)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(children) @@ -189,7 +192,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (paymentSecret1, child1) = (randomBytes32(), TestProbe[NodeRelay.Command]()) val (paymentSecret2, child2) = (randomBytes32(), TestProbe[NodeRelay.Command]()) val children = Map(PaymentKey(paymentHash, paymentSecret1) -> child1.ref, PaymentKey(paymentHash, paymentSecret2) -> child2.ref) - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, router.ref.toClassic, children)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, router.ref.toClassic, children)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(children) @@ -199,7 +202,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl probe.expectMessage(Map(PaymentKey(paymentHash, paymentSecret2) -> child2.ref)) } { - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, router.ref.toClassic)) parentRelayer ! NodeRelayer.Relay(incomingMultiPart.head, randomKey().publicKey) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) val pending1 = probe.expectMessageType[Map[PaymentKey, ActorRef[NodeRelay.Command]]] @@ -256,6 +259,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + // the extra payment will be rejected val fwd = register.expectMessageType[Register.Forward[CMD_FAIL_HTLC]] assert(fwd.channelId == extra.add.channelId) @@ -275,8 +280,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(3.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 3) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] validateOutgoingPayment(outgoingPayment) @@ -378,9 +385,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(6.5 / 8) + // upstream payment relayed val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 6) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] validateOutgoingPayment(outgoingPayment) // those are adapters for pay-fsm messages @@ -480,6 +489,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + mockPayFSM.expectMessageType[SendPaymentConfig] // those are adapters for pay-fsm messages val nodeRelayerAdapters = mockPayFSM.expectMessageType[SendMultiPartPayment].replyTo @@ -510,6 +521,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef] router.expectMessageType[RouteRequest] payFSM ! PaymentRouteNotFound(BalanceTooLow) @@ -533,6 +546,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef] router.expectMessageType[RouteRequest] @@ -559,6 +574,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef] router.expectMessageType[RouteRequest] @@ -584,6 +601,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val routeRequest = router.expectMessageType[RouteRequest] val routeParams = routeRequest.routeParams assert(routeParams.boundaries.maxFeeProportional == 0) // should be disabled @@ -605,8 +624,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(3.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 3) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] validateOutgoingPayment(outgoingPayment) // those are adapters for pay-fsm messages @@ -644,6 +665,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + mockPayFSM.expectMessageType[SendPaymentConfig] val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] validateOutgoingPayment(outgoingPayment) @@ -687,6 +710,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(Upstream.Hot.Channel(incomingSinglePart.add, TimestampMilli.now(), randomKey().publicKey) :: Nil), 7) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] @@ -732,6 +757,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl peerInfo.replyTo ! Peer.PeerInfo(TestProbe[Any]().ref.toClassic, outgoingNodeId, Peer.CONNECTED, Some(nodeParams.features.initFeatures()), None, Set.empty) cleanUpWakeUpActors(peerReadyManager, switchboard) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(5.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] @@ -778,6 +805,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl peerInfo.replyTo ! Peer.PeerInfo(TestProbe[Any]().ref.toClassic, outgoingNodeId, Peer.CONNECTED, Some(nodeParams.features.initFeatures()), None, Set.empty) cleanUpWakeUpActors(peerReadyManager, switchboard) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(5.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] @@ -810,8 +839,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(2.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 2) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] assert(outgoingPayment.recipient.nodeId == outgoingNodeId) assert(outgoingPayment.recipient.totalAmount == outgoingAmount) @@ -858,8 +889,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val getPeerInfo = register.expectMessageType[Register.ForwardNodeId[Peer.GetPeerInfo]](100 millis) getPeerInfo.message.replyTo.foreach(_ ! Peer.PeerNotFound(getPeerInfo.nodeId)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(0) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 0) val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode] assert(outgoingPayment.recipient.nodeId == outgoingNodeId) assert(outgoingPayment.amount == outgoingAmount) @@ -896,8 +929,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head) incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5, ignoreNodeId = true) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 7, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode] assert(outgoingPayment.amount == outgoingAmount) assert(outgoingPayment.recipient.expiry == outgoingExpiry) @@ -929,8 +964,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head) incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5, ignoreNodeId = true) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 1, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] assert(outgoingPayment.recipient.totalAmount == outgoingAmount) assert(outgoingPayment.recipient.expiry == outgoingExpiry) @@ -971,8 +1008,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl wakeUp.replyTo ! Peer.PeerInfo(TestProbe[Any]().ref.toClassic, outgoingNodeId, Peer.CONNECTED, Some(nodeParams.features.initFeatures()), None, Set.empty) cleanUpWakeUpActors(peerReadyManager, switchboard) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(3.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5, ignoreNodeId = true) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 3, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] assert(outgoingPayment.recipient.totalAmount == outgoingAmount) assert(outgoingPayment.recipient.expiry == outgoingExpiry) @@ -1034,6 +1073,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl peerInfo.replyTo ! Peer.PeerInfo(TestProbe[Any]().ref.toClassic, outgoingNodeId, Peer.CONNECTED, Some(nodeParams.features.initFeatures()), None, Set.empty) cleanUpWakeUpActors(peerReadyManager, switchboard) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(5.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] @@ -1073,6 +1114,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl peerInfo.replyTo ! Peer.PeerInfo(TestProbe[Any]().ref.toClassic, outgoingNodeId, Peer.CONNECTED, Some(nodeParams.features.initFeatures()), None, Set.empty) cleanUpWakeUpActors(peerReadyManager, switchboard) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(5.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] @@ -1102,8 +1145,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(getNodeId.shortChannelId == scidDir.scid) getNodeId.replyTo ! Some(outgoingNodeId) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(4.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 5, ignoreNodeId = true) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingPayments.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey)).toList), 4, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode] assert(outgoingPayment.amount == outgoingAmount) assert(outgoingPayment.recipient.expiry == outgoingExpiry) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala index 232996934d..21cdb1c6c5 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala @@ -34,6 +34,7 @@ import fr.acinq.eclair.payment.OutgoingPaymentPacket.{NodePayload, buildOnion, b import fr.acinq.eclair.payment.PaymentPacketSpec._ import fr.acinq.eclair.payment.relay.Relayer._ import fr.acinq.eclair.payment.send.{ClearRecipient, TrampolinePayment} +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router.BaseRouterSpec.{blindedRouteFromHops, channelHopFromUpdate} import fr.acinq.eclair.router.Router.Route import fr.acinq.eclair.wire.protocol.PaymentOnion.FinalPayload @@ -47,7 +48,7 @@ import scala.concurrent.duration.DurationInt class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike { - case class FixtureParam(nodeParams: NodeParams, relayer: akka.actor.ActorRef, router: TestProbe[Any], register: TestProbe[Any], childActors: ChildActors, paymentHandler: TestProbe[Any]) + case class FixtureParam(nodeParams: NodeParams, relayer: akka.actor.ActorRef, router: TestProbe[Any], register: TestProbe[Any], childActors: ChildActors, paymentHandler: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command]) override def withFixture(test: OneArgTest): Outcome = { // we are node B in the route A -> B -> C -> .... @@ -56,17 +57,26 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat val router = TestProbe[Any]("router") val register = TestProbe[Any]("register") val paymentHandler = TestProbe[Any]("payment-handler") + val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder") val probe = TestProbe[Any]() // we can't spawn top-level actors with akka typed testKit.spawn(Behaviors.setup[Any] { context => - val relayer = context.toClassic.actorOf(Relayer.props(nodeParams, router.ref.toClassic, register.ref.toClassic, paymentHandler.ref.toClassic)) + val relayer = context.toClassic.actorOf(Relayer.props(nodeParams, router.ref.toClassic, register.ref.toClassic, paymentHandler.ref.toClassic, Some(reputationRecorder.ref))) probe.ref ! relayer Behaviors.empty[Any] }) val relayer = probe.expectMessageType[akka.actor.ActorRef] relayer ! GetChildActors(probe.ref.toClassic) val childActors = probe.expectMessageType[ChildActors] - withFixture(test.toNoArgTest(FixtureParam(nodeParams, relayer, router, register, childActors, paymentHandler))) + withFixture(test.toNoArgTest(FixtureParam(nodeParams, relayer, router, register, childActors, paymentHandler, reputationRecorder))) + } + + def setConfidence(f: FixtureParam)(value: Double): Unit = { + import f._ + + val getConfidence = reputationRecorder.expectMessageType[ReputationRecorder.GetConfidence] + assert(getConfidence.upstream.receivedFrom == TestConstants.Alice.nodeParams.nodeId) + getConfidence.replyTo ! ReputationRecorder.Confidence(value) } val channelId_ab = randomBytes32() @@ -94,6 +104,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat // and then manually build an htlc val add_ab = UpdateAddHtlc(randomBytes32(), 123456, payment.cmd.amount, payment.cmd.paymentHash, payment.cmd.cltvExpiry, payment.cmd.onion, None, 1.0, None) relayer ! RelayForward(add_ab, priv_a.publicKey) + setConfidence(f)(1.0) register.expectMessageType[Register.Forward[CMD_ADD_HTLC]] } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala new file mode 100644 index 0000000000..b8ef534462 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala @@ -0,0 +1,148 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.reputation + +import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} +import akka.actor.typed.ActorRef +import akka.actor.typed.eventstream.EventStream +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, Upstream} +import fr.acinq.eclair.reputation.ReputationRecorder._ +import fr.acinq.eclair.wire.protocol.{TlvStream, UpdateAddHtlc, UpdateAddHtlcTlv, UpdateFailHtlc, UpdateFulfillHtlc} +import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, MilliSatoshiLong, TimestampMilli, randomBytes, randomBytes32, randomKey, randomLong} +import org.scalatest.Outcome +import org.scalatest.funsuite.FixtureAnyFunSuiteLike + +import scala.concurrent.duration.DurationInt + +class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike { + val originNode: PublicKey = randomKey().publicKey + + case class FixtureParam(config: Reputation.Config, reputationRecorder: ActorRef[Command], replyTo: TestProbe[Confidence]) + + override def withFixture(test: OneArgTest): Outcome = { + val config = Reputation.Config(enabled = true, 1 day, 10 seconds, 2) + val replyTo = TestProbe[Confidence]("confidence") + val reputationRecorder = testKit.spawn(ReputationRecorder(config)) + withFixture(test.toNoArgTest(FixtureParam(config, reputationRecorder.ref, replyTo))) + } + + def makeChannelUpstream(nodeId: PublicKey, endorsement: Int, amount: MilliSatoshi = 1000000 msat): Upstream.Hot.Channel = + Upstream.Hot.Channel(UpdateAddHtlc(randomBytes32(), randomLong(), amount, randomBytes32(), CltvExpiry(1234), null, TlvStream(UpdateAddHtlcTlv.Endorsement(endorsement))), TimestampMilli.now(), nodeId) + + def makeOutgoingHtlcAdded(upstream: Upstream.Hot, fee: MilliSatoshi): OutgoingHtlcAdded = + OutgoingHtlcAdded(UpdateAddHtlc(randomBytes32(), randomLong(), 100000 msat, randomBytes32(), CltvExpiry(456), null, TlvStream.empty), upstream, fee) + + def makeOutgoingHtlcFulfilled(add: UpdateAddHtlc): OutgoingHtlcFulfilled = + OutgoingHtlcFulfilled(UpdateFulfillHtlc(add.channelId, add.id, randomBytes32(), TlvStream.empty)) + + def makeOutgoingHtlcFailed(add: UpdateAddHtlc): OutgoingHtlcFailed = + OutgoingHtlcFailed(UpdateFailHtlc(add.channelId, add.id, randomBytes(100), TlvStream.empty)) + + test("channel relay") { f => + import f._ + + val listener = TestProbe[Any]() + testKit.system.eventStream ! EventStream.Subscribe(listener.ref) + testKit.system.eventStream ! EventStream.Subscribe(listener.ref) + testKit.system.eventStream ! EventStream.Subscribe(listener.ref) + + val upstream1 = makeChannelUpstream(originNode, 7) + reputationRecorder ! GetConfidence(replyTo.ref, upstream1, 2000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0) + val added1 = makeOutgoingHtlcAdded(upstream1, 2000 msat) + testKit.system.eventStream ! EventStream.Publish(added1) + testKit.system.eventStream ! EventStream.Publish(makeOutgoingHtlcFulfilled(added1.add)) + listener.expectMessageType[OutgoingHtlcAdded] + listener.expectMessageType[OutgoingHtlcFulfilled] + val upstream2 = makeChannelUpstream(originNode, 7) + reputationRecorder ! GetConfidence(replyTo.ref, upstream2, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (2.0 / 4) +- 0.001) + val added2 = makeOutgoingHtlcAdded(upstream2, 1000 msat) + testKit.system.eventStream ! EventStream.Publish(added2) + listener.expectMessageType[OutgoingHtlcAdded] + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(originNode, 7), 3000 msat) + assert(replyTo.expectMessageType[Confidence].value === (2.0 / 10) +- 0.001) + val upstream3 = makeChannelUpstream(originNode, 7) + reputationRecorder ! GetConfidence(replyTo.ref, upstream3, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (2.0 / 6) +- 0.001) + val added3 = makeOutgoingHtlcAdded(upstream3, 1000 msat) + testKit.system.eventStream ! EventStream.Publish(added3) + testKit.system.eventStream ! EventStream.Publish(makeOutgoingHtlcFulfilled(added3.add)) + testKit.system.eventStream ! EventStream.Publish(makeOutgoingHtlcFailed(added2.add)) + listener.expectMessageType[OutgoingHtlcAdded] + listener.expectMessageType[OutgoingHtlcFulfilled] + listener.expectMessageType[OutgoingHtlcFailed] + // Not endorsed + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(originNode, 0), 1000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0) + // Different origin node + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(randomKey().publicKey, 7), 1000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0) + // Very large HTLC + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(originNode, 7), 100000000 msat) + assert(replyTo.expectMessageType[Confidence].value === 0.0 +- 0.001) + } + + test("trampoline relay") { f => + import f._ + + val listener = TestProbe[Any]() + testKit.system.eventStream ! EventStream.Subscribe(listener.ref) + testKit.system.eventStream ! EventStream.Subscribe(listener.ref) + testKit.system.eventStream ! EventStream.Subscribe(listener.ref) + + val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey) + + val upstream1 = Upstream.Hot.Trampoline(makeChannelUpstream(a, 7, 20000 msat) :: makeChannelUpstream(b, 7, 40000 msat) :: makeChannelUpstream(c, 0, 60000 msat) :: Nil) + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, upstream1, 12000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0) + val added1 = makeOutgoingHtlcAdded(upstream1, 6000 msat) + testKit.system.eventStream ! EventStream.Publish(added1) + testKit.system.eventStream ! EventStream.Publish(makeOutgoingHtlcFulfilled(added1.add)) + listener.expectMessageType[OutgoingHtlcAdded] + listener.expectMessageType[OutgoingHtlcFulfilled] + val upstream2 = Upstream.Hot.Trampoline(makeChannelUpstream(a, 7, 10000 msat) :: makeChannelUpstream(c, 0, 10000 msat) :: Nil) + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, upstream2, 2000 msat) + assert(replyTo.expectMessageType[Confidence].value === (1.0 / 3) +- 0.001) + val added2 = makeOutgoingHtlcAdded(upstream2, 2000 msat) + testKit.system.eventStream ! EventStream.Publish(added2) + listener.expectMessageType[OutgoingHtlcAdded] + val upstream3 = Upstream.Hot.Trampoline(makeChannelUpstream(a, 0, 10000 msat) :: makeChannelUpstream(b, 7, 20000 msat) :: Nil) + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, upstream3, 3000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0) + val added3 = makeOutgoingHtlcAdded(upstream3, 3000 msat) + testKit.system.eventStream ! EventStream.Publish(added3) + testKit.system.eventStream ! EventStream.Publish(makeOutgoingHtlcFailed(added2.add)) + testKit.system.eventStream ! EventStream.Publish(makeOutgoingHtlcFulfilled(added3.add)) + listener.expectMessageType[OutgoingHtlcAdded] + listener.expectMessageType[OutgoingHtlcFailed] + listener.expectMessageType[OutgoingHtlcFulfilled] + + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(a, 7), 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (1.0 / 3) +- 0.001) + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(a, 0), 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (1.0 / 3) +- 0.001) + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(b, 7), 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (4.0 / 6) +- 0.001) + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(b, 0), 1000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0.0) + reputationRecorder ! GetConfidence(replyTo.ref, makeChannelUpstream(c, 0), 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (3.0 / 5) +- 0.001) + } +} \ No newline at end of file diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala new file mode 100644 index 0000000000..feacc75125 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.reputation + +import fr.acinq.eclair.reputation.Reputation._ +import fr.acinq.eclair.{MilliSatoshiLong, TimestampMilli, randomBytes32} +import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper +import org.scalatest.funsuite.AnyFunSuite + +import scala.concurrent.duration.DurationInt + +class ReputationSpec extends AnyFunSuite { + val (htlcId1, htlcId2, htlcId3, htlcId4, htlcId5, htlcId6, htlcId7) = (HtlcId(randomBytes32(), 1), HtlcId(randomBytes32(), 2), HtlcId(randomBytes32(), 3), HtlcId(randomBytes32(), 4), HtlcId(randomBytes32(), 5), HtlcId(randomBytes32(), 6), HtlcId(randomBytes32(), 7)) + + test("basic") { + val r0 = Reputation.init(Config(enabled = true, 1 day, 1 second, 2)) + assert(r0.getConfidence(10000 msat) == 0) + val r1 = r0.attempt(htlcId1, 10000 msat) + val r2 = r1.record(htlcId1, isSuccess = true) + val r3 = r2.attempt(htlcId2, 10000 msat) + assert(r2.getConfidence(10000 msat) === (1.0 / 3) +- 0.001) + val r4 = r3.attempt(htlcId3, 10000 msat) + assert(r3.getConfidence(10000 msat) === (1.0 / 5) +- 0.001) + val r5 = r4.record(htlcId2, isSuccess = true) + val r6 = r5.record(htlcId3, isSuccess = true) + val r7 = r6.attempt(htlcId4, 1 msat) + assert(r6.getConfidence(1 msat) === 1.0 +- 0.001) + val r8 = r7.attempt(htlcId5, 40000 msat) + assert(r7.getConfidence(40000 msat) === (3.0 / 11) +- 0.001) + val r9 = r8.attempt(htlcId6, 10000 msat) + assert(r8.getConfidence(10000 msat) === (3.0 / 13) +- 0.001) + val r10 = r9.record(htlcId6, isSuccess = false) + assert(r10.getConfidence(10000 msat) === (3.0 / 13) +- 0.001) + } + + test("long HTLC") { + val r0 = Reputation.init(Config(enabled = true, 1000 day, 1 second, 10)) + assert(r0.getConfidence(100000 msat, TimestampMilli(0)) == 0) + val r1 = r0.attempt(htlcId1, 100000 msat, TimestampMilli(0)) + val r2 = r1.record(htlcId1, isSuccess = true, now = TimestampMilli(0)) + assert(r2.getConfidence(1000 msat, TimestampMilli(0)) === (10.0 / 11) +- 0.001) + val r3 = r2.attempt(htlcId2, 1000 msat, TimestampMilli(0)) + val r4 = r3.record(htlcId2, isSuccess = false, now = TimestampMilli(0) + 100.seconds) + assert(r4.getConfidence(0 msat, now = TimestampMilli(0) + 100.seconds) === 0.5 +- 0.001) + } + + test("exponential decay") { + val r0 = Reputation.init(Config(enabled = true, 100 seconds, 1 second, 1)) + val r1 = r0.attempt(htlcId1, 1000 msat, TimestampMilli(0)) + val r2 = r1.record(htlcId1, isSuccess = true, now = TimestampMilli(0)) + assert(r2.getConfidence(1000 msat, TimestampMilli(0)) == 1.0 / 2) + val r3 = r2.attempt(htlcId2, 1000 msat, TimestampMilli(0)) + val r4 = r3.record(htlcId2, isSuccess = true, now = TimestampMilli(0)) + assert(r4.getConfidence(1000 msat, TimestampMilli(0)) == 2.0 / 3) + val r5 = r4.attempt(htlcId3, 1000 msat, TimestampMilli(0)) + val r6 = r5.record(htlcId3, isSuccess = true, now = TimestampMilli(0)) + assert(r6.getConfidence(1000 msat, TimestampMilli(0) + 100.seconds) == 1.5 / 2.5) + assert(r6.getConfidence(1000 msat, TimestampMilli(0) + 1.hour) < 0.000001) + } +} \ No newline at end of file