-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtst_var_iget_vars.py
130 lines (115 loc) · 5.07 KB
/
tst_var_iget_vars.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#
# Copyright (C) 2024, Northwestern University and Argonne National Laboratory
# See COPYRIGHT notice in top-level directory.
#
"""
This example program is intended to illustrate the use of the pnetCDF python API.The
program runs in non-blocking mode and makes a request to read an subsampled array of values
from a netCDF variable of an opened netCDF file using iget_var method of `Variable` class. The
library will internally invoke ncmpi_iget_vars in C.
"""
import pnetcdf
from numpy.random import seed, randint
from numpy.testing import assert_array_equal, assert_equal, assert_array_almost_equal
import tempfile, unittest, os, random, sys
import numpy as np
from mpi4py import MPI
from pnetcdf import strerror, strerrno
from utils import validate_nc_file
import io
seed(0)
file_formats = ['NC_64BIT_DATA', 'NC_64BIT_OFFSET', None]
file_name = "tst_var_iget_vars.nc"
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
xdim=9; ydim=10; zdim=size*10
# initial values for netCDF variable
data = randint(0,10, size=(xdim,ydim,zdim)).astype('i4')
# generate reference arrayes for testing
dataref = []
for i in range(size):
dataref.append(data[3:4:1,0:6:2,i*10:(i+1)*10:2])
num_reqs = 10
# initialize a list to store references of variable values
v_datas = []
class VariablesTestCase(unittest.TestCase):
def setUp(self):
if (len(sys.argv) == 2) and os.path.isdir(sys.argv[1]):
self.file_path = os.path.join(sys.argv[1], file_name)
else:
self.file_path = file_name
self._file_format = file_formats.pop(0)
f = pnetcdf.File(filename=self.file_path, mode = 'w', format=self._file_format, comm=comm, info=None)
f.def_dim('xu',-1)
f.def_dim('y',ydim)
f.def_dim('z',zdim)
# define 20 netCDF variables
for i in range(num_reqs * 2):
v = f.def_var(f'data{i}', pnetcdf.NC_INT, ('xu','y','z'))
# initialize variable values
f.enddef()
for i in range(num_reqs * 2):
v = f.variables[f'data{i}']
v[:] = data
f.close()
comm.Barrier()
assert validate_nc_file(os.environ.get('PNETCDF_DIR'), self.file_path) == 0 if os.environ.get('PNETCDF_DIR') is not None else True
f = pnetcdf.File(self.file_path, 'r')
# each process post 10 requests to read an subsampled array of values
req_ids = []
# reinialize the list of returned data references
v_datas.clear()
starts = np.array([3,0,10*rank])
counts = np.array([1,3,5])
strides = np.array([1,2,2])
for i in range(num_reqs):
v = f.variables[f'data{i}']
buff = np.empty(shape = counts, dtype = v.datatype)
# post the request to read one part of the variable
req_id = v.iget_var(buff, start = starts, count = counts, stride = strides)
# track the reqeust ID for each read reqeust
req_ids.append(req_id)
# store the reference of variable values
v_datas.append(buff)
f.end_indep()
# commit those 10 requests to the file at once using wait_all (collective i/o)
req_errs = [None] * num_reqs
f.wait_all(num_reqs, req_ids, req_errs)
# check request error msg for each unsuccessful requests
for i in range(num_reqs):
if strerrno(req_errs[i]) != "NC_NOERR":
print(f"Error on request {i}:", strerror(req_errs[i]))
# post 10 requests to read a subsampled array of values for the last 10 variables w/o tracking req ids
for i in range(num_reqs, num_reqs * 2):
v = f.variables[f'data{i}']
buff = np.empty(shape = counts, dtype = v.datatype)
# post the request to read an subsampled array of values
v.iget_var(buff, start = starts, count = counts, stride = strides)
# store the reference of variable values
v_datas.append(buff)
# commit all pending get requests to the file at once using wait_all (collective i/o)
req_errs = f.wait_all(num = pnetcdf.NC_GET_REQ_ALL)
f.close()
assert validate_nc_file(os.environ.get('PNETCDF_DIR'), self.file_path) == 0 if os.environ.get('PNETCDF_DIR') is not None else True
def tearDown(self):
# remove the temporary files if test file directory not specified
comm.Barrier()
if (rank == 0) and not((len(sys.argv) == 2) and os.path.isdir(sys.argv[1])):
os.remove(self.file_path)
def runTest(self):
"""testing variable iget_vars method for CDF-5/CDF-2/CDF-1 file format"""
# test iget_vars and collective i/o wait_all
for i in range(num_reqs * 2):
assert_array_equal(v_datas[i], dataref[rank])
if __name__ == '__main__':
suite = unittest.TestSuite()
for i in range(len(file_formats)):
suite.addTest(VariablesTestCase())
runner = unittest.TextTestRunner()
output = io.StringIO()
runner = unittest.TextTestRunner(stream=output)
result = runner.run(suite)
if not result.wasSuccessful():
print(output.getvalue())
sys.exit(1)