Skip to content

Commit 0586c6e

Browse files
committed
Add support for Extension Control Messages (gossipsub v1.3)
1 parent 6b0d806 commit 0586c6e

9 files changed

Lines changed: 243 additions & 1 deletion

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ abstract class AbstractRouter(
139139
*/
140140
protected abstract fun processControl(ctrl: Rpc.ControlMessage, receivedFrom: PeerHandler)
141141

142+
/**
143+
* Processes Gossipsub extensions messages
144+
*/
145+
protected abstract fun processExtensions(msg: Rpc.RPC, receivedFrom: PeerHandler)
146+
142147
override fun onPeerActive(peer: PeerHandler) {
143148
val partsQueue = pendingRpcParts.getQueue(peer)
144149
subscribedTopics.forEach {
@@ -180,6 +185,10 @@ abstract class AbstractRouter(
180185
processControl(msg.control, peer)
181186
}
182187

188+
if (protocol.supportsExtensions()) {
189+
processExtensions(msg, peer)
190+
}
191+
183192
val (msgSubscribed, nonSubscribed) = msg.publishList
184193
.partition { rpcMsg -> rpcMsg.topicIDsList.any { it in subscribedTopics } }
185194

libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubProtocol.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ enum class PubsubProtocol(val announceStr: ProtocolId) {
77
Gossip_V_1_0("/meshsub/1.0.0"),
88
Gossip_V_1_1("/meshsub/1.1.0"),
99
Gossip_V_1_2("/meshsub/1.2.0"),
10+
Gossip_V_1_3("/meshsub/1.3.0"),
1011
Floodsub("/floodsub/1.0.0");
1112

1213
companion object {
@@ -27,4 +28,11 @@ enum class PubsubProtocol(val announceStr: ProtocolId) {
2728
fun supportsIDontWant(): Boolean {
2829
return this == Gossip_V_1_2
2930
}
31+
32+
/**
33+
* https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md#the-extensions-control-message
34+
*/
35+
fun supportsExtensions(): Boolean {
36+
return this == Gossip_V_1_3
37+
}
3038
}

libp2p/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ class FloodRouter(executor: ScheduledExecutorService = Executors.newSingleThread
3636
// NOP
3737
}
3838

39+
override fun processExtensions(msg: Rpc.RPC, receivedFrom: PeerHandler) {
40+
// NOP
41+
}
42+
3943
private fun broadcast(msg: PubsubMessage, receivedFrom: PeerHandler?): CompletableFuture<Unit> {
4044
val peers = msg.topics
4145
.map { getTopicPeers(it) }

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ class Gossip @JvmOverloads constructor(
3131

3232
override val protocolDescriptor =
3333
when (router.protocol) {
34+
PubsubProtocol.Gossip_V_1_3 -> {
35+
ProtocolDescriptor(
36+
PubsubProtocol.Gossip_V_1_3.announceStr,
37+
PubsubProtocol.Gossip_V_1_2.announceStr,
38+
PubsubProtocol.Gossip_V_1_1.announceStr,
39+
PubsubProtocol.Gossip_V_1_0.announceStr
40+
)
41+
}
3442
PubsubProtocol.Gossip_V_1_2 -> {
3543
ProtocolDescriptor(
3644
PubsubProtocol.Gossip_V_1_2.announceStr,

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ open class GossipRouter(
132132
private val acceptRequestsWhitelist = mutableMapOf<PeerHandler, AcceptRequestsWhitelistEntry>()
133133
override val pendingRpcParts = PendingRpcPartsMap<GossipRpcPartsQueue> { DefaultGossipRpcPartsQueue(params) }
134134

135+
private val peerExtensionSupportMap = mutableMapOf<PeerId, Rpc.ControlExtensions>()
136+
135137
private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis())
136138
private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) {
137139
backoffExpireTimes[peer.peerId to topic] = currentTimeSupplier() + delay
@@ -384,6 +386,74 @@ open class GossipRouter(
384386
ctrl.run {
385387
(graftList + pruneList + ihaveList + iwantList + idontwantList)
386388
}.forEach { processControlMessage(it, receivedFrom) }
389+
390+
if (protocol.supportsExtensions() && ctrl.hasExtensions()) {
391+
processControlExtensions(ctrl.extensions, receivedFrom);
392+
}
393+
}
394+
395+
private fun processControlExtensions(
396+
ctrlExtensions: Rpc.ControlExtensions,
397+
receivedFrom: PeerHandler
398+
) {
399+
logger.info("Received control extension {}", ctrlExtensions.toString())
400+
401+
if (peerExtensionSupportMap[receivedFrom.peerId] != null) {
402+
//TODO Should downscore peers that send control extension multiple times? (https://github.com/libp2p/jvm-libp2p/issues/437)
403+
logger.trace(
404+
"Received another control extension message from peer {}",
405+
receivedFrom.peerId
406+
)
407+
return
408+
} else {
409+
peerExtensionSupportMap[receivedFrom.peerId] = ctrlExtensions
410+
}
411+
}
412+
413+
override fun processExtensions(msg: Rpc.RPC, receivedFrom: PeerHandler) {
414+
val peerSupportedExtensions = peerExtensionSupportMap[receivedFrom.peerId]
415+
if (peerSupportedExtensions == null) {
416+
logger.info(
417+
"Ignoring extension messages from peer {} - did it send an extension control message?",
418+
receivedFrom.peerId
419+
)
420+
} else {
421+
when {
422+
peerSupportedExtensions.hasTestExtension() && msg.hasTestExtension() ->
423+
processTestExtensionMessage(msg.testExtension, receivedFrom)
424+
425+
peerSupportedExtensions.hasPartialMessages() && msg.hasPartial() ->
426+
processPartialMessageExtension(msg.partial, receivedFrom)
427+
}
428+
}
429+
}
430+
431+
private fun processTestExtensionMessage(
432+
testExtensionMessage: Rpc.TestExtension,
433+
receivedFrom: PeerHandler
434+
) {
435+
logger.trace(
436+
"Processing test extension message {} from {}",
437+
testExtensionMessage.toByteArray(),
438+
receivedFrom.peerId
439+
)
440+
441+
val response =
442+
Rpc.RPC.newBuilder().setTestExtension(Rpc.TestExtension.newBuilder().build()).build();
443+
444+
send(receivedFrom, response);
445+
}
446+
447+
private fun processPartialMessageExtension(
448+
partialMessagesExtension: Rpc.PartialMessagesExtension,
449+
receivedFrom: PeerHandler
450+
) {
451+
logger.trace(
452+
"Processing partial message extension message {} from {}",
453+
partialMessagesExtension.toString(),
454+
receivedFrom.peerId
455+
)
456+
//TODO: implement partial message handling (https://github.com/libp2p/jvm-libp2p/issues/435)
387457
}
388458

389459
override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {

libp2p/src/main/proto/rpc.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ message RPC {
1616
}
1717

1818
optional ControlMessage control = 3;
19+
20+
// Canonical Extensions
1921
optional PartialMessagesExtension partial = 10;
22+
23+
// Experimental Extensions
24+
optional TestExtension testExtension = 6492434;
2025
}
2126

2227
message Message {
@@ -62,6 +67,10 @@ message ControlIDontWant {
6267

6368
message ControlExtensions {
6469
optional bool partialMessages = 10;
70+
71+
// Experimental extensions must use field numbers larger than 0x200000 to be
72+
// encoded with at least 4 bytes
73+
optional bool testExtension = 6492434;
6574
}
6675

6776
message PeerInfo {
@@ -103,3 +112,5 @@ message TopicDescriptor {
103112
}
104113
}
105114
}
115+
116+
message TestExtension {}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
@file:Suppress("ktlint:standard:class-naming")
2+
3+
package io.libp2p.pubsub.gossip
4+
5+
import io.libp2p.pubsub.PubsubProtocol
6+
import org.junit.jupiter.api.Test
7+
8+
class GossipV1_3Tests : GossipTestsBase() {
9+
10+
@Test
11+
fun selfSanityTest() {
12+
val test = TwoRoutersTest(protocol = PubsubProtocol.Gossip_V_1_3)
13+
14+
test.mockRouter.subscribe("topic1")
15+
val msg = newMessage("topic1", 0L, "Hello".toByteArray())
16+
test.gossipRouter.publish(msg)
17+
test.mockRouter.waitForMessage { it.publishCount > 0 }
18+
}
19+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package io.libp2p.pubsub.gossip.extensions
2+
3+
import io.libp2p.pubsub.PubsubProtocol
4+
import io.libp2p.pubsub.gossip.GossipTestsBase
5+
import org.junit.jupiter.api.Test
6+
import org.junit.jupiter.api.assertThrows
7+
import pubsub.pb.Rpc
8+
import java.util.concurrent.TimeoutException
9+
10+
class GossipExtensionsMessageHandlingTest : GossipTestsBase() {
11+
12+
@Test
13+
fun `extension messages sent to peer prior to gossip v1_3 are ignored`() {
14+
val test = TwoRoutersTest(
15+
protocol = PubsubProtocol.Gossip_V_1_2
16+
)
17+
18+
val rpcMessageWithControlExtensionAndTestExtensionMessages = Rpc.RPC.newBuilder()
19+
.setControl(
20+
Rpc.ControlMessage.newBuilder()
21+
.setExtensions(Rpc.ControlExtensions.newBuilder().setTestExtension(true))
22+
.build()
23+
)
24+
.setTestExtension(Rpc.TestExtension.newBuilder().build())
25+
.build()
26+
test.mockRouter.sendToSingle(rpcMessageWithControlExtensionAndTestExtensionMessages)
27+
28+
assertNoResponseFromTestExtension(test)
29+
}
30+
31+
@Test
32+
fun `extension messages sent to peer prior to sending extension control messages are ignored`() {
33+
val test = TwoRoutersTest(
34+
protocol = PubsubProtocol.Gossip_V_1_2
35+
)
36+
37+
val rpcMessageWithTestExtension =
38+
Rpc.RPC.newBuilder().setTestExtension(testExtensionMessage).build()
39+
test.mockRouter.sendToSingle(rpcMessageWithTestExtension)
40+
41+
assertNoResponseFromTestExtension(test)
42+
}
43+
44+
@Test
45+
fun `extension message flow with extension control message before actual extension message`() {
46+
val test = TwoRoutersTest(
47+
protocol = PubsubProtocol.Gossip_V_1_3
48+
)
49+
50+
val rpcMessageWithControl = Rpc.RPC.newBuilder().setControl(
51+
Rpc.ControlMessage.newBuilder().setExtensions(controlExtensionMessage())
52+
).build()
53+
test.mockRouter.sendToSingle(rpcMessageWithControl)
54+
55+
val rpcMessageWithTestExtension =
56+
Rpc.RPC.newBuilder().setTestExtension(testExtensionMessage).build()
57+
test.mockRouter.sendToSingle(rpcMessageWithTestExtension)
58+
59+
test.mockRouter.waitForMessage { it.hasTestExtension() }
60+
}
61+
62+
@Test
63+
fun `extension message flow with extension control and extension message in the same rpc message`() {
64+
val test = TwoRoutersTest(
65+
protocol = PubsubProtocol.Gossip_V_1_3
66+
)
67+
68+
val rpcMessageWithControlExtensionAndTestExtensionMessages = Rpc.RPC.newBuilder()
69+
.setControl(
70+
Rpc.ControlMessage.newBuilder()
71+
.setExtensions(Rpc.ControlExtensions.newBuilder().setTestExtension(true))
72+
.build()
73+
)
74+
.setTestExtension(Rpc.TestExtension.newBuilder().build())
75+
.build()
76+
test.mockRouter.sendToSingle(rpcMessageWithControlExtensionAndTestExtensionMessages)
77+
78+
test.mockRouter.waitForMessage { it.hasTestExtension() }
79+
}
80+
81+
companion object {
82+
val testExtensionControlEnabledMessage: Rpc.RPC = Rpc.RPC.newBuilder().setControl(
83+
Rpc.ControlMessage.newBuilder()
84+
.setExtensions(Rpc.ControlExtensions.newBuilder().setTestExtension(true).build())
85+
.build()
86+
).build();
87+
88+
fun controlExtensionMessage(testExtensionEnabled: Boolean = false): Rpc.ControlExtensions {
89+
return Rpc.ControlExtensions.newBuilder().setTestExtension(testExtensionEnabled).build()
90+
}
91+
92+
val testExtensionMessage: Rpc.TestExtension = Rpc.TestExtension.newBuilder().build()
93+
94+
fun assertNoResponseFromTestExtension(test: TwoRoutersTest) {
95+
assertThrows<TimeoutException> {
96+
test.mockRouter.waitForMessage(
97+
{ it.hasTestExtension() },
98+
500L
99+
);
100+
}
101+
}
102+
}
103+
}

libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/MockRouter.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import java.util.concurrent.ScheduledExecutorService
88
import java.util.concurrent.TimeUnit
99
import java.util.concurrent.TimeoutException
1010

11+
private const val DEFAULT_WAIT_FOR_MESSAGE_TIMEOUT_IN_MILLIS = 5000L
12+
1113
open class MockRouter(executor: ScheduledExecutorService) : AbstractRouter(
1214
protocol = PubsubProtocol.Floodsub,
1315
executor = executor,
@@ -26,9 +28,16 @@ open class MockRouter(executor: ScheduledExecutorService) : AbstractRouter(
2628
}
2729

2830
fun waitForMessage(predicate: (Rpc.RPC) -> Boolean): Rpc.RPC {
31+
return waitForMessage(predicate, DEFAULT_WAIT_FOR_MESSAGE_TIMEOUT_IN_MILLIS)
32+
}
33+
34+
fun waitForMessage(
35+
predicate: (Rpc.RPC) -> Boolean,
36+
timeoutInMillis: Long = DEFAULT_WAIT_FOR_MESSAGE_TIMEOUT_IN_MILLIS
37+
): Rpc.RPC {
2938
var cnt = 0
3039
while (true) {
31-
val msg = inboundMessages.poll(5, TimeUnit.SECONDS)
40+
val msg = inboundMessages.poll(timeoutInMillis, TimeUnit.MILLISECONDS)
3241
?: throw TimeoutException("No matching message received among $cnt")
3342
if (predicate(msg)) return msg
3443
cnt++
@@ -47,4 +56,5 @@ open class MockRouter(executor: ScheduledExecutorService) : AbstractRouter(
4756
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> = CompletableFuture.completedFuture(null)
4857
override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {}
4958
override fun processControl(ctrl: Rpc.ControlMessage, receivedFrom: PeerHandler) {}
59+
override fun processExtensions(msg: Rpc.RPC, receivedFrom: PeerHandler) {}
5060
}

0 commit comments

Comments
 (0)