Skip to content

Send splice commit_sigs as a batch #787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ data class Commitments(
return failure?.let { Either.Left(it) } ?: Either.Right(copy(changes = changes1))
}

fun sendCommit(channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, List<CommitSig>>> {
fun sendCommit(channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, CommitSigs>> {
val remoteNextPerCommitmentPoint = remoteNextCommitInfo.right ?: return Either.Left(CannotSignBeforeRevocation(channelId))
if (!changes.localHasChanges()) return Either.Left(CannotSignWithoutChanges(channelId))
val (active1, sigs) = active.map { it.sendCommit(channelKeys, params, changes, remoteNextPerCommitmentPoint, active.size, log) }.unzip()
Expand All @@ -771,18 +771,30 @@ data class Commitments(
remoteChanges = changes.remoteChanges.copy(acked = emptyList(), signed = changes.remoteChanges.acked)
)
)
return Either.Right(Pair(commitments1, sigs))
return Either.Right(Pair(commitments1, CommitSigs.fromSigs(sigs)))
}

fun receiveCommit(commits: List<CommitSig>, channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, RevokeAndAck>> {
fun receiveCommit(commits: CommitSigs, channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, RevokeAndAck>> {
// We may receive more commit_sig than the number of active commitments, because there can be a race where we send splice_locked
// while our peer is sending us a batch of commit_sig. When that happens, we simply need to discard the commit_sig that belong
// to commitments we deactivated.
if (commits.size < active.size) {
return Either.Left(CommitSigCountMismatch(channelId, active.size, commits.size))
val sigs = when (commits) {
is CommitSigBatch -> {
if (commits.batchSize < active.size) {
return Either.Left(CommitSigCountMismatch(channelId, active.size, commits.batchSize))
}
commits.messages
}
is CommitSig -> {
if (active.size > 1) {
return Either.Left(CommitSigCountMismatch(channelId, active.size, 1))
}
listOf(commits)
}
}

// Signatures are sent in order (most recent first), calling `zip` will drop trailing sigs that are for deactivated/pruned commitments.
val active1 = active.zip(commits).map {
val active1 = active.zip(sigs).map {
when (val commitment1 = it.first.receiveCommit(channelKeys, params, changes, it.second, log)) {
is Either.Left -> return Either.Left(commitment1.value)
is Either.Right -> commitment1.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,22 +674,6 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
}
}
}

// in Normal and Shutdown we aggregate sigs for splices before processing
var sigStash = emptyList<CommitSig>()

/** For splices we will send one commit_sig per active commitments. */
internal fun ChannelContext.aggregateSigs(commit: CommitSig): List<CommitSig>? {
sigStash = sigStash + commit
logger.debug { "received sig for batch of size=${commit.batchSize}" }
return if (sigStash.size == commit.batchSize) {
val sigs = sigStash
sigStash = emptyList()
sigs
} else {
null
}
}
}

object Channel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ data class Normal(
val actions = buildList {
add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos))
add(ChannelAction.Storage.StoreState(nextState))
addAll(result.value.second.map { ChannelAction.Message.Send(it) })
add(ChannelAction.Message.Send(result.value.second))
}
Pair(nextState, actions)
}
Expand Down Expand Up @@ -173,12 +173,12 @@ data class Normal(
is Either.Left -> handleLocalError(cmd, result.value)
is Either.Right -> Pair([email protected](commitments = result.value), listOf())
}
is CommitSig -> when {
is CommitSigs -> when {
spliceStatus == SpliceStatus.Aborted -> {
logger.warning { "received commit_sig after sending tx_abort, they probably sent it before receiving our tx_abort, ignoring..." }
Pair(this@Normal, listOf())
}
spliceStatus is SpliceStatus.WaitingForSigs -> {
spliceStatus is SpliceStatus.WaitingForSigs && cmd.message is CommitSig -> {
val (signingSession1, action) = spliceStatus.session.receiveCommitSig(channelKeys(), commitments.params, cmd.message, currentBlockHeight.toLong(), logger)
when (action) {
is InteractiveTxSigningSessionAction.AbortFundingAttempt -> {
Expand All @@ -193,7 +193,7 @@ data class Normal(
is InteractiveTxSigningSessionAction.SendTxSigs -> sendSpliceTxSigs(spliceStatus.origins, action, spliceStatus.liquidityPurchase)
}
}
ignoreRetransmittedCommitSig(cmd.message) -> {
cmd.message is CommitSig && ignoreRetransmittedCommitSig(cmd.message) -> {
// We haven't received our peer's tx_signatures for the latest funding transaction and asked them to resend it on reconnection.
// They also resend their corresponding commit_sig, but we have already received it so we should ignore it.
// Note that the funding transaction may have confirmed while we were offline.
Expand All @@ -202,34 +202,29 @@ data class Normal(
}
// NB: in all other cases we process the commit_sig normally. We could do a full pattern matching on all splice statuses, but it would force us to handle
// corner cases like race condition between splice_init and a non-splice commit_sig
else -> {
when (val sigs = aggregateSigs(cmd.message)) {
is List<CommitSig> -> when (val result = commitments.receiveCommit(sigs, channelKeys(), logger)) {
is Either.Left -> handleLocalError(cmd, result.value)
is Either.Right -> {
val commitments1 = result.value.first
val spliceStatus1 = when {
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> SpliceStatus.InitiatorQuiescent(spliceStatus.command)
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> SpliceStatus.NonInitiatorQuiescent
else -> spliceStatus
}
val nextState = [email protected](commitments = commitments1, spliceStatus = spliceStatus1)
val actions = mutableListOf<ChannelAction>()
actions.add(ChannelAction.Storage.StoreState(nextState))
actions.add(ChannelAction.Message.Send(result.value.second))
if (commitments1.changes.localHasChanges()) {
actions.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
}
// If we're now quiescent, we may send our stfu message.
when {
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = true)))
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = false)))
else -> {}
}
Pair(nextState, actions)
}
else -> when (val result = commitments.receiveCommit(cmd.message, channelKeys(), logger)) {
is Either.Left -> handleLocalError(cmd, result.value)
is Either.Right -> {
val commitments1 = result.value.first
val spliceStatus1 = when {
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> SpliceStatus.InitiatorQuiescent(spliceStatus.command)
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> SpliceStatus.NonInitiatorQuiescent
else -> spliceStatus
}
val nextState = [email protected](commitments = commitments1, spliceStatus = spliceStatus1)
val actions = mutableListOf<ChannelAction>()
actions.add(ChannelAction.Storage.StoreState(nextState))
actions.add(ChannelAction.Message.Send(result.value.second))
if (commitments1.changes.localHasChanges()) {
actions.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
}
else -> Pair(this@Normal, listOf())
// If we're now quiescent, we may send our stfu message.
when {
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = true)))
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = false)))
else -> {}
}
Pair(nextState, actions)
}
}
}
Expand Down Expand Up @@ -822,8 +817,6 @@ data class Normal(
SpliceStatus.None
}
}
// reset the commit_sig batch
sigStash = emptyList()
Pair(Offline([email protected](spliceStatus = spliceStatus1)), failedHtlcs)
}
is ChannelCommand.Connected -> unhandled(cmd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,26 @@ data class ShuttingDown(
is Either.Left -> handleLocalError(cmd, result.value)
is Either.Right -> Pair([email protected](commitments = result.value), listOf())
}
is CommitSig -> when (val sigs = aggregateSigs(cmd.message)) {
is List<CommitSig> -> when (val result = commitments.receiveCommit(sigs, channelKeys(), logger)) {
is Either.Left -> handleLocalError(cmd, result.value)
is Either.Right -> {
val (commitments1, revocation) = result.value
when {
commitments1.hasNoPendingHtlcsOrFeeUpdate() -> startClosingNegotiation(closeCommand, commitments1, localShutdown, remoteShutdown, listOf(ChannelAction.Message.Send(revocation)))
else -> {
val nextState = [email protected](commitments = commitments1)
val actions = buildList {
add(ChannelAction.Storage.StoreState(nextState))
add(ChannelAction.Message.Send(revocation))
if (commitments1.changes.localHasChanges()) {
// if we have newly acknowledged changes let's sign them
add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
}
is CommitSigs -> when (val result = commitments.receiveCommit(cmd.message, channelKeys(), logger)) {
is Either.Left -> handleLocalError(cmd, result.value)
is Either.Right -> {
val (commitments1, revocation) = result.value
when {
commitments1.hasNoPendingHtlcsOrFeeUpdate() -> startClosingNegotiation(closeCommand, commitments1, localShutdown, remoteShutdown, listOf(ChannelAction.Message.Send(revocation)))
else -> {
val nextState = [email protected](commitments = commitments1)
val actions = buildList {
add(ChannelAction.Storage.StoreState(nextState))
add(ChannelAction.Message.Send(revocation))
if (commitments1.changes.localHasChanges()) {
// if we have newly acknowledged changes let's sign them
add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
}
Pair(nextState, actions)
}
Pair(nextState, actions)
}
}
}
else -> Pair(this@ShuttingDown, listOf())
}
is RevokeAndAck -> when (val result = commitments.receiveRevocation(cmd.message)) {
is Either.Left -> handleLocalError(cmd, result.value)
Expand Down Expand Up @@ -128,7 +125,7 @@ data class ShuttingDown(
val actions = buildList {
add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos))
add(ChannelAction.Storage.StoreState(nextState))
addAll(result.value.second.map { ChannelAction.Message.Send(it) })
add(ChannelAction.Message.Send(result.value.second))
}
Pair(nextState, actions)
}
Expand Down Expand Up @@ -164,11 +161,7 @@ data class ShuttingDown(
is WatchSpentTriggered -> handlePotentialForceClose(watch)
}
is ChannelCommand.Commitment.CheckHtlcTimeout -> checkHtlcTimeout()
is ChannelCommand.Disconnected -> {
// reset the commit_sig batch
sigStash = emptyList()
Pair(Offline(this@ShuttingDown), listOf())
}
is ChannelCommand.Disconnected -> Pair(Offline(this@ShuttingDown), listOf())
else -> unhandled(cmd)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,23 +408,23 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
// we just sent a new commit_sig but they didn't receive it
// we resend the same updates and the same sig, and preserve the same ordering
val signedUpdates = commitments.changes.localChanges.signed
val commitSigs = commitments.active.map { it.nextRemoteCommit }.filterIsInstance<NextRemoteCommit>().map { it.sig }
val commitSigs = CommitSigs.fromSigs(commitments.active.mapNotNull { it.nextRemoteCommit?.sig })
val retransmit = when (retransmitRevocation) {
null -> buildList {
addAll(signedUpdates)
addAll(commitSigs)
add(commitSigs)
}
else -> if (commitments.localCommitIndex > rnci.value.sentAfterLocalCommitIndex) {
buildList {
addAll(signedUpdates)
addAll(commitSigs)
add(commitSigs)
add(retransmitRevocation)
}
} else {
buildList {
add(retransmitRevocation)
addAll(signedUpdates)
addAll(commitSigs)
add(commitSigs)
}
}
}
Expand Down
Loading