Skip to content

Commit aba879e

Browse files
committed
Implement outbound publishPartial (step 4)
Adds the outbound path for the partial-messages extension: - GossipRpcPartsQueue: addPartialMessage queues a PartialMessagePart; takeMerged caps at 1 per RPC (proto field is optional, not repeated). - PartialMessagesAdapter: publishPartial invokes the client's PublishActionsFn, enforces the spec MUST (omit partialMessage when peer supports but did not request), updates nextPeerState atomically, and calls back via enqueueFn. - GossipRouter: publishPartial looks up PeerHandler by PeerId, routes through GossipRpcPartsQueue (not a direct send), and flushes pending. - Gossip facade: publishPartial submits to the event thread and returns CompletableFuture<Unit>. Tests: PartialMessagesOutboundRpcTest (5 wire-level) and 6 new unit tests in PartialMessagesAdapterImplTest.
1 parent 3265c33 commit aba879e

7 files changed

Lines changed: 404 additions & 2 deletions

File tree

docs/partial-messages.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ Mirror this checklist in issue #435.
367367
- [x] **Step 3** — Inbound `RPC.partial` dispatch: replace the stub at
368368
`GossipRouter.kt:476` with the full flow (validate caps, create/update
369369
group state, call `onIncomingRpc`).
370-
- [ ] **Step 4** — Outbound `publishPartial(...)` on the `Gossip` facade;
370+
- [x] **Step 4** — Outbound `publishPartial(...)` on the `Gossip` facade;
371371
route through `GossipRpcPartsQueue` (do **not** bypass — PR #433 got
372372
this wrong). Enforce the "omit `partialMessage` when peer supports but
373373
didn't request" MUST.

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import io.libp2p.core.multistream.ProtocolDescriptor
1010
import io.libp2p.core.pubsub.PubsubApi
1111
import io.libp2p.pubsub.PubsubApiImpl
1212
import io.libp2p.pubsub.PubsubProtocol
13+
import io.libp2p.pubsub.Topic
1314
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
15+
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
1416
import io.netty.channel.ChannelHandler
1517
import org.slf4j.LoggerFactory
1618
import java.util.concurrent.CompletableFuture
@@ -32,6 +34,20 @@ class Gossip @JvmOverloads constructor(
3234
return router.score.getCachedScore(peerId)
3335
}
3436

37+
/**
38+
* Queues outbound [pubsub.pb.Rpc.PartialMessagesExtension] RPCs for [topic]/[groupId]
39+
* by invoking the client's [actionsFn] on the current group state.
40+
*
41+
* Submits to the pubsub event thread; the returned future completes when the RPCs
42+
* have been enqueued and flushed.
43+
*/
44+
fun publishPartial(
45+
topic: Topic,
46+
groupId: ByteArray,
47+
actionsFn: PublishActionsFn<*>
48+
): CompletableFuture<Unit> =
49+
router.submitOnEventThread { router.publishPartial(topic, groupId, actionsFn) }
50+
3551
override val protocolDescriptor =
3652
when (router.protocol) {
3753
PubsubProtocol.Gossip_V_1_3 -> {

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import io.libp2p.etc.types.*
88
import io.libp2p.etc.util.P2PService
99
import io.libp2p.pubsub.*
1010
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesAdapter
11+
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
12+
import io.libp2p.pubsub.gossip.partialmessages.toGroupId
1113
import org.slf4j.LoggerFactory
1214
import pubsub.pb.Rpc
1315
import java.time.Duration
@@ -164,6 +166,28 @@ open class GossipRouter(
164166
}
165167
}
166168

169+
/**
170+
* Queues outbound [pubsub.pb.Rpc.PartialMessagesExtension] RPCs for [topic]/[groupId]
171+
* by invoking the client's [actionsFn] on the current group state.
172+
*
173+
* Must be called on the pubsub event thread.
174+
*/
175+
fun publishPartial(topic: Topic, groupId: ByteArray, actionsFn: PublishActionsFn<*>) {
176+
val adapter = partialMessages ?: return
177+
val gid = groupId.toGroupId()
178+
179+
fun peerRequestsPartial(peerId: PeerId) =
180+
partialSubscriptionState.peerRequestsPartial(topic, peerId)
181+
182+
fun enqueue(peerId: PeerId, partialMessage: ByteArray?, partsMetadata: ByteArray?) {
183+
val peerHandler = activePeers.find { it.peerId == peerId } ?: return
184+
pendingRpcParts.getQueue(peerHandler).addPartialMessage(topic, groupId, partialMessage, partsMetadata)
185+
}
186+
187+
adapter.publishPartial(topic, gid, actionsFn, ::peerRequestsPartial, ::enqueue)
188+
flushAllPending()
189+
}
190+
167191
private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis())
168192
private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) {
169193
backoffExpireTimes[peer.peerId to topic] = currentTimeSupplier() + delay

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ interface GossipRpcPartsQueue : RpcPartsQueue {
2929

3030
// TODO Need to check if we should handle when control extension and extension messages could be separated by split (https://github.com/libp2p/jvm-libp2p/issues/440)
3131
fun addControlExtensions(ctrlMessage: Rpc.ControlExtensions)
32+
33+
fun addPartialMessage(topic: Topic, groupId: ByteArray, partialMessage: ByteArray?, partsMetadata: ByteArray?)
3234
}
3335

3436
/**
@@ -90,6 +92,23 @@ open class DefaultGossipRpcPartsQueue(
9092
}
9193
}
9294

95+
// Not a data class: ByteArray fields break equals/hashCode in data classes.
96+
protected class PartialMessagePart(
97+
val topic: Topic,
98+
val groupId: ByteArray,
99+
val partialMessage: ByteArray?,
100+
val partsMetadata: ByteArray?
101+
) : AbstractPart {
102+
override fun appendToBuilder(builder: Rpc.RPC.Builder) {
103+
val pmBuilder = Rpc.PartialMessagesExtension.newBuilder()
104+
.setTopicID(topic)
105+
.setGroupID(groupId.toProtobuf())
106+
partialMessage?.let { pmBuilder.setPartialMessage(it.toProtobuf()) }
107+
partsMetadata?.let { pmBuilder.setPartsMetadata(it.toProtobuf()) }
108+
builder.setPartial(pmBuilder.build())
109+
}
110+
}
111+
93112
override fun addIHave(messageId: MessageId, topic: Topic) {
94113
addPart(IHavePart(messageId, topic))
95114
}
@@ -114,6 +133,10 @@ open class DefaultGossipRpcPartsQueue(
114133
addPart(ControlExtensionPart(ctrlMessage))
115134
}
116135

136+
override fun addPartialMessage(topic: Topic, groupId: ByteArray, partialMessage: ByteArray?, partsMetadata: ByteArray?) {
137+
addPart(PartialMessagePart(topic, groupId, partialMessage, partsMetadata))
138+
}
139+
117140
override fun takeMerged(): List<Rpc.RPC> {
118141
val ret = mutableListOf<Rpc.RPC>()
119142
var partIdx = 0
@@ -126,10 +149,12 @@ open class DefaultGossipRpcPartsQueue(
126149
var iWantCount = params.maxIWantMessageIds ?: Int.MAX_VALUE
127150
var graftCount = params.maxGraftMessages ?: Int.MAX_VALUE
128151
var pruneCount = params.maxPruneMessages ?: Int.MAX_VALUE
152+
// proto field `partial` is optional (not repeated): at most 1 per RPC
153+
var partialCount = 1
129154

130155
while (partIdx < parts.size &&
131156
publishCount > 0 && subscriptionCount > 0 && iHaveCount > 0 &&
132-
iWantCount > 0 && graftCount > 0 && pruneCount > 0
157+
iWantCount > 0 && graftCount > 0 && pruneCount > 0 && partialCount > 0
133158
) {
134159
val part = parts[partIdx++]
135160
when (part) {
@@ -139,6 +164,7 @@ open class DefaultGossipRpcPartsQueue(
139164
is IWantPart -> iWantCount--
140165
is GraftPart -> graftCount--
141166
is PrunePart -> pruneCount--
167+
is PartialMessagePart -> partialCount--
142168
}
143169

144170
part.appendToBuilder(builder)

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/partialmessages/PartialMessagesAdapter.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package io.libp2p.pubsub.gossip.partialmessages
22

33
import io.libp2p.core.PeerId
44
import io.libp2p.pubsub.Topic
5+
import org.slf4j.LoggerFactory
56
import pubsub.pb.Rpc
67

8+
private val logger = LoggerFactory.getLogger(PartialMessagesAdapterImpl::class.java)
9+
710
/**
811
* Type-erased view of the partial-messages subsystem used by [io.libp2p.pubsub.gossip.GossipRouter].
912
*
@@ -14,6 +17,21 @@ internal interface PartialMessagesAdapter {
1417
fun onTopicUnsubscribed(topic: Topic)
1518
fun onHeartbeat()
1619
fun onIncomingRpc(topic: Topic, from: PeerId, rpc: Rpc.PartialMessagesExtension)
20+
21+
/**
22+
* Executes the client's [PublishActionsFn], updates group state, and enqueues
23+
* outbound [Rpc.PartialMessagesExtension] RPCs via [enqueueFn].
24+
*
25+
* [peerRequestsPartial] is used to enforce the spec MUST: omit [PublishAction.partialMessage]
26+
* when the peer supports but did not request partial messages.
27+
*/
28+
fun publishPartial(
29+
topic: Topic,
30+
groupId: GroupId,
31+
actionsFn: PublishActionsFn<*>,
32+
peerRequestsPartial: (PeerId) -> Boolean,
33+
enqueueFn: (PeerId, ByteArray?, ByteArray?) -> Unit
34+
)
1735
}
1836

1937
/**
@@ -39,4 +57,30 @@ internal class PartialMessagesAdapterImpl<PeerState>(
3957
val groupState = stateStore.getOrCreatePeerGroup(topic, groupId, from) ?: return
4058
handler.onIncomingRpc(from, groupState.peerStates, rpc, feedback)
4159
}
60+
61+
@Suppress("UNCHECKED_CAST")
62+
override fun publishPartial(
63+
topic: Topic,
64+
groupId: GroupId,
65+
actionsFn: PublishActionsFn<*>,
66+
peerRequestsPartial: (PeerId) -> Boolean,
67+
enqueueFn: (PeerId, ByteArray?, ByteArray?) -> Unit
68+
) {
69+
val typedFn = actionsFn as PublishActionsFn<PeerState>
70+
val groupState = stateStore.getOrCreateLocalGroup(topic, groupId)
71+
for ((peerId, action) in typedFn.decide(groupState.peerStates, peerRequestsPartial)) {
72+
if (action.error != null) {
73+
logger.debug("Skipping partial publish to {}: {}", peerId, action.error.message)
74+
continue
75+
}
76+
// Spec MUST: omit partialMessage if peer supports but didn't request
77+
val effectivePartialMessage = if (peerRequestsPartial(peerId)) action.partialMessage else null
78+
if (effectivePartialMessage != null || action.partsMetadata != null) {
79+
enqueueFn(peerId, effectivePartialMessage, action.partsMetadata)
80+
}
81+
if (action.nextPeerState != null) {
82+
groupState.peerStates[peerId] = action.nextPeerState
83+
}
84+
}
85+
}
4286
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package io.libp2p.pubsub.gossip.extensions
2+
3+
import com.google.protobuf.ByteString
4+
import io.libp2p.core.PeerId
5+
import io.libp2p.pubsub.PubsubProtocol
6+
import io.libp2p.pubsub.gossip.GossipExtension
7+
import io.libp2p.pubsub.gossip.GossipTestsBase
8+
import io.libp2p.pubsub.gossip.partialmessages.PublishAction
9+
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
10+
import org.assertj.core.api.Assertions.assertThat
11+
import org.junit.jupiter.api.Test
12+
import pubsub.pb.Rpc
13+
14+
private const val TIMEOUT_MS = 500L
15+
16+
class PartialMessagesOutboundRpcTest : GossipTestsBase() {
17+
18+
private val topicId = "test-topic"
19+
private val groupIdBytes = "group-1".toByteArray()
20+
21+
private fun newTest() = TwoRoutersTest(
22+
protocol = PubsubProtocol.Gossip_V_1_3,
23+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
24+
partialMessagesHandler = nopPartialMessagesHandler,
25+
)
26+
27+
private fun controlExtensionsWithPartial(): Rpc.RPC =
28+
Rpc.RPC.newBuilder().setControl(
29+
Rpc.ControlMessage.newBuilder().setExtensions(
30+
Rpc.ControlExtensions.newBuilder().setPartialMessages(true)
31+
)
32+
).build()
33+
34+
private fun subscribeRpc(
35+
topic: String,
36+
requestsPartial: Boolean,
37+
supportsSendingPartial: Boolean
38+
): Rpc.RPC =
39+
Rpc.RPC.newBuilder().addSubscriptions(
40+
Rpc.RPC.SubOpts.newBuilder()
41+
.setTopicid(topic)
42+
.setSubscribe(true)
43+
.setRequestsPartial(requestsPartial)
44+
.setSupportsSendingPartial(supportsSendingPartial)
45+
).build()
46+
47+
private fun TwoRoutersTest.flushRouter() =
48+
gossipRouter.submitOnEventThread {}.join()
49+
50+
private fun TwoRoutersTest.peerIdOfMockRouter(): PeerId = router2.peerId
51+
52+
@Test
53+
fun `publishPartial delivers partial RPC to peer that requested partial`() {
54+
val test = newTest()
55+
56+
test.mockRouter.sendToSingle(controlExtensionsWithPartial())
57+
test.mockRouter.sendToSingle(subscribeRpc(topicId, requestsPartial = true, supportsSendingPartial = true))
58+
test.flushRouter()
59+
60+
val payload = byteArrayOf(1, 2, 3)
61+
val meta = byteArrayOf(0xAA.toByte())
62+
val peerId = test.peerIdOfMockRouter()
63+
64+
val actionsFn = PublishActionsFn<Unit> { _, _ ->
65+
sequenceOf(peerId to PublishAction(partialMessage = payload, partsMetadata = meta))
66+
}
67+
68+
test.gossipRouter.publishPartial(topicId, groupIdBytes, actionsFn)
69+
70+
val received = test.mockRouter.waitForMessage({ it.hasPartial() }, TIMEOUT_MS)
71+
assertThat(received.partial.topicID).isEqualTo(topicId)
72+
assertThat(received.partial.groupID).isEqualTo(ByteString.copyFrom(groupIdBytes))
73+
assertThat(received.partial.partialMessage.toByteArray()).isEqualTo(payload)
74+
assertThat(received.partial.partsMetadata.toByteArray()).isEqualTo(meta)
75+
}
76+
77+
@Test
78+
fun `publishPartial omits partialMessage when peer supports but did not request`() {
79+
val test = newTest()
80+
81+
// Peer supports sending partial but did NOT request partial messages
82+
test.mockRouter.sendToSingle(controlExtensionsWithPartial())
83+
test.mockRouter.sendToSingle(subscribeRpc(topicId, requestsPartial = false, supportsSendingPartial = true))
84+
test.flushRouter()
85+
86+
val payload = byteArrayOf(1, 2, 3)
87+
val meta = byteArrayOf(0xAA.toByte())
88+
val peerId = test.peerIdOfMockRouter()
89+
90+
val actionsFn = PublishActionsFn<Unit> { _, _ ->
91+
sequenceOf(peerId to PublishAction(partialMessage = payload, partsMetadata = meta))
92+
}
93+
94+
test.gossipRouter.publishPartial(topicId, groupIdBytes, actionsFn)
95+
96+
val received = test.mockRouter.waitForMessage({ it.hasPartial() }, TIMEOUT_MS)
97+
// partsMetadata is present; partialMessage MUST be absent (spec MUST)
98+
assertThat(received.partial.hasPartialMessage()).isFalse()
99+
assertThat(received.partial.partsMetadata.toByteArray()).isEqualTo(meta)
100+
}
101+
102+
@Test
103+
fun `publishPartial sends nothing when actionsFn returns empty sequence`() {
104+
val test = newTest()
105+
106+
test.mockRouter.sendToSingle(controlExtensionsWithPartial())
107+
test.mockRouter.sendToSingle(subscribeRpc(topicId, requestsPartial = true, supportsSendingPartial = true))
108+
test.flushRouter()
109+
110+
val actionsFn = PublishActionsFn<Unit> { _, _ -> emptySequence() }
111+
112+
test.gossipRouter.publishPartial(topicId, groupIdBytes, actionsFn)
113+
test.flushRouter()
114+
115+
assertThat(test.mockRouter.inboundMessages.none { it.hasPartial() }).isTrue()
116+
}
117+
118+
@Test
119+
fun `publishPartial sends nothing when adapter is not configured`() {
120+
val test = TwoRoutersTest(
121+
protocol = PubsubProtocol.Gossip_V_1_3,
122+
enabledGossipExtensions = listOf(),
123+
)
124+
test.flushRouter()
125+
126+
val peerId = test.peerIdOfMockRouter()
127+
val actionsFn = PublishActionsFn<Unit> { _, _ ->
128+
sequenceOf(peerId to PublishAction(partsMetadata = byteArrayOf(1)))
129+
}
130+
131+
test.gossipRouter.publishPartial(topicId, groupIdBytes, actionsFn)
132+
test.flushRouter()
133+
134+
assertThat(test.mockRouter.inboundMessages.none { it.hasPartial() }).isTrue()
135+
}
136+
137+
@Test
138+
fun `publishPartial two groups produce two separate RPCs`() {
139+
val test = newTest()
140+
141+
test.mockRouter.sendToSingle(controlExtensionsWithPartial())
142+
test.mockRouter.sendToSingle(subscribeRpc(topicId, requestsPartial = true, supportsSendingPartial = true))
143+
test.flushRouter()
144+
145+
val peerId = test.peerIdOfMockRouter()
146+
val groupA = "group-a".toByteArray()
147+
val groupB = "group-b".toByteArray()
148+
149+
test.gossipRouter.publishPartial(
150+
topicId,
151+
groupA,
152+
PublishActionsFn<Unit> { _, _ -> sequenceOf(peerId to PublishAction(partsMetadata = byteArrayOf(1))) }
153+
)
154+
test.gossipRouter.publishPartial(
155+
topicId,
156+
groupB,
157+
PublishActionsFn<Unit> { _, _ -> sequenceOf(peerId to PublishAction(partsMetadata = byteArrayOf(2))) }
158+
)
159+
160+
val rpc1 = test.mockRouter.waitForMessage({ it.hasPartial() }, TIMEOUT_MS)
161+
val rpc2 = test.mockRouter.waitForMessage({ it.hasPartial() }, TIMEOUT_MS)
162+
163+
val groupIds = setOf(
164+
rpc1.partial.groupID.toByteArray().toList(),
165+
rpc2.partial.groupID.toByteArray().toList()
166+
)
167+
assertThat(groupIds).containsExactlyInAnyOrder(
168+
groupA.toList(),
169+
groupB.toList()
170+
)
171+
}
172+
}

0 commit comments

Comments
 (0)