Skip to content

Commit 767d575

Browse files
committed
add distributed reductions
1 parent 28b40e1 commit 767d575

File tree

3 files changed

+107
-3
lines changed

3 files changed

+107
-3
lines changed

src/DistributedComputations/distributed_fields.jl

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,87 @@ function reconstruct_global_field(field::DistributedField)
9494

9595
return global_field
9696
end
97+
98+
function maybe_all_reduce!(op, f::ReducedDistributedField, dims)
99+
reduced_dims = reduced_dimensions(f)
100+
101+
if any(reduced_dims .∈ tuple(dims...))
102+
all_reduce!(op, interior(f), architecture(f))
103+
end
104+
105+
return f
106+
end
107+
108+
function maybe_all_reduce!(op, f::ReducedDistributedField, ::Colon)
109+
all_reduce!(op, interior(f), architecture(f))
110+
return f
111+
end
112+
113+
# Allocating and in-place reductions
114+
for (reduction, all_reduce_op) in zip((:sum, :maximum, :minimum, :all, :any, :prod),
115+
( +, max, min, &, |, *))
116+
117+
reduction! = Symbol(reduction, '!')
118+
119+
@eval begin
120+
121+
# In-place
122+
function Base.$(reduction!)(f::Function,
123+
r::ReducedAbstractField,
124+
a::DistributedField;
125+
condition = nothing,
126+
mask = get_neutral_mask(Base.$(reduction!)),
127+
dims = :,
128+
kwargs...)
129+
130+
operand = condition_operand(f, a, condition, mask)
131+
132+
Base.$(reduction!)(identity,
133+
interior(r),
134+
operand;
135+
dims,
136+
kwargs...)
137+
138+
return maybe_all_reduce!(all_reduce_op, r, dims)
139+
end
140+
141+
function Base.$(reduction!)(r::ReducedAbstractField,
142+
a::DistributedField;
143+
condition = nothing,
144+
mask = get_neutral_mask(Base.$(reduction!)),
145+
dims = :,
146+
kwargs...)
147+
148+
Base.$(reduction!)(identity,
149+
interior(r),
150+
condition_operand(a, condition, mask);
151+
dims,
152+
kwargs...)
153+
154+
return maybe_all_reduce!(all_reduce_op, r, dims)
155+
end
156+
157+
# Allocating
158+
function Base.$(reduction)(f::Function,
159+
c::DistributedField;
160+
condition = nothing,
161+
mask = get_neutral_mask(Base.$(reduction!)),
162+
dims = :)
163+
164+
conditioned_c = condition_operand(f, c, condition, mask)
165+
T = filltype(Base.$(reduction!), c)
166+
loc = reduced_location(location(c); dims)
167+
r = Field(loc, c.grid, T; indices=indices(c))
168+
initialize_reduced_field!(Base.$(reduction!), identity, r, conditioned_c)
169+
Base.$(reduction!)(identity, interior(r), conditioned_c, init=false)
170+
171+
maybe_all_reduce!(all_reduce_op, r, dims)
172+
173+
if dims isa Colon
174+
return CUDA.@allowscalar first(r)
175+
else
176+
return r
177+
end
178+
end
179+
end
180+
end

src/Fields/field.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ for reduction in (:sum, :maximum, :minimum, :all, :any, :prod)
722722
end
723723
end
724724

725-
# Improve me! We can should both the extrama in one single reduction instead of two
725+
# Improve me! We can should both the extrema in one single reduction instead of two
726726
Base.extrema(c::AbstractField; kwargs...) = (minimum(c; kwargs...), maximum(c; kwargs...))
727727
Base.extrema(f, c::AbstractField; kwargs...) = (minimum(f, c; kwargs...), maximum(f, c; kwargs...))
728728

test/test_distributed_models.jl

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ include_corners ? view(f.data, :, :, right_halo_indices(instantiate(LZ), instant
7474
interior_indices(instantiate(LY), instantiate(topology(f, 2)), f.grid.Ny),
7575
right_halo_indices(instantiate(LZ), instantiate(topology(f, 3)), f.grid.Nz, f.grid.Hz))
7676

77-
7877
function southwest_halo(f::AbstractField)
7978
Nx, Ny, _ = size(f.grid)
8079
Hx, Hy, _ = halo_size(f.grid)
@@ -409,7 +408,7 @@ end
409408

410409
@testset "Distributed MPI Oceananigans" begin
411410
@info "Testing distributed MPI Oceananigans..."
412-
411+
#=
413412
@testset "Multi architectures rank connectivity" begin
414413
@info " Testing multi architecture rank connectivity..."
415414
test_triply_periodic_rank_connectivity_with_411_ranks()
@@ -463,8 +462,28 @@ end
463462
@test child_architecture(architecture(cpuosg)) == CPU()
464463
end
465464
end
465+
=#
466+
@testset "Distributed reductions" begin
467+
child_arch = get(ENV, "TEST_ARCHITECTURE", "CPU") == "GPU" ? GPU() : CPU()
466468

469+
for partition in [Partition(1, 4), Partition(2, 2), Partition(4, 1)]
470+
@info "Time-stepping a distributed NonhydrostaticModel with partition $partition..."
471+
arch = Distributed(child_arch; partition)
472+
grid = RectilinearGrid(arch, topology=(Periodic, Periodic, Periodic), size=(8, 8, 1), extent=(1, 2, 3))
473+
c = CenterField(grid)
474+
set!(c, arch.local_rank+1)
475+
476+
c_reduced = Field{Nothing, Nothing, Nothing}(grid)
477+
478+
N = grid.Nx * grid.Ny # local rank grid size
479+
@test sum(c) == 1*N + 2*N + 3*N + 4*N
480+
481+
sum!(c_reduced, c)
482+
@test CUDA.@allowscalar c_reduced.data[1, 1, 1] == 1*N + 2*N + 3*N + 4*N
483+
end
484+
end
467485

486+
#=
468487
# Only test on CPU because we do not have a GPU pressure solver yet
469488
@testset "Time stepping NonhydrostaticModel" begin
470489
child_arch = get(ENV, "TEST_ARCHITECTURE", "CPU") == "GPU" ? GPU() : CPU()
@@ -502,5 +521,6 @@ end
502521
@test model isa ShallowWaterModel
503522
@test model.clock.time ≈ 2
504523
end
524+
=#
505525
end
506526

0 commit comments

Comments
 (0)