Skip to content

Commit 87bc09a

Browse files
committed
Send splice commit_sigs as a batch
There has been some discussion on the splicing spec around batching the set of `commit_sig` messages that are sent while a splice is ongoing. In this PR, we ensure that our `commit_sig` messages are sent as a batch (no other messages can be interleaved between `commit_sig` messages). This will let us gradually deploy the official splicing version from the BOLTs: we first ensure that messages are sent as a batch, and later we will be able to rely on this on the receiver side.
1 parent 930c085 commit 87bc09a

File tree

6 files changed

+31
-18
lines changed

6 files changed

+31
-18
lines changed

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/ChannelAction.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ sealed class ChannelAction {
1616
// @formatter:off
1717
sealed class Message : ChannelAction() {
1818
data class Send(val message: LightningMessage) : Message()
19+
data class SendBatch(val messages: List<CommitSig>) : Message()
1920
data class SendToSelf(val command: ChannelCommand) : Message()
2021
}
2122

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ data class Normal(
8080
val actions = buildList {
8181
add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos))
8282
add(ChannelAction.Storage.StoreState(nextState))
83-
addAll(result.value.second.map { ChannelAction.Message.Send(it) })
83+
add(ChannelAction.Message.SendBatch(result.value.second))
8484
}
8585
Pair(nextState, actions)
8686
}

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/ShuttingDown.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ data class ShuttingDown(
128128
val actions = buildList {
129129
add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos))
130130
add(ChannelAction.Storage.StoreState(nextState))
131-
addAll(result.value.second.map { ChannelAction.Message.Send(it) })
131+
add(ChannelAction.Message.SendBatch(result.value.second))
132132
}
133133
Pair(nextState, actions)
134134
}

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
199199
}
200200

201201
// we may need to retransmit updates and/or commit_sig and/or revocation
202-
actions.addAll(syncResult.retransmit.map { ChannelAction.Message.Send(it) })
202+
actions.addAll(syncResult.retransmit)
203203

204204
// then we clean up unsigned updates
205205
val commitments1 = discardUnsignedUpdates(state.commitments)
@@ -231,8 +231,8 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
231231
val commitments1 = discardUnsignedUpdates(state.commitments)
232232
val actions = buildList {
233233
addAll(syncResult.retransmit)
234-
add(state.localShutdown)
235-
}.map { ChannelAction.Message.Send(it) }
234+
add(ChannelAction.Message.Send(state.localShutdown))
235+
}
236236
Pair(state.copy(commitments = commitments1), actions)
237237
}
238238
}
@@ -379,7 +379,7 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
379379

380380
// @formatter:off
381381
sealed class SyncResult {
382-
data class Success(val retransmit: List<LightningMessage>) : SyncResult()
382+
data class Success(val retransmit: List<ChannelAction.Message>) : SyncResult()
383383
sealed class Failure : SyncResult() {
384384
data class LocalLateProven(val ourLocalCommitmentNumber: Long, val theirRemoteCommitmentNumber: Long) : Failure()
385385
data class LocalLateUnproven(val ourRemoteCommitmentNumber: Long, val theirLocalCommitmentNumber: Long) : Failure()
@@ -400,31 +400,32 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
400400
// step 2 depends on step 1 because we need to preserve ordering between commit_sig and revocation
401401

402402
// step 2: we check the remote commitment
403-
fun checkRemoteCommit(remoteChannelReestablish: ChannelReestablish, retransmitRevocation: RevokeAndAck?): SyncResult {
403+
fun checkRemoteCommit(remoteChannelReestablish: ChannelReestablish, revocation: RevokeAndAck?): SyncResult {
404+
val retransmitRevocation = revocation?.let { ChannelAction.Message.Send(it) }
404405
return when (val rnci = commitments.remoteNextCommitInfo) {
405406
is Either.Left -> {
406407
when {
407408
remoteChannelReestablish.nextLocalCommitmentNumber == commitments.nextRemoteCommitIndex -> {
408409
// we just sent a new commit_sig but they didn't receive it
409410
// we resend the same updates and the same sig, and preserve the same ordering
410-
val signedUpdates = commitments.changes.localChanges.signed
411-
val commitSigs = commitments.active.map { it.nextRemoteCommit }.filterIsInstance<NextRemoteCommit>().map { it.sig }
411+
val signedUpdates = commitments.changes.localChanges.signed.map { ChannelAction.Message.Send(it) }
412+
val commitSigs = ChannelAction.Message.SendBatch(commitments.active.mapNotNull { it.nextRemoteCommit }.map { it.sig })
412413
val retransmit = when (retransmitRevocation) {
413414
null -> buildList {
414415
addAll(signedUpdates)
415-
addAll(commitSigs)
416+
add(commitSigs)
416417
}
417418
else -> if (commitments.localCommitIndex > rnci.value.sentAfterLocalCommitIndex) {
418419
buildList {
419420
addAll(signedUpdates)
420-
addAll(commitSigs)
421+
add(commitSigs)
421422
add(retransmitRevocation)
422423
}
423424
} else {
424425
buildList {
425426
add(retransmitRevocation)
426427
addAll(signedUpdates)
427-
addAll(commitSigs)
428+
add(commitSigs)
428429
}
429430
}
430431
}
@@ -477,7 +478,7 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
477478
// step 1: we check our local commitment
478479
return if (commitments.localCommitIndex == remoteChannelReestablish.nextRemoteRevocationNumber) {
479480
// our local commitment is in sync, let's check the remote commitment
480-
checkRemoteCommit(remoteChannelReestablish, retransmitRevocation = null)
481+
checkRemoteCommit(remoteChannelReestablish, revocation = null)
481482
} else if (commitments.localCommitIndex == remoteChannelReestablish.nextRemoteRevocationNumber + 1) {
482483
// they just sent a new commit_sig, we have received it but they didn't receive our revocation
483484
val localPerCommitmentSecret = channelKeys.commitmentSecret(commitments.localCommitIndex - 1)
@@ -487,7 +488,7 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
487488
perCommitmentSecret = localPerCommitmentSecret,
488489
nextPerCommitmentPoint = localNextPerCommitmentPoint
489490
)
490-
checkRemoteCommit(remoteChannelReestablish, retransmitRevocation = revocation)
491+
checkRemoteCommit(remoteChannelReestablish, revocation)
491492
} else if (commitments.localCommitIndex > remoteChannelReestablish.nextRemoteRevocationNumber + 1) {
492493
SyncResult.Failure.RemoteLate
493494
} else {

modules/core/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,10 @@ class Peer(
859859
}
860860
peerConnection?.send(action.message) // ignore if disconnected
861861
}
862+
is ChannelAction.Message.SendBatch -> {
863+
updatePeerStorage(nodeParams, channels + (channelId to state), peerConnection, theirInit?.features, logger)
864+
action.messages.forEach { peerConnection?.send(it) }
865+
}
862866
// sometimes channel actions include "self" command (such as ChannelCommand.Commitment.Sign)
863867
is ChannelAction.Message.SendToSelf -> input.send(WrappedChannelCommand(actualChannelId, action.command))
864868
is ChannelAction.Blockchain.SendWatch -> watcher.watch(action.watch)

modules/core/src/commonTest/kotlin/fr/acinq/lightning/channel/TestsHelper.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,14 @@ import kotlinx.coroutines.runBlocking
2929
import kotlin.test.*
3030

3131
// LN Message
32-
internal inline fun <reified T : LightningMessage> List<ChannelAction>.findOutgoingMessages(): List<T> = filterIsInstance<ChannelAction.Message.Send>().map { it.message }.filterIsInstance<T>()
32+
internal inline fun <reified T : LightningMessage> List<ChannelAction>.findOutgoingMessages(): List<T> = filterIsInstance<ChannelAction.Message>().flatMap {
33+
when (it) {
34+
is ChannelAction.Message.Send -> listOf(it.message)
35+
is ChannelAction.Message.SendBatch -> it.messages
36+
is ChannelAction.Message.SendToSelf -> listOf()
37+
}
38+
}.filterIsInstance<T>()
39+
3340
internal inline fun <reified T : LightningMessage> List<ChannelAction>.findOutgoingMessageOpt(): T? = findOutgoingMessages<T>().firstOrNull()
3441
internal inline fun <reified T : LightningMessage> List<ChannelAction>.findOutgoingMessage(): T = findOutgoingMessageOpt<T>() ?: fail("cannot find outgoing message ${T::class}")
3542
internal inline fun <reified T : LightningMessage> List<ChannelAction>.hasOutgoingMessage() = assertNotNull(findOutgoingMessageOpt<T>(), "cannot find outgoing message ${T::class}")
@@ -545,7 +552,7 @@ object TestsHelper {
545552
val rHasChanges = nodeB.state.commitments.changes.localHasChanges()
546553

547554
val (sender0, sActions0) = nodeA.process(ChannelCommand.Commitment.Sign)
548-
val commitSigs0 = sActions0.findOutgoingMessages<CommitSig>()
555+
val commitSigs0 = sActions0.find<ChannelAction.Message.SendBatch>().messages
549556
assertEquals(commitmentsCount, commitSigs0.size)
550557
commitSigs0.forEach { assertEquals(commitmentsCount, it.batchSize) }
551558

@@ -556,7 +563,7 @@ object TestsHelper {
556563
val (sender1, _) = sender0.process(ChannelCommand.MessageReceived(revokeAndAck0))
557564
assertIs<LNChannel<T>>(sender1)
558565
val (receiver1, rActions1) = receiver0.process(commandSign0)
559-
val commitSigs1 = rActions1.findOutgoingMessages<CommitSig>()
566+
val commitSigs1 = rActions1.find<ChannelAction.Message.SendBatch>().messages
560567
assertEquals(commitmentsCount, commitSigs1.size)
561568
commitSigs1.forEach { assertEquals(commitmentsCount, it.batchSize) }
562569

@@ -568,7 +575,7 @@ object TestsHelper {
568575
if (rHasChanges) {
569576
val commandSign1 = sActions2.findCommand<ChannelCommand.Commitment.Sign>()
570577
val (sender3, sActions3) = sender2.process(commandSign1)
571-
val commitSigs2 = sActions3.findOutgoingMessages<CommitSig>()
578+
val commitSigs2 = sActions3.find<ChannelAction.Message.SendBatch>().messages
572579
assertEquals(commitmentsCount, commitSigs2.size)
573580

574581
val (receiver3, rActions3) = receiveCommitSigs(receiver2, commitSigs2)

0 commit comments

Comments
 (0)