Skip to content

Commit d661597

Browse files
committed
simplfy model, but crashing
1 parent 3e46bdd commit d661597

6 files changed

Lines changed: 127 additions & 145 deletions

File tree

examples/tasking.jl

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ function test_driver()
6262
push!(init_output_vars, Legate.add_output(task, b))
6363
push!(init_output_vars, Legate.add_output(task, c))
6464
Legate.default_alignment(task, Vector{Legate.Variable}(), init_output_vars)
65-
66-
Legate.submit_task(rt, task)
67-
sleep(1)
65+
Legate.submit_task(rt, task)
6866

6967
# 2. Compute Task (3 args)
7068
task2 = Legate.create_julia_task(rt, lib, my_task)
@@ -74,9 +72,7 @@ function test_driver()
7472
push!(input_vars, Legate.add_input(task2, b))
7573
push!(output_vars, Legate.add_output(task2, c))
7674
Legate.default_alignment(task2, input_vars, output_vars)
77-
7875
Legate.submit_task(rt, task2)
79-
sleep(1)
8076

8177
# 3. Arbitrary Arg Task (4 args: 2 in, 2 out)
8278
task3 = Legate.create_julia_task(rt, lib, my_4arg_task)
@@ -87,9 +83,7 @@ function test_driver()
8783
push!(out_vars_4, Legate.add_output(task3, b))
8884
push!(out_vars_4, Legate.add_output(task3, d))
8985
Legate.default_alignment(task3, in_vars_4, out_vars_4)
90-
9186
Legate.submit_task(rt, task3)
92-
sleep(1)
9387

9488
# 4. Scalar Arg Task (2 args + scalar)
9589
task4 = Legate.create_julia_task(rt, lib, my_scalar_task)
@@ -99,10 +93,7 @@ function test_driver()
9993
push!(out_vars_s, Legate.add_output(task4, a))
10094
Legate.add_scalar(task4, Legate.Scalar(2.5f0))
10195
Legate.default_alignment(task4, in_vars_s, out_vars_s)
102-
10396
Legate.submit_task(rt, task4)
104-
# Background workers should handle this now.
105-
sleep(2)
10697

10798
# --- VERIFICATION OF ASYNC EXECUTION ---
10899
@info "Submitting 100 bulk tasks to verify asynchronous polling..."

lib/legate_jl_wrapper/src/task.cpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,11 @@ struct UFISlot {
149149

150150
static UFISlot g_ufi_slots[MAX_UFI_SLOTS];
151151
static std::mutex g_slot_mutex; // Protects slot allocation
152-
static std::condition_variable g_slot_cv;
152+
static std::condition_variable g_slot_cv; // Signaled when a slot is released
153+
static std::deque<int> g_pending_queue; // Queue of slot IDs waiting for Julia
154+
static std::mutex g_pending_mutex; // Protects g_pending_queue
153155

154156
static std::atomic<int> g_active_calls{0};
155-
156157
static std::atomic<int> g_max_task_id_seen{0};
157158
static std::atomic<int> g_work_sequence{1};
158159
static std::atomic<uint64_t> g_task_sequence{0};
@@ -189,8 +190,6 @@ JULIA_LEGATE_UFI_EXPORT void* legate_get_slot_request_ptr(int slot_id) {
189190
return static_cast<void*>(&g_ufi_slots[slot_id].request);
190191
}
191192

192-
193-
194193
JULIA_LEGATE_UFI_EXPORT int legate_get_active_call_count() {
195194
return g_active_calls.load();
196195
}
@@ -199,6 +198,15 @@ JULIA_LEGATE_UFI_EXPORT int legate_get_max_task_id_seen() {
199198
return g_max_task_id_seen.load();
200199
}
201200

201+
// Returns the next slot ID that is ready for processing, or -1 if none.
202+
JULIA_LEGATE_UFI_EXPORT int legate_pop_pending_slot() {
203+
std::lock_guard<std::mutex> lock(g_pending_mutex);
204+
if (g_pending_queue.empty()) return -1;
205+
int slot_id = g_pending_queue.front();
206+
g_pending_queue.pop_front();
207+
return slot_id;
208+
}
209+
202210
JULIA_LEGATE_UFI_EXPORT int legate_get_active_slot_count() {
203211
int count = 0;
204212
for (int i = 0; i < MAX_UFI_SLOTS; ++i) {
@@ -371,6 +379,12 @@ inline void JuliaTaskInterface(legate::TaskContext context, bool is_gpu) {
371379

372380
// Release lock before signaling Julia to prevent deadlock
373381
lock.unlock();
382+
383+
// Push to the pending queue for Julia to pop
384+
{
385+
std::lock_guard<std::mutex> qlock(g_pending_mutex);
386+
g_pending_queue.push_back(slot_id);
387+
}
374388
}
375389

376390
// Wait for Julia to signal completion
@@ -459,6 +473,7 @@ void wrap_ufi(jlcxx::Module& mod) {
459473
mod.method("legate_get_slot_work_available_ptr", &ufi::legate_get_slot_work_available_ptr);
460474
mod.method("legate_get_max_slots", &ufi::legate_get_max_slots);
461475
mod.method("legate_get_slot_request_ptr", &ufi::legate_get_slot_request_ptr);
476+
mod.method("legate_pop_pending_slot", &ufi::legate_pop_pending_slot);
462477

463478
mod.method("legate_get_active_call_count", &ufi::legate_get_active_call_count);
464479
mod.method("legate_get_max_task_id_seen", &ufi::legate_get_max_task_id_seen);

src/Legate.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ end
9999
# Expose C++ types to the main Legate namespace for use in other files
100100
# Note: TaskRequest and TaskRequestPrivate are defined in ufi.jl, not C++
101101
using .LegateInternal: Library, Variable, Constraint, LocalTaskID, GlobalTaskID,
102-
AutoTask, ManualTask, StoreTarget, Shape, Scalar, Slice,
102+
AutoTask as AutoTaskImpl, ManualTask as ManualTaskImpl, StoreTarget, Shape, Scalar as ScalarImpl, Slice,
103103
PhysicalStore, PhysicalArray, LogicalStoreImpl, LogicalArrayImpl,
104104
LegateType, Domain, Runtime
105105

src/api/tasks.jl

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ Create an auto task in the runtime.
99
- `id`: The local task identifier.
1010
"""
1111
function create_task(rt::CxxPtr{Runtime}, lib::Library, id::LocalTaskID)
12-
LegateInternal.create_auto_task(rt, lib, id)
12+
impl = LegateInternal.create_auto_task(rt, lib, id)
13+
return AutoTask(impl)
1314
end
1415
function create_task(rt::CxxPtr{Runtime}, lib::Library, id::LocalTaskID, domain::Domain)
15-
LegateInternal.create_manual_task(rt, lib, id, domain)
16+
impl = LegateInternal.create_manual_task(rt, lib, id, domain)
17+
return ManualTask(impl)
1618
end
1719

1820
"""
@@ -21,17 +23,16 @@ end
2123
2224
Submit an manual/auto task to the runtime.
2325
"""
24-
submit_task(rt::CxxPtr{Runtime}, task::AutoTask) = begin
26+
function submit_task(rt::CxxPtr{Runtime}, task::AutoTask)
2527
# Update High Water Mark for UFI tracking
26-
# MAX_SUBMITTED_TASK_ID bridges the gap until the first variant starts in C++
27-
Threads.atomic_max!(MAX_SUBMITTED_TASK_ID, LAST_CREATED_TASK_ID[])
28+
Threads.atomic_max!(MAX_SUBMITTED_TASK_ID, Int(LAST_CREATED_TASK_ID[]))
2829

29-
# Use @threadcall to avoid blocking the Julia thread while Legate processes the submission.
30-
@threadcall((:legate_submit_auto_task, Legate.WRAPPER_LIB_PATH), Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), rt.cpp_object, task.cpp_object)
30+
@threadcall((:legate_submit_auto_task, Legate.WRAPPER_LIB_PATH), Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), rt.cpp_object, task.impl.cpp_object)
3131
end
32-
submit_task(rt::CxxPtr{Runtime}, task::ManualTask) = begin
32+
function submit_task(rt::CxxPtr{Runtime}, task::ManualTask)
3333
Threads.atomic_max!(MAX_SUBMITTED_TASK_ID, Int(LAST_CREATED_TASK_ID[]))
34-
@threadcall((:legate_submit_manual_task, Legate.WRAPPER_LIB_PATH), Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), rt.cpp_object, task.cpp_object)
34+
35+
@threadcall((:legate_submit_manual_task, Legate.WRAPPER_LIB_PATH), Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), rt.cpp_object, task.impl.cpp_object)
3536
end
3637

3738
"""
@@ -73,7 +74,7 @@ end
7374
Add a constraint to the task.
7475
"""
7576
function add_constraint(task::AutoTask, c::Constraint)
76-
LegateInternal.add_constraint(task, c)
77+
LegateInternal.add_constraint(task.impl, c)
7778
end
7879

7980
"""
@@ -84,30 +85,29 @@ Add a logical array/store as an input to the task.
8485
"""
8586
function add_input(
8687
task::Union{AutoTask,ManualTask},
87-
item::Union{LogicalArray,LogicalStore},
88-
)
89-
LegateInternal.add_input(task, item.handle)
88+
item::LogicalArray{T, N},
89+
) where {T, N}
90+
push!(task.arg_types, Array{T, N})
91+
LegateInternal.add_input(task.impl, item.handle)
9092
end
9193

92-
"""
93-
add_output(AutoTask, LogicalArray) -> Variable
94-
add_output(ManualTask, LogicalStore) -> Variable
95-
96-
Add a logical array/store as an output of the task.
97-
"""
9894
function add_output(
9995
task::Union{AutoTask,ManualTask},
100-
item::Union{LogicalArray,LogicalStore},
101-
)
102-
LegateInternal.add_output(task, item.handle)
96+
item::LogicalArray{T, N},
97+
) where {T, N}
98+
push!(task.arg_types, Array{T, N})
99+
LegateInternal.add_output(task.impl, item.handle)
103100
end
104101

105-
"""
106-
add_scalar(AutoTask, scalar::Scalar)
107-
add_scalar(ManualTask, scalar::Scalar)
102+
function add_scalar(task::Union{AutoTask,ManualTask}, scalar::ScalarImpl)
103+
# Note: We don't easily have the Julia type here unless we wrap Scalar.
104+
# For now, we rely on the MTW in ufi_poll if this is missing.
105+
# But often Julia tasks are created via wrap_task which does MTW.
106+
LegateInternal.add_scalar(task.impl, scalar)
107+
end
108108

109-
Add a scalar argument to the task.
110-
"""
111-
function add_scalar(task::Union{AutoTask,ManualTask}, scalar::Scalar)
112-
LegateInternal.add_scalar(task, scalar)
109+
# Specialized add_scalar to capture type for precompile
110+
function add_scalar(task::Union{AutoTask,ManualTask}, x::T) where {T<:SUPPORTED_TYPES}
111+
push!(task.arg_types, T)
112+
LegateInternal.add_scalar(task.impl, Scalar(x))
113113
end

src/api/types.jl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,24 @@ GlobalTaskID
3838
3939
Represents an automatically scheduled task. Supports adding inputs, outputs, scalars, and constraints.
4040
"""
41-
AutoTask
41+
mutable struct AutoTask
42+
impl::AutoTaskImpl
43+
task_id::UInt32
44+
arg_types::Vector{DataType}
45+
AutoTask(impl) = new(impl, 0, DataType[])
46+
end
4247

4348
"""
4449
ManualTask
4550
4651
Represents a manually scheduled task. Supports adding inputs, outputs, and scalars.
4752
"""
48-
ManualTask
53+
mutable struct ManualTask
54+
impl::ManualTaskImpl
55+
task_id::UInt32
56+
arg_types::Vector{DataType}
57+
ManualTask(impl) = new(impl, 0, DataType[])
58+
end
4959

5060
"""
5161
StoreTarget

0 commit comments

Comments
 (0)