Skip to content

Commit ae74b90

Browse files
chore: Introduce EventBroker, RequestBroker and MultiRequestBroker (#3644)
* Introduce EventBroker and RequestBroker as decoupling helpers that represent reactive (event-driven) and proactive (request/response) patterns without tight coupling between modules * Address copilot observation. error log if failed listener call exception, handling listener overuse - run out of IDs * Address review observations: no exception to leak, listeners must raise no exception, adding listener now reports error with Result. * Added MultiRequestBroker utility to collect results from many providers * Support an arbitrary number of arguments for RequestBroker's request/provider signature * MultiRequestBroker allows provider procs to throw exceptions, which will be handled during request processing. * MultiRequestBroker supports one zero arg signature and/or multi arg signature * test no exception leaks from RequestBroker and MultiRequestBroker * Embed MultiRequestBroker tests into common * EventBroker: removed all ...Broker typed public procs to simplify EventBroker interface, forger is renamed to dropListener * Make Request's broker type private * MultiRequestBroker: Use explicit returns in generated procs * Updated descriptions of EventBroker and RequestBroker, updated RequestBroker.setProvider, returns error if already set. * Better description for MultiRequestBroker and its usage * Add EventBroker support for ref objects, fix emit variant with event object ctor * Add RequestBroker support for ref objects * Add MultiRequestBroker support for ref objects * Mover brokers under waku/common
1 parent 7eb1fdb commit ae74b90

File tree

8 files changed

+1933
-1
lines changed

8 files changed

+1933
-1
lines changed

tests/common/test_all.nim

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ import
99
./test_tokenbucket,
1010
./test_requestratelimiter,
1111
./test_ratelimit_setting,
12-
./test_timed_map
12+
./test_timed_map,
13+
./test_event_broker,
14+
./test_request_broker,
15+
./test_multi_request_broker

tests/common/test_event_broker.nim

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import chronos
2+
import std/sequtils
3+
import testutils/unittests
4+
5+
import waku/common/broker/event_broker
6+
7+
EventBroker:
8+
type SampleEvent = object
9+
value*: int
10+
label*: string
11+
12+
EventBroker:
13+
type BinaryEvent = object
14+
flag*: bool
15+
16+
EventBroker:
17+
type RefEvent = ref object
18+
payload*: seq[int]
19+
20+
template waitForListeners() =
21+
waitFor sleepAsync(1.milliseconds)
22+
23+
suite "EventBroker":
24+
test "delivers events to all listeners":
25+
var seen: seq[(int, string)] = @[]
26+
27+
discard SampleEvent.listen(
28+
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
29+
seen.add((evt.value, evt.label))
30+
)
31+
32+
discard SampleEvent.listen(
33+
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
34+
seen.add((evt.value * 2, evt.label & "!"))
35+
)
36+
37+
let evt = SampleEvent(value: 5, label: "hi")
38+
SampleEvent.emit(evt)
39+
waitForListeners()
40+
41+
check seen.len == 2
42+
check seen.anyIt(it == (5, "hi"))
43+
check seen.anyIt(it == (10, "hi!"))
44+
45+
SampleEvent.dropAllListeners()
46+
47+
test "forget removes a single listener":
48+
var counter = 0
49+
50+
let handleA = SampleEvent.listen(
51+
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
52+
inc counter
53+
)
54+
55+
let handleB = SampleEvent.listen(
56+
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
57+
inc(counter, 2)
58+
)
59+
60+
SampleEvent.dropListener(handleA.get())
61+
let eventVal = SampleEvent(value: 1, label: "one")
62+
SampleEvent.emit(eventVal)
63+
waitForListeners()
64+
check counter == 2
65+
66+
SampleEvent.dropAllListeners()
67+
68+
test "forgetAll clears every listener":
69+
var triggered = false
70+
71+
let handle1 = SampleEvent.listen(
72+
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
73+
triggered = true
74+
)
75+
let handle2 = SampleEvent.listen(
76+
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
77+
discard
78+
)
79+
80+
SampleEvent.dropAllListeners()
81+
SampleEvent.emit(42, "noop")
82+
SampleEvent.emit(label = "noop", value = 42)
83+
waitForListeners()
84+
check not triggered
85+
86+
let freshHandle = SampleEvent.listen(
87+
proc(evt: SampleEvent): Future[void] {.async: (raises: []).} =
88+
discard
89+
)
90+
check freshHandle.get().id > 0'u64
91+
SampleEvent.dropListener(freshHandle.get())
92+
93+
test "broker helpers operate via typedesc":
94+
var toggles: seq[bool] = @[]
95+
96+
let handle = BinaryEvent.listen(
97+
proc(evt: BinaryEvent): Future[void] {.async: (raises: []).} =
98+
toggles.add(evt.flag)
99+
)
100+
101+
BinaryEvent(flag: true).emit()
102+
waitForListeners()
103+
let binaryEvent = BinaryEvent(flag: false)
104+
BinaryEvent.emit(binaryEvent)
105+
waitForListeners()
106+
107+
check toggles == @[true, false]
108+
BinaryEvent.dropAllListeners()
109+
110+
test "ref typed event":
111+
var counter: int = 0
112+
113+
let handle = RefEvent.listen(
114+
proc(evt: RefEvent): Future[void] {.async: (raises: []).} =
115+
for n in evt.payload:
116+
counter += n
117+
)
118+
119+
RefEvent(payload: @[1, 2, 3]).emit()
120+
waitForListeners()
121+
RefEvent.emit(payload = @[4, 5, 6])
122+
waitForListeners()
123+
124+
check counter == 21 # 1+2+3 + 4+5+6
125+
RefEvent.dropAllListeners()
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
{.used.}
2+
3+
import testutils/unittests
4+
import chronos
5+
import std/sequtils
6+
import std/strutils
7+
8+
import waku/common/broker/multi_request_broker
9+
10+
MultiRequestBroker:
11+
type NoArgResponse = object
12+
label*: string
13+
14+
proc signatureFetch*(): Future[Result[NoArgResponse, string]] {.async.}
15+
16+
MultiRequestBroker:
17+
type ArgResponse = object
18+
id*: string
19+
20+
proc signatureFetch*(
21+
suffix: string, numsuffix: int
22+
): Future[Result[ArgResponse, string]] {.async.}
23+
24+
MultiRequestBroker:
25+
type DualResponse = ref object
26+
note*: string
27+
suffix*: string
28+
29+
proc signatureBase*(): Future[Result[DualResponse, string]] {.async.}
30+
proc signatureWithInput*(
31+
suffix: string
32+
): Future[Result[DualResponse, string]] {.async.}
33+
34+
suite "MultiRequestBroker":
35+
test "aggregates zero-argument providers":
36+
discard NoArgResponse.setProvider(
37+
proc(): Future[Result[NoArgResponse, string]] {.async.} =
38+
ok(NoArgResponse(label: "one"))
39+
)
40+
41+
discard NoArgResponse.setProvider(
42+
proc(): Future[Result[NoArgResponse, string]] {.async.} =
43+
discard catch:
44+
await sleepAsync(1.milliseconds)
45+
ok(NoArgResponse(label: "two"))
46+
)
47+
48+
let responses = waitFor NoArgResponse.request()
49+
check responses.get().len == 2
50+
check responses.get().anyIt(it.label == "one")
51+
check responses.get().anyIt(it.label == "two")
52+
53+
NoArgResponse.clearProviders()
54+
55+
test "aggregates argument providers":
56+
discard ArgResponse.setProvider(
57+
proc(suffix: string, num: int): Future[Result[ArgResponse, string]] {.async.} =
58+
ok(ArgResponse(id: suffix & "-a-" & $num))
59+
)
60+
61+
discard ArgResponse.setProvider(
62+
proc(suffix: string, num: int): Future[Result[ArgResponse, string]] {.async.} =
63+
ok(ArgResponse(id: suffix & "-b-" & $num))
64+
)
65+
66+
let keyed = waitFor ArgResponse.request("topic", 1)
67+
check keyed.get().len == 2
68+
check keyed.get().anyIt(it.id == "topic-a-1")
69+
check keyed.get().anyIt(it.id == "topic-b-1")
70+
71+
ArgResponse.clearProviders()
72+
73+
test "clearProviders resets both provider lists":
74+
discard DualResponse.setProvider(
75+
proc(): Future[Result[DualResponse, string]] {.async.} =
76+
ok(DualResponse(note: "base", suffix: ""))
77+
)
78+
79+
discard DualResponse.setProvider(
80+
proc(suffix: string): Future[Result[DualResponse, string]] {.async.} =
81+
ok(DualResponse(note: "base" & suffix, suffix: suffix))
82+
)
83+
84+
let noArgs = waitFor DualResponse.request()
85+
check noArgs.get().len == 1
86+
87+
let param = waitFor DualResponse.request("-extra")
88+
check param.get().len == 1
89+
check param.get()[0].suffix == "-extra"
90+
91+
DualResponse.clearProviders()
92+
93+
let emptyNoArgs = waitFor DualResponse.request()
94+
check emptyNoArgs.get().len == 0
95+
96+
let emptyWithArgs = waitFor DualResponse.request("-extra")
97+
check emptyWithArgs.get().len == 0
98+
99+
test "request returns empty seq when no providers registered":
100+
let empty = waitFor NoArgResponse.request()
101+
check empty.get().len == 0
102+
103+
test "failed providers will fail the request":
104+
NoArgResponse.clearProviders()
105+
discard NoArgResponse.setProvider(
106+
proc(): Future[Result[NoArgResponse, string]] {.async.} =
107+
err("boom")
108+
)
109+
110+
discard NoArgResponse.setProvider(
111+
proc(): Future[Result[NoArgResponse, string]] {.async.} =
112+
ok(NoArgResponse(label: "survivor"))
113+
)
114+
115+
let filtered = waitFor NoArgResponse.request()
116+
check filtered.isErr()
117+
118+
NoArgResponse.clearProviders()
119+
120+
test "deduplicates identical zero-argument providers":
121+
NoArgResponse.clearProviders()
122+
var invocations = 0
123+
let sharedHandler = proc(): Future[Result[NoArgResponse, string]] {.async.} =
124+
inc invocations
125+
ok(NoArgResponse(label: "dup"))
126+
127+
let first = NoArgResponse.setProvider(sharedHandler)
128+
let second = NoArgResponse.setProvider(sharedHandler)
129+
130+
check first.get().id == second.get().id
131+
check first.get().kind == second.get().kind
132+
133+
let dupResponses = waitFor NoArgResponse.request()
134+
check dupResponses.get().len == 1
135+
check invocations == 1
136+
137+
NoArgResponse.clearProviders()
138+
139+
test "removeProvider deletes registered handlers":
140+
var removedCalled = false
141+
var keptCalled = false
142+
143+
let removable = NoArgResponse.setProvider(
144+
proc(): Future[Result[NoArgResponse, string]] {.async.} =
145+
removedCalled = true
146+
ok(NoArgResponse(label: "removed"))
147+
)
148+
149+
discard NoArgResponse.setProvider(
150+
proc(): Future[Result[NoArgResponse, string]] {.async.} =
151+
keptCalled = true
152+
ok(NoArgResponse(label: "kept"))
153+
)
154+
155+
NoArgResponse.removeProvider(removable.get())
156+
157+
let afterRemoval = (waitFor NoArgResponse.request()).valueOr:
158+
assert false, "request failed"
159+
@[]
160+
check afterRemoval.len == 1
161+
check afterRemoval[0].label == "kept"
162+
check not removedCalled
163+
check keptCalled
164+
165+
NoArgResponse.clearProviders()
166+
167+
test "removeProvider works for argument signatures":
168+
var invoked: seq[string] = @[]
169+
170+
discard ArgResponse.setProvider(
171+
proc(suffix: string, num: int): Future[Result[ArgResponse, string]] {.async.} =
172+
invoked.add("first" & suffix)
173+
ok(ArgResponse(id: suffix & "-one-" & $num))
174+
)
175+
176+
let handle = ArgResponse.setProvider(
177+
proc(suffix: string, num: int): Future[Result[ArgResponse, string]] {.async.} =
178+
invoked.add("second" & suffix)
179+
ok(ArgResponse(id: suffix & "-two-" & $num))
180+
)
181+
182+
ArgResponse.removeProvider(handle.get())
183+
184+
let single = (waitFor ArgResponse.request("topic", 1)).valueOr:
185+
assert false, "request failed"
186+
@[]
187+
check single.len == 1
188+
check single[0].id == "topic-one-1"
189+
check invoked == @["firsttopic"]
190+
191+
ArgResponse.clearProviders()
192+
193+
test "catches exception from providers and report error":
194+
let firstHandler = NoArgResponse.setProvider(
195+
proc(): Future[Result[NoArgResponse, string]] {.async.} =
196+
raise newException(ValueError, "first handler raised")
197+
ok(NoArgResponse(label: "any"))
198+
)
199+
200+
discard NoArgResponse.setProvider(
201+
proc(): Future[Result[NoArgResponse, string]] {.async.} =
202+
ok(NoArgResponse(label: "just ok"))
203+
)
204+
205+
let afterException = waitFor NoArgResponse.request()
206+
check afterException.isErr()
207+
check afterException.error().contains("first handler raised")
208+
209+
NoArgResponse.clearProviders()
210+
211+
test "ref providers returning nil fail request":
212+
DualResponse.clearProviders()
213+
214+
discard DualResponse.setProvider(
215+
proc(): Future[Result[DualResponse, string]] {.async.} =
216+
let nilResponse: DualResponse = nil
217+
ok(nilResponse)
218+
)
219+
220+
let zeroArg = waitFor DualResponse.request()
221+
check zeroArg.isErr()
222+
223+
DualResponse.clearProviders()
224+
225+
discard DualResponse.setProvider(
226+
proc(suffix: string): Future[Result[DualResponse, string]] {.async.} =
227+
let nilResponse: DualResponse = nil
228+
ok(nilResponse)
229+
)
230+
231+
let withInput = waitFor DualResponse.request("-extra")
232+
check withInput.isErr()
233+
234+
DualResponse.clearProviders()

0 commit comments

Comments
 (0)