Skip to content

Commit 1d57a17

Browse files
committed
Updated naming convention
1 parent b1384ba commit 1d57a17

8 files changed

Lines changed: 147 additions & 121 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ abstract class AbstractRouter(
149149
subscribedTopics.forEach {
150150
partsQueue.addSubscribe(it)
151151
}
152-
153-
// TODO end control extension
154-
155152
flushPending(peer)
156153
}
157154

@@ -188,7 +185,9 @@ abstract class AbstractRouter(
188185
processControl(msg.control, peer)
189186
}
190187

191-
if (protocol.supportsExtensions()) {
188+
// TODO we need to handle the existence of extension messages more generically (https://github.com/libp2p/jvm-libp2p/issues/441)
189+
190+
if (protocol.supportsExtensions() && (msg.hasTestExtension() || msg.hasPartial())) {
192191
processExtensions(msg, peer)
193192
}
194193

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.libp2p.pubsub.PubsubApiImpl
1212
import io.libp2p.pubsub.PubsubProtocol
1313
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
1414
import io.netty.channel.ChannelHandler
15+
import org.slf4j.LoggerFactory
1516
import java.util.concurrent.CompletableFuture
1617

1718
class Gossip @JvmOverloads constructor(
@@ -21,6 +22,8 @@ class Gossip @JvmOverloads constructor(
2122
) :
2223
ProtocolBinding<Unit>, ConnectionHandler, PubsubApi by api {
2324

25+
private val logger = LoggerFactory.getLogger(Gossip::class.java)
26+
2427
fun updateTopicScoreParams(scoreParams: Map<String, GossipTopicScoreParams>) {
2528
router.score.updateTopicParams(scoreParams)
2629
}
@@ -62,6 +65,7 @@ class Gossip @JvmOverloads constructor(
6265
}
6366

6467
override fun initChannel(ch: P2PChannel, selectedProtocol: String): CompletableFuture<out Unit> {
68+
logger.trace("Gossip initChannel - selected protocol: {}", selectedProtocol)
6569
router.addPeerWithDebugHandler(ch as Stream, debugGossipHandler)
6670
return CompletableFuture.completedFuture(Unit)
6771
}

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipExtensionsState.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,33 @@ import pubsub.pb.Rpc
66
class GossipExtensionsState {
77

88
/*
9-
Tracks the peers that we have already sent an extension control message
9+
Tracks the peers that we have already sent a control extensions message
1010
*/
11-
private val outgoingExtensionControlMsgPeers: MutableSet<PeerId> = mutableSetOf()
11+
private val outgoingControlExtensionsMsgPeers: MutableSet<PeerId> = mutableSetOf()
1212

1313
/*
14-
Tracks peers that already sent us an extension control message
14+
Tracks peers that already sent us a control extensions message
1515
*/
1616
private val peerExtensionSupportMap: MutableMap<PeerId, Rpc.ControlExtensions> = mutableMapOf()
1717

1818
fun onPeerDisconnected(peer: PeerId) {
19-
outgoingExtensionControlMsgPeers.remove(peer)
19+
outgoingControlExtensionsMsgPeers.remove(peer)
2020
peerExtensionSupportMap.remove(peer)
2121
}
2222

23-
fun onExtensionControlMessage(ctrlExtensions: Rpc.ControlExtensions, receivedFrom: PeerId) {
23+
fun onControlExtensionsMessage(ctrlExtensions: Rpc.ControlExtensions, receivedFrom: PeerId) {
2424
peerExtensionSupportMap[receivedFrom] = ctrlExtensions
2525
}
2626

2727
fun registerControlExtensionMessageSentToPeers(peerId: PeerId) {
28-
outgoingExtensionControlMsgPeers.add(peerId)
28+
outgoingControlExtensionsMsgPeers.add(peerId)
2929
}
3030

3131
fun peerSupportedExtensions(peerId: PeerId) = peerExtensionSupportMap[peerId]
3232

33-
fun hasReceivedExtensionControlFrom(peer: PeerId) =
33+
fun hasReceivedControlExtensionsFrom(peer: PeerId) =
3434
peerExtensionSupportMap.contains(peer)
3535

36-
fun hasSentExtensionControlTo(peer: PeerId) =
37-
outgoingExtensionControlMsgPeers.contains(peer)
36+
fun hasSentControlExtensionsTo(peer: PeerId) =
37+
outgoingControlExtensionsMsgPeers.contains(peer)
3838
}

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ open class GossipRouter(
167167
super.onPeerActive(peer)
168168
eventBroadcaster.notifyConnected(peer.peerId, peer.getRemoteAddress())
169169
heartbeatTask.hashCode() // force lazy initialization
170-
sendExtensionsControl(peer)
170+
sendControlExtensions(peer)
171171
}
172172

173173
override fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {
@@ -400,35 +400,56 @@ open class GossipRouter(
400400
) {
401401
logger.trace("Received control extension {}", ctrlExtensions.toString())
402402

403-
if (gossipExtensionsState.hasReceivedExtensionControlFrom(receivedFrom.peerId)) {
403+
if (gossipExtensionsState.hasReceivedControlExtensionsFrom(receivedFrom.peerId)) {
404404
// TODO Should disconnect peers that send control extension multiple times (https://github.com/libp2p/jvm-libp2p/issues/437)
405405
logger.trace(
406406
"Received another control extension message from peer {}",
407407
receivedFrom.peerId
408408
)
409409
return
410410
} else {
411-
gossipExtensionsState.onExtensionControlMessage(ctrlExtensions, receivedFrom.peerId)
411+
gossipExtensionsState.onControlExtensionsMessage(ctrlExtensions, receivedFrom.peerId)
412412
}
413413
}
414414

415415
override fun processExtensions(msg: Rpc.RPC, receivedFrom: PeerHandler) {
416416
val peerSupportedExtensions =
417417
gossipExtensionsState.peerSupportedExtensions(receivedFrom.peerId)
418-
if (peerSupportedExtensions == null) {
418+
419+
// TODO Revisit this logic as part of adding feature flags (https://github.com/libp2p/jvm-libp2p/issues/441)
420+
421+
when {
422+
msg.hasTestExtension() && checkPeerExtensionSupport(
423+
peerSupportedExtensions,
424+
Rpc.ControlExtensions::hasTestExtension
425+
) ->
426+
processTestExtensionMessage(msg.testExtension, receivedFrom)
427+
428+
msg.hasPartial() && checkPeerExtensionSupport(
429+
peerSupportedExtensions,
430+
Rpc.ControlExtensions::hasPartialMessages
431+
) ->
432+
processPartialMessageExtension(msg.partial, receivedFrom)
433+
}
434+
}
435+
436+
private fun checkPeerExtensionSupport(
437+
peerSavedPreferences: Rpc.ControlExtensions?,
438+
checkSupportFunction: (Rpc.ControlExtensions) -> Boolean
439+
): Boolean {
440+
if (peerSavedPreferences == null) {
441+
return false
442+
}
443+
444+
if (!checkSupportFunction.invoke(peerSavedPreferences)) {
419445
logger.trace(
420-
"Ignoring extension messages from peer {} - did it send an extension control message?",
421-
receivedFrom.peerId
446+
"Ignoring extension messages from peer {} - did it send an control extensions message?",
447+
peerSavedPreferences
422448
)
423-
} else {
424-
when {
425-
peerSupportedExtensions.hasTestExtension() && msg.hasTestExtension() ->
426-
processTestExtensionMessage(msg.testExtension, receivedFrom)
427-
428-
peerSupportedExtensions.hasPartialMessages() && msg.hasPartial() ->
429-
processPartialMessageExtension(msg.partial, receivedFrom)
430-
}
449+
return false
431450
}
451+
452+
return true
432453
}
433454

434455
private fun processTestExtensionMessage(
@@ -582,7 +603,7 @@ open class GossipRouter(
582603
lastPublished -= topic
583604
}
584605

585-
activePeers.forEach { sendExtensionsControl(it) }
606+
activePeers.forEach { sendControlExtensions(it) }
586607
}
587608

588609
override fun unsubscribe(topic: Topic) {
@@ -783,23 +804,25 @@ open class GossipRouter(
783804
send(peer, iDontWant)
784805
}
785806

786-
private fun sendExtensionsControl(peer: PeerHandler) {
807+
private fun sendControlExtensions(peer: PeerHandler) {
787808
if (!this.protocol.supportsExtensions()) {
788809
logger.trace(
789-
"Protocol does not support extensions. Won't send extensions control message."
810+
"Protocol does not support extensions. Won't send control extensions message."
790811
)
791812
return
792813
}
793814

794-
if (gossipExtensionsState.hasSentExtensionControlTo(peer.peerId)) {
815+
if (gossipExtensionsState.hasSentControlExtensionsTo(peer.peerId)) {
795816
logger.trace(
796-
"Already sent extension control msg to peer {}. Won't send another one.",
817+
"Already sent control extensions msg to peer {}. Won't send another one.",
797818
peer.peerId
798819
)
799820
return
800821
}
801822

802-
pendingRpcParts.getQueue(peer).addExtensionsControl(
823+
logger.trace("Sending control extensions message to peer {}", peer.peerId)
824+
825+
pendingRpcParts.getQueue(peer).addControlExtensions(
803826
Rpc.ControlExtensions.newBuilder()
804827
.setTestExtension(true)
805828
.setPartialMessages(true)

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ interface GossipRpcPartsQueue : RpcPartsQueue {
2828
fun addPrune(topic: Topic, backoffSeconds: Long, backoffPeers: List<PeerId>)
2929

3030
// TODO Need to check if we should handle when control extension and extension messages could be separated by split (https://github.com/libp2p/jvm-libp2p/issues/440)
31-
fun addExtensionsControl(ctrlMessage: Rpc.ControlExtensions)
31+
fun addControlExtensions(ctrlMessage: Rpc.ControlExtensions)
3232
}
3333

3434
/**
@@ -110,7 +110,7 @@ open class DefaultGossipRpcPartsQueue(
110110
addPart(PrunePart(topic, backoffSeconds, backoffPeers))
111111
}
112112

113-
override fun addExtensionsControl(ctrlMessage: Rpc.ControlExtensions) {
113+
override fun addControlExtensions(ctrlMessage: Rpc.ControlExtensions) {
114114
addPart(ControlExtensionPart(ctrlMessage))
115115
}
116116

0 commit comments

Comments
 (0)