Skip to content

Commit 5386bed

Browse files
authored
Merge pull request #8 from lazydynamics/6-threading-scheduler
Add threaded scheduling to `TickedScheduler`
2 parents c159f51 + bad1e95 commit 5386bed

File tree

6 files changed

+138
-9
lines changed

6 files changed

+138
-9
lines changed

Project.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
name = "EnvironmentEngine"
22
uuid = "12b47476-e81a-4b1e-bd5d-7199c2c66cc7"
33
authors = ["wouterwln"]
4-
version = "0.0.1-DEV"
4+
version = "0.0.1"
55

66
[deps]
7+
Static = "aedffcd0-7271-4cad-89d0-dc628f76c6d3"
78
Unitful = "1986cc42-f94f-5a68-af5c-568840ba703d"
89

910
[compat]
1011
Aqua = "0.8"
1112
InteractiveUtils = "1.10.0"
1213
JET = "0.9, 0.10"
14+
Static = "1.2.0"
1315
Test = "1.10"
1416
TestItemRunner = "1.1"
1517
Unitful = "1.24.0"

docs/make.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ DocMeta.setdocmeta!(EnvironmentEngine, :DocTestSetup, :(using EnvironmentEngine)
55

66
makedocs(;
77
modules = [EnvironmentEngine],
8-
authors = "lazydynamics and contributors",
8+
authors = "Lazy Dynamics and contributors",
99
sitename = "EnvironmentEngine.jl",
1010
format = Documenter.HTML(;
1111
canonical = "https://lazydynamics.github.io/EnvironmentEngine.jl", edit_link = "main", assets = String[]

src/implementations/schedulers/ticked.jl

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using Unitful
2+
using Base.Threads
3+
using Static
24

35
export TickedScheduler
46

@@ -12,20 +14,26 @@ A scheduler that processes jobs at regular tick intervals.
1214
- `jobs::Vector{Job}`: The jobs to be scheduled
1315
- `ticker::Ticker{T}`: The ticker that manages timing
1416
"""
15-
struct TickedScheduler{C <: Clock, T, V} <: Scheduler
17+
struct TickedScheduler{C <: Clock, T, V, B} <: Scheduler
1618
clock::C
1719
jobs::V
1820
ticker::Ticker{T}
21+
threading::B
1922
end
2023

2124
"""
2225
TickedScheduler(clock::Clock, tick_period::T) where {T}
2326
2427
Create a new TickedScheduler with the specified clock and tick period.
2528
"""
26-
function TickedScheduler(clock::Clock, tick_period::T) where {T}
29+
function TickedScheduler(clock::Clock, tick_period::T; threading::Bool = false) where {T}
2730
tick_period = convert(Quantity{Float64}, tick_period)
28-
TickedScheduler(clock, Job[], Ticker(tick_period, convert(typeof(tick_period), now(clock)), zero(tick_period)))
31+
TickedScheduler(
32+
clock,
33+
Job[],
34+
Ticker(tick_period, convert(typeof(tick_period), now(clock)), zero(tick_period)),
35+
static(threading)
36+
)
2937
end
3038

3139
"""
@@ -47,17 +55,28 @@ function update!(scheduler::TickedScheduler)
4755
advance_to!(scheduler.ticker, current_time)
4856

4957
while can_tick(scheduler.ticker)
50-
foreach(scheduler.jobs) do job
51-
progress!(job, scheduler.ticker.period)
52-
end
58+
_process_jobs!(scheduler)
5359
consume_tick!(scheduler.ticker)
5460
end
5561
end
5662

63+
function _process_jobs!(scheduler::TickedScheduler{C, T, V, <:False}) where {C, T, V}
64+
foreach(scheduler.jobs) do job
65+
progress!(job, scheduler.ticker.period)
66+
end
67+
end
68+
69+
function _process_jobs!(scheduler::TickedScheduler{C, T, V, <:True}) where {C, T, V}
70+
tasks = map(scheduler.jobs) do job
71+
@spawn progress!(job, scheduler.ticker.period)
72+
end
73+
wait.(tasks)
74+
end
75+
5776
function get_period(scheduler::TickedScheduler)
5877
return scheduler.ticker.period
5978
end
6079

6180
function compile(scheduler::TickedScheduler)
62-
return TickedScheduler(scheduler.clock, (scheduler.jobs...,), scheduler.ticker)
81+
return TickedScheduler(scheduler.clock, (scheduler.jobs...,), scheduler.ticker, scheduler.threading)
6382
end

test/implementations/scheduler/tickedscheduler_tests.jl

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,81 @@
5555
@test order == [1, 2]
5656
end
5757
end
58+
59+
@testitem "TickedScheduler with threading enabled" setup = [MockJobs, MockClocks] begin
60+
import EnvironmentEngine: TickedScheduler, update!, set_time!, advance_time!, schedule!
61+
using Unitful
62+
using Base.Threads
63+
if Base.Threads.nthreads() > 1
64+
@testset "Threading preserves execution count" begin
65+
clock = MockClocks.MockClock()
66+
67+
# Test both modes
68+
scheduler_seq = TickedScheduler(clock, 1.0ms; threading = false)
69+
scheduler_par = TickedScheduler(clock, 1.0ms; threading = true)
70+
71+
# Create counting jobs
72+
jobs_seq = [MockJobs.CountingJob() for _ in 1:3]
73+
jobs_par = [MockJobs.CountingJob() for _ in 1:3]
74+
75+
# Schedule jobs
76+
for job in jobs_seq
77+
schedule!(scheduler_seq, job)
78+
end
79+
for job in jobs_par
80+
schedule!(scheduler_par, job)
81+
end
82+
83+
# Advance time and update
84+
advance_time!(clock, 10.0u"ms")
85+
update!(scheduler_seq)
86+
update!(scheduler_par)
87+
88+
# Both should execute the same number of times
89+
seq_total = sum(job.counter[] for job in jobs_seq)
90+
par_total = sum(job.counter[] for job in jobs_par)
91+
@test seq_total == par_total
92+
@test seq_total > 0
93+
end
94+
95+
@testset "All jobs execute per tick" begin
96+
clock = MockClocks.MockClock()
97+
scheduler = TickedScheduler(clock, 1.0u"ms"; threading = true)
98+
99+
jobs = [MockJobs.CountingJob() for _ in 1:5]
100+
for job in jobs
101+
schedule!(scheduler, job)
102+
end
103+
104+
advance_time!(clock, 1.0u"ms")
105+
update!(scheduler)
106+
107+
# All jobs should have executed exactly once
108+
for job in jobs
109+
@test job.counter[] == 1
110+
end
111+
end
112+
113+
@testset "Multiple ticks work correctly" begin
114+
clock = MockClocks.MockClock()
115+
scheduler = TickedScheduler(clock, 1.0u"ms"; threading = true)
116+
117+
jobs = [MockJobs.CountingJob() for _ in 1:3]
118+
for job in jobs
119+
schedule!(scheduler, job)
120+
end
121+
122+
# Advance time for 3 ticks
123+
advance_time!(clock, 3.0u"ms")
124+
update!(scheduler)
125+
126+
# All jobs should have executed 3 times
127+
for job in jobs
128+
@test job.counter[] == 3
129+
end
130+
end
131+
else
132+
multiple_threads = false
133+
@test_broken multiple_threads
134+
end
135+
end

test/integration/integration_tests.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,20 @@
9191
@test all(obs == 1 for obs in test_state.observations_received)
9292
end
9393
end
94+
95+
@testitem "Multithread TimedJob scheduling" begin
96+
import EnvironmentEngine: TimedJob
97+
using Base.Threads: Atomic, atomic_add!
98+
clock = VirtualClock()
99+
scheduler = TickedScheduler(clock, 5.0ms; threading = true)
100+
101+
counter = Atomic{Int}(0)
102+
every(scheduler, 1ms) do dt
103+
atomic_add!(counter, 1)
104+
end
105+
106+
advance_time!(clock, 1.0s)
107+
update!(scheduler)
108+
109+
@test counter[] == 1000
110+
end

test/mocks/job.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,24 @@
22
import EnvironmentEngine: Job, progress!
33
using Unitful
44
import Unitful: 𝐓
5+
using Base.Threads: Atomic
6+
import Base.Threads: atomic_add!
7+
58
struct MockJob <: Job
69
f
710
end
811

912
function progress!(job::MockJob, dt::Quantity{<:Number, 𝐓})
1013
job.f()
1114
end
15+
16+
struct CountingJob <: Job
17+
counter::Atomic{Int}
18+
end
19+
20+
CountingJob() = CountingJob(Atomic{Int}(0))
21+
22+
function progress!(job::CountingJob, dt::Quantity{<:Number, 𝐓})
23+
atomic_add!(job.counter, 1)
24+
end
1225
end

0 commit comments

Comments
 (0)