Skip to content

Commit 6adf574

Browse files
committed
Add PartialMessagesHandler API and GroupState container (step 2)
Introduces the public partial-messages API surface and the internal state management layer required before any routing logic lands: Public API (io.libp2p.pubsub.gossip.partialmessages): - PartialMessagesHandler<PeerState> — onIncomingRpc + onEmitGossip; PartialMessagesPeerFeedback passed per-call (resolves open question from design doc §9) - PublishAction<PeerState> / PublishActionsFn<PeerState> - PartialMessagesPeerFeedback interface + FeedbackKind enum Internal state management: - GroupId — content-equality ByteArray wrapper for use as map key - GroupState<PeerState> — per-(topic,groupId) container with mutable TTL and app-opaque peerStates - PartialGroupStateStore<PeerState> — TTL countdown, GC on ttl≤0 or empty peerStates, DoS caps (255/topic, 8/topic/peer, matching go-libp2p defaults), onPeerDisconnected, onTopicUnsubscribed - PartialMessagesAdapter (internal interface) / PartialMessagesAdapterImpl<PeerState> — erases PeerState at the GossipRouter boundary via a single @Suppress("UNCHECKED_CAST") in the builder Wiring: - GossipRouterBuilder: partialMessagesHandler field; build-time error if PARTIAL_MESSAGES extension enabled without a handler - GossipRouter: internal var partialMessages: PartialMessagesAdapter? No routing changes in this step.
1 parent 80b8135 commit 6adf574

12 files changed

Lines changed: 620 additions & 5 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import io.libp2p.core.pubsub.ValidationResult
77
import io.libp2p.etc.types.*
88
import io.libp2p.etc.util.P2PService
99
import io.libp2p.pubsub.*
10+
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesAdapter
1011
import org.slf4j.LoggerFactory
1112
import pubsub.pb.Rpc
1213
import java.time.Duration
@@ -135,6 +136,7 @@ open class GossipRouter(
135136

136137
val gossipExtensionsState = GossipExtensionsState(gossipExtensionsConfig)
137138
val partialSubscriptionState = PartialSubscriptionState()
139+
internal var partialMessages: PartialMessagesAdapter? = null
138140

139141
/**
140142
* Local per-topic subscription options that affect outbound subscribe announcements.

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import io.libp2p.core.pubsub.ValidationResult
55
import io.libp2p.etc.types.lazyVar
66
import io.libp2p.pubsub.*
77
import io.libp2p.pubsub.gossip.*
8+
import io.libp2p.pubsub.gossip.partialmessages.*
89
import java.util.*
910
import java.util.concurrent.Executors
1011
import java.util.concurrent.ScheduledExecutorService
@@ -40,6 +41,13 @@ open class GossipRouterBuilder(
4041
},
4142
val gossipRouterEventListeners: MutableList<GossipRouterEventListener> = mutableListOf(),
4243
val enabledGossipExtensions: List<GossipExtension> = mutableListOf(),
44+
45+
/**
46+
* Client-supplied handler for the partial-messages extension.
47+
* Required when [GossipExtension.PARTIAL_MESSAGES] is enabled; a build-time
48+
* error is thrown if the extension is enabled without a handler.
49+
*/
50+
var partialMessagesHandler: PartialMessagesHandler<*>? = null,
4351
) {
4452

4553
var seenCache: SeenCache<Optional<ValidationResult>> by lazyVar { TTLSeenCache(SimpleSeenCache(), params.seenTTL, currentTimeSupplier) }
@@ -73,12 +81,28 @@ open class GossipRouterBuilder(
7381
)
7482

7583
router.eventBroadcaster.listeners += gossipRouterEventListeners
84+
router.partialMessages = buildPartialMessagesAdapter()
7685
return router
7786
}
7887

88+
@Suppress("UNCHECKED_CAST")
89+
private fun buildPartialMessagesAdapter(): PartialMessagesAdapter? {
90+
val handler = partialMessagesHandler ?: return null
91+
return PartialMessagesAdapterImpl(
92+
handler = handler as PartialMessagesHandler<Any?>,
93+
stateStore = PartialGroupStateStore(),
94+
feedback = NopPartialMessagesFeedback,
95+
)
96+
}
97+
7998
open fun build(): GossipRouter {
8099
if (disposed) throw RuntimeException("The builder was already used")
81100
disposed = true
101+
if (enabledGossipExtensions.contains(GossipExtension.PARTIAL_MESSAGES) && partialMessagesHandler == null) {
102+
throw IllegalStateException(
103+
"GossipExtension.PARTIAL_MESSAGES is enabled but no partialMessagesHandler was provided"
104+
)
105+
}
82106
return createGossipRouter()
83107
}
84108

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package io.libp2p.pubsub.gossip.partialmessages
2+
3+
import io.libp2p.core.PeerId
4+
import io.libp2p.pubsub.Topic
5+
import org.slf4j.LoggerFactory
6+
7+
private val logger = LoggerFactory.getLogger(PartialGroupStateStore::class.java)
8+
9+
const val DEFAULT_GROUP_TTL_HEARTBEATS = 5
10+
const val DEFAULT_PEER_INITIATED_GROUP_LIMIT_PER_TOPIC = 255
11+
const val DEFAULT_PEER_INITIATED_GROUP_LIMIT_PER_TOPIC_PER_PEER = 8
12+
13+
/**
14+
* Stable, value-based identity for a partial-messages group ID.
15+
*
16+
* Wraps a raw [ByteArray] so it can be used as a [HashMap] key with
17+
* content equality rather than reference equality.
18+
*/
19+
class GroupId(val bytes: ByteArray) {
20+
override fun equals(other: Any?): Boolean =
21+
other is GroupId && bytes.contentEquals(other.bytes)
22+
override fun hashCode(): Int = bytes.contentHashCode()
23+
override fun toString(): String = bytes.joinToString("") { "%02x".format(it) }
24+
}
25+
26+
fun ByteArray.toGroupId(): GroupId = GroupId(this)
27+
28+
/**
29+
* Per-(topic, groupId) state container.
30+
*
31+
* [peerStates] is mutable and updated as parts arrive.
32+
* [ttlInHeartbeats] is decremented each heartbeat and reset on [PartialGroupStateStore.resetTtl].
33+
* [initiatingPeer] is non-null iff [peerInitiated] is true.
34+
*
35+
* NOT thread-safe: accessed only on the pubsub event loop.
36+
*/
37+
class GroupState<PeerState>(
38+
var ttlInHeartbeats: Int,
39+
val peerInitiated: Boolean,
40+
val initiatingPeer: PeerId?
41+
) {
42+
val peerStates: MutableMap<PeerId, PeerState> = mutableMapOf()
43+
}
44+
45+
/**
46+
* Stores and manages per-(topic, groupId) [GroupState] entries for the partial-messages
47+
* extension.
48+
*
49+
* DoS caps (matching go-libp2p defaults):
50+
* - [peerInitiatedGroupLimitPerTopic]: max peer-initiated groups across all peers per topic.
51+
* - [peerInitiatedGroupLimitPerTopicPerPeer]: max peer-initiated groups per (topic, peer).
52+
*
53+
* NOT thread-safe: all access must be serialised on the pubsub event loop.
54+
*/
55+
class PartialGroupStateStore<PeerState>(
56+
val groupTtlHeartbeats: Int = DEFAULT_GROUP_TTL_HEARTBEATS,
57+
val peerInitiatedGroupLimitPerTopic: Int = DEFAULT_PEER_INITIATED_GROUP_LIMIT_PER_TOPIC,
58+
val peerInitiatedGroupLimitPerTopicPerPeer: Int = DEFAULT_PEER_INITIATED_GROUP_LIMIT_PER_TOPIC_PER_PEER
59+
) {
60+
private val groups: HashMap<Topic, HashMap<GroupId, GroupState<PeerState>>> = hashMapOf()
61+
62+
fun getGroup(topic: Topic, groupId: GroupId): GroupState<PeerState>? =
63+
groups[topic]?.get(groupId)
64+
65+
/**
66+
* Returns the group for (topic, groupId), creating it as a locally-initiated group
67+
* if absent. Resets the TTL if the group already exists.
68+
*/
69+
fun getOrCreateLocalGroup(topic: Topic, groupId: GroupId): GroupState<PeerState> {
70+
val topicGroups = groups.getOrPut(topic) { hashMapOf() }
71+
val existing = topicGroups[groupId]
72+
if (existing != null) {
73+
existing.ttlInHeartbeats = groupTtlHeartbeats
74+
return existing
75+
}
76+
return GroupState<PeerState>(
77+
ttlInHeartbeats = groupTtlHeartbeats,
78+
peerInitiated = false,
79+
initiatingPeer = null
80+
).also { topicGroups[groupId] = it }
81+
}
82+
83+
/**
84+
* Returns the group for (topic, groupId), creating it as a peer-initiated group if absent.
85+
* Returns null and drops the RPC if either DoS cap would be exceeded.
86+
*/
87+
fun getOrCreatePeerGroup(topic: Topic, groupId: GroupId, peer: PeerId): GroupState<PeerState>? {
88+
val topicGroups = groups.getOrPut(topic) { hashMapOf() }
89+
val existing = topicGroups[groupId]
90+
if (existing != null) return existing
91+
92+
val totalPeerInitiated = topicGroups.values.count { it.peerInitiated }
93+
if (totalPeerInitiated >= peerInitiatedGroupLimitPerTopic) {
94+
logger.debug(
95+
"Dropping peer-initiated group {} from {}: per-topic cap {} reached for topic {}",
96+
groupId,
97+
peer,
98+
peerInitiatedGroupLimitPerTopic,
99+
topic
100+
)
101+
return null
102+
}
103+
104+
val peerTotal = topicGroups.values.count { it.initiatingPeer == peer }
105+
if (peerTotal >= peerInitiatedGroupLimitPerTopicPerPeer) {
106+
logger.debug(
107+
"Dropping peer-initiated group {} from {}: per-peer cap {} reached for topic {}",
108+
groupId,
109+
peer,
110+
peerInitiatedGroupLimitPerTopicPerPeer,
111+
topic
112+
)
113+
return null
114+
}
115+
116+
return GroupState<PeerState>(
117+
ttlInHeartbeats = groupTtlHeartbeats,
118+
peerInitiated = true,
119+
initiatingPeer = peer
120+
).also { topicGroups[groupId] = it }
121+
}
122+
123+
/** Resets the TTL for (topic, groupId). Called by publishPartial. */
124+
fun resetTtl(topic: Topic, groupId: GroupId) {
125+
groups[topic]?.get(groupId)?.let { it.ttlInHeartbeats = groupTtlHeartbeats }
126+
}
127+
128+
/** Returns a read-only snapshot of all groups for [topic]. */
129+
fun groupsForTopic(topic: Topic): Map<GroupId, GroupState<PeerState>> =
130+
groups[topic] ?: emptyMap()
131+
132+
/**
133+
* Decrements TTLs and garbage-collects expired groups (TTL ≤ 0) and
134+
* groups whose peerStates map has become empty.
135+
*/
136+
fun onHeartbeat() {
137+
val topicIter = groups.entries.iterator()
138+
while (topicIter.hasNext()) {
139+
val (_, topicGroups) = topicIter.next()
140+
val groupIter = topicGroups.entries.iterator()
141+
while (groupIter.hasNext()) {
142+
val (_, group) = groupIter.next()
143+
group.ttlInHeartbeats--
144+
if (group.ttlInHeartbeats <= 0 || group.peerStates.isEmpty()) {
145+
groupIter.remove()
146+
}
147+
}
148+
if (topicGroups.isEmpty()) topicIter.remove()
149+
}
150+
}
151+
152+
/**
153+
* Removes [peer] from all group peerStates; garbage-collects groups that
154+
* become empty as a result.
155+
*/
156+
fun onPeerDisconnected(peer: PeerId) {
157+
val topicIter = groups.entries.iterator()
158+
while (topicIter.hasNext()) {
159+
val (_, topicGroups) = topicIter.next()
160+
val groupIter = topicGroups.entries.iterator()
161+
while (groupIter.hasNext()) {
162+
val (_, group) = groupIter.next()
163+
group.peerStates.remove(peer)
164+
if (group.peerStates.isEmpty()) groupIter.remove()
165+
}
166+
if (topicGroups.isEmpty()) topicIter.remove()
167+
}
168+
}
169+
170+
/** Drops all group state for [topic] (called when we unsubscribe). */
171+
fun onTopicUnsubscribed(topic: Topic) {
172+
groups.remove(topic)
173+
}
174+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.libp2p.pubsub.gossip.partialmessages
2+
3+
import io.libp2p.core.PeerId
4+
import io.libp2p.pubsub.Topic
5+
6+
/**
7+
* Type-erased view of the partial-messages subsystem used by [io.libp2p.pubsub.gossip.GossipRouter].
8+
*
9+
* All methods are called on the pubsub event thread.
10+
*/
11+
internal interface PartialMessagesAdapter {
12+
fun onPeerDisconnected(peer: PeerId)
13+
fun onTopicUnsubscribed(topic: Topic)
14+
fun onHeartbeat()
15+
}
16+
17+
/**
18+
* Bridges [GossipRouter] (which has no [PeerState] type parameter) to the typed
19+
* [PartialMessagesHandler] and [PartialGroupStateStore].
20+
*
21+
* Created once in [io.libp2p.pubsub.gossip.builders.GossipRouterBuilder] with an
22+
* unchecked cast that is safe because [PeerState] is captured and used consistently
23+
* throughout the lifetime of this object.
24+
*/
25+
internal class PartialMessagesAdapterImpl<PeerState>(
26+
val handler: PartialMessagesHandler<PeerState>,
27+
val stateStore: PartialGroupStateStore<PeerState>,
28+
val feedback: PartialMessagesPeerFeedback
29+
) : PartialMessagesAdapter {
30+
31+
override fun onPeerDisconnected(peer: PeerId) = stateStore.onPeerDisconnected(peer)
32+
override fun onTopicUnsubscribed(topic: Topic) = stateStore.onTopicUnsubscribed(topic)
33+
override fun onHeartbeat() = stateStore.onHeartbeat()
34+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.libp2p.pubsub.gossip.partialmessages
2+
3+
import io.libp2p.core.PeerId
4+
import io.libp2p.pubsub.Topic
5+
import pubsub.pb.Rpc
6+
7+
/**
8+
* Client-supplied handler for the partial-messages extension.
9+
*
10+
* Both callbacks run on the pubsub event thread and MUST be fast and non-blocking.
11+
* Dispatch heavy work (decoding, KZG validation) to a separate executor.
12+
*
13+
* @param PeerState opaque per-(topic, groupId, peerId) state that the library
14+
* stores and passes back; the library never interprets it.
15+
*/
16+
interface PartialMessagesHandler<PeerState> {
17+
18+
/**
19+
* Called on every inbound [Rpc.PartialMessagesExtension] RPC.
20+
*
21+
* Any of [rpc].partialMessage and [rpc].partsMetadata may be absent; all
22+
* four combinations are valid wire messages.
23+
*
24+
* [peerStates] reflects the current state for this (topic, groupId) pair across
25+
* all peers. The map is a live view — do not retain a reference outside this call.
26+
*/
27+
fun onIncomingRpc(
28+
from: PeerId,
29+
peerStates: Map<PeerId, PeerState>,
30+
rpc: Rpc.PartialMessagesExtension,
31+
feedback: PartialMessagesPeerFeedback
32+
)
33+
34+
/**
35+
* Called once per locally-initiated group during the gossipsub heartbeat for
36+
* gossip targets that are partial-capable on [topic].
37+
*
38+
* The client typically responds by calling [io.libp2p.pubsub.gossip.Gossip.publishPartial]
39+
* for the same (topic, groupId).
40+
*
41+
* [peerStates] reflects the current state for this group across all peers.
42+
* The map is a live view — do not retain a reference outside this call.
43+
*/
44+
fun onEmitGossip(
45+
topic: Topic,
46+
groupId: ByteArray,
47+
gossipPeers: Collection<PeerId>,
48+
peerStates: Map<PeerId, PeerState>,
49+
feedback: PartialMessagesPeerFeedback
50+
)
51+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.libp2p.pubsub.gossip.partialmessages
2+
3+
import io.libp2p.core.PeerId
4+
import io.libp2p.pubsub.Topic
5+
6+
enum class FeedbackKind { USEFUL, INVALID, IGNORED }
7+
8+
interface PartialMessagesPeerFeedback {
9+
fun reportFeedback(topic: Topic, peer: PeerId, kind: FeedbackKind)
10+
}
11+
12+
internal object NopPartialMessagesFeedback : PartialMessagesPeerFeedback {
13+
override fun reportFeedback(topic: Topic, peer: PeerId, kind: FeedbackKind) {}
14+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.libp2p.pubsub.gossip.partialmessages
2+
3+
import io.libp2p.core.PeerId
4+
5+
/**
6+
* Encodes what the library should send to one peer for a single
7+
* [io.libp2p.pubsub.gossip.Gossip.publishPartial] call.
8+
*
9+
* [nextPeerState] is applied atomically by the library per peer after the
10+
* send; null means "leave the existing state unchanged".
11+
*/
12+
data class PublishAction<PeerState>(
13+
val partialMessage: ByteArray? = null,
14+
val partsMetadata: ByteArray? = null,
15+
val nextPeerState: PeerState? = null,
16+
val error: Throwable? = null
17+
)
18+
19+
/**
20+
* Decision function supplied by the client to [io.libp2p.pubsub.gossip.Gossip.publishPartial].
21+
*
22+
* [decide] is called on the pubsub event thread with the current peer state map
23+
* and a predicate for checking whether a peer requested partial for the topic.
24+
* It must return a sequence of (peerId, action) pairs — one per peer that
25+
* should receive an outbound [pubsub.pb.Rpc.PartialMessagesExtension] RPC.
26+
*/
27+
fun interface PublishActionsFn<PeerState> {
28+
fun decide(
29+
peerStates: Map<PeerId, PeerState>,
30+
peerRequestsPartial: (PeerId) -> Boolean
31+
): Sequence<Pair<PeerId, PublishAction<PeerState>>>
32+
}

0 commit comments

Comments
 (0)