Skip to content

Commit cf54b50

Browse files
committed
Monitor the internal state of ReputationRecorder
It is quite hard to reason about the internal state of this actor, and it tracks every pending HTLC. If for some reason we're not removing HTLCs correctly, this would create a memory leak. We log the number of pending HTLCs every 5 minutes to help detect if clean-up is correctly implemented.
1 parent fa7e2ee commit cf54b50

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package fr.acinq.eclair.reputation
1818

1919
import akka.actor.typed.eventstream.EventStream
20-
import akka.actor.typed.scaladsl.Behaviors
20+
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
2121
import akka.actor.typed.{ActorRef, Behavior}
2222
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2323
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi}
@@ -27,21 +27,26 @@ import fr.acinq.eclair.reputation.ReputationRecorder._
2727
import fr.acinq.eclair.wire.protocol.{UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc}
2828

2929
import scala.collection.mutable
30+
import scala.concurrent.duration.DurationInt
3031

3132
object ReputationRecorder {
3233
// @formatter:off
3334
sealed trait Command
3435
case class GetConfidence(replyTo: ActorRef[Reputation.Score], upstream: Upstream.Hot, downstream_opt: Option[PublicKey], fee: MilliSatoshi, currentBlockHeight: BlockHeight, expiry: CltvExpiry) extends Command
3536
case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command
3637
case class WrappedOutgoingHtlcSettled(settled: OutgoingHtlcSettled) extends Command
38+
private case object TickAudit extends Command
3739
// @formatter:on
3840

3941
def apply(config: Reputation.Config): Behavior[Command] =
40-
Behaviors.setup(context => {
41-
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled))
42-
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded))
43-
new ReputationRecorder(config).run()
44-
})
42+
Behaviors.setup { context =>
43+
Behaviors.withTimers { timers =>
44+
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcSettled))
45+
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded))
46+
timers.startTimerWithFixedDelay(TickAudit, 5 minutes)
47+
new ReputationRecorder(context, config).run()
48+
}
49+
}
4550

4651
/**
4752
* A pending outgoing HTLC.
@@ -53,11 +58,13 @@ object ReputationRecorder {
5358
case class PendingHtlc(add: UpdateAddHtlc, upstream: Upstream.Hot, downstream: PublicKey)
5459
}
5560

56-
class ReputationRecorder(config: Reputation.Config) {
61+
class ReputationRecorder(context: ActorContext[ReputationRecorder.Command], config: Reputation.Config) {
5762
private val incomingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config))
5863
private val outgoingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty.withDefaultValue(Reputation.init(config))
5964
private val pending: mutable.Map[HtlcId, PendingHtlc] = mutable.HashMap.empty
6065

66+
private val log = context.log
67+
6168
def run(): Behavior[Command] =
6269
Behaviors.receiveMessage {
6370
case GetConfidence(replyTo, _: Upstream.Local, _, _, _, _) =>
@@ -128,5 +135,11 @@ class ReputationRecorder(config: Reputation.Config) {
128135
outgoingReputations(p.downstream) = outgoingReputations(p.downstream).settlePendingHtlc(htlcId, isSuccess)
129136
})
130137
Behaviors.same
138+
139+
case TickAudit =>
140+
val totalIncomingPending = incomingReputations.values.map(_.pending.size).sum
141+
val totalOutgoingPending = outgoingReputations.values.map(_.pending.size).sum
142+
log.info("{} pending HTLCs: {} tracked for incoming reputation and {} tracked for outgoing reputation", pending.size, totalIncomingPending, totalOutgoingPending)
143+
Behaviors.same
131144
}
132145
}

eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@ package fr.acinq.eclair.reputation
1818

1919
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
2020
import akka.actor.typed.ActorRef
21-
import akka.actor.typed.eventstream.EventStream
2221
import akka.testkit.TestKit.awaitCond
2322
import com.typesafe.config.ConfigFactory
2423
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2524
import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, Upstream}
2625
import fr.acinq.eclair.reputation.ReputationRecorder._
27-
import fr.acinq.eclair.wire.protocol.{TlvStream, UpdateAddHtlc, UpdateAddHtlcTlv, UpdateFailHtlc, UpdateFulfillHtlc}
26+
import fr.acinq.eclair.wire.protocol._
2827
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi, MilliSatoshiLong, TimestampMilli, randomBytes, randomBytes32, randomKey, randomLong}
2928
import org.scalatest.Outcome
3029
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
@@ -206,7 +205,7 @@ class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa
206205
}, max = 60 seconds)
207206
}
208207

209-
test("sink attack") {f =>
208+
test("sink attack") { f =>
210209
import f._
211210

212211
val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey)

0 commit comments

Comments
 (0)