Skip to content

Commit 5971b2e

Browse files
committed
Wire per-topic partial-messages SubOpts flags (libp2p#444)
Step 1 of the partial-messages extension: plumb SubOpts.requestsPartial / SubOpts.supportsSendingPartial through subscribe announcements in both directions, and track the per-peer-per-topic receive state. - AbstractRouter parses the flags with the spec-mandated coercion (supportsSendingPartial := requestsPartial || supportsSendingPartial) and zeroes them on subscribe=false. - New enqueueSubscribe hook unifies outbound subscribe enqueueing so GossipRouter can attach per-topic flags in a single override. - GossipRouter exposes setTopicPartialFlags(topic, ...) to configure flags advertised for a locally-subscribed topic, and stores inbound flags in a new PartialSubscriptionState (plain HashMap on the pubsub event loop). State is cleaned on peer disconnect, topic unsubscribe, and per-peer unsubscribe. - Outbound unsubscribe MUST NOT carry partial flags; enforced at the SubscriptionPart wire-build site. No routing behaviour changes yet. See docs/partial-messages.md §4.5, §5, §6.1 for context.
1 parent 92cb5a8 commit 5971b2e

7 files changed

Lines changed: 520 additions & 14 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,22 @@ abstract class AbstractRouter(
147147
override fun onPeerActive(peer: PeerHandler) {
148148
val partsQueue = pendingRpcParts.getQueue(peer)
149149
subscribedTopics.forEach {
150-
partsQueue.addSubscribe(it)
150+
enqueueSubscribe(partsQueue, it)
151151
}
152152
flushPending(peer)
153153
}
154154

155+
/**
156+
* Enqueues a subscribe announcement for [topic] onto [partsQueue].
157+
*
158+
* The default implementation emits a bare subscribe with no per-topic options.
159+
* Subclasses (e.g. GossipRouter) override this to attach per-topic options
160+
* such as partial-message flags.
161+
*/
162+
protected open fun enqueueSubscribe(partsQueue: RpcPartsQueue, topic: Topic) {
163+
partsQueue.addSubscribe(topic)
164+
}
165+
155166
protected open fun notifyMalformedMessage(peer: PeerHandler) {}
156167
protected open fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {}
157168
protected open fun notifyNonSubscribedMessage(peer: PeerHandler, msg: Rpc.Message) {}
@@ -172,7 +183,16 @@ abstract class AbstractRouter(
172183
}
173184

174185
try {
175-
val subscriptions = msg.subscriptionsList.map { PubsubSubscription(it.topicid, it.subscribe) }
186+
val subscriptions = msg.subscriptionsList.map {
187+
// Per partial-messages spec: flags MUST be ignored on subscribe=false, and the
188+
// receiving side coerces supportsSendingPartial := requestsPartial || supportsSendingPartial.
189+
PubsubSubscription(
190+
topic = it.topicid,
191+
subscribe = it.subscribe,
192+
requestsPartial = it.subscribe && it.requestsPartial,
193+
supportsSendingPartial = it.subscribe && (it.supportsSendingPartial || it.requestsPartial)
194+
)
195+
}
176196
subscriptionFilter
177197
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
178198
.forEach { handleMessageSubscriptions(peer, it) }
@@ -301,7 +321,7 @@ abstract class AbstractRouter(
301321
}
302322
}
303323

304-
private fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
324+
protected open fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
305325
if (msg.subscribe) {
306326
peersTopics.add(peer, msg.topic)
307327
} else {
@@ -319,7 +339,7 @@ abstract class AbstractRouter(
319339
}
320340

321341
protected open fun subscribe(topic: Topic) {
322-
activePeers.forEach { pendingRpcParts.getQueue(it).addSubscribe(topic) }
342+
activePeers.forEach { enqueueSubscribe(pendingRpcParts.getQueue(it), topic) }
323343
subscribedTopics += topic
324344
}
325345

libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ typealias Topic = String
1313
typealias MessageId = WBytes
1414
typealias PubsubMessageFactory = (Rpc.Message) -> PubsubMessage
1515

16-
data class PubsubSubscription(val topic: Topic, val subscribe: Boolean)
16+
data class PubsubSubscription(
17+
val topic: Topic,
18+
val subscribe: Boolean,
19+
val requestsPartial: Boolean = false,
20+
val supportsSendingPartial: Boolean = false
21+
)
1722

1823
interface PubsubMessage {
1924
val protobufMessage: Rpc.Message

libp2p/src/main/kotlin/io/libp2p/pubsub/RpcPartsQueue.kt

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,27 @@ interface RpcPartsQueue {
99
fun addPublish(message: Rpc.Message)
1010

1111
fun addSubscribe(topic: Topic) {
12-
addSubscription(topic, SubscriptionStatus.Subscribed)
12+
addSubscribe(topic, requestsPartial = false, supportsSendingPartial = false)
13+
}
14+
15+
fun addSubscribe(topic: Topic, requestsPartial: Boolean, supportsSendingPartial: Boolean) {
16+
addSubscription(topic, SubscriptionStatus.Subscribed, requestsPartial, supportsSendingPartial)
1317
}
1418

1519
fun addUnsubscribe(topic: Topic) {
16-
addSubscription(topic, SubscriptionStatus.Unsubscribed)
20+
addSubscription(topic, SubscriptionStatus.Unsubscribed, requestsPartial = false, supportsSendingPartial = false)
21+
}
22+
23+
fun addSubscription(topic: Topic, status: SubscriptionStatus) {
24+
addSubscription(topic, status, requestsPartial = false, supportsSendingPartial = false)
1725
}
1826

19-
fun addSubscription(topic: Topic, status: SubscriptionStatus)
27+
fun addSubscription(
28+
topic: Topic,
29+
status: SubscriptionStatus,
30+
requestsPartial: Boolean,
31+
supportsSendingPartial: Boolean
32+
)
2033

2134
fun takeMerged(): List<Rpc.RPC>
2235
}
@@ -38,11 +51,20 @@ open class DefaultRpcPartsQueue : RpcPartsQueue {
3851
}
3952
}
4053

41-
protected data class SubscriptionPart(val topic: Topic, val status: RpcPartsQueue.SubscriptionStatus) : AbstractPart {
54+
protected data class SubscriptionPart(
55+
val topic: Topic,
56+
val status: RpcPartsQueue.SubscriptionStatus,
57+
val requestsPartial: Boolean = false,
58+
val supportsSendingPartial: Boolean = false
59+
) : AbstractPart {
4260
override fun appendToBuilder(builder: Rpc.RPC.Builder) {
43-
builder.addSubscriptionsBuilder().apply {
44-
setTopicid(topic)
45-
setSubscribe(status == RpcPartsQueue.SubscriptionStatus.Subscribed)
61+
val subBuilder = builder.addSubscriptionsBuilder()
62+
subBuilder.topicid = topic
63+
subBuilder.subscribe = status == RpcPartsQueue.SubscriptionStatus.Subscribed
64+
// Per spec: partial flags MUST NOT be sent on unsubscribe (subscribe=false).
65+
if (status == RpcPartsQueue.SubscriptionStatus.Subscribed) {
66+
if (requestsPartial) subBuilder.requestsPartial = true
67+
if (supportsSendingPartial) subBuilder.supportsSendingPartial = true
4668
}
4769
}
4870
}
@@ -57,8 +79,13 @@ open class DefaultRpcPartsQueue : RpcPartsQueue {
5779
addPart(PublishPart(message))
5880
}
5981

60-
override fun addSubscription(topic: Topic, status: RpcPartsQueue.SubscriptionStatus) {
61-
addPart(SubscriptionPart(topic, status))
82+
override fun addSubscription(
83+
topic: Topic,
84+
status: RpcPartsQueue.SubscriptionStatus,
85+
requestsPartial: Boolean,
86+
supportsSendingPartial: Boolean
87+
) {
88+
addPart(SubscriptionPart(topic, status, requestsPartial, supportsSendingPartial))
6289
}
6390

6491
override fun takeMerged(): List<Rpc.RPC> {

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,35 @@ open class GossipRouter(
134134
override val pendingRpcParts = PendingRpcPartsMap<GossipRpcPartsQueue> { DefaultGossipRpcPartsQueue(params) }
135135

136136
val gossipExtensionsState = GossipExtensionsState(gossipExtensionsConfig)
137+
val partialSubscriptionState = PartialSubscriptionState()
138+
139+
/**
140+
* Local per-topic subscription options that affect outbound subscribe announcements.
141+
* Accessed only on the pubsub event loop.
142+
*/
143+
private val localTopicPartialFlags: MutableMap<Topic, PartialSubFlags> = mutableMapOf()
144+
145+
/**
146+
* Configures the partial-messages flags advertised on this node's subscribe
147+
* announcements for [topic]. Must be called before [subscribe] for the flags
148+
* to take effect on the initial announcement; a subsequent call will affect
149+
* later re-announcements (e.g. on new peer activation).
150+
*
151+
* Per spec, the send-side also applies the coercion
152+
* `supportsSendingPartial := requestsPartial || supportsSendingPartial`.
153+
*/
154+
fun setTopicPartialFlags(topic: Topic, requestsPartial: Boolean, supportsSendingPartial: Boolean) {
155+
runOnEventThread {
156+
if (!requestsPartial && !supportsSendingPartial) {
157+
localTopicPartialFlags -= topic
158+
} else {
159+
localTopicPartialFlags[topic] = PartialSubFlags(
160+
requestsPartial = requestsPartial,
161+
supportsSendingPartial = requestsPartial || supportsSendingPartial
162+
)
163+
}
164+
}
165+
}
137166

138167
private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis())
139168
private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) {
@@ -161,9 +190,28 @@ open class GossipRouter(
161190
acceptRequestsWhitelist -= peer
162191
pendingRpcParts.popQueue(peer) // discard them
163192
gossipExtensionsState.onPeerDisconnected(peer.peerId)
193+
partialSubscriptionState.onPeerDisconnected(peer.peerId)
164194
super.onPeerDisconnected(peer)
165195
}
166196

197+
override fun enqueueSubscribe(partsQueue: RpcPartsQueue, topic: Topic) {
198+
val flags = localTopicPartialFlags[topic] ?: PartialSubFlags.NONE
199+
partsQueue.addSubscribe(topic, flags.requestsPartial, flags.supportsSendingPartial)
200+
}
201+
202+
override fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
203+
super.handleMessageSubscriptions(peer, msg)
204+
if (msg.subscribe) {
205+
partialSubscriptionState.setPeerFlags(
206+
msg.topic,
207+
peer.peerId,
208+
PartialSubFlags(msg.requestsPartial, msg.supportsSendingPartial)
209+
)
210+
} else {
211+
partialSubscriptionState.removePeerFlags(msg.topic, peer.peerId)
212+
}
213+
}
214+
167215
override fun onPeerActive(peer: PeerHandler) {
168216
super.onPeerActive(peer)
169217
eventBroadcaster.notifyConnected(peer.peerId, peer.getRemoteAddress())
@@ -615,6 +663,8 @@ open class GossipRouter(
615663
super.unsubscribe(topic)
616664
mesh[topic]?.copy()?.forEach { prune(it, topic) }
617665
mesh -= topic
666+
localTopicPartialFlags -= topic
667+
partialSubscriptionState.removeTopic(topic)
618668
}
619669

620670
private fun catchingHeartbeat() {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.libp2p.pubsub.gossip
2+
3+
import io.libp2p.core.PeerId
4+
import io.libp2p.pubsub.Topic
5+
6+
data class PartialSubFlags(
7+
val requestsPartial: Boolean,
8+
val supportsSendingPartial: Boolean
9+
) {
10+
companion object {
11+
val NONE = PartialSubFlags(requestsPartial = false, supportsSendingPartial = false)
12+
}
13+
}
14+
15+
/**
16+
* Per-topic, per-peer partial-messages subscription state.
17+
*
18+
* Tracks, for each `(topic, peer)`, the remote peer's `requestsPartial` /
19+
* `supportsSendingPartial` flags as most recently announced via a subscribe
20+
* `SubOpts`. Unsubscribes and peer disconnects drop the corresponding state.
21+
*
22+
* NOT thread-safe: accessed only on the pubsub event loop.
23+
*/
24+
class PartialSubscriptionState {
25+
26+
private val byTopic: MutableMap<Topic, MutableMap<PeerId, PartialSubFlags>> = mutableMapOf()
27+
28+
fun setPeerFlags(topic: Topic, peer: PeerId, flags: PartialSubFlags) {
29+
if (flags == PartialSubFlags.NONE) {
30+
removePeerFlags(topic, peer)
31+
return
32+
}
33+
byTopic.getOrPut(topic) { mutableMapOf() }[peer] = flags
34+
}
35+
36+
fun removePeerFlags(topic: Topic, peer: PeerId) {
37+
val peers = byTopic[topic] ?: return
38+
peers.remove(peer)
39+
if (peers.isEmpty()) byTopic.remove(topic)
40+
}
41+
42+
fun removeTopic(topic: Topic) {
43+
byTopic.remove(topic)
44+
}
45+
46+
fun onPeerDisconnected(peer: PeerId) {
47+
val emptied = mutableListOf<Topic>()
48+
for ((topic, peers) in byTopic) {
49+
peers.remove(peer)
50+
if (peers.isEmpty()) emptied += topic
51+
}
52+
emptied.forEach { byTopic.remove(it) }
53+
}
54+
55+
fun peerFlags(topic: Topic, peer: PeerId): PartialSubFlags =
56+
byTopic[topic]?.get(peer) ?: PartialSubFlags.NONE
57+
58+
fun peerRequestsPartial(topic: Topic, peer: PeerId) =
59+
peerFlags(topic, peer).requestsPartial
60+
61+
fun peerSupportsSendingPartial(topic: Topic, peer: PeerId) =
62+
peerFlags(topic, peer).supportsSendingPartial
63+
64+
internal fun snapshot(): Map<Topic, Map<PeerId, PartialSubFlags>> =
65+
byTopic.mapValues { (_, v) -> v.toMap() }
66+
}

0 commit comments

Comments
 (0)