Skip to content

Commit 54824ee

Browse files
committed
Implement full-message suppression for partial-message peers (step 6)
1 parent f5a5a75 commit 54824ee

4 files changed

Lines changed: 181 additions & 3 deletions

File tree

docs/partial-messages.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ Mirror this checklist in issue #435.
373373
didn't request" MUST.
374374
- [x] **Step 5** — End-to-end integration test with a trivial bitmap-based
375375
handler. Exercises Steps 1-4 before any routing changes.
376-
- [ ] **Step 6** — Routing: full-message suppression (§5.1).
376+
- [x] **Step 6** — Routing: full-message suppression (§5.1).
377377
- [ ] **Step 7** — Routing: IDONTWANT suppression (§5.2).
378378
- [ ] **Step 8** — Heartbeat tick + TTL GC + cleanup hooks (§6.4).
379379
- [ ] **Step 9** — Routing: IHAVE replacement with `onEmitGossip` (§5.3).

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ open class GossipRouter(
579579
.plus(peersFromMesh)
580580
.distinct()
581581
.minus(receivedFrom)
582+
.filterNot { peerRequestsPartialForMessage(it, pubMsg.topics) }
582583
.filterNot { peerDoesNotWantMessage(it, pubMsg.messageId) }
583584
.forEach { submitPublishMessage(it, pubMsg) }
584585
mCache += pubMsg
@@ -603,6 +604,7 @@ open class GossipRouter(
603604
return if (peers.isNotEmpty()) {
604605
iDontWant(msg)
605606
val publishedMessages = peers
607+
.filterNot { peerRequestsPartialForMessage(it, msg.topics) }
606608
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
607609
.map { submitPublishMessage(it, msg) }
608610
if (publishedMessages.isEmpty()) {
@@ -832,6 +834,12 @@ open class GossipRouter(
832834
}
833835
}
834836

837+
private fun peerRequestsPartialForMessage(peer: PeerHandler, topics: Collection<Topic>): Boolean {
838+
if (!gossipExtensionsState.partialMessagesEnabled()) return false
839+
if (!gossipExtensionsState.peerSupportsPartialMessages(peer.peerId)) return false
840+
return topics.any { partialSubscriptionState.peerRequestsPartial(it, peer.peerId) }
841+
}
842+
835843
private fun peerDoesNotWantMessage(peer: PeerHandler, messageId: MessageId): Boolean {
836844
return peerIDontWant[peer]?.messageIdsAndTimeReceived?.contains(messageId) == true
837845
}

libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipTestsBase.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,20 @@ abstract class GossipTestsBase {
5555
val mockRouterCount: Int = 10,
5656
val params: GossipParams = GossipParams(),
5757
val scoreParams: GossipScoreParams = GossipScoreParams(),
58-
val protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1
58+
val protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1,
59+
val enabledGossipExtensions: List<GossipExtension> = listOf(),
60+
val partialMessagesHandler: PartialMessagesHandler<*>? = null,
5961
) {
6062
val fuzz = DeterministicFuzz()
61-
val gossipRouterBuilderFactory = { GossipRouterBuilder(protocol = protocol, params = params, scoreParams = scoreParams) }
63+
val gossipRouterBuilderFactory = {
64+
GossipRouterBuilder(
65+
protocol = protocol,
66+
params = params,
67+
scoreParams = scoreParams,
68+
enabledGossipExtensions = enabledGossipExtensions,
69+
partialMessagesHandler = partialMessagesHandler,
70+
)
71+
}
6272
val router0 = fuzz.createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilderFactory))
6373
val routers = (0 until mockRouterCount).map { fuzz.createTestRouter(createMockFuzzRouterFactory()) }
6474
val connections = mutableListOf<SemiduplexConnection>()
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package io.libp2p.pubsub.gossip.extensions
2+
3+
import io.libp2p.pubsub.PubsubProtocol
4+
import io.libp2p.pubsub.gossip.GossipExtension
5+
import io.libp2p.pubsub.gossip.GossipTestsBase
6+
import org.assertj.core.api.Assertions.assertThat
7+
import org.junit.jupiter.api.Test
8+
import pubsub.pb.Rpc
9+
10+
/**
11+
* Tests for Step 6 — full-message suppression (§5.1).
12+
*
13+
* When a peer supports partial messages (ControlExtensions handshake) AND has
14+
* requested partial delivery for a topic (SubOpts), the gossip router MUST NOT
15+
* send the full message to that peer in either [broadcastOutbound] or
16+
* [broadcastInbound]. The client is responsible for pushing parts via
17+
* [Gossip.publishPartial].
18+
*/
19+
class PartialMessagesFullMsgSuppressionTest : GossipTestsBase() {
20+
21+
private val topicId = "test-topic"
22+
23+
private fun newTest() = TwoRoutersTest(
24+
protocol = PubsubProtocol.Gossip_V_1_3,
25+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
26+
partialMessagesHandler = nopPartialMessagesHandler,
27+
)
28+
29+
private fun controlExtensionsWithPartial(): Rpc.RPC =
30+
Rpc.RPC.newBuilder().setControl(
31+
Rpc.ControlMessage.newBuilder().setExtensions(
32+
Rpc.ControlExtensions.newBuilder().setPartialMessages(true)
33+
)
34+
).build()
35+
36+
private fun subscribeRpc(
37+
topic: String = topicId,
38+
requestsPartial: Boolean,
39+
supportsSendingPartial: Boolean,
40+
): Rpc.RPC =
41+
Rpc.RPC.newBuilder().addSubscriptions(
42+
Rpc.RPC.SubOpts.newBuilder()
43+
.setTopicid(topic)
44+
.setSubscribe(true)
45+
.setRequestsPartial(requestsPartial)
46+
.setSupportsSendingPartial(supportsSendingPartial)
47+
).build()
48+
49+
private fun TwoRoutersTest.flushRouter() =
50+
gossipRouter.submitOnEventThread {}.join()
51+
52+
// ── broadcastOutbound ────────────────────────────────────────────────────
53+
54+
@Test
55+
fun `broadcastOutbound - full message NOT sent to peer that requested partial`() {
56+
val test = newTest()
57+
58+
test.mockRouter.subscribe(topicId)
59+
test.mockRouter.sendToSingle(controlExtensionsWithPartial())
60+
test.mockRouter.sendToSingle(subscribeRpc(requestsPartial = true, supportsSendingPartial = true))
61+
test.flushRouter()
62+
63+
val msg = newMessage(topicId, 0L, "Hello".toByteArray())
64+
test.gossipRouter.publish(msg)
65+
test.flushRouter()
66+
67+
assertThat(test.mockRouter.inboundMessages.none { it.publishCount > 0 }).isTrue()
68+
}
69+
70+
@Test
71+
fun `broadcastOutbound - full message still sent when partial extension disabled`() {
72+
val test = TwoRoutersTest(protocol = PubsubProtocol.Gossip_V_1_3)
73+
74+
test.mockRouter.subscribe(topicId)
75+
test.flushRouter()
76+
77+
val msg = newMessage(topicId, 0L, "Hello".toByteArray())
78+
test.gossipRouter.publish(msg)
79+
test.mockRouter.waitForMessage { it.publishCount > 0 }
80+
}
81+
82+
@Test
83+
fun `broadcastOutbound - full message still sent when peer did not request partial`() {
84+
val test = newTest()
85+
86+
test.mockRouter.subscribe(topicId)
87+
test.mockRouter.sendToSingle(controlExtensionsWithPartial())
88+
// No requestsPartial flag — peer supports sending but did not request
89+
test.mockRouter.sendToSingle(subscribeRpc(requestsPartial = false, supportsSendingPartial = true))
90+
test.flushRouter()
91+
92+
val msg = newMessage(topicId, 0L, "Hello".toByteArray())
93+
test.gossipRouter.publish(msg)
94+
test.mockRouter.waitForMessage { it.publishCount > 0 }
95+
}
96+
97+
@Test
98+
fun `broadcastOutbound - full message still sent when peer supports partial at node level but no topic sub flag`() {
99+
val test = newTest()
100+
101+
test.mockRouter.subscribe(topicId)
102+
// ControlExtensions: peer supports partial, but no SubOpts requestsPartial
103+
test.mockRouter.sendToSingle(controlExtensionsWithPartial())
104+
test.flushRouter()
105+
106+
val msg = newMessage(topicId, 0L, "Hello".toByteArray())
107+
test.gossipRouter.publish(msg)
108+
test.mockRouter.waitForMessage { it.publishCount > 0 }
109+
}
110+
111+
// ── broadcastInbound ─────────────────────────────────────────────────────
112+
113+
@Test
114+
fun `broadcastInbound - forwarded full message NOT sent to peer that requested partial`() {
115+
val test = ManyRoutersTest(
116+
mockRouterCount = 2,
117+
protocol = PubsubProtocol.Gossip_V_1_3,
118+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
119+
partialMessagesHandler = nopPartialMessagesHandler,
120+
)
121+
test.connectAll()
122+
123+
// Subscribe mock routers first so gossipRouter.subscribe grafts them immediately.
124+
test.routers.forEach { it.router.subscribe(topicId) }
125+
test.gossipRouter.subscribe(topicId)
126+
127+
// mockRouters[1] announces partial support and requests partial for the topic.
128+
test.mockRouters[1].sendToSingle(controlExtensionsWithPartial())
129+
test.mockRouters[1].sendToSingle(subscribeRpc(requestsPartial = true, supportsSendingPartial = true))
130+
131+
// mockRouters[0] sends a full message that gossipRouter would normally forward.
132+
test.mockRouters[0].sendToSingle(
133+
Rpc.RPC.newBuilder().addPublish(newProtoMessage(topicId, 0L, "Hello".toByteArray())).build()
134+
)
135+
test.fuzz.timeController.addTime(100)
136+
137+
assertThat(test.mockRouters[1].inboundMessages.none { it.publishCount > 0 }).isTrue()
138+
}
139+
140+
@Test
141+
fun `broadcastInbound - forwarded full message IS sent to non-partial peer (sanity)`() {
142+
val test = ManyRoutersTest(
143+
mockRouterCount = 2,
144+
protocol = PubsubProtocol.Gossip_V_1_3,
145+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
146+
partialMessagesHandler = nopPartialMessagesHandler,
147+
)
148+
test.connectAll()
149+
150+
test.routers.forEach { it.router.subscribe(topicId) }
151+
test.gossipRouter.subscribe(topicId)
152+
153+
// mockRouters[1] does NOT request partial — should receive the forwarded message.
154+
test.mockRouters[0].sendToSingle(
155+
Rpc.RPC.newBuilder().addPublish(newProtoMessage(topicId, 0L, "Hello".toByteArray())).build()
156+
)
157+
158+
test.mockRouters[1].waitForMessage { it.publishCount > 0 }
159+
}
160+
}

0 commit comments

Comments
 (0)