Skip to content

Commit 1171864

Browse files
committed
Implement IHAVE replacement with onEmitGossip for partial-message peers (step 9)
§5.3 of the partial-messages spec: during the gossipsub heartbeat lazy-push, peers where we support sending partial AND they requested partial are excluded from IHAVE targets. Instead, handler.onEmitGossip is invoked once per locally-initiated (topic, groupId) group with the collected partial peers so the client can drive per-part delivery via publishPartial. - Add onEmitGossip(topic, partialPeers) to PartialMessagesAdapter and implement it in PartialMessagesAdapterImpl: iterates locally-initiated groups for the topic and calls handler.onEmitGossip for each. - Modify GossipRouter.emitGossip: when the adapter is set and we advertise supportsSendingPartial for the topic, partition the selected gossip targets into fullPeers (IHAVE) and partialPeers (onEmitGossip). Only fullPeers receive IHAVE. - Add PartialMessagesEmitGossipTest covering: IHAVE suppression for partial peers, onEmitGossip invocation for locally-initiated groups, no call for peer-initiated groups, and fallback to normal IHAVE when extension is off or flags unset.
1 parent f26fc37 commit 1171864

3 files changed

Lines changed: 350 additions & 2 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -818,9 +818,22 @@ open class GossipRouter(
818818
val peers = (getTopicPeers(topic) - excludePeers)
819819
.filter { score.score(it.peerId) >= scoreParams.gossipThreshold && !isDirect(it) }
820820

821-
peers.shuffled(random)
821+
val selected = peers.shuffled(random)
822822
.take(max((params.gossipFactor * peers.size).toInt(), params.DLazy))
823-
.forEach { enqueueIhave(it, shuffledMessageIds, topic) }
823+
824+
// §5.3: partition gossip targets into full-message peers and partial-capable peers.
825+
// Skip IHAVE for partial peers; call onEmitGossip for locally-initiated groups instead.
826+
val adapter = partialMessages
827+
if (adapter != null && localTopicPartialFlags[topic]?.supportsSendingPartial == true) {
828+
val (partialPeers, fullPeers) = selected.partition { peer ->
829+
gossipExtensionsState.peerSupportsPartialMessages(peer.peerId) &&
830+
partialSubscriptionState.peerRequestsPartial(topic, peer.peerId)
831+
}
832+
fullPeers.forEach { enqueueIhave(it, shuffledMessageIds, topic) }
833+
adapter.onEmitGossip(topic, partialPeers.map { it.peerId })
834+
} else {
835+
selected.forEach { enqueueIhave(it, shuffledMessageIds, topic) }
836+
}
824837
}
825838

826839
private fun graft(peer: PeerHandler, topic: Topic) {

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/partialmessages/PartialMessagesAdapter.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ internal interface PartialMessagesAdapter {
1818
fun onHeartbeat()
1919
fun onIncomingRpc(topic: Topic, from: PeerId, rpc: Rpc.PartialMessagesExtension)
2020

21+
/**
22+
* Called from [io.libp2p.pubsub.gossip.GossipRouter.emitGossip] for each topic where we
23+
* support sending partial and at least one gossip candidate requested partial.
24+
*
25+
* Iterates all locally-initiated groups under [topic] and invokes
26+
* [PartialMessagesHandler.onEmitGossip] once per group with [partialPeers] as the
27+
* gossip-target set. No-op if [partialPeers] is empty or there are no locally-initiated groups.
28+
*/
29+
fun onEmitGossip(topic: Topic, partialPeers: Collection<PeerId>)
30+
2131
/**
2232
* Executes the client's [PublishActionsFn], updates group state, and enqueues
2333
* outbound [Rpc.PartialMessagesExtension] RPCs via [enqueueFn].
@@ -58,6 +68,15 @@ internal class PartialMessagesAdapterImpl<PeerState>(
5868
handler.onIncomingRpc(from, groupState.peerStates, rpc, feedback)
5969
}
6070

71+
override fun onEmitGossip(topic: Topic, partialPeers: Collection<PeerId>) {
72+
if (partialPeers.isEmpty()) return
73+
for ((groupId, groupState) in stateStore.groupsForTopic(topic)) {
74+
if (!groupState.peerInitiated) {
75+
handler.onEmitGossip(topic, groupId.bytes, partialPeers, groupState.peerStates, feedback)
76+
}
77+
}
78+
}
79+
6180
@Suppress("UNCHECKED_CAST")
6281
override fun publishPartial(
6382
topic: Topic,
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
package io.libp2p.pubsub.gossip.extensions
2+
3+
import io.libp2p.core.PeerId
4+
import io.libp2p.etc.types.seconds
5+
import io.libp2p.pubsub.PubsubProtocol
6+
import io.libp2p.pubsub.Topic
7+
import io.libp2p.pubsub.gossip.GossipExtension
8+
import io.libp2p.pubsub.gossip.GossipParams
9+
import io.libp2p.pubsub.gossip.GossipTestsBase
10+
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesHandler
11+
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesPeerFeedback
12+
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
13+
import org.assertj.core.api.Assertions.assertThat
14+
import org.junit.jupiter.api.Test
15+
import pubsub.pb.Rpc
16+
17+
/**
18+
* Tests for Step 9 — IHAVE replacement with onEmitGossip (§5.3).
19+
*
20+
* When we support sending partial messages for a topic and a peer has requested
21+
* partial delivery, the gossip router MUST NOT send IHAVE to that peer during the
22+
* heartbeat lazy-push. Instead, the router calls handler.onEmitGossip once per
23+
* locally-initiated group under that topic.
24+
*/
25+
class PartialMessagesEmitGossipTest : GossipTestsBase() {
26+
27+
private val topicId = "test-topic"
28+
private val groupIdBytes = byteArrayOf(1, 2, 3)
29+
30+
// D=0 keeps all mock routers out of the mesh, making them IHAVE gossip candidates.
31+
// DLazy=6 ensures all eligible peers are selected in the lazy-push.
32+
// floodPublishMaxMessageSizeThreshold=0 disables flood publish so IHAVE is the
33+
// sole mechanism for notifying non-mesh peers about messages.
34+
private val testParams = GossipParams(
35+
D = 0,
36+
DLow = 0,
37+
DHigh = 0,
38+
DLazy = 6,
39+
floodPublishMaxMessageSizeThreshold = 0,
40+
)
41+
42+
private fun controlExtensionsWithPartial(): Rpc.RPC =
43+
Rpc.RPC.newBuilder().setControl(
44+
Rpc.ControlMessage.newBuilder().setExtensions(
45+
Rpc.ControlExtensions.newBuilder().setPartialMessages(true)
46+
)
47+
).build()
48+
49+
private fun subscribeRpc(
50+
requestsPartial: Boolean,
51+
supportsSendingPartial: Boolean,
52+
): Rpc.RPC =
53+
Rpc.RPC.newBuilder().addSubscriptions(
54+
Rpc.RPC.SubOpts.newBuilder()
55+
.setTopicid(topicId)
56+
.setSubscribe(true)
57+
.setRequestsPartial(requestsPartial)
58+
.setSupportsSendingPartial(supportsSendingPartial)
59+
).build()
60+
61+
/**
62+
* Creates a 2-mock-router test network where:
63+
* - mockRouters[0] is a plain non-partial peer
64+
* - mockRouters[1] is a partial-capable peer that has requested partial delivery
65+
* - gossipRouter supports sending partial for [topicId]
66+
* - D=0 so no mesh, all peers are lazy-push (IHAVE) candidates
67+
*/
68+
private fun startNetwork(
69+
handler: PartialMessagesHandler<*> = nopPartialMessagesHandler,
70+
): ManyRoutersTest {
71+
val test = ManyRoutersTest(
72+
mockRouterCount = 2,
73+
protocol = PubsubProtocol.Gossip_V_1_3,
74+
params = testParams,
75+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
76+
partialMessagesHandler = handler,
77+
)
78+
test.connectAll()
79+
test.mockRouters.forEach { it.subscribe(topicId) }
80+
test.gossipRouter.subscribe(topicId)
81+
test.gossipRouter.setTopicPartialFlags(
82+
topicId,
83+
requestsPartial = false,
84+
supportsSendingPartial = true,
85+
)
86+
87+
// mockRouters[1] is the partial peer: node-level support + topic-level request.
88+
test.mockRouters[1].sendToSingle(controlExtensionsWithPartial())
89+
test.mockRouters[1].sendToSingle(subscribeRpc(requestsPartial = true, supportsSendingPartial = true))
90+
91+
test.gossipRouter.submitOnEventThread {}.join()
92+
return test
93+
}
94+
95+
// ── IHAVE suppression ────────────────────────────────────────────────────
96+
97+
@Test
98+
fun `IHAVE not sent to partial peer when we support sending partial`() {
99+
val test = startNetwork()
100+
101+
test.gossipRouter.publish(newMessage(topicId, 0L, "data".toByteArray()))
102+
test.fuzz.timeController.addTime(2.seconds)
103+
104+
// Non-partial peer MUST receive IHAVE.
105+
val ihaveToNormal = test.mockRouters[0].inboundMessages
106+
.filter { it.hasControl() }
107+
.flatMap { it.control.ihaveList }
108+
assertThat(ihaveToNormal).isNotEmpty()
109+
110+
// Partial peer MUST NOT receive IHAVE.
111+
val ihaveToPartial = test.mockRouters[1].inboundMessages
112+
.filter { it.hasControl() }
113+
.flatMap { it.control.ihaveList }
114+
assertThat(ihaveToPartial).isEmpty()
115+
}
116+
117+
@Test
118+
fun `IHAVE still sent to partial peer when partial extension disabled`() {
119+
val test = ManyRoutersTest(
120+
mockRouterCount = 1,
121+
protocol = PubsubProtocol.Gossip_V_1_3,
122+
params = testParams,
123+
)
124+
test.connectAll()
125+
test.mockRouters[0].subscribe(topicId)
126+
test.gossipRouter.subscribe(topicId)
127+
test.gossipRouter.submitOnEventThread {}.join()
128+
129+
test.gossipRouter.publish(newMessage(topicId, 0L, "data".toByteArray()))
130+
test.fuzz.timeController.addTime(2.seconds)
131+
132+
val ihaves = test.mockRouters[0].inboundMessages
133+
.filter { it.hasControl() }
134+
.flatMap { it.control.ihaveList }
135+
assertThat(ihaves).isNotEmpty()
136+
}
137+
138+
@Test
139+
fun `IHAVE still sent to partial peer when we do not support sending partial for the topic`() {
140+
val test = ManyRoutersTest(
141+
mockRouterCount = 2,
142+
protocol = PubsubProtocol.Gossip_V_1_3,
143+
params = testParams,
144+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
145+
partialMessagesHandler = nopPartialMessagesHandler,
146+
)
147+
test.connectAll()
148+
test.mockRouters.forEach { it.subscribe(topicId) }
149+
test.gossipRouter.subscribe(topicId)
150+
// We do NOT call setTopicPartialFlags — supportsSendingPartial stays false.
151+
test.mockRouters[1].sendToSingle(controlExtensionsWithPartial())
152+
test.mockRouters[1].sendToSingle(subscribeRpc(requestsPartial = true, supportsSendingPartial = true))
153+
test.gossipRouter.submitOnEventThread {}.join()
154+
155+
test.gossipRouter.publish(newMessage(topicId, 0L, "data".toByteArray()))
156+
test.fuzz.timeController.addTime(2.seconds)
157+
158+
// IHAVE IS sent because we don't support sending partial for this topic.
159+
val ihaveToPartialPeer = test.mockRouters[1].inboundMessages
160+
.filter { it.hasControl() }
161+
.flatMap { it.control.ihaveList }
162+
assertThat(ihaveToPartialPeer).isNotEmpty()
163+
}
164+
165+
@Test
166+
fun `IHAVE still sent when peer supports partial but did not request it`() {
167+
val test = ManyRoutersTest(
168+
mockRouterCount = 2,
169+
protocol = PubsubProtocol.Gossip_V_1_3,
170+
params = testParams,
171+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
172+
partialMessagesHandler = nopPartialMessagesHandler,
173+
)
174+
test.connectAll()
175+
test.mockRouters.forEach { it.subscribe(topicId) }
176+
test.gossipRouter.subscribe(topicId)
177+
test.gossipRouter.setTopicPartialFlags(
178+
topicId,
179+
requestsPartial = false,
180+
supportsSendingPartial = true,
181+
)
182+
// mockRouters[1] supports sending partial but did NOT request it.
183+
test.mockRouters[1].sendToSingle(controlExtensionsWithPartial())
184+
test.mockRouters[1].sendToSingle(subscribeRpc(requestsPartial = false, supportsSendingPartial = true))
185+
test.gossipRouter.submitOnEventThread {}.join()
186+
187+
test.gossipRouter.publish(newMessage(topicId, 0L, "data".toByteArray()))
188+
test.fuzz.timeController.addTime(2.seconds)
189+
190+
// IHAVE IS sent because the peer didn't request partial.
191+
val ihaveToPartialPeer = test.mockRouters[1].inboundMessages
192+
.filter { it.hasControl() }
193+
.flatMap { it.control.ihaveList }
194+
assertThat(ihaveToPartialPeer).isNotEmpty()
195+
}
196+
197+
// ── onEmitGossip callback ─────────────────────────────────────────────────
198+
199+
@Test
200+
fun `onEmitGossip called for locally-initiated group with partial-capable gossip peers`() {
201+
data class Call(val topic: Topic, val groupId: ByteArray, val peers: List<PeerId>)
202+
val calls = mutableListOf<Call>()
203+
204+
val handler = object : PartialMessagesHandler<Unit> {
205+
override fun onIncomingRpc(
206+
from: PeerId,
207+
peerStates: Map<PeerId, Unit>,
208+
rpc: Rpc.PartialMessagesExtension,
209+
feedback: PartialMessagesPeerFeedback,
210+
) {}
211+
212+
override fun onEmitGossip(
213+
topic: Topic,
214+
groupId: ByteArray,
215+
gossipPeers: Collection<PeerId>,
216+
peerStates: Map<PeerId, Unit>,
217+
feedback: PartialMessagesPeerFeedback,
218+
) {
219+
calls += Call(topic, groupId, gossipPeers.toList())
220+
}
221+
}
222+
223+
val test = startNetwork(handler)
224+
val partialPeerId = test.routers[1].peerId
225+
226+
// Create a locally-initiated group via publishPartial.
227+
test.gossipRouter.publishPartial(topicId, groupIdBytes, PublishActionsFn<Unit> { _, _ -> emptySequence() })
228+
229+
// Publish a message to put an entry in the mcache; emitGossip only runs when
230+
// mcache is non-empty.
231+
test.gossipRouter.publish(newMessage(topicId, 0L, "data".toByteArray()))
232+
233+
test.fuzz.timeController.addTime(2.seconds)
234+
235+
assertThat(calls).isNotEmpty()
236+
val call = calls.first { it.topic == topicId && it.groupId.contentEquals(groupIdBytes) }
237+
assertThat(call.peers).contains(partialPeerId)
238+
}
239+
240+
@Test
241+
fun `onEmitGossip not called for peer-initiated groups`() {
242+
data class Call(val topic: Topic, val groupId: ByteArray)
243+
val calls = mutableListOf<Call>()
244+
245+
val handler = object : PartialMessagesHandler<Unit> {
246+
override fun onIncomingRpc(
247+
from: PeerId,
248+
peerStates: Map<PeerId, Unit>,
249+
rpc: Rpc.PartialMessagesExtension,
250+
feedback: PartialMessagesPeerFeedback,
251+
) {}
252+
253+
override fun onEmitGossip(
254+
topic: Topic,
255+
groupId: ByteArray,
256+
gossipPeers: Collection<PeerId>,
257+
peerStates: Map<PeerId, Unit>,
258+
feedback: PartialMessagesPeerFeedback,
259+
) {
260+
calls += Call(topic, groupId)
261+
}
262+
}
263+
264+
val test = startNetwork(handler)
265+
val peerGroupId = byteArrayOf(9, 8, 7)
266+
267+
// Simulate an inbound partial RPC that creates a peer-initiated group.
268+
val inboundRpc = Rpc.RPC.newBuilder().setPartial(
269+
Rpc.PartialMessagesExtension.newBuilder()
270+
.setTopicID(topicId)
271+
.setGroupID(com.google.protobuf.ByteString.copyFrom(peerGroupId))
272+
.setPartsMetadata(com.google.protobuf.ByteString.copyFrom(byteArrayOf(0x01)))
273+
).build()
274+
test.mockRouters[1].sendToSingle(inboundRpc)
275+
test.gossipRouter.submitOnEventThread {}.join()
276+
277+
// There are no locally-initiated groups; only a peer-initiated one.
278+
test.gossipRouter.publish(newMessage(topicId, 0L, "data".toByteArray()))
279+
test.fuzz.timeController.addTime(2.seconds)
280+
281+
// handler.onEmitGossip MUST NOT be called for the peer-initiated group.
282+
assertThat(calls.filter { it.groupId.contentEquals(peerGroupId) }).isEmpty()
283+
}
284+
285+
@Test
286+
fun `onEmitGossip not called when there are no locally-initiated groups`() {
287+
val calls = mutableListOf<Unit>()
288+
289+
val handler = object : PartialMessagesHandler<Unit> {
290+
override fun onIncomingRpc(
291+
from: PeerId,
292+
peerStates: Map<PeerId, Unit>,
293+
rpc: Rpc.PartialMessagesExtension,
294+
feedback: PartialMessagesPeerFeedback,
295+
) {}
296+
297+
override fun onEmitGossip(
298+
topic: Topic,
299+
groupId: ByteArray,
300+
gossipPeers: Collection<PeerId>,
301+
peerStates: Map<PeerId, Unit>,
302+
feedback: PartialMessagesPeerFeedback,
303+
) {
304+
calls += Unit
305+
}
306+
}
307+
308+
val test = startNetwork(handler)
309+
310+
// No publishPartial call → no locally-initiated groups exist.
311+
test.gossipRouter.publish(newMessage(topicId, 0L, "data".toByteArray()))
312+
test.fuzz.timeController.addTime(2.seconds)
313+
314+
assertThat(calls).isEmpty()
315+
}
316+
}

0 commit comments

Comments
 (0)