-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtest_gather.py
More file actions
113 lines (80 loc) · 2.64 KB
/
test_gather.py
File metadata and controls
113 lines (80 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import numpy as np
import pyfms
def test_gather_2d():
pyfms.fms.init(ndomain=2)
for convert in [True, False]:
nx, ny = 12, 24
global_indices = [0, nx - 1, 0, ny - 1]
# define domain
layout = pyfms.mpp_domains.define_layout(global_indices, pyfms.mpp.npes())
domain = pyfms.mpp_domains.define_domains(global_indices, layout)
is_root_pe = pyfms.mpp.pe() == pyfms.mpp.root_pe()
# data to send
global_data = np.array(
[[i * 100 + j for j in range(ny)] for i in range(nx)], dtype=np.float64
)
send = global_data[domain.isc : domain.iec + 1, domain.jsc : domain.jec + 1]
if not convert:
global_data = global_data.T
send = send.T
rbuf_shape = None
if is_root_pe:
if convert:
rbuf_shape = [nx, ny]
else:
rbuf_shape = [ny, nx]
pelist = pyfms.mpp.get_current_pelist(pyfms.mpp.npes())
gathered = pyfms.mpp.gather(
send,
rbuf_shape=rbuf_shape,
domain=domain,
pelist=pelist,
convert_cf_order=convert,
)
if pyfms.mpp.pe() == pyfms.mpp.root_pe():
assert np.all(global_data == gathered)
else:
assert gathered is None
pyfms.fms.end()
def test_gather_1d():
sbuf_size = 5
def buffer(ipe):
return [ipe * 10 + i for i in range(sbuf_size)]
pyfms.fms.init()
pe = pyfms.mpp.pe()
npes = pyfms.mpp.npes()
is_root_pe = pyfms.mpp.pe() == pyfms.mpp.root_pe()
send = np.array(buffer(pe), dtype=np.float64)
if is_root_pe:
rbuf_size = sbuf_size * npes
else:
rbuf_size = None
receive = pyfms.mpp.gather(np.array(send), rbuf_size=rbuf_size)
if is_root_pe:
answers = []
for ipe in range(npes):
answers += buffer(ipe)
np.testing.assert_array_equal(receive, answers)
else:
assert receive is None
pyfms.fms.end()
def test_gatherv_1d():
def buffer(ipe):
return [ipe * 10 + i for i in range(ipe + 2)]
pyfms.fms.init()
pe = pyfms.mpp.pe()
is_root_pe = pe == pyfms.mpp.root_pe()
sbuf = np.array(buffer(pe), dtype=np.float64)
if is_root_pe:
rsize = [ipe + 2 for ipe in range(pyfms.mpp.npes())]
else:
rsize = None
receive = pyfms.mpp.gatherv(sbuf, ssize=pe + 2, rsize=rsize)
if is_root_pe:
answers = []
for ipe in range(pyfms.mpp.npes()):
answers += buffer(ipe)
np.testing.assert_array_equal(receive, answers)
else:
assert receive is None
pyfms.fms.end()