Skip to content

Commit 161a1c0

Browse files
feat: add Std.Notify type (#10368)
This PR adds `Notify` that is a structure that is similar to `CondVar` but it's used for concurrency. The main difference between `Std.Sync.Notify` and `Std.Condvar` is that depends on a `Std.Mutex` and blocks the entire thread that the `Task` is using while waiting. If I try to use it with async and a lot of `Task`s like this: ```lean def condvar : Async Unit := do let condvar ← Std.Condvar.new let mutex ← Std.Mutex.new false for i in [0:threads] do background do IO.println s!"start {i + 1}" await =<< (show IO (ETask _ _) from IO.asTask (mutex.atomically (condvar.wait mutex))) IO.println s!"end {i + 1}" IO.sleep 2000 condvar.notifyAll ``` It causes some weird behavior because some tasks start running and get notified, while others don’t, because `condvar.wait` blocks the `Task` entire task and right now afaik it blocks an entire thread and cannot be paused while doing blocking operations like that. `Notify` uses `Promise`s so it’s better suited for concurrency. The `Task` is not blocked while waiting for a notification which makes it simpler for use cases that just involve notifying: ```lean def notify : Async Unit := do let notify ← Std.Notify.new for i in [0:threads] do background do IO.println s!"start {i}" notify.wait IO.println s!"end {i}" IO.sleep 2000 notify.notify ``` This PR depends on: #10366, #10367 and #10370.
1 parent 781e3c6 commit 161a1c0

File tree

3 files changed

+249
-0
lines changed

3 files changed

+249
-0
lines changed

src/Std/Sync.lean

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ public import Std.Sync.Mutex
1212
public import Std.Sync.RecursiveMutex
1313
public import Std.Sync.Barrier
1414
public import Std.Sync.SharedMutex
15+
public import Std.Sync.Notify
1516

1617
@[expose] public section

src/Std/Sync/Notify.lean

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/-
2+
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Sofia Rodrigues
5+
-/
6+
module
7+
8+
prelude
9+
public import Init.System.Promise
10+
public import Init.Data.Queue
11+
public import Std.Sync.Mutex
12+
public import Std.Internal.Async.Select
13+
14+
public section
15+
16+
/-!
17+
This module contains the implementation of `Std.Notify`. `Std.Notify` provides a lightweight
18+
notification primitive for signaling between tasks or threads. It supports both synchronous
19+
and asynchronous waiting, and is useful for cases where you want to notify one or more waiters
20+
that an event has occurred.
21+
22+
Unlike a channel, `Std.Notify` does not buffer messages or carry data. It's simply a trigger.
23+
If no one is waiting, notifications are lost. If one or more waiters are present, exactly one
24+
will be woken up per notification.
25+
-/
26+
27+
namespace Std
28+
open Std.Internal.IO.Async
29+
30+
inductive Notify.Consumer (α : Type) where
31+
| normal (promise : IO.Promise α)
32+
| select (finished : Waiter α)
33+
34+
def Notify.Consumer.resolve (c : Consumer α) (x : α) : BaseIO Bool := do
35+
match c with
36+
| .normal promise =>
37+
promise.resolve x
38+
return true
39+
| .select waiter =>
40+
let lose := return false
41+
let win promise := do
42+
promise.resolve (.ok x)
43+
return true
44+
waiter.race lose win
45+
46+
/--
47+
The central state structure for an a `Notify`.
48+
-/
49+
structure Notify.State where
50+
51+
/--
52+
Consumers that are blocked waiting for a notification.
53+
--/
54+
consumers : Std.Queue (Notify.Consumer Unit)
55+
56+
/--
57+
A notify is a synchronization primitive that allows multiple consumers to wait
58+
until notify is called.
59+
-/
60+
structure Notify where
61+
state : Std.Mutex Notify.State
62+
63+
namespace Notify
64+
65+
/--
66+
Create a new notify.
67+
-/
68+
def new : BaseIO Notify := do
69+
return { state := ← Std.Mutex.new { consumers := ∅ } }
70+
71+
/--
72+
Notify all currently waiting consumers.
73+
-/
74+
def notify (x : Notify) : BaseIO Unit := do
75+
x.state.atomically do
76+
let mut st ← get
77+
78+
let mut remainingConsumers := st.consumers
79+
st := { st with consumers := ∅ }
80+
81+
while true do
82+
if let some (consumer, rest) := remainingConsumers.dequeue? then
83+
remainingConsumers := rest
84+
discard <| consumer.resolve ()
85+
else
86+
break
87+
88+
set st
89+
90+
/--
91+
Notify exactly one waiting consumer (if any). Returns true if a consumer
92+
was notified, false if no consumers were waiting.
93+
-/
94+
def notifyOne (x : Notify) : BaseIO Bool := do
95+
x.state.atomically do
96+
let mut st ← get
97+
98+
if let some (consumer, rest) := st.consumers.dequeue? then
99+
st := { st with consumers := rest }
100+
set st
101+
consumer.resolve ()
102+
else
103+
return false
104+
105+
/--
106+
Wait to be notified. Returns a task that completes when notify is called.
107+
Note: if notify was called before wait, this will wait for the next notify call.
108+
-/
109+
def wait (x : Notify) : IO (AsyncTask Unit) :=
110+
x.state.atomically do
111+
let promise ← IO.Promise.new
112+
modify fun st => { st with consumers := st.consumers.enqueue (.normal promise) }
113+
IO.bindTask promise.result? fun
114+
| some res => pure <| Task.pure (.ok res)
115+
| none => throw (IO.userError "notify dropped")
116+
117+
/--
118+
Creates a selector that waits for notifications
119+
-/
120+
def selector (notify : Notify) : Selector Unit := {
121+
tryFn := do
122+
return none
123+
124+
registerFn := fun waiter => do
125+
notify.state.atomically do
126+
modify fun st => { st with consumers := st.consumers.enqueue (.select waiter) }
127+
128+
unregisterFn := do
129+
notify.state.atomically do
130+
let st ← get
131+
132+
let consumers ← st.consumers.filterM fun
133+
| .normal _ => return true
134+
| .select waiter => return !(← waiter.checkFinished)
135+
136+
set { st with consumers }
137+
}
138+
139+
end Notify
140+
end Std

tests/lean/run/sync_notify.lean

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import Std.Internal.Async
2+
import Std.Sync
3+
4+
open Std.Internal.IO Async
5+
6+
-- Test basic wait and notifyOne functionality
7+
def testBasicWaitNotifyOne : Async Unit := do
8+
let notify ← Std.Notify.new
9+
let waitTask ← notify.wait
10+
11+
assert! (← waitTask.getState) = .waiting
12+
discard <| notify.notifyOne
13+
await waitTask
14+
assert! (← waitTask.getState) = .finished
15+
16+
#eval testBasicWaitNotifyOne.block
17+
18+
-- Test multiple waiters with notifyOne (only one should be notified)
19+
def testMultipleWaitersNotifyOne : Async Unit := do
20+
let notify ← Std.Notify.new
21+
let task1 ← notify.wait
22+
let task2 ← notify.wait
23+
let task3 ← notify.wait
24+
25+
assert! (← task1.getState) = .waiting
26+
assert! (← task2.getState) = .waiting
27+
assert! (← task3.getState) = .waiting
28+
29+
discard <| notify.notifyOne
30+
31+
IO.sleep 100
32+
33+
let states ← [task1, task2, task3].mapM (fun t => t.getState)
34+
let finishedCount := states.filter (· == .finished) |>.length
35+
let waitingCount := states.filter (· == .waiting) |>.length
36+
37+
assert! finishedCount == 1
38+
assert! waitingCount == 2
39+
40+
discard <| notify.notifyOne
41+
42+
#eval testMultipleWaitersNotifyOne.block
43+
44+
-- Test multiple waiters with notify (all should be notified)
45+
def testMultipleWaitersNotifyAll : Async Unit := do
46+
let notify ← Std.Notify.new
47+
let task1 ← notify.wait
48+
let task2 ← notify.wait
49+
let task3 ← notify.wait
50+
51+
assert! (← task1.getState) = .waiting
52+
assert! (← task2.getState) = .waiting
53+
assert! (← task3.getState) = .waiting
54+
55+
discard <| notify.notify
56+
57+
await task1
58+
await task2
59+
await task3
60+
61+
assert! (← task1.getState) = .finished
62+
assert! (← task2.getState) = .finished
63+
assert! (← task3.getState) = .finished
64+
65+
#eval testMultipleWaitersNotifyAll.block
66+
67+
-- Test sequential notification
68+
def testSequentialNotification : Async Unit := do
69+
let notify ← Std.Notify.new
70+
let task1 ← notify.wait
71+
let task2 ← notify.wait
72+
let task3 ← notify.wait
73+
74+
discard <| notify.notifyOne
75+
await task1
76+
assert! (← task1.getState) = .finished
77+
assert! (← task2.getState) = .waiting
78+
assert! (← task3.getState) = .waiting
79+
80+
discard <| notify.notifyOne
81+
await task2
82+
assert! (← task2.getState) = .finished
83+
assert! (← task3.getState) = .waiting
84+
85+
discard <| notify.notifyOne
86+
await task3
87+
assert! (← task3.getState) = .finished
88+
89+
#eval testSequentialNotification.block
90+
91+
def testReuseAfterCompletion : Async Unit := do
92+
let notify ← Std.Notify.new
93+
94+
let task1 ← notify.wait
95+
discard <| notify.notifyOne
96+
await task1
97+
assert! (← task1.getState) = .finished
98+
99+
let task2 ← notify.wait
100+
let task3 ← notify.wait
101+
discard <| notify.notify
102+
await task2
103+
await task3
104+
105+
assert! (← task2.getState) = .finished
106+
assert! (← task3.getState) = .finished
107+
108+
#eval testReuseAfterCompletion.block

0 commit comments

Comments
 (0)