Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Init/System/IO.lean
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,13 @@ Waits for the task to finish, then returns its result.
@[extern "lean_io_wait"] opaque wait (t : Task α) : BaseIO α :=
return t.get

/--
Waits until any of the tasks in the list has finished, then return its result.
-/
@[extern "lean_io_wait_any"] opaque waitAny (tasks : @& List (Task α))
(h : tasks.length > 0 := by exact Nat.zero_lt_succ _) : BaseIO α :=
return tasks[0].get

/--
Returns the number of _heartbeats_ that have occurred during the current thread's execution. The
heartbeat count is the number of “small” memory allocations performed in a thread.
Expand Down
11 changes: 0 additions & 11 deletions src/Init/Task.lean
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,6 @@ public import Init.System.Promise

public section

/--
Waits until any of the tasks in the list has finished, then return its result.
-/
@[noinline]
def IO.waitAny (tasks : @& List (Task α)) (h : tasks.length > 0 := by exact Nat.zero_lt_succ _) :
BaseIO α := do
have : Nonempty α := ⟨tasks[0].get⟩
let promise : IO.Promise α ← IO.Promise.new
tasks.forM <| fun t => BaseIO.chainTask (sync := true) t promise.resolve
return promise.result!.get

namespace Task

/--
Expand Down
2 changes: 2 additions & 0 deletions src/include/lean/lean.h
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,8 @@ LEAN_EXPORT bool lean_io_check_canceled_core(void);
LEAN_EXPORT void lean_io_cancel_core(b_lean_obj_arg t);
/* primitive for implementing `IO.getTaskState : Task a -> IO TaskState` */
LEAN_EXPORT uint8_t lean_io_get_task_state_core(b_lean_obj_arg t);
/* primitive for implementing `IO.waitAny : List (Task a) -> IO (Task a)` */
LEAN_EXPORT b_lean_obj_res lean_io_wait_any_core(b_lean_obj_arg task_list);

/* External objects */

Expand Down
7 changes: 7 additions & 0 deletions src/runtime/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,13 @@ extern "C" LEAN_EXPORT obj_res lean_io_wait(obj_arg t) {
return lean_task_get_own(t);
}

extern "C" LEAN_EXPORT obj_res lean_io_wait_any(b_obj_arg task_list) {
object * t = lean_io_wait_any_core(task_list);
object * v = lean_task_get(t);
lean_inc(v);
return v;
}

extern "C" LEAN_EXPORT obj_res lean_io_exit(uint8_t code) {
exit(code);
}
Expand Down
26 changes: 26 additions & 0 deletions src/runtime/object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,17 @@ class task_manager {
}
}

object * wait_any_check(object * task_list) {
object * it = task_list;
while (!is_scalar(it)) {
object * head = lean_ctor_get(it, 0);
if (lean_to_task(head)->m_value)
return head;
it = cnstr_get(it, 1);
}
return nullptr;
}

public:
task_manager(unsigned max_std_workers):
m_max_std_workers(max_std_workers) {
Expand Down Expand Up @@ -929,6 +940,17 @@ class task_manager {
}
}

object * wait_any(object * task_list) {
if (object * t = wait_any_check(task_list))
return t;
unique_lock<mutex> lock(m_mutex);
while (true) {
if (object * t = wait_any_check(task_list))
return t;
m_task_finished_cv.wait(lock);
}
}

void deactivate_task(lean_task_object * t) {
unique_lock<mutex> lock(m_mutex);
if (object * v = t->m_value) {
Expand Down Expand Up @@ -1166,6 +1188,10 @@ extern "C" LEAN_EXPORT uint8_t lean_io_get_task_state_core(b_obj_arg t) {
return g_task_manager->get_task_state(o);
}

extern "C" LEAN_EXPORT b_obj_res lean_io_wait_any_core(b_obj_arg task_list) {
return g_task_manager->wait_any(task_list);
}

obj_res lean_promise_new() {
lean_always_assert(g_task_manager);

Expand Down
1 change: 1 addition & 0 deletions src/runtime/object.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ inline b_obj_res task_get(b_obj_arg t) { return lean_task_get(t); }
inline bool io_check_canceled_core() { return lean_io_check_canceled_core(); }
inline void io_cancel_core(b_obj_arg t) { return lean_io_cancel_core(t); }
inline bool io_get_task_state_core(b_obj_arg t) { return lean_io_get_task_state_core(t); }
inline b_obj_res io_wait_any_core(b_obj_arg task_list) { return lean_io_wait_any_core(task_list); }

// =======================================
// External
Expand Down
Loading