Skip to content

Commit df99764

Browse files
committed
test: add test
1 parent 3351efa commit df99764

File tree

1 file changed

+222
-0
lines changed

1 file changed

+222
-0
lines changed

tests/lean/run/broadcast.lean

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
import Std.Internal.Async
2+
import Std.Sync
3+
4+
open Std.Internal.IO Async
5+
6+
-- Test tryRecv with empty channel
7+
def tryRecvEmpty : Async Unit := do
8+
let channel ← Std.Broadcast.new (capacity := 4) (α := Nat)
9+
let subs ← channel.subscribe
10+
11+
let result ← subs.tryRecv
12+
assert! result.isNone
13+
14+
#eval tryRecvEmpty.block
15+
16+
-- Test tryRecv with messages available
17+
def tryRecvWithMessages : Async Unit := do
18+
let channel ← Std.Broadcast.new (capacity := 4)
19+
let subs ← channel.subscribe
20+
21+
discard <| await (← channel.send 42)
22+
discard <| await (← channel.send 100)
23+
24+
let msg1 ← subs.tryRecv
25+
let msg2 ← subs.tryRecv
26+
let msg3 ← subs.tryRecv
27+
28+
assert! msg1 == some 42
29+
assert! msg2 == some 100
30+
assert! msg3.isNone
31+
32+
#eval tryRecvWithMessages.block
33+
34+
-- Test unsubscribe functionality
35+
def testUnsubscribe : Async Unit := do
36+
let channel ← Std.Broadcast.new (capacity := 4)
37+
let subs1 ← channel.subscribe
38+
let subs2 ← channel.subscribe
39+
40+
-- Send before unsubscribe
41+
discard <| await (← channel.send 1)
42+
43+
-- Unsubscribe subs1
44+
subs1.unsubscribe
45+
46+
-- Send after unsubscribe
47+
discard <| await (← channel.send 2)
48+
49+
-- subs1 should not receive the second message
50+
let msg1 ← await (← subs1.recv)
51+
let result ← subs1.tryRecv
52+
53+
-- subs2 should receive both messages
54+
let msg2 ← await (← subs2.recv)
55+
let msg3 ← await (← subs2.recv)
56+
57+
assert! msg1 == none
58+
assert! result.isNone -- No more messages for unsubscribed
59+
assert! msg2 == some 1
60+
assert! msg3 == some 2
61+
62+
#eval testUnsubscribe.block
63+
64+
def testUnsubscribeUnblock : Async Unit := do
65+
let channel ← Std.Broadcast.new (capacity := 4)
66+
67+
let subs1 ← channel.subscribe
68+
let subs2 ← channel.subscribe
69+
70+
-- Add 4 messages, so it reaches the limit.
71+
for i in [0:4] do
72+
assert! (← channel.trySend i)
73+
74+
-- Mark subs1 messages as read
75+
for i in [0:10] do
76+
if i < 4 then
77+
assert! (← subs1.tryRecv) = some i
78+
else
79+
assert! (← subs1.tryRecv) = none
80+
81+
-- Mark 2 messages as read so it cleans 2 messages
82+
assert! (← subs2.tryRecv).isSome
83+
assert! (← subs2.tryRecv).isSome
84+
85+
assert! (← channel.trySend 5)
86+
assert! (← channel.trySend 5)
87+
assert! not (← channel.trySend 6)
88+
89+
-- It unsubscribe and mark all subs2 messages as read.
90+
subs2.unsubscribe
91+
92+
-- Create a new subscriber to verify channel still works
93+
let subs3 ← channel.subscribe
94+
95+
-- Send one more message that the new subscriber should receive
96+
assert! (← channel.trySend 8)
97+
98+
-- subs1 should be able to receive the messages sent after it last read:
99+
-- the two 5's and the 8
100+
let subs1Msg1 ← subs1.tryRecv
101+
let subs1Msg2 ← subs1.tryRecv
102+
let subs1Msg3 ← subs1.tryRecv
103+
let subs1Msg4 ← subs1.tryRecv -- should be none
104+
105+
assert! subs1Msg1 == some 5
106+
assert! subs1Msg2 == some 5
107+
assert! subs1Msg3 == some 8
108+
assert! subs1Msg4.isNone
109+
110+
-- The new subscriber should only get the most recent message
111+
let msg ← subs3.tryRecv
112+
assert! msg == some 8
113+
114+
-- No more messages should be available for the new subscriber
115+
let noMsg ← subs3.tryRecv
116+
assert! noMsg.isNone
117+
118+
-- Verify unsubscribed subs2 can't receive anything
119+
let subs2NoMsg ← subs2.tryRecv
120+
assert! subs2NoMsg.isNone
121+
122+
#eval testUnsubscribeUnblock.block
123+
124+
def unsubscribedCannotReceive : Async Unit := do
125+
let channel ← Std.Broadcast.new
126+
127+
let subs1 ← channel.subscribe
128+
let subs2 ← channel.subscribe
129+
130+
discard <| await (← channel.send 1)
131+
discard <| await (← channel.send 2)
132+
133+
let msg1 ← await (← subs1.recv)
134+
let msg2 ← await (← subs1.recv)
135+
let msg3 ← await (← subs2.recv)
136+
let msg4 ← await (← subs2.recv)
137+
138+
assert! msg1 == some 1
139+
assert! msg2 == some 2
140+
141+
assert! msg3 == some 1
142+
assert! msg4 == some 2
143+
144+
#eval unsubscribedCannotReceive.block
145+
146+
def fullBuffer : Async Unit := do
147+
let channel ← Std.Broadcast.new (capacity := 4)
148+
149+
let subs1 ← channel.subscribe
150+
151+
for i in [0:5] do
152+
if not (← channel.trySend i) then
153+
assert! i == 4
154+
155+
#eval fullBuffer.block
156+
157+
def noSubscribers : Async Unit := do
158+
let channel ← Std.Broadcast.new (capacity := 4)
159+
160+
assert! not (← channel.trySend 0)
161+
162+
#eval noSubscribers.block
163+
164+
-- Test unsubscribe during message consumption
165+
def testUnsubscribeDuringConsumption : Async Unit := do
166+
let channel ← Std.Broadcast.new (capacity := 4)
167+
let subs1 ← channel.subscribe
168+
let subs2 ← channel.subscribe
169+
170+
-- Send several messages
171+
for i in [0:4] do
172+
discard <| await (← channel.send i)
173+
174+
-- subs1 reads first message then unsubscribes
175+
let msg1 ← await (← subs1.recv)
176+
subs1.unsubscribe
177+
178+
-- subs2 should still be able to read all messages
179+
let msgs2 ← [0, 1, 2, 3].mapM (fun _ => subs2.recv >>= fun r => await r)
180+
181+
assert! msg1 == some 0
182+
assert! msgs2 == [some 0, some 1, some 2, some 3]
183+
184+
-- subs1 should have no more messages available
185+
let result ← subs1.tryRecv
186+
assert! result.isNone
187+
188+
-- Test mixed send and trySend operations
189+
def testMixedSendOperations : Async Unit := do
190+
let channel ← Std.Broadcast.new (capacity := 3)
191+
let subs ← channel.subscribe
192+
193+
-- Use trySend
194+
assert! (← channel.trySend 1)
195+
196+
-- Use regular send
197+
discard <| await (← channel.send 2)
198+
199+
-- Use trySend again
200+
assert! (← channel.trySend 3)
201+
202+
-- Buffer should be full now
203+
assert! not (← channel.trySend 4)
204+
205+
-- Verify all messages received correctly
206+
let msgs ← [1, 2, 3].mapM (fun _ => subs.recv >>= fun r => await r)
207+
assert! msgs == [some 1, some 2, some 3]
208+
209+
#eval testMixedSendOperations.block
210+
211+
-- Test recv on closed channel with no pending messages
212+
def testRecvOnClosedEmpty : Async Unit := do
213+
let channel ← Std.Broadcast.new (capacity := 4) (α := Nat)
214+
let subs ← channel.subscribe
215+
216+
channel.close
217+
218+
-- tryRecv should return none immediately
219+
let result ← subs.tryRecv
220+
assert! result.isNone
221+
222+
#eval testRecvOnClosedEmpty.block

0 commit comments

Comments
 (0)