Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package fr.acinq.eclair.reputation

import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi}
Expand All @@ -27,21 +27,26 @@ import fr.acinq.eclair.reputation.ReputationRecorder._
import fr.acinq.eclair.wire.protocol.{UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc}

import scala.collection.mutable
import scala.concurrent.duration.DurationInt

object ReputationRecorder {
// @formatter:off
sealed trait Command
case class GetConfidence(replyTo: ActorRef[Reputation.Score], upstream: Upstream.Hot, downstream_opt: Option[PublicKey], fee: MilliSatoshi, currentBlockHeight: BlockHeight, expiry: CltvExpiry) extends Command
case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command
case class WrappedOutgoingHtlcSettled(settled: OutgoingHtlcSettled) extends Command
private case object TickAudit extends Command
// @formatter:on

def apply(config: Reputation.Config): Behavior[Command] =
Behaviors.setup(context => {
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded))
new ReputationRecorder(config).run()
})
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded))
timers.startTimerWithFixedDelay(TickAudit, 5 minutes)
new ReputationRecorder(context, config).run()
}
}

/**
* A pending outgoing HTLC.
Expand All @@ -53,11 +58,13 @@ object ReputationRecorder {
case class PendingHtlc(add: UpdateAddHtlc, upstream: Upstream.Hot, downstream: PublicKey)
}

class ReputationRecorder(config: Reputation.Config) {
class ReputationRecorder(context: ActorContext[ReputationRecorder.Command], config: Reputation.Config) {
private val incomingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config))
private val outgoingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config))
private val pending: mutable.Map[HtlcId, PendingHtlc] = mutable.HashMap.empty

private val log = context.log

def run(): Behavior[Command] =
Behaviors.receiveMessage {
case GetConfidence(replyTo, _: Upstream.Local, _, _, _, _) =>
Expand Down Expand Up @@ -128,5 +135,11 @@ class ReputationRecorder(config: Reputation.Config) {
outgoingReputations(p.downstream) = outgoingReputations(p.downstream).settlePendingHtlc(htlcId, isSuccess)
})
Behaviors.same

case TickAudit =>
val totalIncomingPending = incomingReputations.values.map(_.pending.size).sum
val totalOutgoingPending = outgoingReputations.values.map(_.pending.size).sum
log.info("{} pending HTLCs: {} tracked for incoming reputation and {} tracked for outgoing reputation", pending.size, totalIncomingPending, totalOutgoingPending)
Behaviors.same
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ package fr.acinq.eclair.reputation

import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.ActorRef
import akka.actor.typed.eventstream.EventStream
import akka.testkit.TestKit.awaitCond
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, Upstream}
import fr.acinq.eclair.reputation.ReputationRecorder._
import fr.acinq.eclair.wire.protocol.{TlvStream, UpdateAddHtlc, UpdateAddHtlcTlv, UpdateFailHtlc, UpdateFulfillHtlc}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi, MilliSatoshiLong, TimestampMilli, randomBytes, randomBytes32, randomKey, randomLong}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
Expand Down Expand Up @@ -206,7 +205,7 @@ class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa
}, max = 60 seconds)
}

test("sink attack") {f =>
test("sink attack") { f =>
import f._

val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey)
Expand Down