Skip to content

Commit 90d4d3a

Browse files
committed
Implement IDONTWANT suppression for partial-message peers (step 7)
Per §5.2 of the partial-messages spec: when we have requested partial messages for a topic T and a peer supports sending partial for T, skip IDONTWANT to that peer. Sending IDONTWANT is redundant — the peer is expected to deliver partial RPCs instead of full messages. Also extends PubsubProtocol.supportsIDontWant() to include Gossip_V_1_3, since v1.3 is a superset of v1.2 and both IDONTWANT and the Extensions control message must coexist for partial-message sessions.
1 parent 54824ee commit 90d4d3a

4 files changed

Lines changed: 200 additions & 8 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubProtocol.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ enum class PubsubProtocol(val announceStr: ProtocolId) {
2626
* https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message
2727
*/
2828
fun supportsIDontWant(): Boolean {
29-
return this == Gossip_V_1_2
29+
return this == Gossip_V_1_2 || this == Gossip_V_1_3
3030
}
3131

3232
/**

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,9 +860,21 @@ open class GossipRouter(
860860
.flatten()
861861
.distinct()
862862
.minus(setOfNotNull(receivedFrom))
863+
.filterNot { shouldSkipIDontWantForPeer(it, msg.topics) }
863864
.forEach { sendIdontwant(it, msg.messageId) }
864865
}
865866

867+
// §5.2: skip IDONTWANT to peer P for topic T when we requested partial from P and P supports sending partial.
868+
// Sending IDONTWANT would be redundant — P is expected to send partial RPCs instead of full messages.
869+
private fun shouldSkipIDontWantForPeer(peer: PeerHandler, topics: Collection<Topic>): Boolean {
870+
if (!gossipExtensionsState.partialMessagesEnabled()) return false
871+
if (!gossipExtensionsState.peerSupportsPartialMessages(peer.peerId)) return false
872+
return topics.any { topic ->
873+
(localTopicPartialFlags[topic]?.requestsPartial == true) &&
874+
partialSubscriptionState.peerSupportsSendingPartial(topic, peer.peerId)
875+
}
876+
}
877+
866878
private fun enqueuePrune(peer: PeerHandler, topic: Topic) {
867879
val peerQueue = pendingRpcParts.getQueue(peer)
868880
if (peer.getPeerProtocol().supportsBackoffAndPX() && this.protocol.supportsBackoffAndPX()) {

libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/extensions/PartialMessagesEndToEndTest.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import io.libp2p.pubsub.gossip.Gossip
1010
import io.libp2p.pubsub.gossip.GossipExtension
1111
import io.libp2p.pubsub.gossip.GossipRouter
1212
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
13-
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesPeerFeedback
1413
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesHandler
14+
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesPeerFeedback
1515
import io.libp2p.pubsub.gossip.partialmessages.PublishAction
1616
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
1717
import io.libp2p.security.noise.NoiseXXSecureChannel
@@ -166,7 +166,8 @@ class PartialMessagesEndToEndTest {
166166
val peer2Id = host2.peerId
167167

168168
gossip1.publishPartial(
169-
topic, groupId,
169+
topic,
170+
groupId,
170171
PublishActionsFn<ByteArray> { _, _ ->
171172
sequenceOf(peer2Id to PublishAction(partialMessage = partPayload, partsMetadata = metaBitmap))
172173
},
@@ -193,7 +194,8 @@ class PartialMessagesEndToEndTest {
193194
val n1Meta = byteArrayOf(0x01) // node1 has part 0
194195

195196
gossip1.publishPartial(
196-
topic, groupId,
197+
topic,
198+
groupId,
197199
PublishActionsFn<ByteArray> { _, _ ->
198200
sequenceOf(peer2Id to PublishAction(partialMessage = n1Payload, partsMetadata = n1Meta))
199201
},
@@ -205,7 +207,8 @@ class PartialMessagesEndToEndTest {
205207
val n2Meta = byteArrayOf(0x02) // node2 has part 1
206208

207209
gossip2.publishPartial(
208-
topic, groupId,
210+
topic,
211+
groupId,
209212
PublishActionsFn<ByteArray> { _, _ ->
210213
sequenceOf(peer1Id to PublishAction(partialMessage = n2Payload, partsMetadata = n2Meta))
211214
},
@@ -237,15 +240,17 @@ class PartialMessagesEndToEndTest {
237240

238241
// First publish: store nextPeerState = 0x01 ("sent part 0 to peer2").
239242
gossip1.publishPartial(
240-
topic, groupId,
243+
topic,
244+
groupId,
241245
PublishActionsFn<ByteArray> { _, _ ->
242246
sequenceOf(peer2Id to PublishAction(partsMetadata = byteArrayOf(0x03), nextPeerState = byteArrayOf(0x01)))
243247
},
244248
).get(5, TimeUnit.SECONDS)
245249

246250
// Second publish: decide() should observe peerStates[peer2Id] = 0x01.
247251
gossip1.publishPartial(
248-
topic, groupId,
252+
topic,
253+
groupId,
249254
PublishActionsFn<ByteArray> { peerStates, _ ->
250255
statesSeenInSecondCall += peerStates[peer2Id]?.copyOf()
251256
sequenceOf(peer2Id to PublishAction(partsMetadata = byteArrayOf(0x03), nextPeerState = byteArrayOf(0x03)))
@@ -275,7 +280,8 @@ class PartialMessagesEndToEndTest {
275280
waitForOnEventThread(router1) { router1.partialSubscriptionState.peerSupportsSendingPartial(topic, peer2Id) }
276281

277282
gossip1.publishPartial(
278-
topic, groupId,
283+
topic,
284+
groupId,
279285
PublishActionsFn<ByteArray> { _, _ ->
280286
sequenceOf(
281287
peer2Id to PublishAction(
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package io.libp2p.pubsub.gossip.extensions
2+
3+
import io.libp2p.etc.types.millis
4+
import io.libp2p.etc.types.seconds
5+
import io.libp2p.pubsub.PubsubProtocol
6+
import io.libp2p.pubsub.gossip.GossipExtension
7+
import io.libp2p.pubsub.gossip.GossipParams
8+
import io.libp2p.pubsub.gossip.GossipTestsBase
9+
import org.assertj.core.api.Assertions.assertThat
10+
import org.junit.jupiter.api.Test
11+
import pubsub.pb.Rpc
12+
13+
/**
14+
* Tests for Step 7 — IDONTWANT suppression (§5.2).
15+
*
16+
* When we have requested partial messages for topic T and a peer supports
17+
* sending partial for T, the gossip router MUST NOT send IDONTWANT to that peer.
18+
* Sending IDONTWANT would be redundant — the peer is expected to deliver partial
19+
* RPCs instead of full messages.
20+
*/
21+
class PartialMessagesIDontWantSuppressionTest : GossipTestsBase() {
22+
23+
private val topicId = "test-topic"
24+
25+
private fun controlExtensionsWithPartial(): Rpc.RPC =
26+
Rpc.RPC.newBuilder().setControl(
27+
Rpc.ControlMessage.newBuilder().setExtensions(
28+
Rpc.ControlExtensions.newBuilder().setPartialMessages(true)
29+
)
30+
).build()
31+
32+
private fun subscribeRpc(
33+
topic: String = topicId,
34+
requestsPartial: Boolean,
35+
supportsSendingPartial: Boolean,
36+
): Rpc.RPC =
37+
Rpc.RPC.newBuilder().addSubscriptions(
38+
Rpc.RPC.SubOpts.newBuilder()
39+
.setTopicid(topic)
40+
.setSubscribe(true)
41+
.setRequestsPartial(requestsPartial)
42+
.setSupportsSendingPartial(supportsSendingPartial)
43+
).build()
44+
45+
/**
46+
* Creates a 3-router test network (gossipRouter + 3 mock routers) with all
47+
* peers grafted into the mesh. Uses Gossip_V_1_3 with the partial-messages
48+
* extension enabled. mockRouters[0] acts as publisher; [1] and [2] are gossip
49+
* recipients.
50+
*/
51+
private fun startNetwork(): ManyRoutersTest {
52+
val test = ManyRoutersTest(
53+
mockRouterCount = 3,
54+
protocol = PubsubProtocol.Gossip_V_1_3,
55+
params = GossipParams(iDontWantMinMessageSizeThreshold = 5),
56+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
57+
partialMessagesHandler = nopPartialMessagesHandler,
58+
)
59+
test.connectAll()
60+
61+
test.mockRouters.forEach { it.subscribe(topicId) }
62+
test.gossipRouter.subscribe(topicId)
63+
64+
// Two heartbeats to ensure all peers are GRAFTed into the mesh.
65+
test.fuzz.timeController.addTime(2.seconds)
66+
67+
return test
68+
}
69+
70+
@Test
71+
fun `IDONTWANT not sent to peer that supports sending partial when we request partial`() {
72+
val test = startNetwork()
73+
74+
// We (gossipRouter) request partial for this topic.
75+
test.gossipRouter.setTopicPartialFlags(topicId, requestsPartial = true, supportsSendingPartial = false)
76+
77+
// mockRouters[2] (partial-peer): announces support at node level and for this topic.
78+
test.mockRouters[2].sendToSingle(controlExtensionsWithPartial())
79+
test.mockRouters[2].sendToSingle(subscribeRpc(requestsPartial = false, supportsSendingPartial = true))
80+
81+
// Ensure flags are applied before the message arrives.
82+
test.gossipRouter.submitOnEventThread {}.join()
83+
84+
// Publisher sends a message, triggering IDONTWANT emission to mesh peers.
85+
val msg = newMessage(topicId, 0L, "Hello".toByteArray())
86+
test.mockRouters[0].sendToSingle(
87+
Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build()
88+
)
89+
test.fuzz.timeController.addTime(100.millis)
90+
91+
// mockRouters[2] (partial-peer) MUST NOT receive IDONTWANT.
92+
val iDontWantsToPartialPeer = test.mockRouters[2].inboundMessages
93+
.filter { it.hasControl() }
94+
.flatMap { it.control.idontwantList }
95+
assertThat(iDontWantsToPartialPeer).isEmpty()
96+
97+
// mockRouters[1] (non-partial peer) MUST receive IDONTWANT.
98+
val iDontWantsToNonPartialPeer = test.mockRouters[1].inboundMessages
99+
.filter { it.hasControl() }
100+
.flatMap { it.control.idontwantList }
101+
assertThat(iDontWantsToNonPartialPeer).isNotEmpty()
102+
}
103+
104+
@Test
105+
fun `IDONTWANT still sent when partial extension is disabled`() {
106+
val test = ManyRoutersTest(
107+
mockRouterCount = 2,
108+
protocol = PubsubProtocol.Gossip_V_1_3,
109+
params = GossipParams(iDontWantMinMessageSizeThreshold = 5),
110+
)
111+
test.connectAll()
112+
test.mockRouters.forEach { it.subscribe(topicId) }
113+
test.gossipRouter.subscribe(topicId)
114+
test.fuzz.timeController.addTime(2.seconds)
115+
116+
val msg = newMessage(topicId, 0L, "Hello".toByteArray())
117+
test.mockRouters[0].sendToSingle(
118+
Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build()
119+
)
120+
test.fuzz.timeController.addTime(100.millis)
121+
122+
val iDontWants = test.mockRouters[1].inboundMessages
123+
.filter { it.hasControl() }
124+
.flatMap { it.control.idontwantList }
125+
assertThat(iDontWants).isNotEmpty()
126+
}
127+
128+
@Test
129+
fun `IDONTWANT still sent when we do not request partial for the topic`() {
130+
val test = startNetwork()
131+
132+
// We do NOT set requestsPartial=true for the topic.
133+
// mockRouters[2] supports sending partial at both node and topic level.
134+
test.mockRouters[2].sendToSingle(controlExtensionsWithPartial())
135+
test.mockRouters[2].sendToSingle(subscribeRpc(requestsPartial = false, supportsSendingPartial = true))
136+
test.gossipRouter.submitOnEventThread {}.join()
137+
138+
val msg = newMessage(topicId, 0L, "Hello".toByteArray())
139+
test.mockRouters[0].sendToSingle(
140+
Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build()
141+
)
142+
test.fuzz.timeController.addTime(100.millis)
143+
144+
// IDONTWANT IS sent because we never set requestsPartial=true locally.
145+
val iDontWantsToPartialPeer = test.mockRouters[2].inboundMessages
146+
.filter { it.hasControl() }
147+
.flatMap { it.control.idontwantList }
148+
assertThat(iDontWantsToPartialPeer).isNotEmpty()
149+
}
150+
151+
@Test
152+
fun `IDONTWANT still sent when peer did not announce supportsSendingPartial for the topic`() {
153+
val test = startNetwork()
154+
155+
// We request partial, but mockRouters[2] only announces node-level support
156+
// without a SubOpts supportsSendingPartial flag for this topic.
157+
test.gossipRouter.setTopicPartialFlags(topicId, requestsPartial = true, supportsSendingPartial = false)
158+
test.mockRouters[2].sendToSingle(controlExtensionsWithPartial())
159+
// No SubOpts with supportsSendingPartial=true — partialSubscriptionState has nothing.
160+
test.gossipRouter.submitOnEventThread {}.join()
161+
162+
val msg = newMessage(topicId, 0L, "Hello".toByteArray())
163+
test.mockRouters[0].sendToSingle(
164+
Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build()
165+
)
166+
test.fuzz.timeController.addTime(100.millis)
167+
168+
// IDONTWANT IS sent because the peer has no topic-level supportsSendingPartial.
169+
val iDontWantsToPartialPeer = test.mockRouters[2].inboundMessages
170+
.filter { it.hasControl() }
171+
.flatMap { it.control.idontwantList }
172+
assertThat(iDontWantsToPartialPeer).isNotEmpty()
173+
}
174+
}

0 commit comments

Comments
 (0)