Skip to content

Commit 5ca5331

Browse files
authored
Awaitable parallel loops (#86)
* Rename sync(Weave) to syncRoot(Weave) to make clear that it is not composable * Introduce fine-grained awaitable for-loop * fix comment in capture section * make parallel reduction compile standalone * Add yet-to-be-proper sync on awaitable loops * Sometimes the task you try to split is not the current task anymore * update changelog * update histogram and logsumexp to use the awaitable loops * Fighting your way through recursive imports, and static early symbol resolution * Allox sync on not iterated loop (i.e. iterations = 0) * Well, it seems like awaitable loops are not enough to describe the data dependencies of GEMM :sad_face: * fix lazyFLowvar symbol resolution * Fix LazyFlowVar with reduction and awaitable loops * mention that awaitable might still change [skip ci]
1 parent 5a5f2cb commit 5ca5331

21 files changed

+564
-173
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ exit(Weave)
135135
- `init(Weave)`, `exit(Weave)` to start and stop the runtime. Forgetting this will give you nil pointer exceptions on spawn.
136136
- `spawn fnCall(args)` which spawns a function that may run on another thread and gives you an awaitable Flowvar handle.
137137
- `sync(Flowvar)` will await a Flowvar and block until you receive a result.
138-
- `sync(Weave)` is a global barrier for the main thread on the main task. Allowing nestable barriers for any thread is work-in-progress.
138+
- `syncRoot(Weave)` is a global barrier for the main thread on the main task.
139139
- `parallelFor`, `parallelForStrided`, `parallelForStaged`, `parallelForStagedStrided` are described above and in the experimental section.
140140
- `loadBalance(Weave)` gives the runtime the opportunity to distribute work. Insert this within long computation as due to Weave design, it's busy workers hat are also in charge of load balancing. This is done automatically when using `parallelFor`.
141141
- `isSpawned` allows you to build speculative algorithm where a thread is spawned only if certain conditions are valid. See the `nqueens` benchmark for an example.

benchmarks/bouncing_producer_consumer/weave_bpc.nim

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ proc main() =
124124
let start = wtime_msec()
125125

126126
bpc_produce(NumTasksPerDepth, Depth)
127-
sync(Weave)
127+
syncRoot(Weave)
128128

129129
let stop = wtime_msec()
130130

benchmarks/histogram_2D/weave_histogram.nim

+2-1
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ proc generateHistogramWeaveStaged[T](matrix: Matrix[T], hist: Histogram): T =
216216
# Parallel reduction
217217
parallelForStaged i in 1 ..< matrix.ld-1:
218218
captures: {maxAddr, lockAddr, hist, matrix, boxes}
219+
awaitable: histoLoop
219220
prologue:
220221
let threadHist = newHistogram(boxes)
221222
var threadMax = T(-Inf)
@@ -239,7 +240,7 @@ proc generateHistogramWeaveStaged[T](matrix: Matrix[T], hist: Histogram): T =
239240
lockAddr[].release()
240241
wv_free(threadHist.buffer)
241242

242-
sync(Weave)
243+
sync(histoLoop)
243244
lock.deinitLock()
244245
return max
245246

benchmarks/logsumexp/weave_logsumexp.nim

+4-2
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ proc maxWeaveStaged[T: SomeFloat](M: Matrix[T]) : T =
249249

250250
parallelForStaged i in 0 ..< M.nrows:
251251
captures:{maxAddr, lockAddr, M}
252+
awaitable: maxLoop
252253
prologue:
253254
var localMax = T(-Inf)
254255
loop:
@@ -260,7 +261,7 @@ proc maxWeaveStaged[T: SomeFloat](M: Matrix[T]) : T =
260261
maxAddr[] = max(maxAddr[], localMax)
261262
lockAddr[].release()
262263

263-
sync(Weave)
264+
sync(maxLoop)
264265
lock.deinitLock()
265266

266267
proc logsumexpWeaveStaged[T: SomeFloat](M: Matrix[T]): T =
@@ -279,6 +280,7 @@ proc logsumexpWeaveStaged[T: SomeFloat](M: Matrix[T]): T =
279280

280281
parallelForStaged i in 0 ..< M.nrows:
281282
captures:{lseAddr, lockAddr, alpha, M}
283+
awaitable: logSumExpLoop
282284
prologue:
283285
var localLSE = 0.T
284286
loop:
@@ -290,7 +292,7 @@ proc logsumexpWeaveStaged[T: SomeFloat](M: Matrix[T]): T =
290292
lseAddr[] += localLSE
291293
lockAddr[].release()
292294

293-
sync(Weave)
295+
sync(logSumExpLoop)
294296
result = alpha + ln(lse)
295297
lock.deinitLock()
296298

benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim

+4-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ proc pack_A_mc_kc*[T; ukernel: static MicroKernel](
5959
# Packing B
6060
#
6161
# ############################################################
62-
6362
proc pack_B_kc_nc*[T; ukernel: static MicroKernel](
6463
packedB: ptr UncheckedArray[T],
6564
kc, nc: int,
@@ -70,6 +69,8 @@ proc pack_B_kc_nc*[T; ukernel: static MicroKernel](
7069
## Concretely the outer dimension of packed matrices
7170
## is k so that C[i, j] = A[i, k] * B[k, j]
7271
## does not require strided access
72+
mixin packingLoop
73+
7374
let buffer{.restrict.} = assume_aligned packedB
7475
const NR = ukernel.extract_nr()
7576
let unroll_stop = nc.round_step_down(NR)
@@ -91,3 +92,5 @@ proc pack_B_kc_nc*[T; ukernel: static MicroKernel](
9192
offBuf[k*NR + j] = B[k, unroll_stop+j]
9293
for j in remainder ..< NR: # Pad with 0 if packing over the edge
9394
offBuf[k*NR + j] = 0.T
95+
96+
syncRoot(Weave)

benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim

+1-3
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ proc gemm_impl[T; ukernel: static MicroKernel](
161161
# First time writing to C, we scale it, otherwise accumulate
162162
let beta = if pc == 0: beta else: 1.T
163163

164-
sync(Weave) # TODO: this cannot be nested
165164
# ####################################
166165
# 3. for ic = 0,...,m−1 in steps of mc
167166
parallelFor icb in 0 ..< tiles.ic_num_tasks:
@@ -180,7 +179,7 @@ proc gemm_impl[T; ukernel: static MicroKernel](
180179
alpha, packA, tiles.b, # αA[ic:ic+mc, pc:pc+kc] * B[pc:pc+kc, jc:jc+nc] +
181180
beta, vC.stride(ic, 0) # βC[ic:ic+mc, jc:jc+nc]
182181
)
183-
sync(Weave) # TODO: this cannot be nested
182+
syncRoot(Weave)
184183

185184
# ############################################################
186185
#
@@ -253,7 +252,6 @@ proc gemm_strided*[T: SomeNumber](
253252
if hasAvx512f(): dispatch(x86_AVX512)
254253
elif hasSse2(): dispatch(x86_SSE2)
255254
dispatch(x86_Generic)
256-
sync(Weave)
257255

258256
# ############################################################
259257
#

benchmarks/matrix_transposition/weave_transposes.nim

+4-4
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ template runBench(transposeName: typed, reorderCompute, isSequential: bool): unt
204204
for _ in 0 ..< nrounds:
205205
transposeName(M, N, bufIn, bufOut)
206206
if not isSequential:
207-
sync(Weave)
207+
syncRoot(Weave)
208208
when not defined(windows):
209209
let stop = wtime_msec()
210210
mxnTime = stop - start
@@ -215,7 +215,7 @@ template runBench(transposeName: typed, reorderCompute, isSequential: bool): unt
215215
for _ in 0 ..< nrounds:
216216
transposeName(N, M, bufIn, bufOut)
217217
if not isSequential:
218-
sync(Weave)
218+
syncRoot(Weave)
219219
when not defined(windows):
220220
let stop = wtime_msec()
221221
nxmTime = stop - start
@@ -238,7 +238,7 @@ template runBench(transposeName: typed, reorderCompute, isSequential: bool): unt
238238
for _ in 0 ..< nrounds:
239239
transposeName(N, M, bufIn, bufOut)
240240
if not isSequential:
241-
sync(Weave)
241+
syncRoot(Weave)
242242
when not defined(windows):
243243
let stop = wtime_msec()
244244
nxmTime = stop - start
@@ -249,7 +249,7 @@ template runBench(transposeName: typed, reorderCompute, isSequential: bool): unt
249249
for _ in 0 ..< nrounds:
250250
transposeName(M, N, bufIn, bufOut)
251251
if not isSequential:
252-
sync(Weave)
252+
syncRoot(Weave)
253253
when not defined(windows):
254254
let stop = wtime_msec()
255255
mxnTime = stop - start

benchmarks/single_task_producer/weave_spc.nim

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ proc main() =
115115

116116
# spc_produce_seq(NumTasksTotal)
117117
spc_produce(NumTasksTotal)
118-
sync(Weave)
118+
syncRoot(Weave)
119119

120120
let stop = wtime_msec()
121121

changelog.md

+47
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,52 @@
11
# Changelog
22

3+
### v0.3.0 - unreleased
4+
5+
`sync(Weave)` has been renamed `syncRoot(Weave)` to highlight that it is only valid on the root task in the main thread. In particular, a procedure that uses syncRoot should not be called be in a multithreaded section. This is a breaking change. In the future such changes will have a deprecation path but the library is only 2 weeks old at the moment.
6+
7+
`parallelFor`, `parallelForStrided`, `parallelForStaged`, `parallelForStagedStrided`
8+
now support an "awaitable" statement to allow fine-grain sync.
9+
10+
Fine-grained data-dependencies are under research (for example launch a task when the first 50 iterations are done out of a 100 iteration loops), "awaitable" may change
11+
to have an unified syntax for delayed tasks depending on a task, a whole loop or a subset of it.
12+
If possible, it is recommended to use "awaitable" instead of `syncRoot()` to allow composable parallelism, `syncRoot()` can only be called in a serial section of the code.
13+
14+
Weave can now be compiled with Microsoft Visual Studio in C++ mode.
15+
16+
"LastVictim" and "LastThief" WV_Target policy has been added.
17+
The default is still "Random", pass "-d:WV_Target=LastVictim" to explore performance on your workload
18+
19+
"StealEarly" has been implemented, the default is not to steal early,
20+
pass "-d:WV_StealEarly=2" for example to allow workers to initiate a steal request
21+
when 2 tasks or less are left in their queue.
22+
23+
#### Performance
24+
25+
Weave has been thoroughly tested and tuned on state-of-the-art matrix multiplication implementation
26+
against competing pure Assembly, hand-tuned BLAS implementations to reach High-performance Computing scalability standards.
27+
28+
3 cases can trigger loop splitting in Weave:
29+
- loadBalance(Weave),
30+
- sharing work to idle child threads
31+
- incoming thieves
32+
The first 2 were not working properly and resulted in pathological performance cases.
33+
This has been fixed.
34+
35+
Fixed strided loop iteration rounding
36+
Fixed compilation with metrics
37+
38+
Executing a loop now counts as a single task for the adaptative steal policy.
39+
This prevents short loops from hindering steal-half strategy as it depends
40+
on the number of tasks executed per steal requests interval.
41+
42+
#### Internals
43+
- Weave uses explicit finite state machines in several places.
44+
- The memory pool now has the same interface has malloc/free, in the past
45+
freeing a block required passing a threadID as this avoided an expensive getThreadID syscall.
46+
The new solution uses assembly code to get the address of the current thread thread-local storage
47+
as an unique threadID.
48+
- Weave memory subsystem now supports LLVM AddressSanitizer to detect memory bugs.
49+
Spurious (?) errors from Nim and Weave were not removed and are left as a future task.
350

451
### v0.2.0 - December 2019
552

weave.nim

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
# at your option. This file may not be copied, modified, or distributed except according to those terms.
77

88
import
9-
weave/[parallel_tasks, parallel_for, parallel_for_staged, runtime, runtime_fsm],
9+
weave/[parallel_tasks, parallel_for, parallel_for_staged, runtime, runtime_fsm, await_fsm],
1010
weave/datatypes/flowvars
1111

1212
export
1313
Flowvar, Weave,
14-
spawn, sync,
14+
spawn, sync, syncRoot,
1515
parallelFor, parallelForStrided, parallelForStaged, parallelForStagedStrided,
1616
init, exit,
1717
loadBalance,

weave.nimble

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ task test, "Run Weave tests":
4242
test "", "weave/parallel_tasks.nim"
4343
test "", "weave/parallel_for.nim"
4444
test "", "weave/parallel_for_staged.nim"
45-
# test "", "weave/parallel_reduce.nim"
45+
test "", "weave/parallel_reduce.nim"
4646

4747
test "-d:WV_LazyFlowvar", "weave/parallel_tasks.nim"
4848
test "-d:WV_LazyFlowvar", "weave/parallel_for.nim"
4949
test "-d:WV_LazyFlowvar", "weave/parallel_for_staged.nim"
50-
# test "-d:WV_LazyFlowvar", "weave/parallel_reduce.nim" # Experimental
50+
test "-d:WV_LazyFlowvar", "weave/parallel_reduce.nim"
5151

5252
test "", "benchmarks/dfs/weave_dfs.nim"
5353
test "", "benchmarks/fibonacci/weave_fib.nim"

weave/await_fsm.nim

+45-12
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,8 @@ setTerminalState(awaitFSA, AW_Exit)
5050

5151
# -------------------------------------------
5252

53-
EagerFV:
54-
template isFutReady(): untyped =
55-
fv.chan[].tryRecv(parentResult)
56-
LazyFV:
57-
template isFutReady(): untyped =
58-
if fv.lfv.hasChannel:
59-
ascertain: not fv.lfv.lazy.chan.isNil
60-
fv.lfv.lazy.chan[].tryRecv(parentResult)
61-
else:
62-
fv.lfv.isReady
63-
6453
implEvent(awaitFSA, AWE_FutureReady):
65-
isFutReady()
54+
isFutReady(fv)
6655

6756
behavior(awaitFSA):
6857
# In AW_Steal we might recv tasks and steal requests which get stuck in our queues
@@ -184,3 +173,47 @@ behavior(awaitFSA):
184173

185174
synthesize(awaitFSA):
186175
proc forceFuture*[T](fv: Flowvar[T], parentResult: var T)
176+
177+
# -------------------------------------------
178+
179+
EagerFV:
180+
proc forceComplete*[T](fv: Flowvar[T], parentResult: var T) {.inline.} =
181+
## From the parent thread awaiting on the result, force its computation
182+
## by eagerly processing only the child tasks spawned by the awaited task
183+
fv.forceFuture(parentResult)
184+
recycleChannel(fv)
185+
186+
LazyFV:
187+
template forceComplete*[T](fv: Flowvar[T], parentResult: var T) =
188+
fv.forceFuture(parentResult)
189+
# Reclaim memory
190+
if not fv.lfv.hasChannel:
191+
ascertain: fv.lfv.isReady
192+
parentResult = cast[ptr T](fv.lfv.lazy.buf.addr)[]
193+
else:
194+
ascertain: not fv.lfv.lazy.chan.isNil
195+
recycleChannel(fv)
196+
197+
# Public
198+
# -------------------------------------------
199+
200+
type Dummy* = object
201+
## A dummy return type (Flowvar[Dummy])
202+
## for waitable for-loops
203+
# Do we add a dummy field to avoid a size of 0?
204+
205+
proc sync*[T](fv: FlowVar[T]): T {.inline.} =
206+
## Blocks the current thread until the flowvar is available
207+
## and returned.
208+
## The thread is not idle and will complete pending tasks.
209+
fv.forceComplete(result)
210+
211+
template sync*(fv: FlowVar[Dummy]) =
212+
## Blocks the current thread until the full loop task
213+
## associated with the dummy has finished
214+
## The thread is not idle and will complete pending tasks.
215+
# This must be a template to avoid recursive dependency
216+
# as forceFuture is in await_fsm and await_fsm depends
217+
# on this module.
218+
var dummy: Dummy
219+
forceComplete(fv, dummy)

weave/channels/channels_spsc_single.nim

+6-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} =
7070
## Returns true if successful (channel was not empty)
7171
##
7272
## ⚠ Use only in the consumer thread that reads from the channel.
73-
preCondition: sizeof(T) == chan.itemsize.int
73+
preCondition: (sizeof(T) == chan.itemsize.int) or
74+
# Support dummy object
75+
(sizeof(T) == 0 and chan.itemsize == 1)
7476

7577
let full = chan.full.load(moAcquire)
7678
if not full:
@@ -84,7 +86,9 @@ func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
8486
## Reurns true if successful (channel was empty)
8587
##
8688
## ⚠ Use only in the producer thread that writes from the channel.
87-
preCondition: sizeof(T) == chan.itemsize.int
89+
preCondition: (sizeof(T) == chan.itemsize.int) or
90+
# Support dummy object
91+
(sizeof(T) == 0 and chan.itemsize == 1)
8892

8993
let full = chan.full.load(moAcquire)
9094
if full:

weave/config.nim

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ template debug*(body: untyped): untyped =
105105
block: {.noSideEffect, gcsafe.}: body
106106

107107
template debugSplit*(body: untyped): untyped =
108-
when defined(WV_DebugSplit):
108+
when defined(WV_DebugSplit) or defined(WV_Debug):
109109
block: {.noSideEffect, gcsafe.}: body
110110

111111
template StealAdaptative*(body: untyped): untyped =

0 commit comments

Comments
 (0)