diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala index 58224a50ca..df52a5653b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala @@ -17,7 +17,7 @@ package fr.acinq.eclair.reputation import akka.actor.typed.eventstream.EventStream -import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi} @@ -27,6 +27,7 @@ import fr.acinq.eclair.reputation.ReputationRecorder._ import fr.acinq.eclair.wire.protocol.{UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc} import scala.collection.mutable +import scala.concurrent.duration.DurationInt object ReputationRecorder { // @formatter:off @@ -34,14 +35,18 @@ object ReputationRecorder { case class GetConfidence(replyTo: ActorRef[Reputation.Score], upstream: Upstream.Hot, downstream_opt: Option[PublicKey], fee: MilliSatoshi, currentBlockHeight: BlockHeight, expiry: CltvExpiry) extends Command case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command case class WrappedOutgoingHtlcSettled(settled: OutgoingHtlcSettled) extends Command + private case object TickAudit extends Command // @formatter:on def apply(config: Reputation.Config): Behavior[Command] = - Behaviors.setup(context => { - context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled)) - context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded)) - new ReputationRecorder(config).run() - }) + Behaviors.setup { context => + Behaviors.withTimers { timers => + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded)) + timers.startTimerWithFixedDelay(TickAudit, 5 minutes) + new ReputationRecorder(context, config).run() + } + } /** * A pending outgoing HTLC. @@ -53,11 +58,13 @@ object ReputationRecorder { case class PendingHtlc(add: UpdateAddHtlc, upstream: Upstream.Hot, downstream: PublicKey) } -class ReputationRecorder(config: Reputation.Config) { +class ReputationRecorder(context: ActorContext[ReputationRecorder.Command], config: Reputation.Config) { private val incomingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config)) private val outgoingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config)) private val pending: mutable.Map[HtlcId, PendingHtlc] = mutable.HashMap.empty + private val log = context.log + def run(): Behavior[Command] = Behaviors.receiveMessage { case GetConfidence(replyTo, _: Upstream.Local, _, _, _, _) => @@ -128,5 +135,11 @@ class ReputationRecorder(config: Reputation.Config) { outgoingReputations(p.downstream) = outgoingReputations(p.downstream).settlePendingHtlc(htlcId, isSuccess) }) Behaviors.same + + case TickAudit => + val totalIncomingPending = incomingReputations.values.map(_.pending.size).sum + val totalOutgoingPending = outgoingReputations.values.map(_.pending.size).sum + log.info("{} pending HTLCs: {} tracked for incoming reputation and {} tracked for outgoing reputation", pending.size, totalIncomingPending, totalOutgoingPending) + Behaviors.same } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala index 1cbbe2b3c5..2d310d24b3 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala @@ -18,13 +18,12 @@ package fr.acinq.eclair.reputation import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} import akka.actor.typed.ActorRef -import akka.actor.typed.eventstream.EventStream import akka.testkit.TestKit.awaitCond import com.typesafe.config.ConfigFactory import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, Upstream} import fr.acinq.eclair.reputation.ReputationRecorder._ -import fr.acinq.eclair.wire.protocol.{TlvStream, UpdateAddHtlc, UpdateAddHtlcTlv, UpdateFailHtlc, UpdateFulfillHtlc} +import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi, MilliSatoshiLong, TimestampMilli, randomBytes, randomBytes32, randomKey, randomLong} import org.scalatest.Outcome import org.scalatest.funsuite.FixtureAnyFunSuiteLike @@ -206,7 +205,7 @@ class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa }, max = 60 seconds) } - test("sink attack") {f => + test("sink attack") { f => import f._ val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey)