Skip to content

Commit cf50a95

Browse files
committed
feat: async type classes
1 parent 290971d commit cf50a95

File tree

4 files changed

+88
-2
lines changed

4 files changed

+88
-2
lines changed

src/Std/Internal/Async.lean

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ public import Std.Internal.Async.DNS
1414
public import Std.Internal.Async.Select
1515
public import Std.Internal.Async.Process
1616
public import Std.Internal.Async.System
17+
public import Std.Internal.Async.IO
1718

1819
public section

src/Std/Internal/Async/Basic.lean

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,13 @@ Converts `AsyncTask` into `Async`.
893893
protected def ofAsyncTask (task : AsyncTask α) : Async α := do
894894
pure (f := BaseIO) (MaybeTask.ofTask task)
895895

896+
/--
897+
Converts `AsyncTask` into `Async`.
898+
-/
899+
@[inline]
900+
protected def ofETask (task : ETask IO.Error α) : Async α := do
901+
pure (f := BaseIO) (MaybeTask.ofTask task)
902+
896903
/--
897904
Converts `IO (Task α)` into `Async`.
898905
-/

src/Std/Internal/Async/IO.lean

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/-
2+
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Sofia Rodrigues
5+
-/
6+
module
7+
8+
prelude
9+
public import Std.Internal.Async.Select
10+
11+
public section
12+
13+
namespace Std
14+
namespace Internal
15+
namespace Async
16+
namespace IO
17+
18+
open Std.Internal.IO.Async
19+
20+
/-!
21+
This module provides buffered asynchronous I/O operations for efficient reading and writing.
22+
-/
23+
24+
/--
25+
Interface for asynchronous reading operations
26+
-/
27+
class AsyncRead (α : Type) (β : outParam Type) where
28+
read : α → Async β
29+
30+
/--
31+
Interface for asynchronous writing operations
32+
-/
33+
class AsyncWrite (α : Type) (β : outParam Type) where
34+
write : α → β → Async Unit
35+
36+
writeAll : α → Array β → Async Unit :=
37+
fun socket data => data.forM (write socket)
38+
39+
flush : α → Async Unit :=
40+
fun _ => pure ()
41+
42+
/--
43+
Interface for asynchronous streaming with selector-based iteration
44+
-/
45+
class AsyncStream (α : Type) (β : outParam Type) where
46+
next : α → Selector β
47+
48+
stop : α → IO Unit :=
49+
fun _ => pure ()
50+
51+
end IO
52+
end Async
53+
end Internal
54+
end Std

src/Std/Sync/Channel.lean

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@ public import Init.System.Promise
1010
public import Init.Data.Queue
1111
public import Std.Sync.Mutex
1212
public import Std.Internal.Async.Select
13+
public import Std.Internal.Async.IO
1314

1415
public section
1516

17+
open Std.Internal.Async.IO
18+
open Std.Internal.IO.Async
19+
1620
/-!
1721
This module contains the implementation of `Std.Channel`. `Std.Channel` is a multi-producer
1822
multi-consumer FIFO channel that offers both bounded and unbounded buffering as well as synchronous
@@ -24,7 +28,6 @@ for cleaner code.
2428
-/
2529

2630
namespace Std
27-
2831
namespace CloseableChannel
2932

3033
/--
@@ -752,6 +755,17 @@ partial def forAsync (f : α → BaseIO Unit) (ch : CloseableChannel α)
752755
| none => return .pure ()
753756
| some v => do f v; ch.forAsync f prio
754757

758+
instance [Inhabited α] : AsyncStream (CloseableChannel α) (Option α) where
759+
next channel := channel.recvSelector
760+
761+
instance [Inhabited α] : AsyncRead (CloseableChannel α) (Option α) where
762+
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
763+
764+
instance [Inhabited α] : AsyncWrite (CloseableChannel α) α where
765+
write receiver x := do
766+
let task ← receiver.send x
767+
Async.ofAsyncTask <| task.map (Except.mapError (IO.userError ∘ toString))
768+
755769
/--
756770
This function is a no-op and just a convenient way to expose the synchronous API of the channel.
757771
-/
@@ -803,7 +817,6 @@ instance [MonadLiftT BaseIO m] : ForIn m (Sync α) α where
803817
forIn ch b f := private ch.forIn f b
804818

805819
end Sync
806-
807820
end CloseableChannel
808821

809822
/--
@@ -892,6 +905,17 @@ partial def forAsync [Inhabited α] (f : α → BaseIO Unit) (ch : Channel α)
892905
(prio : Task.Priority := .default) : BaseIO (Task Unit) := do
893906
BaseIO.bindTask (prio := prio) (← ch.recv) fun v => do f v; ch.forAsync f prio
894907

908+
instance [Inhabited α] : AsyncStream (Channel α) α where
909+
next channel := channel.recvSelector
910+
911+
instance [Inhabited α] : AsyncRead (Channel α) α where
912+
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
913+
914+
instance [Inhabited α] : AsyncWrite (Channel α) α where
915+
write receiver x := do
916+
let task ← receiver.send x
917+
Async.ofTask task
918+
895919
@[inherit_doc CloseableChannel.sync, inline]
896920
def sync (ch : Channel α) : Channel.Sync α := ch
897921

0 commit comments

Comments
 (0)