Skip to content

Commit 0242acd

Browse files
committed
Send and receive splice commit_sig as batch
We introduce a `CommitSigBatch` class to group `commit_sig` messages when splice transactions are pending. We use this class to ensure that all the `commit_sig` messages in the batch are sent together to our peer, without any other messages in-between. When we receive `commit_sig` messages that contain the `batch` TLV, we group them directly in the `PeerConnection` before relaying them as a batch to the channel.
1 parent cbd454d commit 0242acd

File tree

9 files changed

+162
-146
lines changed

9 files changed

+162
-146
lines changed

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ data class Commitments(
759759
return failure?.let { Either.Left(it) } ?: Either.Right(copy(changes = changes1))
760760
}
761761

762-
fun sendCommit(channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, List<CommitSig>>> {
762+
fun sendCommit(channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, CommitSigs>> {
763763
val remoteNextPerCommitmentPoint = remoteNextCommitInfo.right ?: return Either.Left(CannotSignBeforeRevocation(channelId))
764764
if (!changes.localHasChanges()) return Either.Left(CannotSignWithoutChanges(channelId))
765765
val (active1, sigs) = active.map { it.sendCommit(channelKeys, params, changes, remoteNextPerCommitmentPoint, active.size, log) }.unzip()
@@ -771,18 +771,30 @@ data class Commitments(
771771
remoteChanges = changes.remoteChanges.copy(acked = emptyList(), signed = changes.remoteChanges.acked)
772772
)
773773
)
774-
return Either.Right(Pair(commitments1, sigs))
774+
return Either.Right(Pair(commitments1, CommitSigs.fromSigs(sigs)))
775775
}
776776

777-
fun receiveCommit(commits: List<CommitSig>, channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, RevokeAndAck>> {
777+
fun receiveCommit(commits: CommitSigs, channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, RevokeAndAck>> {
778778
// We may receive more commit_sig than the number of active commitments, because there can be a race where we send splice_locked
779779
// while our peer is sending us a batch of commit_sig. When that happens, we simply need to discard the commit_sig that belong
780780
// to commitments we deactivated.
781-
if (commits.size < active.size) {
782-
return Either.Left(CommitSigCountMismatch(channelId, active.size, commits.size))
781+
val sigs = when (commits) {
782+
is CommitSigBatch -> {
783+
if (commits.batchSize < active.size) {
784+
return Either.Left(CommitSigCountMismatch(channelId, active.size, commits.batchSize))
785+
}
786+
commits.messages
787+
}
788+
is CommitSig -> {
789+
if (active.size > 1) {
790+
return Either.Left(CommitSigCountMismatch(channelId, active.size, 1))
791+
}
792+
listOf(commits)
793+
}
783794
}
795+
784796
// Signatures are sent in order (most recent first), calling `zip` will drop trailing sigs that are for deactivated/pruned commitments.
785-
val active1 = active.zip(commits).map {
797+
val active1 = active.zip(sigs).map {
786798
when (val commitment1 = it.first.receiveCommit(channelKeys, params, changes, it.second, log)) {
787799
is Either.Left -> return Either.Left(commitment1.value)
788800
is Either.Right -> commitment1.value

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -674,22 +674,6 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
674674
}
675675
}
676676
}
677-
678-
// in Normal and Shutdown we aggregate sigs for splices before processing
679-
var sigStash = emptyList<CommitSig>()
680-
681-
/** For splices we will send one commit_sig per active commitments. */
682-
internal fun ChannelContext.aggregateSigs(commit: CommitSig): List<CommitSig>? {
683-
sigStash = sigStash + commit
684-
logger.debug { "received sig for batch of size=${commit.batchSize}" }
685-
return if (sigStash.size == commit.batchSize) {
686-
val sigs = sigStash
687-
sigStash = emptyList()
688-
sigs
689-
} else {
690-
null
691-
}
692-
}
693677
}
694678

695679
object Channel {

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ data class Normal(
8080
val actions = buildList {
8181
add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos))
8282
add(ChannelAction.Storage.StoreState(nextState))
83-
addAll(result.value.second.map { ChannelAction.Message.Send(it) })
83+
add(ChannelAction.Message.Send(result.value.second))
8484
}
8585
Pair(nextState, actions)
8686
}
@@ -173,12 +173,12 @@ data class Normal(
173173
is Either.Left -> handleLocalError(cmd, result.value)
174174
is Either.Right -> Pair(this@Normal.copy(commitments = result.value), listOf())
175175
}
176-
is CommitSig -> when {
176+
is CommitSigs -> when {
177177
spliceStatus == SpliceStatus.Aborted -> {
178178
logger.warning { "received commit_sig after sending tx_abort, they probably sent it before receiving our tx_abort, ignoring..." }
179179
Pair(this@Normal, listOf())
180180
}
181-
spliceStatus is SpliceStatus.WaitingForSigs -> {
181+
spliceStatus is SpliceStatus.WaitingForSigs && cmd.message is CommitSig -> {
182182
val (signingSession1, action) = spliceStatus.session.receiveCommitSig(channelKeys(), commitments.params, cmd.message, currentBlockHeight.toLong(), logger)
183183
when (action) {
184184
is InteractiveTxSigningSessionAction.AbortFundingAttempt -> {
@@ -193,7 +193,7 @@ data class Normal(
193193
is InteractiveTxSigningSessionAction.SendTxSigs -> sendSpliceTxSigs(spliceStatus.origins, action, spliceStatus.liquidityPurchase)
194194
}
195195
}
196-
ignoreRetransmittedCommitSig(cmd.message) -> {
196+
cmd.message is CommitSig && ignoreRetransmittedCommitSig(cmd.message) -> {
197197
// We haven't received our peer's tx_signatures for the latest funding transaction and asked them to resend it on reconnection.
198198
// They also resend their corresponding commit_sig, but we have already received it so we should ignore it.
199199
// Note that the funding transaction may have confirmed while we were offline.
@@ -202,34 +202,29 @@ data class Normal(
202202
}
203203
// NB: in all other cases we process the commit_sig normally. We could do a full pattern matching on all splice statuses, but it would force us to handle
204204
// corner cases like race condition between splice_init and a non-splice commit_sig
205-
else -> {
206-
when (val sigs = aggregateSigs(cmd.message)) {
207-
is List<CommitSig> -> when (val result = commitments.receiveCommit(sigs, channelKeys(), logger)) {
208-
is Either.Left -> handleLocalError(cmd, result.value)
209-
is Either.Right -> {
210-
val commitments1 = result.value.first
211-
val spliceStatus1 = when {
212-
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> SpliceStatus.InitiatorQuiescent(spliceStatus.command)
213-
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> SpliceStatus.NonInitiatorQuiescent
214-
else -> spliceStatus
215-
}
216-
val nextState = this@Normal.copy(commitments = commitments1, spliceStatus = spliceStatus1)
217-
val actions = mutableListOf<ChannelAction>()
218-
actions.add(ChannelAction.Storage.StoreState(nextState))
219-
actions.add(ChannelAction.Message.Send(result.value.second))
220-
if (commitments1.changes.localHasChanges()) {
221-
actions.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
222-
}
223-
// If we're now quiescent, we may send our stfu message.
224-
when {
225-
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = true)))
226-
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = false)))
227-
else -> {}
228-
}
229-
Pair(nextState, actions)
230-
}
205+
else -> when (val result = commitments.receiveCommit(cmd.message, channelKeys(), logger)) {
206+
is Either.Left -> handleLocalError(cmd, result.value)
207+
is Either.Right -> {
208+
val commitments1 = result.value.first
209+
val spliceStatus1 = when {
210+
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> SpliceStatus.InitiatorQuiescent(spliceStatus.command)
211+
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> SpliceStatus.NonInitiatorQuiescent
212+
else -> spliceStatus
213+
}
214+
val nextState = this@Normal.copy(commitments = commitments1, spliceStatus = spliceStatus1)
215+
val actions = mutableListOf<ChannelAction>()
216+
actions.add(ChannelAction.Storage.StoreState(nextState))
217+
actions.add(ChannelAction.Message.Send(result.value.second))
218+
if (commitments1.changes.localHasChanges()) {
219+
actions.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
231220
}
232-
else -> Pair(this@Normal, listOf())
221+
// If we're now quiescent, we may send our stfu message.
222+
when {
223+
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = true)))
224+
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = false)))
225+
else -> {}
226+
}
227+
Pair(nextState, actions)
233228
}
234229
}
235230
}
@@ -822,8 +817,6 @@ data class Normal(
822817
SpliceStatus.None
823818
}
824819
}
825-
// reset the commit_sig batch
826-
sigStash = emptyList()
827820
Pair(Offline(this@Normal.copy(spliceStatus = spliceStatus1)), failedHtlcs)
828821
}
829822
is ChannelCommand.Connected -> unhandled(cmd)

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/ShuttingDown.kt

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,29 +39,26 @@ data class ShuttingDown(
3939
is Either.Left -> handleLocalError(cmd, result.value)
4040
is Either.Right -> Pair(this@ShuttingDown.copy(commitments = result.value), listOf())
4141
}
42-
is CommitSig -> when (val sigs = aggregateSigs(cmd.message)) {
43-
is List<CommitSig> -> when (val result = commitments.receiveCommit(sigs, channelKeys(), logger)) {
44-
is Either.Left -> handleLocalError(cmd, result.value)
45-
is Either.Right -> {
46-
val (commitments1, revocation) = result.value
47-
when {
48-
commitments1.hasNoPendingHtlcsOrFeeUpdate() -> startClosingNegotiation(closeCommand, commitments1, localShutdown, remoteShutdown, listOf(ChannelAction.Message.Send(revocation)))
49-
else -> {
50-
val nextState = this@ShuttingDown.copy(commitments = commitments1)
51-
val actions = buildList {
52-
add(ChannelAction.Storage.StoreState(nextState))
53-
add(ChannelAction.Message.Send(revocation))
54-
if (commitments1.changes.localHasChanges()) {
55-
// if we have newly acknowledged changes let's sign them
56-
add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
57-
}
42+
is CommitSigs -> when (val result = commitments.receiveCommit(cmd.message, channelKeys(), logger)) {
43+
is Either.Left -> handleLocalError(cmd, result.value)
44+
is Either.Right -> {
45+
val (commitments1, revocation) = result.value
46+
when {
47+
commitments1.hasNoPendingHtlcsOrFeeUpdate() -> startClosingNegotiation(closeCommand, commitments1, localShutdown, remoteShutdown, listOf(ChannelAction.Message.Send(revocation)))
48+
else -> {
49+
val nextState = this@ShuttingDown.copy(commitments = commitments1)
50+
val actions = buildList {
51+
add(ChannelAction.Storage.StoreState(nextState))
52+
add(ChannelAction.Message.Send(revocation))
53+
if (commitments1.changes.localHasChanges()) {
54+
// if we have newly acknowledged changes let's sign them
55+
add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
5856
}
59-
Pair(nextState, actions)
6057
}
58+
Pair(nextState, actions)
6159
}
6260
}
6361
}
64-
else -> Pair(this@ShuttingDown, listOf())
6562
}
6663
is RevokeAndAck -> when (val result = commitments.receiveRevocation(cmd.message)) {
6764
is Either.Left -> handleLocalError(cmd, result.value)
@@ -128,7 +125,7 @@ data class ShuttingDown(
128125
val actions = buildList {
129126
add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos))
130127
add(ChannelAction.Storage.StoreState(nextState))
131-
addAll(result.value.second.map { ChannelAction.Message.Send(it) })
128+
add(ChannelAction.Message.Send(result.value.second))
132129
}
133130
Pair(nextState, actions)
134131
}
@@ -164,11 +161,7 @@ data class ShuttingDown(
164161
is WatchSpentTriggered -> handlePotentialForceClose(watch)
165162
}
166163
is ChannelCommand.Commitment.CheckHtlcTimeout -> checkHtlcTimeout()
167-
is ChannelCommand.Disconnected -> {
168-
// reset the commit_sig batch
169-
sigStash = emptyList()
170-
Pair(Offline(this@ShuttingDown), listOf())
171-
}
164+
is ChannelCommand.Disconnected -> Pair(Offline(this@ShuttingDown), listOf())
172165
else -> unhandled(cmd)
173166
}
174167
}

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,23 +408,23 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
408408
// we just sent a new commit_sig but they didn't receive it
409409
// we resend the same updates and the same sig, and preserve the same ordering
410410
val signedUpdates = commitments.changes.localChanges.signed
411-
val commitSigs = commitments.active.map { it.nextRemoteCommit }.filterIsInstance<NextRemoteCommit>().map { it.sig }
411+
val commitSigs = CommitSigs.fromSigs(commitments.active.mapNotNull { it.nextRemoteCommit?.sig })
412412
val retransmit = when (retransmitRevocation) {
413413
null -> buildList {
414414
addAll(signedUpdates)
415-
addAll(commitSigs)
415+
add(commitSigs)
416416
}
417417
else -> if (commitments.localCommitIndex > rnci.value.sentAfterLocalCommitIndex) {
418418
buildList {
419419
addAll(signedUpdates)
420-
addAll(commitSigs)
420+
add(commitSigs)
421421
add(retransmitRevocation)
422422
}
423423
} else {
424424
buildList {
425425
add(retransmitRevocation)
426426
addAll(signedUpdates)
427-
addAll(commitSigs)
427+
add(commitSigs)
428428
}
429429
}
430430
}

0 commit comments

Comments
 (0)