Skip to content

Commit 94e5b66

Browse files
feat: add AsyncStream, AsyncWrite and AsyncRead type classes (#10370)
This PR adds async type classes for streams.
1 parent 8443600 commit 94e5b66

File tree

3 files changed

+81
-2
lines changed

3 files changed

+81
-2
lines changed

src/Std/Internal/Async.lean

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ public import Std.Internal.Async.Select
1515
public import Std.Internal.Async.Process
1616
public import Std.Internal.Async.System
1717
public import Std.Internal.Async.Signal
18+
public import Std.Internal.Async.IO
1819

1920
public section

src/Std/Internal/Async/IO.lean

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/-
2+
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Sofia Rodrigues
5+
-/
6+
module
7+
8+
prelude
9+
public import Std.Internal.Async.Select
10+
11+
public section
12+
13+
namespace Std
14+
namespace Internal
15+
namespace Async
16+
namespace IO
17+
18+
open Std.Internal.IO.Async
19+
20+
/-!
21+
This module provides buffered asynchronous I/O operations for efficient reading and writing.
22+
-/
23+
24+
/--
25+
Interface for asynchronous reading operations.
26+
-/
27+
class AsyncRead (α : Type) (β : Type) where
28+
read : α → Async β
29+
30+
/--
31+
Interface for asynchronous writing operations.
32+
-/
33+
class AsyncWrite (α : Type) (β : Type) where
34+
write : α → β → Async Unit
35+
36+
writeAll : α → Array β → Async Unit :=
37+
fun socket data => data.forM (write socket)
38+
39+
flush : α → Async Unit :=
40+
fun _ => pure ()
41+
42+
/--
43+
Interface for asynchronous streaming with selector-based iteration.
44+
-/
45+
class AsyncStream (α : Type) (β : outParam Type) where
46+
next : α → Selector β
47+
48+
stop : α → IO Unit :=
49+
fun _ => pure ()
50+
51+
end IO
52+
end Async
53+
end Internal
54+
end Std

src/Std/Sync/Channel.lean

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@ public import Init.System.Promise
1010
public import Init.Data.Queue
1111
public import Std.Sync.Mutex
1212
public import Std.Internal.Async.Select
13+
public import Std.Internal.Async.IO
1314

1415
public section
1516

17+
open Std.Internal.Async.IO
18+
open Std.Internal.IO.Async
19+
1620
/-!
1721
This module contains the implementation of `Std.Channel`. `Std.Channel` is a multi-producer
1822
multi-consumer FIFO channel that offers both bounded and unbounded buffering as well as synchronous
@@ -24,7 +28,6 @@ for cleaner code.
2428
-/
2529

2630
namespace Std
27-
2831
namespace CloseableChannel
2932

3033
/--
@@ -753,6 +756,17 @@ partial def forAsync (f : α → BaseIO Unit) (ch : CloseableChannel α)
753756
| none => return .pure ()
754757
| some v => do f v; ch.forAsync f prio
755758

759+
instance [Inhabited α] : AsyncStream (CloseableChannel α) (Option α) where
760+
next channel := channel.recvSelector
761+
762+
instance [Inhabited α] : AsyncRead (CloseableChannel α) (Option α) where
763+
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
764+
765+
instance [Inhabited α] : AsyncWrite (CloseableChannel α) α where
766+
write receiver x := do
767+
let task ← receiver.send x
768+
Async.ofAsyncTask <| task.map (Except.mapError (IO.userError ∘ toString))
769+
756770
/--
757771
This function is a no-op and just a convenient way to expose the synchronous API of the channel.
758772
-/
@@ -804,7 +818,6 @@ instance [MonadLiftT BaseIO m] : ForIn m (Sync α) α where
804818
forIn ch b f := private ch.forIn f b
805819

806820
end Sync
807-
808821
end CloseableChannel
809822

810823
/--
@@ -893,6 +906,17 @@ partial def forAsync [Inhabited α] (f : α → BaseIO Unit) (ch : Channel α)
893906
(prio : Task.Priority := .default) : BaseIO (Task Unit) := do
894907
BaseIO.bindTask (prio := prio) (← ch.recv) fun v => do f v; ch.forAsync f prio
895908

909+
instance [Inhabited α] : AsyncStream (Channel α) α where
910+
next channel := channel.recvSelector
911+
912+
instance [Inhabited α] : AsyncRead (Channel α) α where
913+
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
914+
915+
instance [Inhabited α] : AsyncWrite (Channel α) α where
916+
write receiver x := do
917+
let task ← receiver.send x
918+
Async.ofTask task
919+
896920
@[inherit_doc CloseableChannel.sync, inline]
897921
def sync (ch : Channel α) : Channel.Sync α := ch
898922

0 commit comments

Comments
 (0)