-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtst_var_iget_varn.py
206 lines (175 loc) · 7.91 KB
/
tst_var_iget_varn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#
# Copyright (C) 2024, Northwestern University and Argonne National Laboratory
# See COPYRIGHT notice in top-level directory.
#
"""
This program tests iget_varn() method of `Variable` class, by making a
nonblocking request to read a list of subarrays of a netCDF variable of an
opened netCDF file. The library will internally invoke ncmpi_iget_varn() in C.
"""
import pnetcdf
from numpy.random import seed
from numpy.testing import assert_array_equal
import unittest, os, random, sys
import numpy as np
from mpi4py import MPI
from pnetcdf import strerror, strerrno
from utils import validate_nc_file
import io
seed(0)
file_formats = ['NC_64BIT_DATA', 'NC_64BIT_OFFSET', None]
file_name = "tst_var_iget_varn.nc"
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
xdim=4; ydim=10
# max number of subarrays requested among all put requests from all ranks
MAX_NUM_REQS = 6
NDIMS = 2
# data to store in the NetCDF variable
data = np.array([[3, 3, 3, 1, 1, 0, 0, 2, 1, 1],
[0, 2, 2, 2, 3, 1, 1, 2, 2, 2],
[1, 1, 2, 3, 3, 3, 0, 0, 1, 1],
[0, 0, 0, 2, 1, 1, 1, 3, 3, 3]], np.float32)
starts = np.zeros((MAX_NUM_REQS, NDIMS), dtype=np.int64)
counts = np.zeros((MAX_NUM_REQS, NDIMS), dtype=np.int64)
#initialize variable values
if rank == 0:
# number of subarrays requested for each iget_var
num_subarray_reqs = 4
starts[0][0] = 0; starts[0][1] = 5; counts[0][0] = 1; counts[0][1] = 2
starts[1][0] = 1; starts[1][1] = 0; counts[1][0] = 1; counts[1][1] = 1
starts[2][0] = 2; starts[2][1] = 6; counts[2][0] = 1; counts[2][1] = 2
starts[3][0] = 3; starts[3][1] = 0; counts[3][0] = 1; counts[3][1] = 3
# rank 0 is reading the following locations: ("-" means skip)
# - - - - - 0 0 - - -
# 0 - - - - - - - - -
# - - - - - - 0 0 - -
# 0 0 0 - - - - - - -
elif rank == 1:
num_subarray_reqs = 6
starts[0][0] = 0; starts[0][1] = 3; counts[0][0] = 1; counts[0][1] = 2
starts[1][0] = 0; starts[1][1] = 8; counts[1][0] = 1; counts[1][1] = 2
starts[2][0] = 1; starts[2][1] = 5; counts[2][0] = 1; counts[2][1] = 2
starts[3][0] = 2; starts[3][1] = 0; counts[3][0] = 1; counts[3][1] = 2
starts[4][0] = 2; starts[4][1] = 8; counts[4][0] = 1; counts[4][1] = 2
starts[5][0] = 3; starts[5][1] = 4; counts[5][0] = 1; counts[5][1] = 3
# rank 1 is reading the following locations: ("-" means skip)
# - - - 1 1 - - - 1 1
# - - - - - 1 1 - - -
# 1 1 - - - - - - 1 1
# - - - - 1 1 1 - - -
elif rank == 2:
num_subarray_reqs = 5
starts[0][0] = 0; starts[0][1] = 7; counts[0][0] = 1; counts[0][1] = 1
starts[1][0] = 1; starts[1][1] = 1; counts[1][0] = 1; counts[1][1] = 3
starts[2][0] = 1; starts[2][1] = 7; counts[2][0] = 1; counts[2][1] = 3
starts[3][0] = 2; starts[3][1] = 2; counts[3][0] = 1; counts[3][1] = 1
starts[4][0] = 3; starts[4][1] = 3; counts[4][0] = 1; counts[4][1] = 1
# rank 2 is reading the following locations: ("-" means skip)
# - - - - - - - 2 - -
# - 2 2 2 - - - 2 2 2
# - - 2 - - - - - - -
# - - - 2 - - - - - -
elif rank == 3:
num_subarray_reqs = 4
starts[0][0] = 0; starts[0][1] = 0; counts[0][0] = 1; counts[0][1] = 3
starts[1][0] = 1; starts[1][1] = 4; counts[1][0] = 1; counts[1][1] = 1
starts[2][0] = 2; starts[2][1] = 3; counts[2][0] = 1; counts[2][1] = 3
starts[3][0] = 3; starts[3][1] = 7; counts[3][0] = 1; counts[3][1] = 3
# rank 3 is reading the following locations: ("-" means skip)
# 3 3 3 - - - - - - -
# - - - - 3 - - - - -
# - - - 3 3 3 - - - -
# - - - - - - - 3 3 3
else:
num_subarray_reqs = 0
# obtain the buffer size of returned array
buf_len = 0
for i in range(num_subarray_reqs):
w_req_len = np.prod(counts[i,:])
buf_len += w_req_len
# generate reference array for comparing at the testing phase
dataref = np.full((buf_len,), rank, np.float32)
# total number of subarray requests for this test program
num_reqs = 10
# initialize a list to store references of variable values
v_datas = []
class VariablesTestCase(unittest.TestCase):
def setUp(self):
if (len(sys.argv) == 2) and os.path.isdir(sys.argv[1]):
self.file_path = os.path.join(sys.argv[1], file_name)
else:
self.file_path = file_name
self._file_format = file_formats.pop(0)
f = pnetcdf.File(filename=self.file_path, mode = 'w',
format=self._file_format, comm=comm, info=None)
dx = f.def_dim('x',xdim)
dy = f.def_dim('y',ydim)
# define 20 netCDF variables
for i in range(num_reqs * 2):
v = f.def_var(f'data{i}', pnetcdf.NC_FLOAT, (dx, dy))
f.enddef()
# initialize and write variable values
for i in range(num_reqs * 2):
v = f.variables[f'data{i}']
v[:] = data
f.close()
if os.environ.get('PNETCDF_DIR') is not None:
assert validate_nc_file(os.environ.get('PNETCDF_DIR'), self.file_path) == 0
f = pnetcdf.File(self.file_path, 'r')
# each process post 10 requests to read a list of subarrays from the variable
req_ids = []
v_datas.clear()
for i in range(num_reqs):
v = f.variables[f'data{i}']
buff = np.empty(shape = buf_len, dtype = v.datatype)
# post the request to read multiple slices (subarrays) of the variable
req_id = v.iget_varn(buff, num_subarray_reqs, starts, counts)
# track the reqeust ID for each read reqeust
req_ids.append(req_id)
# store the reference of variable values
v_datas.append(buff)
# commit those 10 requests to the file at once using wait_all (collective i/o)
req_errs = [None] * num_reqs
f.wait_all(num_reqs, req_ids, req_errs)
# check request error msg for each unsuccessful requests
for i in range(num_reqs):
if strerrno(req_errs[i]) != "NC_NOERR":
print(f"Error on request {i}:", strerror(req_errs[i]))
# post 10 requests to read an array of values for the last 10
# variables w/o tracking req ids
for i in range(num_reqs, num_reqs * 2):
v = f.variables[f'data{i}']
buff = np.empty(buf_len, dtype = v.datatype)
# post the request to read a list of subarrays from the variable
v.iget_varn(buff, num_subarray_reqs, starts, counts)
# store the reference of variable values
v_datas.append(buff)
# commit all pending get requests to the file at once using wait_all
# (collective i/o)
req_errs = f.wait_all(num = pnetcdf.NC_GET_REQ_ALL)
f.close()
if os.environ.get('PNETCDF_DIR') is not None:
assert validate_nc_file(os.environ.get('PNETCDF_DIR'), self.file_path) == 0
def tearDown(self):
# remove the temporary files if the test file directory is not specified
comm.Barrier()
if (rank == 0) and not((len(sys.argv) == 2) and os.path.isdir(sys.argv[1])):
os.remove(self.file_path)
def runTest(self):
"""testing variable iget varn method for CDF-5/CDF-2/CDF-1 file format"""
# test iget_varn and collective i/o wait_all
for i in range(num_reqs * 2):
assert_array_equal(v_datas[i], dataref)
if __name__ == '__main__':
suite = unittest.TestSuite()
for i in range(len(file_formats)):
suite.addTest(VariablesTestCase())
runner = unittest.TextTestRunner()
output = io.StringIO()
runner = unittest.TextTestRunner(stream=output)
result = runner.run(suite)
if not result.wasSuccessful():
print(output.getvalue())
sys.exit(1)