-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtst_var_bput_vars.py
131 lines (115 loc) · 5.23 KB
/
tst_var_bput_vars.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#
# Copyright (C) 2024, Northwestern University and Argonne National Laboratory
# See COPYRIGHT notice in top-level directory.
#
"""
This example program is intended to illustrate the use of the pnetCDF python API.
The program runs in non-blocking mode and makes a request to write an subsampled array of values
to a variable into a netCDF variable of an opened netCDF file using bput_var method of `Variable`
class. This method is a buffered version of bput_var and requires the user to attach an internal
buffer of size equal to the sum of all requests using attach_buff method of `File` class. The
library will internally invoke ncmpi_bput_var and ncmpi_attach_buffer in C.
"""
import pnetcdf
from numpy.random import seed, randint
from numpy.testing import assert_array_equal, assert_equal, assert_array_almost_equal
import tempfile, unittest, os, random, sys
import numpy as np
from mpi4py import MPI
from pnetcdf import strerror, strerrno
from utils import validate_nc_file
import io
seed(0)
file_formats = ['NC_64BIT_DATA', 'NC_64BIT_OFFSET', None]
file_name = "tst_var_bput_vars.nc"
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
xdim=9; ydim=10; zdim=size*10
# initial values for netCDF variable
data = np.zeros((xdim,ydim,zdim)).astype('i4')
# new array that will be written using bput_var (overwriting some parts of initial values)
datam = randint(0,10, size=(1,3,5)).astype('i4')
# reference array for comparison in the testing phase
datares1, datares2 = data.copy(), data.copy()
for i in range(size):
datares1[3:4:1,0:6:2,i*10:(i+1)*10:2] = datam
# number of put requests planning to post
num_reqs = 10
class VariablesTestCase(unittest.TestCase):
def setUp(self):
if (len(sys.argv) == 2) and os.path.isdir(sys.argv[1]):
self.file_path = os.path.join(sys.argv[1], file_name)
else:
self.file_path = file_name
self._file_format = file_formats.pop(0)
f = pnetcdf.File(filename=self.file_path, mode = 'w', format=self._file_format, comm=comm, info=None)
f.def_dim('x',xdim)
f.def_dim('xu',-1)
f.def_dim('y',ydim)
f.def_dim('z',zdim)
# estimate the memory buffer size of all requests and attach buffer for buffered put requests
buffsize = num_reqs * datam.nbytes
f.attach_buff(buffsize)
assert(f.inq_buff_size() == buffsize)
# define 20 netCDF variables
for i in range(num_reqs * 2):
v = f.def_var(f'data{i}', pnetcdf.NC_INT, ('xu','y','z'))
# initialize variable values
f.enddef()
for i in range(num_reqs * 2):
v = f.variables[f'data{i}']
v[:] = data
# each process post 10 requests to write a subsampled array of values
req_ids = []
starts = np.array([3, 0, 10 * rank])
counts = np.array([1, 3, 5])
strides = np.array([1, 2, 2])
for i in range(num_reqs):
v = f.variables[f'data{i}']
# post the request to write a subsampled array of values
req_id = v.bput_var(datam, start = starts, count = counts, stride = strides)
# track the reqeust ID for each write reqeust
req_ids.append(req_id)
f.end_indep()
# all processes commit those 10 requests to the file at once using wait_all (collective i/o)
req_errs = [None] * num_reqs
f.wait_all(num_reqs, req_ids, req_errs)
# check request error msg for each unsuccessful requests
for i in range(num_reqs):
if strerrno(req_errs[i]) != "NC_NOERR":
print(f"Error on request {i}:", strerror(req_errs[i]))
# post 10 requests to write a subsampled arrays of values for the last 10 variables w/o tracking req ids
for i in range(num_reqs, num_reqs * 2):
v = f.variables[f'data{i}']
# post the request to write a subsampled array of values
v.bput_var(datam, start = starts, count = counts, stride = strides)
# all processes commit all pending requests to the file at once using wait_all (collective i/o)
f.wait_all(num = pnetcdf.NC_PUT_REQ_ALL)
# relase the internal buffer
f.detach_buff()
f.close()
assert validate_nc_file(os.environ.get('PNETCDF_DIR'), self.file_path) == 0 if os.environ.get('PNETCDF_DIR') is not None else True
def runTest(self):
"""testing variable bput vars for CDF-5/CDF-2/CDF-1 file format"""
f = pnetcdf.File(self.file_path, 'r')
# test bput vars and collective i/o wait_all
for i in range(num_reqs * 2):
v = f.variables[f'data{i}']
assert_array_equal(v[:], datares1)
def tearDown(self):
# remove the temporary file if test file directory not specified
comm.Barrier()
if (rank == 0) and not((len(sys.argv) == 2) and os.path.isdir(sys.argv[1])):
os.remove(self.file_path)
if __name__ == '__main__':
suite = unittest.TestSuite()
for i in range(len(file_formats)):
suite.addTest(VariablesTestCase())
runner = unittest.TextTestRunner()
output = io.StringIO()
runner = unittest.TextTestRunner(stream=output)
result = runner.run(suite)
if not result.wasSuccessful():
print(output.getvalue())
sys.exit(1)