Skip to content

Commit 4e59996

Browse files
committed
Address review feedback: PartialSubFlags.coerce helper and docs
- `PartialSubFlags.coerce(requestsPartial, supportsSendingPartial)`: single source of truth for the spec coercion rule `supportsSendingPartial := requestsPartial || supportsSendingPartial`. Used from `GossipRouter.setTopicPartialFlags` for the outbound side. AbstractRouter keeps the inline expression for the receive side to avoid a reverse layering dependency (pubsub -> gossip); a comment notes the rule is applied on both sides. - `PartialSubscriptionState.setPeerFlags`: document that passing `PartialSubFlags.NONE` (or any equivalent all-false flags) is treated as a removal. Makes the set-sometimes-deletes invariant explicit for readers. - `AbstractRouter.handleMessageSubscriptions`: add Kdoc now that the method is `protected open`. Documents the "call super" contract for overrides (GossipRouter relies on this to keep peersTopics and partialSubscriptionState in sync) and the flag-normalisation precondition.
1 parent c31900a commit 4e59996

3 files changed

Lines changed: 40 additions & 5 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ abstract class AbstractRouter(
186186
val subscriptions = msg.subscriptionsList.map {
187187
// Per partial-messages spec: flags MUST be ignored on subscribe=false, and the
188188
// receiving side coerces supportsSendingPartial := requestsPartial || supportsSendingPartial.
189+
// The coercion rule is also applied on the outbound side by GossipRouter.
189190
PubsubSubscription(
190191
topic = it.topicid,
191192
subscribe = it.subscribe,
@@ -321,6 +322,19 @@ abstract class AbstractRouter(
321322
}
322323
}
323324

325+
/**
326+
* Applies a single filtered inbound subscription to the router's state.
327+
*
328+
* Called once per `SubOpts` on the pubsub event loop, after
329+
* [SubscriptionFilter.filterIncomingSubscriptions] has run. Subclasses may
330+
* override to react to subscription state changes (for example, to track
331+
* per-topic capability flags). Overrides MUST call `super` so that
332+
* [peersTopics] stays in sync.
333+
*
334+
* [msg] carries the protocol-level flags already normalised by the caller:
335+
* for `subscribe=false` frames, extension flags are zeroed before reaching
336+
* this method.
337+
*/
324338
protected open fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
325339
if (msg.subscribe) {
326340
peersTopics.add(peer, msg.topic)

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,11 @@ open class GossipRouter(
153153
*/
154154
fun setTopicPartialFlags(topic: Topic, requestsPartial: Boolean, supportsSendingPartial: Boolean) {
155155
runOnEventThread {
156-
if (!requestsPartial && !supportsSendingPartial) {
156+
val coerced = PartialSubFlags.coerce(requestsPartial, supportsSendingPartial)
157+
if (coerced == PartialSubFlags.NONE) {
157158
localTopicPartialFlags -= topic
158159
} else {
159-
localTopicPartialFlags[topic] = PartialSubFlags(
160-
requestsPartial = requestsPartial,
161-
supportsSendingPartial = requestsPartial || supportsSendingPartial
162-
)
160+
localTopicPartialFlags[topic] = coerced
163161
}
164162
}
165163
}

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/PartialSubscriptionState.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,21 @@ data class PartialSubFlags(
99
) {
1010
companion object {
1111
val NONE = PartialSubFlags(requestsPartial = false, supportsSendingPartial = false)
12+
13+
/**
14+
* Applies the partial-messages spec coercion
15+
* `supportsSendingPartial := requestsPartial || supportsSendingPartial`.
16+
*
17+
* Per the spec, this rule MUST be applied by both the sender (when
18+
* advertising flags outbound) and the receiver (when parsing inbound
19+
* `SubOpts`). Callers are expected to have already zeroed the flags
20+
* for `subscribe=false` frames before calling this helper.
21+
*/
22+
fun coerce(requestsPartial: Boolean, supportsSendingPartial: Boolean): PartialSubFlags =
23+
PartialSubFlags(
24+
requestsPartial = requestsPartial,
25+
supportsSendingPartial = supportsSendingPartial || requestsPartial
26+
)
1227
}
1328
}
1429

@@ -25,6 +40,14 @@ class PartialSubscriptionState {
2540

2641
private val byTopic: MutableMap<Topic, MutableMap<PeerId, PartialSubFlags>> = mutableMapOf()
2742

43+
/**
44+
* Stores [flags] for `(topic, peer)`.
45+
*
46+
* Passing [PartialSubFlags.NONE] (or any equivalent `PartialSubFlags(false, false)`)
47+
* is treated as a removal: the peer's entry is dropped and, if it was the
48+
* last peer for the topic, the topic entry is GC'd. This keeps the snapshot
49+
* invariant "present ⇔ non-default flags".
50+
*/
2851
fun setPeerFlags(topic: Topic, peer: PeerId, flags: PartialSubFlags) {
2952
if (flags == PartialSubFlags.NONE) {
3053
removePeerFlags(topic, peer)

0 commit comments

Comments
 (0)