Skip to content

Commit f26fc37

Browse files
committed
Wire heartbeat TTL GC and cleanup hooks for partial messages (step 8)
Three one-line additions to GossipRouter that call the already-implemented PartialMessagesAdapter hooks at the right lifecycle points: - heartbeat() → partialMessages?.onHeartbeat() Decrements TTL on all live groups each heartbeat and garbage-collects groups whose TTL has reached zero or whose peerStates map is empty. - onPeerDisconnected() → partialMessages?.onPeerDisconnected(peer.peerId) Removes the disconnected peer from every group's peerStates; GCs groups that become empty as a result. - unsubscribe() → partialMessages?.onTopicUnsubscribed(topic) Drops all group state for the topic when we leave it. TTL is already reset on every publishPartial call via getOrCreateLocalGroup.
1 parent 90d4d3a commit f26fc37

2 files changed

Lines changed: 206 additions & 0 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ open class GossipRouter(
215215
pendingRpcParts.popQueue(peer) // discard them
216216
gossipExtensionsState.onPeerDisconnected(peer.peerId)
217217
partialSubscriptionState.onPeerDisconnected(peer.peerId)
218+
partialMessages?.onPeerDisconnected(peer.peerId)
218219
super.onPeerDisconnected(peer)
219220
}
220221

@@ -698,6 +699,7 @@ open class GossipRouter(
698699
mesh -= topic
699700
localTopicPartialFlags -= topic
700701
partialSubscriptionState.removeTopic(topic)
702+
partialMessages?.onTopicUnsubscribed(topic)
701703
}
702704

703705
private fun catchingHeartbeat() {
@@ -800,6 +802,7 @@ open class GossipRouter(
800802
}
801803

802804
mCache.shift()
805+
partialMessages?.onHeartbeat()
803806

804807
flushAllPending()
805808
} catch (t: Exception) {
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package io.libp2p.pubsub.gossip.extensions
2+
3+
import com.google.protobuf.ByteString
4+
import io.libp2p.core.PeerId
5+
import io.libp2p.etc.types.seconds
6+
import io.libp2p.pubsub.PubsubProtocol
7+
import io.libp2p.pubsub.gossip.GossipExtension
8+
import io.libp2p.pubsub.gossip.GossipTestsBase
9+
import io.libp2p.pubsub.gossip.partialmessages.PartialGroupStateStore
10+
import io.libp2p.pubsub.gossip.partialmessages.PartialMessagesAdapterImpl
11+
import io.libp2p.pubsub.gossip.partialmessages.PublishActionsFn
12+
import io.libp2p.pubsub.gossip.partialmessages.toGroupId
13+
import org.assertj.core.api.Assertions.assertThat
14+
import org.junit.jupiter.api.Test
15+
import pubsub.pb.Rpc
16+
17+
/**
18+
* Tests for Step 8 — heartbeat tick + TTL GC + cleanup hooks (§6.4).
19+
*
20+
* Verifies that the three wiring points added in GossipRouter actually invoke
21+
* the partial-messages adapter at the right times:
22+
* - heartbeat → TTL decrement and GC of expired groups
23+
* - onPeerDisconnected → peer state removed from all groups
24+
* - unsubscribe → all group state for the topic dropped
25+
*/
26+
class PartialMessagesLifecycleTest : GossipTestsBase() {
27+
28+
private val topicId = "test-topic"
29+
private val groupId = "group-1".toByteArray()
30+
31+
private fun newTest() =
32+
TwoRoutersTest(
33+
protocol = PubsubProtocol.Gossip_V_1_3,
34+
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
35+
partialMessagesHandler = nopPartialMessagesHandler,
36+
)
37+
38+
@Suppress("UNCHECKED_CAST")
39+
private fun TwoRoutersTest.store(): PartialGroupStateStore<Any?> =
40+
(gossipRouter.partialMessages as PartialMessagesAdapterImpl<Any?>).stateStore
41+
42+
/**
43+
* Seeds a group with one peer-state entry so peerStates is non-empty and
44+
* the group survives heartbeats until its TTL expires (not GC'd immediately
45+
* by the peerStates-empty condition).
46+
*/
47+
private fun TwoRoutersTest.seedGroup(peer: PeerId) {
48+
gossipRouter.submitOnEventThread {
49+
val group = store().getOrCreateLocalGroup(topicId, groupId.toGroupId())
50+
group.peerStates[peer] = "sentinel"
51+
}.join()
52+
}
53+
54+
private fun controlExtensionsWithPartial(): Rpc.RPC =
55+
Rpc.RPC.newBuilder()
56+
.setControl(
57+
Rpc.ControlMessage.newBuilder()
58+
.setExtensions(Rpc.ControlExtensions.newBuilder().setPartialMessages(true))
59+
)
60+
.build()
61+
62+
private fun partialRpc(): Rpc.RPC =
63+
Rpc.RPC.newBuilder()
64+
.setPartial(
65+
Rpc.PartialMessagesExtension.newBuilder()
66+
.setTopicID(topicId)
67+
.setGroupID(ByteString.copyFrom(groupId))
68+
)
69+
.build()
70+
71+
// ── Heartbeat GC ──────────────────────────────────────────────────────────
72+
73+
@Test
74+
fun `heartbeat GCs peer-initiated group whose peerStates is empty`() {
75+
val test = newTest()
76+
77+
// Peer-initiated group via inbound RPC; nopHandler sets no peerStates → empty
78+
test.mockRouter.sendToSingle(controlExtensionsWithPartial())
79+
test.mockRouter.sendToSingle(partialRpc())
80+
test.gossipRouter.submitOnEventThread {}.join()
81+
82+
val store = test.store()
83+
assertThat(store.getGroup(topicId, groupId.toGroupId())).isNotNull()
84+
85+
// One heartbeat fires; peerStates.isEmpty() triggers immediate GC
86+
test.fuzz.timeController.addTime(2.seconds)
87+
88+
assertThat(store.getGroup(topicId, groupId.toGroupId())).isNull()
89+
}
90+
91+
@Test
92+
fun `heartbeat decrements TTL and GCs group after TTL expires`() {
93+
val test = newTest()
94+
val mockPeerId = test.router2.peerId
95+
test.seedGroup(mockPeerId)
96+
97+
val store = test.store()
98+
assertThat(store.getGroup(topicId, groupId.toGroupId())).isNotNull()
99+
100+
// DEFAULT_GROUP_TTL_HEARTBEATS = 5; advance 6 s to fire 6 heartbeats
101+
test.fuzz.timeController.addTime(6.seconds)
102+
103+
assertThat(store.getGroup(topicId, groupId.toGroupId())).isNull()
104+
}
105+
106+
@Test
107+
fun `publishPartial resets TTL so group survives past initial TTL`() {
108+
val test = newTest()
109+
val mockPeerId = test.router2.peerId
110+
test.seedGroup(mockPeerId)
111+
112+
val store = test.store()
113+
114+
// Advance 4 heartbeats: TTL goes 5→4→3→2→1; group still alive
115+
test.fuzz.timeController.addTime(4.seconds)
116+
assertThat(store.getGroup(topicId, groupId.toGroupId())).isNotNull()
117+
118+
// publishPartial with empty actions still calls getOrCreateLocalGroup → resets TTL
119+
test.gossipRouter.publishPartial(topicId, groupId, PublishActionsFn<Any?> { _, _ -> emptySequence() })
120+
121+
// Advance 4 more heartbeats: TTL goes 5→4→3→2→1; group still alive
122+
test.fuzz.timeController.addTime(4.seconds)
123+
assertThat(store.getGroup(topicId, groupId.toGroupId())).isNotNull()
124+
}
125+
126+
// ── Peer disconnect ───────────────────────────────────────────────────────
127+
128+
@Test
129+
fun `peer disconnect removes peer from group state and GCs now-empty group`() {
130+
val test = newTest()
131+
val mockPeerId = test.router2.peerId
132+
test.seedGroup(mockPeerId)
133+
134+
val store = test.store()
135+
assertThat(store.getGroup(topicId, groupId.toGroupId())?.peerStates).containsKey(mockPeerId)
136+
137+
test.connection.disconnect()
138+
test.gossipRouter.submitOnEventThread {}.join()
139+
140+
// peerStates is now empty → group GC'd immediately in onPeerDisconnected
141+
assertThat(store.getGroup(topicId, groupId.toGroupId())).isNull()
142+
}
143+
144+
@Test
145+
fun `peer disconnect leaves group alive when other peers still have state`() {
146+
val test = newTest()
147+
val mockPeerId = test.router2.peerId
148+
val otherPeerId = PeerId.random()
149+
150+
test.gossipRouter.submitOnEventThread {
151+
val group = test.store().getOrCreateLocalGroup(topicId, groupId.toGroupId())
152+
group.peerStates[mockPeerId] = "mock-state"
153+
group.peerStates[otherPeerId] = "other-state"
154+
}.join()
155+
156+
test.connection.disconnect()
157+
test.gossipRouter.submitOnEventThread {}.join()
158+
159+
// Group survives because otherPeerId still has state
160+
val group = test.store().getGroup(topicId, groupId.toGroupId())
161+
assertThat(group).isNotNull()
162+
assertThat(group?.peerStates).doesNotContainKey(mockPeerId)
163+
assertThat(group?.peerStates).containsKey(otherPeerId)
164+
}
165+
166+
// ── Unsubscribe ───────────────────────────────────────────────────────────
167+
168+
@Test
169+
fun `unsubscribing from topic drops all group state for that topic`() {
170+
val test = newTest()
171+
test.gossipRouter.subscribe(topicId)
172+
val mockPeerId = test.router2.peerId
173+
test.seedGroup(mockPeerId)
174+
175+
val store = test.store()
176+
assertThat(store.groupsForTopic(topicId)).isNotEmpty()
177+
178+
test.gossipRouter.unsubscribe(topicId)
179+
test.gossipRouter.submitOnEventThread {}.join()
180+
181+
assertThat(store.groupsForTopic(topicId)).isEmpty()
182+
}
183+
184+
@Test
185+
fun `unsubscribing from one topic does not affect groups on other topics`() {
186+
val otherTopic = "other-topic"
187+
val test = newTest()
188+
test.gossipRouter.subscribe(topicId)
189+
val mockPeerId = test.router2.peerId
190+
191+
test.gossipRouter.submitOnEventThread {
192+
val store = test.store()
193+
store.getOrCreateLocalGroup(topicId, groupId.toGroupId()).peerStates[mockPeerId] = "s1"
194+
store.getOrCreateLocalGroup(otherTopic, groupId.toGroupId()).peerStates[mockPeerId] = "s2"
195+
}.join()
196+
197+
test.gossipRouter.unsubscribe(topicId)
198+
test.gossipRouter.submitOnEventThread {}.join()
199+
200+
assertThat(test.store().groupsForTopic(topicId)).isEmpty()
201+
assertThat(test.store().groupsForTopic(otherTopic)).isNotEmpty()
202+
}
203+
}

0 commit comments

Comments
 (0)