Skip to content
4 changes: 2 additions & 2 deletions docs/partial-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,10 @@ Mirror this checklist in issue #435.
`PublishAction<PeerState>` (with `nextPeerState`),
`PublishActionsFn<PeerState>`, `PartialMessagesPeerFeedback`, and
`GroupState` container with TTL + DoS caps. No routing yet.
- [ ] **Step 3** — Inbound `RPC.partial` dispatch: replace the stub at
- [x] **Step 3** — Inbound `RPC.partial` dispatch: replace the stub at
`GossipRouter.kt:476` with the full flow (validate caps, create/update
group state, call `onIncomingRpc`).
- [ ] **Step 4** — Outbound `publishPartial(...)` on the `Gossip` facade;
- [x] **Step 4** — Outbound `publishPartial(...)` on the `Gossip` facade;
route through `GossipRpcPartsQueue` (do **not** bypass — PR #433 got
this wrong). Enforce the "omit `partialMessage` when peer supports but
didn't request" MUST.
Expand Down
42 changes: 38 additions & 4 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,22 @@ abstract class AbstractRouter(
override fun onPeerActive(peer: PeerHandler) {
val partsQueue = pendingRpcParts.getQueue(peer)
subscribedTopics.forEach {
partsQueue.addSubscribe(it)
enqueueSubscribe(partsQueue, it)
}
flushPending(peer)
}

/**
* Enqueues a subscribe announcement for [topic] onto [partsQueue].
*
* The default implementation emits a bare subscribe with no per-topic options.
* Subclasses (e.g. GossipRouter) override this to attach per-topic options
* such as partial-message flags.
*/
protected open fun enqueueSubscribe(partsQueue: RpcPartsQueue, topic: Topic) {
partsQueue.addSubscribe(topic)
}

protected open fun notifyMalformedMessage(peer: PeerHandler) {}
protected open fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {}
protected open fun notifyNonSubscribedMessage(peer: PeerHandler, msg: Rpc.Message) {}
Expand All @@ -172,7 +183,17 @@ abstract class AbstractRouter(
}

try {
val subscriptions = msg.subscriptionsList.map { PubsubSubscription(it.topicid, it.subscribe) }
val subscriptions = msg.subscriptionsList.map {
// Per partial-messages spec: flags MUST be ignored on subscribe=false, and the
// receiving side coerces supportsSendingPartial := requestsPartial || supportsSendingPartial.
// The coercion rule is also applied on the outbound side by GossipRouter.
PubsubSubscription(
topic = it.topicid,
subscribe = it.subscribe,
requestsPartial = it.subscribe && it.requestsPartial,
supportsSendingPartial = it.subscribe && (it.supportsSendingPartial || it.requestsPartial)
)
}
subscriptionFilter
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
.forEach { handleMessageSubscriptions(peer, it) }
Expand Down Expand Up @@ -301,7 +322,20 @@ abstract class AbstractRouter(
}
}

private fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
/**
* Applies a single filtered inbound subscription to the router's state.
*
* Called once per `SubOpts` on the pubsub event loop, after
* [SubscriptionFilter.filterIncomingSubscriptions] has run. Subclasses may
* override to react to subscription state changes (for example, to track
* per-topic capability flags). Overrides MUST call `super` so that
* [peersTopics] stays in sync.
*
* [msg] carries the protocol-level flags already normalised by the caller:
* for `subscribe=false` frames, extension flags are zeroed before reaching
* this method.
*/
protected open fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
if (msg.subscribe) {
peersTopics.add(peer, msg.topic)
} else {
Expand All @@ -319,7 +353,7 @@ abstract class AbstractRouter(
}

protected open fun subscribe(topic: Topic) {
activePeers.forEach { pendingRpcParts.getQueue(it).addSubscribe(topic) }
activePeers.forEach { enqueueSubscribe(pendingRpcParts.getQueue(it), topic) }
subscribedTopics += topic
}

Expand Down
7 changes: 6 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ typealias Topic = String
typealias MessageId = WBytes
typealias PubsubMessageFactory = (Rpc.Message) -> PubsubMessage

data class PubsubSubscription(val topic: Topic, val subscribe: Boolean)
data class PubsubSubscription(
val topic: Topic,
val subscribe: Boolean,
val requestsPartial: Boolean = false,
val supportsSendingPartial: Boolean = false
)

interface PubsubMessage {
val protobufMessage: Rpc.Message
Expand Down
41 changes: 32 additions & 9 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/RpcPartsQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,23 @@ interface RpcPartsQueue {
fun addPublish(message: Rpc.Message)

fun addSubscribe(topic: Topic) {
addSubscription(topic, SubscriptionStatus.Subscribed)
addSubscribe(topic, requestsPartial = false, supportsSendingPartial = false)
}

fun addSubscribe(topic: Topic, requestsPartial: Boolean, supportsSendingPartial: Boolean) {
addSubscription(topic, SubscriptionStatus.Subscribed, requestsPartial, supportsSendingPartial)
}

fun addUnsubscribe(topic: Topic) {
addSubscription(topic, SubscriptionStatus.Unsubscribed)
addSubscription(topic, SubscriptionStatus.Unsubscribed, requestsPartial = false, supportsSendingPartial = false)
}

fun addSubscription(topic: Topic, status: SubscriptionStatus)
fun addSubscription(
topic: Topic,
status: SubscriptionStatus,
requestsPartial: Boolean,
supportsSendingPartial: Boolean
)

fun takeMerged(): List<Rpc.RPC>
}
Expand All @@ -38,11 +47,20 @@ open class DefaultRpcPartsQueue : RpcPartsQueue {
}
}

protected data class SubscriptionPart(val topic: Topic, val status: RpcPartsQueue.SubscriptionStatus) : AbstractPart {
protected data class SubscriptionPart(
val topic: Topic,
val status: RpcPartsQueue.SubscriptionStatus,
val requestsPartial: Boolean = false,
val supportsSendingPartial: Boolean = false
) : AbstractPart {
override fun appendToBuilder(builder: Rpc.RPC.Builder) {
builder.addSubscriptionsBuilder().apply {
setTopicid(topic)
setSubscribe(status == RpcPartsQueue.SubscriptionStatus.Subscribed)
val subBuilder = builder.addSubscriptionsBuilder()
subBuilder.topicid = topic
subBuilder.subscribe = status == RpcPartsQueue.SubscriptionStatus.Subscribed
// Per spec: partial flags MUST NOT be sent on unsubscribe (subscribe=false).
if (status == RpcPartsQueue.SubscriptionStatus.Subscribed) {
if (requestsPartial) subBuilder.requestsPartial = true
if (supportsSendingPartial) subBuilder.supportsSendingPartial = true
}
}
}
Expand All @@ -57,8 +75,13 @@ open class DefaultRpcPartsQueue : RpcPartsQueue {
addPart(PublishPart(message))
}

override fun addSubscription(topic: Topic, status: RpcPartsQueue.SubscriptionStatus) {
addPart(SubscriptionPart(topic, status))
override fun addSubscription(
topic: Topic,
status: RpcPartsQueue.SubscriptionStatus,
requestsPartial: Boolean,
supportsSendingPartial: Boolean
) {
addPart(SubscriptionPart(topic, status, requestsPartial, supportsSendingPartial))
}

override fun takeMerged(): List<Rpc.RPC> {
Expand Down
16 changes: 16 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import io.libp2p.core.multistream.ProtocolDescriptor
import io.libp2p.core.pubsub.PubsubApi
import io.libp2p.pubsub.PubsubApiImpl
import io.libp2p.pubsub.PubsubProtocol
import io.libp2p.pubsub.Topic
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
import io.netty.channel.ChannelHandler
import org.slf4j.LoggerFactory
import java.util.concurrent.CompletableFuture
Expand All @@ -32,6 +34,20 @@ class Gossip @JvmOverloads constructor(
return router.score.getCachedScore(peerId)
}

/**
* Queues outbound [pubsub.pb.Rpc.PartialMessagesExtension] RPCs for [topic]/[groupId]
* by invoking the client's [actionsFn] on the current group state.
*
* Submits to the pubsub event thread; the returned future completes when the RPCs
* have been enqueued and flushed.
*/
fun publishPartial(
topic: Topic,
groupId: ByteArray,
actionsFn: PublishActionsFn<*>
): CompletableFuture<Unit> =
router.submitOnEventThread { router.publishPartial(topic, groupId, actionsFn) }

override val protocolDescriptor =
when (router.protocol) {
PubsubProtocol.Gossip_V_1_3 -> {
Expand Down
93 changes: 87 additions & 6 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.*
import io.libp2p.etc.util.P2PService
import io.libp2p.pubsub.*
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesAdapter
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
import io.libp2p.pubsub.gossip.partialmessages.toGroupId
import org.slf4j.LoggerFactory
import pubsub.pb.Rpc
import java.time.Duration
Expand Down Expand Up @@ -134,6 +137,56 @@ open class GossipRouter(
override val pendingRpcParts = PendingRpcPartsMap<GossipRpcPartsQueue> { DefaultGossipRpcPartsQueue(params) }

val gossipExtensionsState = GossipExtensionsState(gossipExtensionsConfig)
val partialSubscriptionState = PartialSubscriptionState()
internal var partialMessages: PartialMessagesAdapter? = null

/**
* Local per-topic subscription options that affect outbound subscribe announcements.
* Accessed only on the pubsub event loop.
*/
private val localTopicPartialFlags: MutableMap<Topic, PartialSubFlags> = mutableMapOf()

/**
* Configures the partial-messages flags advertised on this node's subscribe
* announcements for [topic]. Must be called before [subscribe] for the flags
* to take effect on the initial announcement; a subsequent call will affect
* later re-announcements (e.g. on new peer activation).
*
* Per spec, the send-side also applies the coercion
* `supportsSendingPartial := requestsPartial || supportsSendingPartial`.
*/
fun setTopicPartialFlags(topic: Topic, requestsPartial: Boolean, supportsSendingPartial: Boolean) {
runOnEventThread {
val coerced = PartialSubFlags.coerce(requestsPartial, supportsSendingPartial)
if (coerced == PartialSubFlags.NONE) {
localTopicPartialFlags -= topic
} else {
localTopicPartialFlags[topic] = coerced
}
}
}

/**
* Queues outbound [pubsub.pb.Rpc.PartialMessagesExtension] RPCs for [topic]/[groupId]
* by invoking the client's [actionsFn] on the current group state.
*
* Must be called on the pubsub event thread.
*/
fun publishPartial(topic: Topic, groupId: ByteArray, actionsFn: PublishActionsFn<*>) {
val adapter = partialMessages ?: return
val gid = groupId.toGroupId()

fun peerRequestsPartial(peerId: PeerId) =
partialSubscriptionState.peerRequestsPartial(topic, peerId)

fun enqueue(peerId: PeerId, partialMessage: ByteArray?, partsMetadata: ByteArray?) {
val peerHandler = activePeers.find { it.peerId == peerId } ?: return
pendingRpcParts.getQueue(peerHandler).addPartialMessage(topic, groupId, partialMessage, partsMetadata)
}

adapter.publishPartial(topic, gid, actionsFn, ::peerRequestsPartial, ::enqueue)
flushAllPending()
}

private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis())
private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) {
Expand Down Expand Up @@ -161,9 +214,28 @@ open class GossipRouter(
acceptRequestsWhitelist -= peer
pendingRpcParts.popQueue(peer) // discard them
gossipExtensionsState.onPeerDisconnected(peer.peerId)
partialSubscriptionState.onPeerDisconnected(peer.peerId)
super.onPeerDisconnected(peer)
}

override fun enqueueSubscribe(partsQueue: RpcPartsQueue, topic: Topic) {
val flags = localTopicPartialFlags[topic] ?: PartialSubFlags.NONE
partsQueue.addSubscribe(topic, flags.requestsPartial, flags.supportsSendingPartial)
}

override fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
super.handleMessageSubscriptions(peer, msg)
if (msg.subscribe) {
partialSubscriptionState.setPeerFlags(
msg.topic,
peer.peerId,
PartialSubFlags(msg.requestsPartial, msg.supportsSendingPartial)
)
} else {
partialSubscriptionState.removePeerFlags(msg.topic, peer.peerId)
}
}

override fun onPeerActive(peer: PeerHandler) {
super.onPeerActive(peer)
eventBroadcaster.notifyConnected(peer.peerId, peer.getRemoteAddress())
Expand Down Expand Up @@ -477,12 +549,19 @@ open class GossipRouter(
partialMessagesExtension: Rpc.PartialMessagesExtension,
receivedFrom: PeerHandler
) {
logger.trace(
"Processing partial message extension message {} from {}",
partialMessagesExtension.toString(),
receivedFrom.peerId
)
// TODO: implement partial message handling (https://github.com/libp2p/jvm-libp2p/issues/435)
val topic = partialMessagesExtension.topicID
if (!partialMessagesExtension.hasTopicID() || topic.isEmpty()) {
logger.debug("Dropping partial message from {}: missing topicID", receivedFrom.peerId)
return
}

if (!partialMessagesExtension.hasGroupID() || partialMessagesExtension.groupID.isEmpty) {
logger.debug("Dropping partial message from {}: missing groupID", receivedFrom.peerId)
return
}

logger.trace("Processing partial message extension for topic {} from {}", topic, receivedFrom.peerId)
partialMessages?.onIncomingRpc(topic, receivedFrom.peerId, partialMessagesExtension)
}

override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {
Expand Down Expand Up @@ -615,6 +694,8 @@ open class GossipRouter(
super.unsubscribe(topic)
mesh[topic]?.copy()?.forEach { prune(it, topic) }
mesh -= topic
localTopicPartialFlags -= topic
partialSubscriptionState.removeTopic(topic)
}

private fun catchingHeartbeat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ interface GossipRpcPartsQueue : RpcPartsQueue {

// TODO Need to check if we should handle when control extension and extension messages could be separated by split (https://github.com/libp2p/jvm-libp2p/issues/440)
fun addControlExtensions(ctrlMessage: Rpc.ControlExtensions)

fun addPartialMessage(topic: Topic, groupId: ByteArray, partialMessage: ByteArray?, partsMetadata: ByteArray?)
}

/**
Expand Down Expand Up @@ -90,6 +92,23 @@ open class DefaultGossipRpcPartsQueue(
}
}

// Not a data class: ByteArray fields break equals/hashCode in data classes.
protected class PartialMessagePart(
val topic: Topic,
val groupId: ByteArray,
val partialMessage: ByteArray?,
val partsMetadata: ByteArray?
) : AbstractPart {
override fun appendToBuilder(builder: Rpc.RPC.Builder) {
val pmBuilder = Rpc.PartialMessagesExtension.newBuilder()
.setTopicID(topic)
.setGroupID(groupId.toProtobuf())
partialMessage?.let { pmBuilder.setPartialMessage(it.toProtobuf()) }
partsMetadata?.let { pmBuilder.setPartsMetadata(it.toProtobuf()) }
builder.setPartial(pmBuilder.build())
}
}

override fun addIHave(messageId: MessageId, topic: Topic) {
addPart(IHavePart(messageId, topic))
}
Expand All @@ -114,6 +133,10 @@ open class DefaultGossipRpcPartsQueue(
addPart(ControlExtensionPart(ctrlMessage))
}

override fun addPartialMessage(topic: Topic, groupId: ByteArray, partialMessage: ByteArray?, partsMetadata: ByteArray?) {
addPart(PartialMessagePart(topic, groupId, partialMessage, partsMetadata))
}

override fun takeMerged(): List<Rpc.RPC> {
val ret = mutableListOf<Rpc.RPC>()
var partIdx = 0
Expand All @@ -126,10 +149,12 @@ open class DefaultGossipRpcPartsQueue(
var iWantCount = params.maxIWantMessageIds ?: Int.MAX_VALUE
var graftCount = params.maxGraftMessages ?: Int.MAX_VALUE
var pruneCount = params.maxPruneMessages ?: Int.MAX_VALUE
// proto field `partial` is optional (not repeated): at most 1 per RPC
var partialCount = 1

while (partIdx < parts.size &&
publishCount > 0 && subscriptionCount > 0 && iHaveCount > 0 &&
iWantCount > 0 && graftCount > 0 && pruneCount > 0
iWantCount > 0 && graftCount > 0 && pruneCount > 0 && partialCount > 0
) {
val part = parts[partIdx++]
when (part) {
Expand All @@ -139,6 +164,7 @@ open class DefaultGossipRpcPartsQueue(
is IWantPart -> iWantCount--
is GraftPart -> graftCount--
is PrunePart -> pruneCount--
is PartialMessagePart -> partialCount--
}

part.appendToBuilder(builder)
Expand Down
Loading
Loading