-
Notifications
You must be signed in to change notification settings - Fork 104
Expand file tree
/
Copy pathWaterLilyMPIExt.jl
More file actions
304 lines (245 loc) · 12.4 KB
/
WaterLilyMPIExt.jl
File metadata and controls
304 lines (245 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""
WaterLilyMPIExt — Julia package extension
==========================================
Activated automatically when ImplicitGlobalGrid and MPI are loaded alongside
WaterLily. Provides MPI-aware overrides for global reductions, halo exchange,
and boundary conditions at MPI-subdomain interfaces.
Uses the `AbstractParMode` dispatch pattern: serial WaterLily dispatches all
hooks through `par_mode[]` (defaults to `Serial()`). This extension defines
`Parallel <: AbstractParMode` and adds new dispatch methods — no method
overwriting, so precompilation works normally.
Functions with MPI-specific behavior (via dispatch on `::Parallel`):
_wallBC_L! — zero L at physical walls only (skip MPI-internal) + halo on L
_exitBC! — global reductions for inflow/outflow mass flux
_divisible — same coarsening threshold as serial (N>4)
Halo exchange uses a cached `_has_neighbors` flag to skip all exchange
when no MPI neighbors exist (e.g. np=1 non-periodic), eliminating the
overhead of IGG's `update_halo!` and buffer copies in that case.
"""
module WaterLilyMPIExt
using WaterLily
import WaterLily: @loop
using ImplicitGlobalGrid
using MPI
using StaticArrays
# ── MPI parallel mode ────────────────────────────────────────────────────────
struct Parallel <: WaterLily.AbstractParMode
comm::MPI.Comm
end
_comm() = (WaterLily.par_mode[]::Parallel).comm
# ── Global coordinate offset ──────────────────────────────────────────────────
"""
_global_offset(Val(N), T, ::Parallel) → SVector{N,T}
Rank-local origin in global WaterLily index space.
offset[d] = coords[d] * (nxyz[d] - overlaps[d]) = coords[d] * nx_loc
"""
function WaterLily._global_offset(::Val{N}, ::Type{T}, ::Parallel) where {N,T}
g = ImplicitGlobalGrid.global_grid()
SVector{N,T}(ntuple(d -> T(g.coords[d] * (g.nxyz[d] - g.overlaps[d])), N))
end
# ── MPI initialization ───────────────────────────────────────────────────────
"""
init_waterlily_mpi(global_dims; perdir=()) → (local_dims, rank, comm)
Initialize MPI domain decomposition for WaterLily.
1. Determines the optimal MPI topology via `MPI.Dims_create`
2. Computes local subdomain dimensions (`global_dims .÷ topology`)
3. Initializes ImplicitGlobalGrid with the correct overlaps and halowidths
4. Sets `par_mode[] = Parallel(comm)` for dispatch-based MPI hooks
Returns `(local_dims::NTuple{N,Int}, rank::Int, comm::MPI.Comm)`.
"""
function WaterLily.init_waterlily_mpi(global_dims::NTuple{N}; perdir=()) where N
MPI.Initialized() || MPI.Init()
nprocs = MPI.Comm_size(MPI.COMM_WORLD)
# Optimal MPI topology for N active dimensions
mpi_dims = Tuple(Int.(MPI.Dims_create(nprocs, zeros(Int, N))))
# Local interior dims
local_dims = global_dims .÷ mpi_dims
all(global_dims .== local_dims .* mpi_dims) ||
error("Global dims $global_dims not evenly divisible by MPI topology " *
"$mpi_dims with $nprocs ranks")
# Pad to 3D for IGG (which always expects 3 dimensions)
igg_local = ntuple(d -> d <= N ? local_dims[d] + 4 : 1, 3)
igg_mpi = ntuple(d -> d <= N ? mpi_dims[d] : 1, 3)
igg_per = ntuple(d -> d <= N && d in perdir ? 1 : 0, 3)
me, dims, np, coords, comm = init_global_grid(
igg_local...;
dimx = igg_mpi[1], dimy = igg_mpi[2], dimz = igg_mpi[3],
overlaps = (4, 4, 4),
halowidths = (2, 2, 2),
periodx = igg_per[1], periody = igg_per[2], periodz = igg_per[3],
init_MPI = false,
)
WaterLily.par_mode[] = Parallel(comm)
_init_has_neighbors!()
if me == 0
topo = join(string.(dims[1:N]), "×")
loc = join(string.(local_dims), "×")
glob = join(string.(global_dims), "×")
@info "WaterLily MPI: $(np) ranks, topology=$(topo), " *
"local=$(loc), global=$(glob)"
end
return local_dims, me, comm
end
# ── Dimension helpers ─────────────────────────────────────────────────────────
# Number of active spatial dimensions (nxyz > 1) in the IGG grid.
_ndims_active() = sum(ImplicitGlobalGrid.global_grid().nxyz .> 1)
# True if any MPI neighbor exists in any active dimension (cached after init).
const _has_neighbors = Ref(false)
function _init_has_neighbors!()
g = ImplicitGlobalGrid.global_grid()
nd = _ndims_active()
_has_neighbors[] = any(g.neighbors[s, d] >= 0 for s in 1:2, d in 1:nd)
end
# ── Scalar halo exchange (fine grid — via IGG) ───────────────────────────────
function _scalar_halo_igg!(arr::AbstractArray)
nd = _ndims_active()
if ndims(arr) < 3
arr3d = reshape(arr, size(arr)..., ntuple(_->1, 3-ndims(arr))...)
update_halo!(arr3d; dims=ntuple(identity, nd))
else
update_halo!(arr; dims=ntuple(identity, nd))
end
end
# ── Direct MPI halo exchange (any array size) ────────────────────────────────
#
# IGG pre-allocates MPI send/recv buffers sized for the registered fine grid.
# Calling update_halo! on coarse multigrid arrays produces garbage. This
# function performs a direct MPI halo exchange using Isend/Irecv! with freshly
# allocated buffers sized for the actual array. It exchanges 2-cell-wide
# slabs in each active spatial dimension.
# Pre-allocated MPI send/recv buffers keyed by (eltype, slab_shape, dim_tag).
const _mpi_bufs = Dict{Tuple, NTuple{4,Array}}()
function _get_mpi_bufs(::Type{T}, slab_shape::Tuple, dim::Int) where T
get!(_mpi_bufs, (T, slab_shape, dim)) do
(zeros(T, slab_shape), zeros(T, slab_shape),
zeros(T, slab_shape), zeros(T, slab_shape))
end
end
function _slab(arr::AbstractArray, dim::Int, r::UnitRange)
colons = ntuple(i -> i == dim ? r : (:), ndims(arr))
@view arr[colons...]
end
# Pre-allocated request buffer (max 4 requests per dim exchange)
const _mpi_reqs = MPI.Request[MPI.REQUEST_NULL for _ in 1:4]
function _scalar_halo_mpi!(arr::AbstractArray{T}) where T
g = ImplicitGlobalGrid.global_grid()
nd = _ndims_active()
N = size(arr)
comm = _comm()
for dim in 1:nd
nleft = g.neighbors[1, dim]
nright = g.neighbors[2, dim]
(nleft < 0 && nright < 0) && continue
slab_shape = ntuple(i -> i == dim ? 2 : N[i], ndims(arr))
send_left, recv_left, send_right, recv_right = _get_mpi_bufs(T, slab_shape, dim)
# Pack send buffers
copyto!(send_left, _slab(arr, dim, 3:4))
copyto!(send_right, _slab(arr, dim, N[dim]-3:N[dim]-2))
# Post all non-blocking sends/recvs, then Waitall for max overlap
nreqs = 0
if nright >= 0
nreqs += 1; _mpi_reqs[nreqs] = MPI.Isend(send_right, comm; dest=nright, tag=dim*10)
nreqs += 1; _mpi_reqs[nreqs] = MPI.Irecv!(recv_right, comm; source=nright, tag=dim*10+1)
end
if nleft >= 0
nreqs += 1; _mpi_reqs[nreqs] = MPI.Isend(send_left, comm; dest=nleft, tag=dim*10+1)
nreqs += 1; _mpi_reqs[nreqs] = MPI.Irecv!(recv_left, comm; source=nleft, tag=dim*10)
end
MPI.Waitall(MPI.RequestSet(_mpi_reqs[1:nreqs]))
# Unpack recv buffers
if nleft >= 0
copyto!(_slab(arr, dim, 1:2), recv_left)
end
if nright >= 0
copyto!(_slab(arr, dim, N[dim]-1:N[dim]), recv_right)
end
end
end
# ── Unified scalar halo exchange ─────────────────────────────────────────────
function _is_fine(arr::AbstractArray)
g = ImplicitGlobalGrid.global_grid()
nd = _ndims_active()
size(arr)[1:nd] == Tuple(g.nxyz[1:nd])
end
function _do_scalar_halo!(arr::AbstractArray)
_has_neighbors[] || return
if _is_fine(arr)
_scalar_halo_igg!(arr)
else
_scalar_halo_mpi!(arr)
end
end
# ── Vector (velocity-shaped) halo exchange ────────────────────────────────────
const _halo_bufs = Dict{Tuple, Array}()
function _get_halo_buf(::Type{T}, dims::NTuple{N,Int}) where {T,N}
get!(() -> Array{T}(undef, dims), _halo_bufs, (T, dims))
end
function _do_velocity_halo!(u::AbstractArray{T,N}) where {T,N}
_has_neighbors[] || return
D = size(u, N) # number of components (last dim)
sp = ntuple(_ -> :, N-1) # all spatial dims as Colons
sdims = size(u)[1:N-1] # spatial dimensions
tmp = _get_halo_buf(T, sdims) # single pre-allocated buffer
for d in 1:D
copyto!(tmp, @view u[sp..., d])
_do_scalar_halo!(tmp)
copyto!(@view(u[sp..., d]), tmp)
end
end
# ── Dispatch hooks for Parallel ──────────────────────────────────────────────
WaterLily._global_allreduce(x, ::Parallel) = MPI.Allreduce(x, MPI.SUM, _comm())
WaterLily._global_min(a, b, ::Parallel) = MPI.Allreduce(min(a, b), MPI.MIN, _comm())
WaterLily._scalar_halo!(x, ::Parallel) = _do_scalar_halo!(x)
WaterLily._velocity_halo!(u, ::Parallel) = _do_velocity_halo!(u)
# Communication hooks: in parallel, MPI halo handles periodicity
WaterLily._comm!(a, perdir, ::Parallel) = _do_scalar_halo!(a)
WaterLily._velocity_comm!(a, perdir, ::Parallel) = _do_velocity_halo!(a)
# ── MPI-aware exitBC! ────────────────────────────────────────────────────────
function WaterLily._exitBC!(u, u⁰, Δt, ::Parallel)
g = ImplicitGlobalGrid.global_grid()
comm = _comm()
N, _ = WaterLily.size_u(u)
is_inflow = g.neighbors[1, 1] < 0
is_exit = g.neighbors[2, 1] < 0
# All ranks participate in Allreduce for exit face area
local_exit_len = is_exit ? length(WaterLily.slice(N .- 2, N[1] - 1, 1, 3)) : 0
global_exit_len = MPI.Allreduce(local_exit_len, MPI.SUM, comm)
# All ranks participate in Allreduce for mean inflow velocity
local_inflow_sum = is_inflow ? sum(@view(u[WaterLily.slice(N .- 2, 2, 1, 3), 1])) : zero(eltype(u))
U = MPI.Allreduce(local_inflow_sum, MPI.SUM, comm) / global_exit_len
# Convective exit on rightmost-x ranks only
if is_exit
exitR = WaterLily.slice(N .- 2, N[1] - 1, 1, 3)
@loop u[I, 1] = u⁰[I, 1] - U * Δt * (u⁰[I, 1] - u⁰[I - WaterLily.δ(1, I), 1]) over I ∈ exitR
end
# All ranks participate in Allreduce for mass flux correction
local_exit_sum = is_exit ? sum(@view(u[WaterLily.slice(N .- 2, N[1] - 1, 1, 3), 1])) : zero(eltype(u))
global_exit_sum = MPI.Allreduce(local_exit_sum, MPI.SUM, comm)
∮u = global_exit_sum / global_exit_len - U
if is_exit
exitR = WaterLily.slice(N .- 2, N[1] - 1, 1, 3)
@loop u[I, 1] -= ∮u over I ∈ exitR
end
_do_velocity_halo!(u)
end
# ── MPI-aware wallBC_L! ──────────────────────────────────────────────────────
function WaterLily._wallBC_L!(L, perdir, ::Parallel)
g = ImplicitGlobalGrid.global_grid()
N, n = WaterLily.size_u(L)
for j in 1:n
j in perdir && continue
if g.neighbors[1, j] < 0 # physical left wall
@loop L[I,j] = zero(eltype(L)) over I ∈ WaterLily.slice(N, 3, j)
end
if g.neighbors[2, j] < 0 # physical right wall
@loop L[I,j] = zero(eltype(L)) over I ∈ WaterLily.slice(N, N[j]-1, j)
end
end
_do_velocity_halo!(L)
end
# ── MPI-aware divisible ───────────────────────────────────────────────────────
# Same threshold as serial (N>4). Coarse-level comm cost is negligible thanks
# to `_has_neighbors` short-circuiting (no exchange when no MPI neighbors exist)
# and tiny array sizes at the coarsest levels.
WaterLily._divisible(N, ::Parallel) = mod(N,2)==0 && N>4
end # module WaterLilyMPIExt