Skip to content

Commit 16868a4

Browse files
Merge branch 'develop' into feature/quic-issues
2 parents 00e5b6e + ba322e3 commit 16868a4

18 files changed

Lines changed: 1467 additions & 8 deletions

libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexFrameCodec.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,12 @@ class MplexFrameCodec(
7373
val streamId = header.shr(3)
7474
val data = msg.readSlice(lenData.toInt())
7575
data.retain() // MessageToMessageCodec releases original buffer, but it needs to be relayed
76-
val flag = MplexFlag.getByValue(streamTag)
76+
val flag = try {
77+
MplexFlag.getByValue(streamTag)
78+
} catch (e: IllegalArgumentException) {
79+
data.release()
80+
throw e
81+
}
7782
val mplexFrame = MplexFrame(MplexId(ctx.channel().id(), streamId, !flag.isInitiator), flag, data)
7883
out.add(mplexFrame)
7984
}

libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,18 @@ open class MplexHandler(
2626
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
2727
msg as MplexFrame
2828
when (msg.flag.type) {
29-
MplexFlag.Type.OPEN -> onRemoteOpen(msg.id)
30-
MplexFlag.Type.CLOSE -> onRemoteDisconnect(msg.id)
31-
MplexFlag.Type.RESET -> onRemoteClose(msg.id)
29+
MplexFlag.Type.OPEN -> {
30+
onRemoteOpen(msg.id)
31+
msg.release()
32+
}
33+
MplexFlag.Type.CLOSE -> {
34+
onRemoteDisconnect(msg.id)
35+
msg.release()
36+
}
37+
MplexFlag.Type.RESET -> {
38+
onRemoteClose(msg.id)
39+
msg.release()
40+
}
3241
MplexFlag.Type.DATA -> childRead(msg.id, msg.data)
3342
}
3443
}

libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,15 @@ abstract class AbstractRouter(
8383
return true
8484
}
8585

86+
/**
87+
* Per-router caps on repeated-field counts inside inbound RPCs. Enforced before
88+
* protobuf materialisation by an [RpcCountFrameDecoder] inserted into the stream
89+
* pipeline. Defaults to [PubsubRpcLimits.NONE] (no pre-decode cap). Subclasses
90+
* with configured limits (e.g. [io.libp2p.pubsub.gossip.GossipRouter]) override.
91+
*/
92+
protected open val rpcLimits: PubsubRpcLimits
93+
get() = PubsubRpcLimits.NONE
94+
8695
/**
8796
* Flushes all pending message parts for all peers
8897
*/
@@ -113,6 +122,7 @@ abstract class AbstractRouter(
113122
with(streamHandler.stream) {
114123
pushHandler(LimitedProtobufVarint32FrameDecoder(maxMsgSize))
115124
pushHandler(ProtobufVarint32LengthFieldPrepender())
125+
pushHandler(RpcCountFrameDecoder(rpcLimits))
116126
pushHandler(ProtobufDecoder(Rpc.RPC.getDefaultInstance()))
117127
pushHandler(ProtobufEncoder())
118128
handler?.also { pushHandler(it) }
@@ -216,7 +226,9 @@ abstract class AbstractRouter(
216226
true
217227
} catch (e: Exception) {
218228
logger.debug("Invalid pubsub message from peer {}: {}", peer, it, e)
219-
seenMessages[it] = Optional.of(ValidationResult.Invalid)
229+
// Avoid rejecting a future legitimate message with the same id
230+
// (e.g. same from||seqno)
231+
seenMessages -= it.messageId
220232
notifyUnseenInvalidMessage(peer, it)
221233
false
222234
}
@@ -230,8 +242,14 @@ abstract class AbstractRouter(
230242
validFuts.forEach { (msg, validationFut) ->
231243
validationFut.thenAcceptAsync(
232244
{ res ->
233-
seenMessages[msg] = Optional.of(res)
234-
if (res == ValidationResult.Invalid) notifyUnseenInvalidMessage(peer, msg)
245+
if (res == ValidationResult.Invalid) {
246+
// Evict so a later legitimate message with the same id is not
247+
// suppressed by this rejected one.
248+
seenMessages -= msg.messageId
249+
notifyUnseenInvalidMessage(peer, msg)
250+
} else {
251+
seenMessages[msg] = Optional.of(res)
252+
}
235253
},
236254
executor
237255
)

libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
4242
seqId: Long?,
4343
vararg topics: Topic
4444
): CompletableFuture<Unit> {
45+
// If the caller supplies a `from` AND we have a signing key, the `from` MUST
46+
// resolve to the same PeerId as the signing key
47+
if (from != null && privKey != null) {
48+
val derived = PeerId.fromPubKey(privKey.publicKey()).bytes
49+
require(from.contentEquals(derived)) {
50+
"publishExt 'from' (${PeerId(from)}) does not match the signing key's PeerId (${PeerId(derived)})"
51+
}
52+
}
4553
val mFrom = from?.toProtobuf() ?: this.from
4654
val mSeqId = seqId ?: seqIdGenerator()
4755

libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubCrypto.kt

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package io.libp2p.pubsub
22

3+
import io.libp2p.core.PeerId
34
import io.libp2p.core.crypto.PrivKey
45
import io.libp2p.core.crypto.marshalPublicKey
56
import io.libp2p.core.crypto.unmarshalPublicKey
67
import io.libp2p.etc.types.toProtobuf
8+
import org.slf4j.LoggerFactory
79
import pubsub.pb.Rpc
810

911
val SignPrefix = "libp2p-pubsub:".toByteArray()
1012

13+
private val logger = LoggerFactory.getLogger("io.libp2p.pubsub.PubsubCrypto")
14+
1115
fun pubsubSign(msg: Rpc.Message, key: PrivKey): Rpc.Message {
1216
if (msg.hasKey() || msg.hasSignature()) throw IllegalArgumentException("Message to sign should not contain 'key' or 'signature' fields")
1317
val signature = key.sign(SignPrefix + msg.toByteArray())
@@ -17,12 +21,59 @@ fun pubsubSign(msg: Rpc.Message, key: PrivKey): Rpc.Message {
1721
.build()
1822
}
1923

24+
/**
25+
* Strict-sign validation for a pubsub [Rpc.Message]:
26+
*
27+
* - `from`, `key` and `signature` MUST all be present and non-empty.
28+
* - `signature` MUST verify under `key` over the SSZ-style serialised message with
29+
* `signature` and `key` cleared.
30+
* - `PeerId.fromPubKey(key)` MUST equal `PeerId(from)`, otherwise an attacker holding
31+
* any valid private key could publish messages whose `from` claims a different peer.
32+
*
33+
* Returns `false` on any validation failure. Never throws.
34+
*/
2035
fun pubsubValidate(msg: Rpc.Message): Boolean {
36+
if (!msg.hasFrom() || msg.from.isEmpty) {
37+
logger.debug("Rejecting pubsub message with missing/empty 'from'")
38+
return false
39+
}
40+
if (!msg.hasKey() || msg.key.isEmpty) {
41+
logger.debug("Rejecting pubsub message with missing/empty 'key'")
42+
return false
43+
}
44+
if (!msg.hasSignature() || msg.signature.isEmpty) {
45+
logger.debug("Rejecting pubsub message with missing/empty 'signature'")
46+
return false
47+
}
48+
49+
val publicKey = try {
50+
unmarshalPublicKey(msg.key.toByteArray())
51+
} catch (e: Exception) {
52+
logger.debug("Rejecting pubsub message with un-parseable 'key': {}", e.toString())
53+
return false
54+
}
55+
56+
val keyDerivedPeerId = PeerId.fromPubKey(publicKey)
57+
val claimedFrom = try {
58+
PeerId(msg.from.toByteArray())
59+
} catch (e: Exception) {
60+
logger.debug("Rejecting pubsub message with un-parseable 'from': {}", e.toString())
61+
return false
62+
}
63+
if (keyDerivedPeerId != claimedFrom) {
64+
logger.debug(
65+
"Rejecting pubsub message: from={} does not match PeerId(key)={}",
66+
claimedFrom,
67+
keyDerivedPeerId
68+
)
69+
return false
70+
}
71+
2172
val msgToSign = Rpc.Message.newBuilder(msg)
2273
.clearSignature()
2374
.clearKey()
2475
.build()
25-
return unmarshalPublicKey(msg.key.toByteArray()).verify(
76+
return publicKey.verify(
2677
SignPrefix + msgToSign.toByteArray(),
2778
msg.signature.toByteArray()
2879
)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.libp2p.pubsub
2+
3+
/**
4+
* Per-router limits on repeated-field counts inside an inbound pubsub RPC. Enforced
5+
* at decode time by [RpcMessageCountValidator] to prevent allocation amplification
6+
* before [pubsub.pb.Rpc.RPC] is materialised.
7+
*
8+
* A null field means "no limit" — same semantics as the corresponding nullable
9+
* fields on `GossipParams`.
10+
*/
11+
data class PubsubRpcLimits(
12+
val maxPublishedMessages: Int?,
13+
val maxTopicsPerPublishedMessage: Int?,
14+
val maxSubscriptions: Int?,
15+
val maxIHaveMessageIds: Int?,
16+
val maxIWantMessageIds: Int?,
17+
val maxGraftMessages: Int?,
18+
val maxPruneMessages: Int?,
19+
val maxPeersPerPruneMessage: Int?,
20+
val maxIDontWantMessages: Int? = null,
21+
val maxIDontWantMessageIds: Int? = null,
22+
val rejectEmptyPublishEntries: Boolean = true,
23+
val rejectEmptyIDontWantEntries: Boolean = true,
24+
) {
25+
/**
26+
* True when no configured limit or reject-flag can fire. Lets
27+
* [RpcCountFrameDecoder] skip the validator walk entirely on the toggle-off
28+
* path. Any new field added to this data class must be considered here.
29+
*/
30+
val isNoop: Boolean =
31+
maxPublishedMessages == null &&
32+
maxTopicsPerPublishedMessage == null &&
33+
maxSubscriptions == null &&
34+
maxIHaveMessageIds == null &&
35+
maxIWantMessageIds == null &&
36+
maxGraftMessages == null &&
37+
maxPruneMessages == null &&
38+
maxPeersPerPruneMessage == null &&
39+
maxIDontWantMessages == null &&
40+
maxIDontWantMessageIds == null &&
41+
!rejectEmptyPublishEntries &&
42+
!rejectEmptyIDontWantEntries
43+
44+
companion object {
45+
val NONE = PubsubRpcLimits(
46+
maxPublishedMessages = null,
47+
maxTopicsPerPublishedMessage = null,
48+
maxSubscriptions = null,
49+
maxIHaveMessageIds = null,
50+
maxIWantMessageIds = null,
51+
maxGraftMessages = null,
52+
maxPruneMessages = null,
53+
maxPeersPerPruneMessage = null,
54+
maxIDontWantMessages = null,
55+
maxIDontWantMessageIds = null,
56+
rejectEmptyPublishEntries = false,
57+
rejectEmptyIDontWantEntries = false,
58+
)
59+
}
60+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.libp2p.pubsub
2+
3+
import io.netty.buffer.ByteBuf
4+
import io.netty.channel.ChannelHandlerContext
5+
import io.netty.handler.codec.CorruptedFrameException
6+
import io.netty.handler.codec.MessageToMessageDecoder
7+
import org.slf4j.LoggerFactory
8+
9+
/**
10+
* Pre-decode count cap for inbound pubsub RPC frames. Sits between
11+
* [io.libp2p.etc.util.netty.protobuf.LimitedProtobufVarint32FrameDecoder] (byte-size
12+
* cap) and [io.netty.handler.codec.protobuf.ProtobufDecoder] (materialisation).
13+
*
14+
* For each frame, delegates to [RpcMessageCountValidator]. Accepted frames are
15+
* forwarded unchanged as a `ByteBuf` to the next handler. Frames rejected because
16+
* a configured count limit was exceeded are dropped with a debug log; no
17+
* `Rpc$Message` is allocated for them. Frames rejected because the protobuf bytes
18+
* themselves are malformed propagate a [CorruptedFrameException] so that
19+
* downstream handlers (e.g. [io.libp2p.pubsub.AbstractRouter.onPeerWireException])
20+
* can apply the same behaviour penalty they would have on a [ProtobufDecoder]
21+
* failure.
22+
*
23+
* When [limits] is a no-op (see [PubsubRpcLimits.isNoop], e.g. [PubsubRpcLimits.NONE])
24+
* the validator is skipped entirely and the buffer is forwarded as-is. Malformed
25+
* bytes still surface downstream from [ProtobufDecoder], which already triggers
26+
* the same wire-exception path the validator would have used.
27+
*/
28+
class RpcCountFrameDecoder(private val limits: PubsubRpcLimits) : MessageToMessageDecoder<ByteBuf>() {
29+
30+
override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: MutableList<Any>) {
31+
if (limits.isNoop) {
32+
out.add(msg.retain())
33+
return
34+
}
35+
36+
val result = try {
37+
RpcMessageCountValidator.validate(msg, limits)
38+
} catch (e: Exception) {
39+
logger.debug("Dropping pubsub RPC frame due to unexpected validator error", e)
40+
return
41+
}
42+
43+
when (result) {
44+
RpcMessageCountValidator.Result.Accepted -> {
45+
out.add(msg.retain())
46+
}
47+
is RpcMessageCountValidator.Result.Malformed -> {
48+
throw CorruptedFrameException(result.reason)
49+
}
50+
is RpcMessageCountValidator.Result.Rejected -> {
51+
logger.debug("Dropping pubsub RPC frame: {}", result.reason)
52+
}
53+
}
54+
}
55+
56+
companion object {
57+
private val logger = LoggerFactory.getLogger(RpcCountFrameDecoder::class.java)
58+
}
59+
}

0 commit comments

Comments
 (0)