Skip to content

Commit dad4323

Browse files
committed
Add mixed-peer interop test and simulator scenario for partial messages (step 10)
Wire partial-messages support through SimGossipRouter/Builder so the simulator can create networks with both partial-capable and non-partial peers. New tests: - PartialMessagesMixedPeerTest: 3-node E2E test (A partial, B partial, C full-only) verifying full-message suppression for partial peers, partial RPC delivery, and that non-partial senders still reach partial-capable peers unconditionally. - PartialMessagesSimTest: simulator scenario with a 4-peer star topology confirming full-message suppression and non-partial propagation in the gossip mesh. Refactoring: - GossipRouterBuilder: move router.partialMessages assignment from createGossipRouter() to build() so subclasses inherit the wiring without duplicating it; expose buildGossipExtensionsConfig() as protected. - SimGossipRouter: accept gossipExtensionsConfig as constructor parameter (defaults to GossipExtensionsConfig() for backward compatibility). - SimGossipRouterBuilder: pass buildGossipExtensionsConfig() to SimGossipRouter.
1 parent e28e453 commit dad4323

6 files changed

Lines changed: 438 additions & 7 deletions

File tree

docs/partial-messages.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ Mirror this checklist in issue #435.
377377
- [ ] **Step 7** — Routing: IDONTWANT suppression (§5.2).
378378
- [ ] **Step 8** — Heartbeat tick + TTL GC + cleanup hooks (§6.4).
379379
- [ ] **Step 9** — Routing: IHAVE replacement with `onEmitGossip` (§5.3).
380-
- [ ] **Step 10** — Simulator scenario + mixed-peer interop test (partial +
380+
- [x] **Step 10** — Simulator scenario + mixed-peer interop test (partial +
381381
non-partial nodes on the same topic).
382382

383383
---

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ open class GossipRouterBuilder(
8181
)
8282

8383
router.eventBroadcaster.listeners += gossipRouterEventListeners
84-
router.partialMessages = buildPartialMessagesAdapter()
8584
return router
8685
}
8786

@@ -103,10 +102,12 @@ open class GossipRouterBuilder(
103102
"GossipExtension.PARTIAL_MESSAGES is enabled but no partialMessagesHandler was provided"
104103
)
105104
}
106-
return createGossipRouter()
105+
val router = createGossipRouter()
106+
router.partialMessages = buildPartialMessagesAdapter()
107+
return router
107108
}
108109

109-
private fun buildGossipExtensionsConfig(): GossipExtensionsConfig {
110+
protected fun buildGossipExtensionsConfig(): GossipExtensionsConfig {
110111
return GossipExtensionsConfig(
111112
partialMessagesEnabled = enabledGossipExtensions.contains(GossipExtension.PARTIAL_MESSAGES),
112113
testExtensionEnabled = enabledGossipExtensions.contains(GossipExtension.TEST_EXTENSION)
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
package io.libp2p.pubsub.gossip.extensions
2+
3+
import com.google.protobuf.ByteString
4+
import io.libp2p.core.PeerId
5+
import io.libp2p.core.dsl.host
6+
import io.libp2p.core.mux.StreamMuxerProtocol
7+
import io.libp2p.core.pubsub.ValidationResult
8+
import io.libp2p.core.pubsub.Validator
9+
import io.libp2p.pubsub.PubsubProtocol
10+
import io.libp2p.pubsub.Topic
11+
import io.libp2p.pubsub.gossip.Gossip
12+
import io.libp2p.pubsub.gossip.GossipExtension
13+
import io.libp2p.pubsub.gossip.GossipRouter
14+
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
15+
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesHandler
16+
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesPeerFeedback
17+
import io.libp2p.pubsub.gossip.partialmessages.PublishAction
18+
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
19+
import io.libp2p.security.noise.NoiseXXSecureChannel
20+
import io.libp2p.transport.tcp.TcpTransport
21+
import io.netty.buffer.Unpooled
22+
import org.assertj.core.api.Assertions.assertThat
23+
import org.junit.jupiter.api.AfterEach
24+
import org.junit.jupiter.api.BeforeEach
25+
import org.junit.jupiter.api.Test
26+
import pubsub.pb.Rpc
27+
import java.util.concurrent.CompletableFuture
28+
import java.util.concurrent.CopyOnWriteArrayList
29+
import java.util.concurrent.TimeUnit
30+
import java.util.concurrent.TimeoutException
31+
import io.libp2p.core.pubsub.Topic as ApiTopic
32+
33+
/**
34+
* Mixed-peer interop test (Step 10).
35+
*
36+
* Three real libp2p hosts on the same topic:
37+
* - nodeA: partial-capable (requests + supports partial)
38+
* - nodeB: partial-capable (requests + supports partial)
39+
* - nodeC: non-partial (Gossip v1.3 without PARTIAL_MESSAGES extension)
40+
*
41+
* Topology: A—B and A—C (star, A is the hub).
42+
*
43+
* Key assertions:
44+
* 1. Full message from A is suppressed for B (partial), delivered to C (non-partial).
45+
* 2. Partial RPC from A reaches B but not C.
46+
* 3. Full message from C (non-partial sender) is received by A even though A is partial-capable;
47+
* non-partial senders cannot honour the partial-request and send full messages unconditionally.
48+
*/
49+
class PartialMessagesMixedPeerTest {
50+
51+
private val topic = "mixed-peer-topic"
52+
private val groupId = "group-mixed".toByteArray()
53+
54+
private val nodeBPartialRpcs = CopyOnWriteArrayList<Rpc.PartialMessagesExtension>()
55+
private val nodeBFullMessages = CopyOnWriteArrayList<ByteArray>()
56+
private val nodeCFullMessages = CopyOnWriteArrayList<ByteArray>()
57+
private val nodeAFullMessages = CopyOnWriteArrayList<ByteArray>()
58+
59+
private fun bHandler(): PartialMessagesHandler<ByteArray> =
60+
object : PartialMessagesHandler<ByteArray> {
61+
override fun onIncomingRpc(
62+
from: PeerId,
63+
peerStates: Map<PeerId, ByteArray>,
64+
rpc: Rpc.PartialMessagesExtension,
65+
feedback: PartialMessagesPeerFeedback,
66+
) {
67+
nodeBPartialRpcs += rpc
68+
}
69+
70+
override fun onEmitGossip(
71+
topic: Topic,
72+
groupId: ByteArray,
73+
gossipPeers: Collection<PeerId>,
74+
peerStates: Map<PeerId, ByteArray>,
75+
feedback: PartialMessagesPeerFeedback,
76+
) {}
77+
}
78+
79+
private fun buildPartialRouter(handler: PartialMessagesHandler<ByteArray>) =
80+
GossipRouterBuilder(
81+
protocol = PubsubProtocol.Gossip_V_1_3,
82+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
83+
partialMessagesHandler = handler,
84+
).build()
85+
86+
private fun buildNonPartialRouter() =
87+
GossipRouterBuilder(
88+
protocol = PubsubProtocol.Gossip_V_1_3,
89+
).build()
90+
91+
private val routerA by lazy { buildPartialRouter(nopPartialMessagesHandler) }
92+
private val routerB by lazy { buildPartialRouter(bHandler()) }
93+
private val routerC by lazy { buildNonPartialRouter() }
94+
95+
private val gossipA by lazy { Gossip(routerA) }
96+
private val gossipB by lazy { Gossip(routerB) }
97+
private val gossipC by lazy { Gossip(routerC) }
98+
99+
private fun buildHost(gossip: Gossip) = host {
100+
identity { random() }
101+
transports { add(::TcpTransport) }
102+
network { listen("/ip4/127.0.0.1/tcp/0") }
103+
secureChannels { add(::NoiseXXSecureChannel) }
104+
muxers { +StreamMuxerProtocol.Mplex }
105+
protocols { +gossip }
106+
}
107+
108+
private val hostA by lazy { buildHost(gossipA) }
109+
private val hostB by lazy { buildHost(gossipB) }
110+
private val hostC by lazy { buildHost(gossipC) }
111+
112+
@BeforeEach
113+
fun setUp() {
114+
hostA.start().get(5, TimeUnit.SECONDS)
115+
hostB.start().get(5, TimeUnit.SECONDS)
116+
hostC.start().get(5, TimeUnit.SECONDS)
117+
}
118+
119+
@AfterEach
120+
fun tearDown() {
121+
hostA.stop().get(5, TimeUnit.SECONDS)
122+
hostB.stop().get(5, TimeUnit.SECONDS)
123+
hostC.stop().get(5, TimeUnit.SECONDS)
124+
}
125+
126+
/**
127+
* Connects the three hosts and waits for all handshakes to settle:
128+
* - ControlExtensions exchanged between A↔B and A↔C
129+
* - SubOpts with partial flags from B→A and A→B
130+
*/
131+
private fun connectMixedNetwork(): Triple<PeerId, PeerId, PeerId> {
132+
// Partial flags must be set before subscribing so they are included in the
133+
// SubOpts sent on peer activation (onPeerActive → enqueueSubscribe reads them).
134+
routerA.setTopicPartialFlags(topic, requestsPartial = true, supportsSendingPartial = true)
135+
routerB.setTopicPartialFlags(topic, requestsPartial = true, supportsSendingPartial = true)
136+
137+
gossipA.subscribe(
138+
Validator { msg ->
139+
nodeAFullMessages += msg.data.array().copyOf()
140+
CompletableFuture.completedFuture(ValidationResult.Valid)
141+
},
142+
ApiTopic(topic),
143+
)
144+
gossipB.subscribe(
145+
Validator { msg ->
146+
nodeBFullMessages += msg.data.array().copyOf()
147+
CompletableFuture.completedFuture(ValidationResult.Valid)
148+
},
149+
ApiTopic(topic),
150+
)
151+
gossipC.subscribe(
152+
Validator { msg ->
153+
nodeCFullMessages += msg.data.array().copyOf()
154+
CompletableFuture.completedFuture(ValidationResult.Valid)
155+
},
156+
ApiTopic(topic),
157+
)
158+
159+
hostA.network.connect(hostB.peerId, hostB.listenAddresses().first()).get(10, TimeUnit.SECONDS)
160+
hostA.network.connect(hostC.peerId, hostC.listenAddresses().first()).get(10, TimeUnit.SECONDS)
161+
162+
val peerAId = hostA.peerId
163+
val peerBId = hostB.peerId
164+
val peerCId = hostC.peerId
165+
166+
// Wait for ControlExtensions handshake between A and B (both partial-capable).
167+
waitForOnEventThread(routerA) { routerA.gossipExtensionsState.peerSupportsPartialMessages(peerBId) }
168+
waitForOnEventThread(routerB) { routerB.gossipExtensionsState.peerSupportsPartialMessages(peerAId) }
169+
// Wait for A to have received ControlExtensions from C (non-partial — partialMessages=false).
170+
waitForOnEventThread(routerA) { routerA.gossipExtensionsState.hasReceivedControlExtensionsFrom(peerCId) }
171+
172+
// Wait for partial SubOpts: B→A and A→B.
173+
waitForOnEventThread(routerA) { routerA.partialSubscriptionState.peerRequestsPartial(topic, peerBId) }
174+
waitForOnEventThread(routerB) { routerB.partialSubscriptionState.peerRequestsPartial(topic, peerAId) }
175+
176+
return Triple(peerAId, peerBId, peerCId)
177+
}
178+
179+
private fun waitFor(predicate: () -> Boolean) {
180+
repeat(100) {
181+
if (predicate()) return
182+
Thread.sleep(100)
183+
}
184+
throw TimeoutException("Timed out waiting for condition")
185+
}
186+
187+
private fun waitForOnEventThread(router: GossipRouter, predicate: () -> Boolean) {
188+
waitFor { router.submitOnEventThread { predicate() }.get(1, TimeUnit.SECONDS) }
189+
}
190+
191+
// ── Test 1: full-message suppression ────────────────────────────────────
192+
193+
@Test
194+
fun `full message from partial node reaches non-partial peer but not partial peer`() {
195+
connectMixedNetwork()
196+
197+
val payload = "hello mixed network".toByteArray()
198+
gossipA.createPublisher(hostA.privKey, 0L)
199+
.publish(Unpooled.wrappedBuffer(payload), ApiTopic(topic))
200+
.get(5, TimeUnit.SECONDS)
201+
202+
// C (non-partial) MUST receive the full message.
203+
waitFor { nodeCFullMessages.isNotEmpty() }
204+
assertThat(nodeCFullMessages.first()).isEqualTo(payload)
205+
206+
// B (partial, requested suppression) MUST NOT receive the full message.
207+
Thread.sleep(500)
208+
assertThat(nodeBFullMessages).isEmpty()
209+
}
210+
211+
// ── Test 2: partial RPC delivery ────────────────────────────────────────
212+
213+
@Test
214+
fun `partial RPC from partial node reaches partial peer`() {
215+
val (_, peerBId, _) = connectMixedNetwork()
216+
217+
val partPayload = byteArrayOf(0xCA.toByte(), 0xFE.toByte())
218+
val partMeta = byteArrayOf(0x01)
219+
220+
gossipA.publishPartial(
221+
topic,
222+
groupId,
223+
PublishActionsFn<ByteArray> { _, _ ->
224+
sequenceOf(peerBId to PublishAction(partialMessage = partPayload, partsMetadata = partMeta))
225+
},
226+
).get(5, TimeUnit.SECONDS)
227+
228+
waitFor { nodeBPartialRpcs.isNotEmpty() }
229+
val rpc = nodeBPartialRpcs.single()
230+
assertThat(rpc.topicID).isEqualTo(topic)
231+
assertThat(rpc.groupID).isEqualTo(ByteString.copyFrom(groupId))
232+
assertThat(rpc.partialMessage.toByteArray()).isEqualTo(partPayload)
233+
assertThat(rpc.partsMetadata.toByteArray()).isEqualTo(partMeta)
234+
}
235+
236+
// ── Test 3: non-partial sender can still deliver to partial-capable nodes ─
237+
238+
@Test
239+
fun `non-partial node sends full message received by partial-capable direct peer`() {
240+
// C (non-partial) publishes. A (partial-capable, directly connected to C) receives
241+
// the full message because:
242+
// - Suppression is OUTBOUND only (A suppresses when it would send TO B).
243+
// - A still RECEIVES full messages from peers that don't support partial.
244+
connectMixedNetwork()
245+
246+
val payload = "from non-partial node C".toByteArray()
247+
gossipC.createPublisher(hostC.privKey, 0L)
248+
.publish(Unpooled.wrappedBuffer(payload), ApiTopic(topic))
249+
.get(5, TimeUnit.SECONDS)
250+
251+
waitFor { nodeAFullMessages.isNotEmpty() }
252+
assertThat(nodeAFullMessages.first()).isEqualTo(payload)
253+
}
254+
255+
// ── Helper: no-op handler for nodeA ─────────────────────────────────────
256+
257+
private val nopPartialMessagesHandler: PartialMessagesHandler<ByteArray> =
258+
object : PartialMessagesHandler<ByteArray> {
259+
override fun onIncomingRpc(
260+
from: PeerId,
261+
peerStates: Map<PeerId, ByteArray>,
262+
rpc: Rpc.PartialMessagesExtension,
263+
feedback: PartialMessagesPeerFeedback,
264+
) {}
265+
266+
override fun onEmitGossip(
267+
topic: Topic,
268+
groupId: ByteArray,
269+
gossipPeers: Collection<PeerId>,
270+
peerStates: Map<PeerId, ByteArray>,
271+
feedback: PartialMessagesPeerFeedback,
272+
) {}
273+
}
274+
}

tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/router/SimGossipRouter.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ class SimGossipRouter(
2424
seenMessages: SeenCache<Optional<ValidationResult>>,
2525
messageValidator: PubsubRouterMessageValidator,
2626
val serializeToBytes: Boolean,
27-
additionalHeartbeatDelay: Duration
27+
additionalHeartbeatDelay: Duration,
28+
gossipExtensionsConfig: GossipExtensionsConfig = GossipExtensionsConfig(),
2829
) : GossipRouter(
2930
params,
3031
scoreParams,
@@ -33,7 +34,7 @@ class SimGossipRouter(
3334
name,
3435
mCache,
3536
score,
36-
gossipExtensionsConfig = GossipExtensionsConfig(),
37+
gossipExtensionsConfig = gossipExtensionsConfig,
3738
subscriptionTopicSubscriptionFilter,
3839
protocol,
3940
executor,

tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/router/SimGossipRouterBuilder.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class SimGossipRouterBuilder : GossipRouterBuilder() {
2727
seenMessages = seenCache,
2828
messageValidator = messageValidator,
2929
serializeToBytes = serializeMessagesToBytes,
30-
additionalHeartbeatDelay = additionalHeartbeatDelay
30+
additionalHeartbeatDelay = additionalHeartbeatDelay,
31+
gossipExtensionsConfig = buildGossipExtensionsConfig(),
3132
)
3233

3334
router.eventBroadcaster.listeners += gossipRouterEventListeners

0 commit comments

Comments
 (0)