Skip to content

Commit 1fb4d1e

Browse files
fcecinNagyZoltanPeterIvansete-status
authored
feat: implement Waku API Health spec (#3689)
* Fix protocol strength metric to consider connected peers only * Remove polling loop; event-driven node connection health updates * Remove 10s WakuRelay topic health polling loop; now event-driven * Change NodeHealthStatus to ConnectionStatus * Change new nodeState (rest API /health) field to connectionStatus * Add getSyncProtocolHealthInfo and getSyncNodeHealthReport * Add ConnectionStatusChangeEvent * Add RequestHealthReport * Refactor sync/async protocol health queries in the health monitor * Add EventRelayTopicHealthChange * Add EventWakuPeer emitted by PeerManager * Add Edge support for topics health requests and events * Rename "RelayTopic" -> "Topic" * Add RequestContentTopicsHealth sync request * Add EventContentTopicHealthChange * Rename RequestTopicsHealth -> RequestShardTopicsHealth * Remove health check gating from checkApiAvailability * Add basic health smoke tests * Other misc improvements, refactors, fixes Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
1 parent dd8dc74 commit 1fb4d1e

32 files changed

+1719
-386
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{.push raises: [].}
2+
3+
import system, std/json
4+
import ./json_base_event
5+
import ../../waku/api/types
6+
7+
type JsonConnectionStatusChangeEvent* = ref object of JsonEvent
8+
status*: ConnectionStatus
9+
10+
proc new*(
11+
T: type JsonConnectionStatusChangeEvent, status: ConnectionStatus
12+
): T =
13+
return JsonConnectionStatusChangeEvent(
14+
eventType: "node_health_change",
15+
status: status
16+
)
17+
18+
method `$`*(event: JsonConnectionStatusChangeEvent): string =
19+
$(%*event)

library/libwaku.nim

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import
77
./events/json_message_event,
88
./events/json_topic_health_change_event,
99
./events/json_connection_change_event,
10+
./events/json_connection_status_change_event,
1011
../waku/factory/app_callbacks,
1112
waku/factory/waku,
1213
waku/node/waku_node,
14+
waku/node/health_monitor/health_status,
1315
./declare_lib
1416

1517
################################################################################
@@ -61,10 +63,16 @@ proc waku_new(
6163
callEventCallback(ctx, "onConnectionChange"):
6264
$JsonConnectionChangeEvent.new($peerId, peerEvent)
6365

66+
proc onConnectionStatusChange(ctx: ptr FFIContext): ConnectionStatusChangeHandler =
67+
return proc(status: ConnectionStatus) {.async.} =
68+
callEventCallback(ctx, "onConnectionStatusChange"):
69+
$JsonConnectionStatusChangeEvent.new(status)
70+
6471
let appCallbacks = AppCallbacks(
6572
relayHandler: onReceivedMessage(ctx),
6673
topicHealthChangeHandler: onTopicHealthChange(ctx),
6774
connectionChangeHandler: onConnectionChange(ctx),
75+
connectionStatusChangeHandler: onConnectionStatusChange(ctx)
6876
)
6977

7078
ffi.sendRequestToFFIThread(

tests/api/test_all.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{.used.}
22

3-
import ./test_entry_nodes, ./test_node_conf
3+
import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health

tests/api/test_api_health.nim

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
{.used.}
2+
3+
import std/[options, sequtils, times]
4+
import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo]
5+
import ../testlib/[common, wakucore, wakunode, testasync]
6+
7+
import
8+
waku,
9+
waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context],
10+
waku/node/health_monitor/[topic_health, health_status, protocol_health, health_report],
11+
waku/requests/health_requests,
12+
waku/requests/node_requests,
13+
waku/events/health_events,
14+
waku/common/waku_protocol,
15+
waku/factory/waku_conf
16+
17+
const TestTimeout = chronos.seconds(10)
18+
const DefaultShard = PubsubTopic("/waku/2/rs/1/0")
19+
const TestContentTopic = ContentTopic("/waku/2/default-content/proto")
20+
21+
proc dummyHandler(
22+
topic: PubsubTopic, msg: WakuMessage
23+
): Future[void] {.async, gcsafe.} =
24+
discard
25+
26+
proc waitForConnectionStatus(
27+
brokerCtx: BrokerContext, expected: ConnectionStatus
28+
) {.async.} =
29+
var future = newFuture[void]("waitForConnectionStatus")
30+
31+
let handler: EventConnectionStatusChangeListenerProc = proc(
32+
e: EventConnectionStatusChange
33+
) {.async: (raises: []), gcsafe.} =
34+
if not future.finished:
35+
if e.connectionStatus == expected:
36+
future.complete()
37+
38+
let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr:
39+
raiseAssert error
40+
41+
try:
42+
if not await future.withTimeout(TestTimeout):
43+
raiseAssert "Timeout waiting for status: " & $expected
44+
finally:
45+
EventConnectionStatusChange.dropListener(brokerCtx, handle)
46+
47+
proc waitForShardHealthy(
48+
brokerCtx: BrokerContext
49+
): Future[EventShardTopicHealthChange] {.async.} =
50+
var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy")
51+
52+
let handler: EventShardTopicHealthChangeListenerProc = proc(
53+
e: EventShardTopicHealthChange
54+
) {.async: (raises: []), gcsafe.} =
55+
if not future.finished:
56+
if e.health == TopicHealth.MINIMALLY_HEALTHY or
57+
e.health == TopicHealth.SUFFICIENTLY_HEALTHY:
58+
future.complete(e)
59+
60+
let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr:
61+
raiseAssert error
62+
63+
try:
64+
if await future.withTimeout(TestTimeout):
65+
return future.read()
66+
else:
67+
raiseAssert "Timeout waiting for shard health event"
68+
finally:
69+
EventShardTopicHealthChange.dropListener(brokerCtx, handle)
70+
71+
suite "LM API health checking":
72+
var
73+
serviceNode {.threadvar.}: WakuNode
74+
client {.threadvar.}: Waku
75+
servicePeerInfo {.threadvar.}: RemotePeerInfo
76+
77+
asyncSetup:
78+
lockNewGlobalBrokerContext:
79+
serviceNode =
80+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
81+
(await serviceNode.mountRelay()).isOkOr:
82+
raiseAssert error
83+
serviceNode.mountMetadata(1, @[0'u16]).isOkOr:
84+
raiseAssert error
85+
await serviceNode.mountLibp2pPing()
86+
await serviceNode.start()
87+
88+
servicePeerInfo = serviceNode.peerInfo.toRemotePeerInfo()
89+
serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler)
90+
91+
lockNewGlobalBrokerContext:
92+
let conf = NodeConfig.init(
93+
mode = WakuMode.Core,
94+
networkingConfig =
95+
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0),
96+
protocolsConfig = ProtocolsConfig.init(
97+
entryNodes = @[],
98+
clusterId = 1'u16,
99+
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
100+
),
101+
)
102+
103+
client = (await createNode(conf)).valueOr:
104+
raiseAssert error
105+
(await startWaku(addr client)).isOkOr:
106+
raiseAssert error
107+
108+
asyncTeardown:
109+
discard await client.stop()
110+
await serviceNode.stop()
111+
112+
asyncTest "RequestShardTopicsHealth, check PubsubTopic health":
113+
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
114+
await client.node.connectToNodes(@[servicePeerInfo])
115+
116+
var isHealthy = false
117+
let start = Moment.now()
118+
while Moment.now() - start < TestTimeout:
119+
let req = RequestShardTopicsHealth.request(client.brokerCtx, @[DefaultShard]).valueOr:
120+
raiseAssert "RequestShardTopicsHealth failed"
121+
122+
if req.topicHealth.len > 0:
123+
let h = req.topicHealth[0].health
124+
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
125+
isHealthy = true
126+
break
127+
await sleepAsync(chronos.milliseconds(100))
128+
129+
check isHealthy == true
130+
131+
asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic":
132+
const GhostShard = PubsubTopic("/waku/2/rs/1/666")
133+
client.node.wakuRelay.subscribe(GhostShard, dummyHandler)
134+
135+
let req = RequestShardTopicsHealth.request(client.brokerCtx, @[GhostShard]).valueOr:
136+
raiseAssert "Request failed"
137+
138+
check req.topicHealth.len > 0
139+
check req.topicHealth[0].health == TopicHealth.UNHEALTHY
140+
141+
asyncTest "RequestProtocolHealth, check relay status":
142+
await client.node.connectToNodes(@[servicePeerInfo])
143+
144+
var isReady = false
145+
let start = Moment.now()
146+
while Moment.now() - start < TestTimeout:
147+
let relayReq = await RequestProtocolHealth.request(
148+
client.brokerCtx, WakuProtocol.RelayProtocol
149+
)
150+
if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY:
151+
isReady = true
152+
break
153+
await sleepAsync(chronos.milliseconds(100))
154+
155+
check isReady == true
156+
157+
let storeReq =
158+
await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol)
159+
if storeReq.isOk():
160+
check storeReq.get().healthStatus.health != HealthStatus.READY
161+
162+
asyncTest "RequestProtocolHealth, check unmounted protocol":
163+
let req =
164+
await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol)
165+
check req.isOk()
166+
167+
let status = req.get().healthStatus
168+
check status.health == HealthStatus.NOT_MOUNTED
169+
check status.desc.isNone()
170+
171+
asyncTest "RequestConnectionStatus, check connectivity state":
172+
let initialReq = RequestConnectionStatus.request(client.brokerCtx).valueOr:
173+
raiseAssert "RequestConnectionStatus failed"
174+
check initialReq.connectionStatus == ConnectionStatus.Disconnected
175+
176+
await client.node.connectToNodes(@[servicePeerInfo])
177+
178+
var isConnected = false
179+
let start = Moment.now()
180+
while Moment.now() - start < TestTimeout:
181+
let req = RequestConnectionStatus.request(client.brokerCtx).valueOr:
182+
raiseAssert "RequestConnectionStatus failed"
183+
184+
if req.connectionStatus == ConnectionStatus.PartiallyConnected or
185+
req.connectionStatus == ConnectionStatus.Connected:
186+
isConnected = true
187+
break
188+
await sleepAsync(chronos.milliseconds(100))
189+
190+
check isConnected == true
191+
192+
asyncTest "EventConnectionStatusChange, detect connect and disconnect":
193+
let connectFuture =
194+
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected)
195+
196+
await client.node.connectToNodes(@[servicePeerInfo])
197+
await connectFuture
198+
199+
let disconnectFuture =
200+
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.Disconnected)
201+
await client.node.disconnectNode(servicePeerInfo)
202+
await disconnectFuture
203+
204+
asyncTest "EventShardTopicHealthChange, detect health improvement":
205+
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
206+
207+
let healthEventFuture = waitForShardHealthy(client.brokerCtx)
208+
209+
await client.node.connectToNodes(@[servicePeerInfo])
210+
211+
let event = await healthEventFuture
212+
check event.topic == DefaultShard
213+
214+
asyncTest "RequestHealthReport, check aggregate report":
215+
let req = await RequestHealthReport.request(client.brokerCtx)
216+
217+
check req.isOk()
218+
219+
let report = req.get().healthReport
220+
check report.nodeHealth == HealthStatus.READY
221+
check report.protocolsHealth.len > 0
222+
check report.protocolsHealth.anyIt(it.protocol == $WakuProtocol.RelayProtocol)
223+
224+
asyncTest "RequestContentTopicsHealth, smoke test":
225+
let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto")
226+
227+
let req = RequestContentTopicsHealth.request(client.brokerCtx, @[fictionalTopic])
228+
229+
check req.isOk()
230+
231+
let res = req.get()
232+
check res.contentTopicHealth.len == 1
233+
check res.contentTopicHealth[0].topic == fictionalTopic
234+
check res.contentTopicHealth[0].health == TopicHealth.NOT_SUBSCRIBED
235+
236+
asyncTest "RequestContentTopicsHealth, core mode trivial 1-shard autosharding":
237+
let cTopic = ContentTopic("/waku/2/my-content-topic/proto")
238+
239+
let shardReq =
240+
RequestRelayShard.request(client.brokerCtx, none(PubsubTopic), cTopic)
241+
242+
check shardReq.isOk()
243+
let targetShard = $shardReq.get().relayShard
244+
245+
client.node.wakuRelay.subscribe(targetShard, dummyHandler)
246+
serviceNode.wakuRelay.subscribe(targetShard, dummyHandler)
247+
248+
await client.node.connectToNodes(@[servicePeerInfo])
249+
250+
var isHealthy = false
251+
let start = Moment.now()
252+
while Moment.now() - start < TestTimeout:
253+
let req = RequestContentTopicsHealth.request(client.brokerCtx, @[cTopic]).valueOr:
254+
raiseAssert "Request failed"
255+
256+
if req.contentTopicHealth.len > 0:
257+
let h = req.contentTopicHealth[0].health
258+
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
259+
isHealthy = true
260+
break
261+
262+
await sleepAsync(chronos.milliseconds(100))
263+
264+
check isHealthy == true
265+
266+
asyncTest "RequestProtocolHealth, edge mode smoke test":
267+
var edgeWaku: Waku
268+
269+
lockNewGlobalBrokerContext:
270+
let edgeConf = NodeConfig.init(
271+
mode = WakuMode.Edge,
272+
networkingConfig =
273+
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0),
274+
protocolsConfig = ProtocolsConfig.init(
275+
entryNodes = @[],
276+
clusterId = 1'u16,
277+
messageValidation =
278+
MessageValidation(maxMessageSize: "150 KiB", rlnConfig: none(RlnConfig)),
279+
),
280+
)
281+
282+
edgeWaku = (await createNode(edgeConf)).valueOr:
283+
raiseAssert "Failed to create edge node: " & error
284+
285+
(await startWaku(addr edgeWaku)).isOkOr:
286+
raiseAssert "Failed to start edge waku: " & error
287+
288+
let relayReq = await RequestProtocolHealth.request(
289+
edgeWaku.brokerCtx, WakuProtocol.RelayProtocol
290+
)
291+
check relayReq.isOk()
292+
check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED
293+
294+
check not edgeWaku.node.wakuFilterClient.isNil()
295+
296+
discard await edgeWaku.stop()

tests/api/test_api_send.nim

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,17 @@ proc validate(
117117
check requestId == expectedRequestId
118118

119119
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
120+
# allocate random ports to avoid port-already-in-use errors
121+
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
122+
120123
result = NodeConfig.init(
121124
mode = mode,
122125
protocolsConfig = ProtocolsConfig.init(
123126
entryNodes = @[],
124127
clusterId = 1,
125128
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
126129
),
130+
networkingConfig = netConf,
127131
p2pReliability = true,
128132
)
129133

@@ -246,8 +250,9 @@ suite "Waku API - Send":
246250

247251
let sendResult = await node.send(envelope)
248252

249-
check sendResult.isErr() # Depending on implementation, it might say "not healthy"
250-
check sendResult.error().contains("not healthy")
253+
# TODO: The API is not enforcing a health check before the send,
254+
# so currently this test cannot successfully fail to send.
255+
check sendResult.isOk()
251256

252257
(await node.stop()).isOkOr:
253258
raiseAssert "Failed to stop node: " & error

tests/node/test_all.nim

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ import
77
./test_wakunode_peer_exchange,
88
./test_wakunode_store,
99
./test_wakunode_legacy_store,
10-
./test_wakunode_peer_manager
10+
./test_wakunode_peer_manager,
11+
./test_wakunode_health_monitor

0 commit comments

Comments
 (0)