Skip to content

Commit a87963b

Browse files
authored
Monitor the internal state of ReputationRecorder (#3216)
It is quite hard to reason about the internal state of this actor, and it tracks every pending HTLC. If for some reason we're not removing HTLCs correctly, this would create a memory leak. We log the number of pending HTLCs every 5 minutes to help detect if clean-up is correctly implemented.
1 parent fa7e2ee commit a87963b

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package fr.acinq.eclair.reputation
1818

1919
import akka.actor.typed.eventstream.EventStream
20-
import akka.actor.typed.scaladsl.Behaviors
20+
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
2121
import akka.actor.typed.{ActorRef, Behavior}
2222
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2323
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi}
@@ -27,21 +27,26 @@ import fr.acinq.eclair.reputation.ReputationRecorder._
2727
import fr.acinq.eclair.wire.protocol.{UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc}
2828

2929
import scala.collection.mutable
30+
import scala.concurrent.duration.DurationInt
3031

3132
object ReputationRecorder {
3233
// @formatter:off
3334
sealed trait Command
3435
case class GetConfidence(replyTo: ActorRef[Reputation.Score], upstream: Upstream.Hot, downstream_opt: Option[PublicKey], fee: MilliSatoshi, currentBlockHeight: BlockHeight, expiry: CltvExpiry) extends Command
3536
case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command
3637
case class WrappedOutgoingHtlcSettled(settled: OutgoingHtlcSettled) extends Command
38+
private case object TickAudit extends Command
3739
// @formatter:on
3840

3941
def apply(config: Reputation.Config): Behavior[Command] =
40-
Behaviors.setup(context => {
41-
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled))
42-
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded))
43-
new ReputationRecorder(config).run()
44-
})
42+
Behaviors.setup { context =>
43+
Behaviors.withTimers { timers =>
44+
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled))
45+
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded))
46+
timers.startTimerWithFixedDelay(TickAudit, 5 minutes)
47+
new ReputationRecorder(context, config).run()
48+
}
49+
}
4550

4651
/**
4752
* A pending outgoing HTLC.
@@ -53,11 +58,13 @@ object ReputationRecorder {
5358
case class PendingHtlc(add: UpdateAddHtlc, upstream: Upstream.Hot, downstream: PublicKey)
5459
}
5560

56-
class ReputationRecorder(config: Reputation.Config) {
61+
class ReputationRecorder(context: ActorContext[ReputationRecorder.Command], config: Reputation.Config) {
5762
private val incomingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config))
5863
private val outgoingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config))
5964
private val pending: mutable.Map[HtlcId, PendingHtlc] = mutable.HashMap.empty
6065

66+
private val log = context.log
67+
6168
def run(): Behavior[Command] =
6269
Behaviors.receiveMessage {
6370
case GetConfidence(replyTo, _: Upstream.Local, _, _, _, _) =>
@@ -128,5 +135,11 @@ class ReputationRecorder(config: Reputation.Config) {
128135
outgoingReputations(p.downstream) = outgoingReputations(p.downstream).settlePendingHtlc(htlcId, isSuccess)
129136
})
130137
Behaviors.same
138+
139+
case TickAudit =>
140+
val totalIncomingPending = incomingReputations.values.map(_.pending.size).sum
141+
val totalOutgoingPending = outgoingReputations.values.map(_.pending.size).sum
142+
log.info("{} pending HTLCs: {} tracked for incoming reputation and {} tracked for outgoing reputation", pending.size, totalIncomingPending, totalOutgoingPending)
143+
Behaviors.same
131144
}
132145
}

eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@ package fr.acinq.eclair.reputation
1818

1919
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
2020
import akka.actor.typed.ActorRef
21-
import akka.actor.typed.eventstream.EventStream
2221
import akka.testkit.TestKit.awaitCond
2322
import com.typesafe.config.ConfigFactory
2423
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2524
import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, Upstream}
2625
import fr.acinq.eclair.reputation.ReputationRecorder._
27-
import fr.acinq.eclair.wire.protocol.{TlvStream, UpdateAddHtlc, UpdateAddHtlcTlv, UpdateFailHtlc, UpdateFulfillHtlc}
26+
import fr.acinq.eclair.wire.protocol._
2827
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi, MilliSatoshiLong, TimestampMilli, randomBytes, randomBytes32, randomKey, randomLong}
2928
import org.scalatest.Outcome
3029
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
@@ -206,7 +205,7 @@ class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa
206205
}, max = 60 seconds)
207206
}
208207

209-
test("sink attack") {f =>
208+
test("sink attack") { f =>
210209
import f._
211210

212211
val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey)

0 commit comments

Comments
 (0)