Skip to content

Make blockheight/fees flows non-nullable #493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 10 additions & 45 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class Peer(
val db: Databases,
socketBuilder: TcpSocket.Builder?,
scope: CoroutineScope,
val currentTipFlow: StateFlow<Pair<Int, BlockHeader>>,
val onChainFeeratesFlow: StateFlow<OnChainFeerates>,
val swapInFeeratesFlow: StateFlow<FeeratePerKw>,
private val isMigrationFromLegacyApp: Boolean = false,
private val initTlvStream: TlvStream<InitTlv> = TlvStream.empty()
) : CoroutineScope by scope {
Expand Down Expand Up @@ -157,17 +160,13 @@ class Peer(
private val ourInit = Init(features.initFeatures(), initTlvStream)
private var theirInit: Init? = null

val currentTipFlow = MutableStateFlow<Pair<Int, BlockHeader>?>(null)
val onChainFeeratesFlow = MutableStateFlow<OnChainFeerates?>(null)
val swapInFeeratesFlow = MutableStateFlow<FeeratePerKw?>(null)

private val _channelLogger = nodeParams.loggerFactory.newLogger(ChannelState::class)
private suspend fun ChannelState.process(cmd: ChannelCommand): Pair<ChannelState, List<ChannelAction>> {
val state = this
val ctx = ChannelContext(
StaticParams(nodeParams, remoteNodeId),
currentTipFlow.filterNotNull().first().first,
onChainFeeratesFlow.filterNotNull().first(),
currentTipFlow.value.first,
onChainFeeratesFlow.value,
logger = MDCLogger(
logger = _channelLogger,
staticMdc = mapOf("remoteNodeId" to remoteNodeId) + state.mdc()
Expand All @@ -188,20 +187,6 @@ class Peer(
val swapInAddress: String = nodeParams.keyManager.swapInOnChainWallet.address.also { swapInWallet.addAddress(it) }

init {
launch {
watcher.client.notifications.filterIsInstance<HeaderSubscriptionResponse>()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a race condition here between establishing the connection to electrum (which happens outside of the peer), and subscribing to HeaderSubscriptionResponse.

If the connection to electrum is very fast, tje peer will miss the initial HeaderSubscriptionResponse and then have to wait for the next block.

Same race condition applies for feerates.

.collect { msg ->
currentTipFlow.value = msg.blockHeight to msg.header
}
}
launch {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
updateEstimateFees()
}
}
launch {
watcher.openWatchNotificationsFlow().collect {
logger.debug { "notification: $it" }
Expand Down Expand Up @@ -233,8 +218,6 @@ class Peer(
processSwapInCommands(swapInManager)
}
launch {
// wait to have a swap-in feerate available
swapInFeeratesFlow.filterNotNull().first()
watchSwapInWallet()
}
launch {
Expand All @@ -257,24 +240,6 @@ class Peer(
}
}

private suspend fun updateEstimateFees() {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
val sortedFees = listOf(
watcher.client.estimateFees(2),
watcher.client.estimateFees(6),
watcher.client.estimateFees(18),
watcher.client.estimateFees(144),
)
logger.info { "on-chain fees: $sortedFees" }
// TODO: If some feerates are null, we may implement a retry
onChainFeeratesFlow.value = OnChainFeerates(
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
)
}

fun connect() {
if (connectionState.value is Connection.CLOSED) establishConnection()
else logger.warning { "Peer is already connecting / connected" }
Expand Down Expand Up @@ -393,7 +358,7 @@ class Peer(
swapInWallet.walletStateFlow
.filter { it.consistent }
.collect {
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
val currentBlockHeight = currentTipFlow.value.first
swapInCommands.send(SwapInCommand.TrySwapIn(currentBlockHeight, it, walletParams.swapInConfirmations, isMigrationFromLegacyApp))
}
}
Expand Down Expand Up @@ -532,7 +497,7 @@ class Peer(
}

is ChannelAction.ProcessCmdRes.AddSettledFail -> {
val currentTip = currentTipFlow.filterNotNull().first()
val currentTip = currentTipFlow.value
when (val result = outgoingPaymentHandler.processAddSettled(actualChannelId, action, _channels, currentTip.first)) {
is OutgoingPaymentHandler.Progress -> {
_eventsFlow.emit(PaymentProgress(result.request, result.fees))
Expand Down Expand Up @@ -639,7 +604,7 @@ class Peer(
}

private suspend fun processIncomingPayment(item: Either<PayToOpenRequest, UpdateAddHtlc>) {
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
val currentBlockHeight = currentTipFlow.value.first
val result = when (item) {
is Either.Right -> incomingPaymentHandler.process(item.value, currentBlockHeight)
is Either.Left -> incomingPaymentHandler.process(item.value, currentBlockHeight)
Expand Down Expand Up @@ -937,7 +902,7 @@ class Peer(
is RequestChannelOpen -> {
when (val channel = channels.values.firstOrNull { it is Normal }) {
is ChannelStateWithCommitments -> {
val targetFeerate = swapInFeeratesFlow.filterNotNull().first()
val targetFeerate = swapInFeeratesFlow.value
val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = cmd.walletInputs, localOutputs = emptyList())
val (feerate, fee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger)

Expand Down Expand Up @@ -1034,7 +999,7 @@ class Peer(
sendToPeer(cmd.payToOpenResponse)
}
is SendPayment -> {
val currentTip = currentTipFlow.filterNotNull().first()
val currentTip = currentTipFlow.value
when (val result = outgoingPaymentHandler.sendPayment(cmd, _channels, currentTip.first)) {
is OutgoingPaymentHandler.Progress -> {
_eventsFlow.emit(PaymentProgress(result.request, result.fees))
Expand Down
22 changes: 12 additions & 10 deletions src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import fr.acinq.lightning.wire.ChannelReestablish
import fr.acinq.lightning.wire.Init
import fr.acinq.lightning.wire.LightningMessage
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.kodein.log.LoggerFactory
Expand Down Expand Up @@ -169,13 +167,17 @@ fun buildPeer(
): Peer {
val electrum = ElectrumClient(TcpSocket.Builder(), scope, LoggerFactory.default)
val watcher = ElectrumWatcher(electrum, scope, LoggerFactory.default)
val peer = Peer(nodeParams, walletParams, watcher, databases, TcpSocket.Builder(), scope)
peer.currentTipFlow.value = currentTip
peer.onChainFeeratesFlow.value = OnChainFeerates(
fundingFeerate = FeeratePerKw(FeeratePerByte(5.sat)),
mutualCloseFeerate = FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = FeeratePerKw(FeeratePerByte(50.sat))
val peer = Peer(nodeParams, walletParams, watcher, databases, TcpSocket.Builder(), scope,
currentTipFlow = MutableStateFlow(currentTip),
onChainFeeratesFlow = MutableStateFlow(
OnChainFeerates(
fundingFeerate = FeeratePerKw(FeeratePerByte(5.sat)),
mutualCloseFeerate = FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = FeeratePerKw(FeeratePerByte(50.sat))
)
),
swapInFeeratesFlow = MutableStateFlow(FeeratePerKw(FeeratePerByte(5.sat)))
)

return peer
Expand Down