Skip to content

Commit 2cfd980

Browse files
authored
fix: revert the waitAny refactoring (#11000)
This PR fixes a memleak caused by the Lean based `IO.waitAny` implementation by reverting it. This the faulty Lean implementation: ```lean def IO.waitAny (tasks : @& List (Task α)) (h : tasks.length > 0 := by exact Nat.zero_lt_succ _) : BaseIO α := do have : Nonempty α := ⟨tasks[0].get⟩ let promise : IO.Promise α ← IO.Promise.new tasks.forM <| fun t => BaseIO.chainTask (sync := true) t promise.resolve return promise.result!.get ``` In a situation where we call this function repeatedly in a loop with a pair of tasks `[t1, t2]` where `t2` is a long lived task that we pass every time and `t1` is fresh a short lived task, `t2` will accumlate more and more children from `BaseIO.chainTask` that fill memory over time. The old C++ implementation did not have this issue so we are reverting.
1 parent 2497cf0 commit 2cfd980

File tree

6 files changed

+43
-11
lines changed

6 files changed

+43
-11
lines changed

src/Init/System/IO.lean

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,13 @@ Waits for the task to finish, then returns its result.
557557
@[extern "lean_io_wait"] opaque wait (t : Task α) : BaseIO α :=
558558
return t.get
559559

560+
/--
561+
Waits until any of the tasks in the list has finished, then return its result.
562+
-/
563+
@[extern "lean_io_wait_any"] opaque waitAny (tasks : @& List (Task α))
564+
(h : tasks.length > 0 := by exact Nat.zero_lt_succ _) : BaseIO α :=
565+
return tasks[0].get
566+
560567
/--
561568
Returns the number of _heartbeats_ that have occurred during the current thread's execution. The
562569
heartbeat count is the number of “small” memory allocations performed in a thread.

src/Init/Task.lean

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,6 @@ public import Init.System.Promise
1212

1313
public section
1414

15-
/--
16-
Waits until any of the tasks in the list has finished, then return its result.
17-
-/
18-
@[noinline]
19-
def IO.waitAny (tasks : @& List (Task α)) (h : tasks.length > 0 := by exact Nat.zero_lt_succ _) :
20-
BaseIO α := do
21-
have : Nonempty α := ⟨tasks[0].get⟩
22-
let promise : IO.Promise α ← IO.Promise.new
23-
tasks.forM <| fun t => BaseIO.chainTask (sync := true) t promise.resolve
24-
return promise.result!.get
25-
2615
namespace Task
2716

2817
/--

src/include/lean/lean.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,6 +1225,8 @@ LEAN_EXPORT bool lean_io_check_canceled_core(void);
12251225
LEAN_EXPORT void lean_io_cancel_core(b_lean_obj_arg t);
12261226
/* primitive for implementing `IO.getTaskState : Task a -> IO TaskState` */
12271227
LEAN_EXPORT uint8_t lean_io_get_task_state_core(b_lean_obj_arg t);
1228+
/* primitive for implementing `IO.waitAny : List (Task a) -> IO (Task a)` */
1229+
LEAN_EXPORT b_lean_obj_res lean_io_wait_any_core(b_lean_obj_arg task_list);
12281230

12291231
/* External objects */
12301232

src/runtime/io.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,6 +1554,13 @@ extern "C" LEAN_EXPORT obj_res lean_io_wait(obj_arg t) {
15541554
return lean_task_get_own(t);
15551555
}
15561556

1557+
extern "C" LEAN_EXPORT obj_res lean_io_wait_any(b_obj_arg task_list) {
1558+
object * t = lean_io_wait_any_core(task_list);
1559+
object * v = lean_task_get(t);
1560+
lean_inc(v);
1561+
return v;
1562+
}
1563+
15571564
extern "C" LEAN_EXPORT obj_res lean_io_exit(uint8_t code) {
15581565
exit(code);
15591566
}

src/runtime/object.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,17 @@ class task_manager {
847847
}
848848
}
849849

850+
object * wait_any_check(object * task_list) {
851+
object * it = task_list;
852+
while (!is_scalar(it)) {
853+
object * head = lean_ctor_get(it, 0);
854+
if (lean_to_task(head)->m_value)
855+
return head;
856+
it = cnstr_get(it, 1);
857+
}
858+
return nullptr;
859+
}
860+
850861
public:
851862
task_manager(unsigned max_std_workers):
852863
m_max_std_workers(max_std_workers) {
@@ -929,6 +940,17 @@ class task_manager {
929940
}
930941
}
931942

943+
object * wait_any(object * task_list) {
944+
if (object * t = wait_any_check(task_list))
945+
return t;
946+
unique_lock<mutex> lock(m_mutex);
947+
while (true) {
948+
if (object * t = wait_any_check(task_list))
949+
return t;
950+
m_task_finished_cv.wait(lock);
951+
}
952+
}
953+
932954
void deactivate_task(lean_task_object * t) {
933955
unique_lock<mutex> lock(m_mutex);
934956
if (object * v = t->m_value) {
@@ -1166,6 +1188,10 @@ extern "C" LEAN_EXPORT uint8_t lean_io_get_task_state_core(b_obj_arg t) {
11661188
return g_task_manager->get_task_state(o);
11671189
}
11681190

1191+
extern "C" LEAN_EXPORT b_obj_res lean_io_wait_any_core(b_obj_arg task_list) {
1192+
return g_task_manager->wait_any(task_list);
1193+
}
1194+
11691195
obj_res lean_promise_new() {
11701196
lean_always_assert(g_task_manager);
11711197

src/runtime/object.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ inline b_obj_res task_get(b_obj_arg t) { return lean_task_get(t); }
287287
inline bool io_check_canceled_core() { return lean_io_check_canceled_core(); }
288288
inline void io_cancel_core(b_obj_arg t) { return lean_io_cancel_core(t); }
289289
inline bool io_get_task_state_core(b_obj_arg t) { return lean_io_get_task_state_core(t); }
290+
inline b_obj_res io_wait_any_core(b_obj_arg task_list) { return lean_io_wait_any_core(task_list); }
290291

291292
// =======================================
292293
// External

0 commit comments

Comments
 (0)