Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration}
* @param score How much fees we have collected in the past (exponential moving average).
* @param lastSettlementAt Timestamp of the last recorded HTLC settlement.
*/
case class PastScore(weight: Double, score: Double, lastSettlementAt: TimestampMilli)
case class PastScore(weight: Double, score: Double, lastSettlementAt: TimestampMilli) {
def decay(now: TimestampMilli, halfLife: FiniteDuration): PastScore = {
val d = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife)
PastScore(d * weight, d * score, now)
}
}

/** We're relaying that HTLC and are waiting for it to settle. */
case class PendingHtlc(fee: MilliSatoshi, accountability: Int, startedAt: TimestampMilli, expiry: CltvExpiry) {
Expand All @@ -58,18 +63,16 @@ case object HtlcId {
* @param maxRelayDuration Duration after which HTLCs are penalized for staying pending too long.
*/
case class Reputation(pastScores: Map[Int, PastScore], pending: Map[HtlcId, PendingHtlc], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration) {
private def decay(now: TimestampMilli, lastSettlementAt: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife)

/**
* Estimate the confidence that a payment will succeed.
*/
def getConfidence(fee: MilliSatoshi, accountability: Int, currentBlockHeight: BlockHeight, expiry: CltvExpiry, now: TimestampMilli = TimestampMilli.now()): Double = {
val weights = Array.fill(Reputation.accountabilityLevels)(0.0)
val scores = Array.fill(Reputation.accountabilityLevels)(0.0)
for (e <- 0 until Reputation.accountabilityLevels) {
val d = decay(now, pastScores(e).lastSettlementAt)
weights(e) += d * pastScores(e).weight
scores(e) += d * pastScores(e).score
val ps = pastScores(e).decay(now, halfLife)
weights(e) += ps.weight
scores(e) += ps.score
}
for (p <- pending.values) {
weights(p.accountability) += p.weight(now, maxRelayDuration, currentBlockHeight)
Expand All @@ -90,7 +93,7 @@ case class Reputation(pastScores: Map[Int, PastScore], pending: Map[HtlcId, Pend
weight += weights(e)
confidence = confidence.max(score / weight)
}
confidence
confidence.min(1.0)
}

/**
Expand All @@ -104,19 +107,24 @@ case class Reputation(pastScores: Map[Int, PastScore], pending: Map[HtlcId, Pend
*/
def settlePendingHtlc(htlcId: HtlcId, isSuccess: Boolean, now: TimestampMilli = TimestampMilli.now()): Reputation = {
val newScores = pending.get(htlcId).map(p => {
val d = decay(now, pastScores(p.accountability).lastSettlementAt)
val ps = pastScores(p.accountability).decay(now, halfLife)
val duration = now - p.startedAt
val (weight, score) = if (isSuccess) {
(p.fee.toLong.toDouble * (duration / maxRelayDuration).max(1.0), p.fee.toLong.toDouble)
} else {
(p.fee.toLong.toDouble * (duration / maxRelayDuration), 0.0)
}
val newWeight = d * pastScores(p.accountability).weight + weight
val newScore = d * pastScores(p.accountability).score + score
val newWeight = ps.weight + weight
val newScore = ps.score + score
pastScores + (p.accountability -> PastScore(newWeight, newScore, now))
}).getOrElse(pastScores)
copy(pending = pending - htlcId, pastScores = newScores)
}

def addExtraFee(fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Reputation = {
val ps = pastScores(0).decay(now, halfLife)
copy(pastScores = pastScores + (0 -> ps.copy(score = ps.score + fee.toLong.toDouble)))
}
}

object Reputation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi}
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi, ToMilliSatoshiConversion}
import fr.acinq.eclair.channel.Upstream.Hot
import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, OutgoingHtlcSettled, Upstream}
import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, OutgoingHtlcSettled, Upstream}
import fr.acinq.eclair.reputation.ReputationRecorder._
import fr.acinq.eclair.wire.protocol.{UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc}

Expand All @@ -36,13 +36,15 @@ object ReputationRecorder {
case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command
case class WrappedOutgoingHtlcSettled(settled: OutgoingHtlcSettled) extends Command
private case object TickAudit extends Command
case class WrappedChannelLiquidityPurchased(purchased: ChannelLiquidityPurchased) extends Command
// @formatter:on

def apply(config: Reputation.Config): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedChannelLiquidityPurchased))
timers.startTimerWithFixedDelay(TickAudit, 5 minutes)
new ReputationRecorder(context, config).run()
}
Expand Down Expand Up @@ -91,6 +93,11 @@ class ReputationRecorder(context: ActorContext[ReputationRecorder.Command], conf
})
Behaviors.same

case WrappedChannelLiquidityPurchased(purchased) =>
val fee = purchased.purchase.fees.serviceFee.toMilliSatoshi
outgoingReputations(purchased.remoteNodeId) = outgoingReputations(purchased.remoteNodeId).addExtraFee(fee)
Behaviors.same

case TickAudit =>
val totalOutgoingPending = outgoingReputations.values.map(_.pending.size).sum
log.info("{} pending HTLCs: {} tracked for outgoing reputation", pending.size, totalOutgoingPending)
Expand Down