Skip to content

Commit 35bf19d

Browse files
committed
Move data section in chunks of 16 MB per process
1 parent be79ba8 commit 35bf19d

File tree

2 files changed

+72
-52
lines changed

2 files changed

+72
-52
lines changed

sneak_peek.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ This is essentially a placeholder for the next release note ...
66
+ none
77

88
* New optimization
9-
+ none
9+
+ When file header extent size grows, moving the data section to a higher
10+
file offset has changed to be done in chunks of 16 MB per process.
11+
See [PR #174](https://github.com/Parallel-NetCDF/PnetCDF/pull/174),
1012

1113
* New Limitations
1214
+ none

src/drivers/ncmpio/ncmpio_enddef.c

Lines changed: 69 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -39,75 +39,85 @@ move_file_block(NC *ncp,
3939
MPI_Offset from, /* source file starting offset */
4040
MPI_Offset nbytes) /* amount to be moved */
4141
{
42-
int rank, bufcount, mpireturn, err, status=NC_NOERR, min_st;
42+
int rank, nprocs, mpireturn, err, status=NC_NOERR, min_st;
4343
void *buf;
44-
size_t chunk_size;
44+
size_t num_moves, mv_amnt, p_units;
45+
MPI_Offset off_last, off_from, off_to;
4546
MPI_Status mpistatus;
4647
MPI_File fh;
4748

4849
rank = ncp->rank;
50+
nprocs = ncp->nprocs;
4951

5052
/* moving file blocks must be done in collective mode, ignoring NC_HCOLL */
5153
fh = ncp->collective_fh;
5254

53-
/* Divide amount nbytes among all processes. If the divided amount,
54-
* chunk_size, is larger then MOVE_UNIT, set chunk_size to be the move unit
55-
* size per process (make sure it is <= NC_MAX_INT, as MPI read/write APIs
56-
* use 4-byte int in their count argument.)
57-
*/
58-
#define MOVE_UNIT 67108864
59-
chunk_size = nbytes / ncp->nprocs;
60-
if (nbytes % ncp->nprocs) chunk_size++;
61-
if (chunk_size > MOVE_UNIT) {
62-
/* move data in multiple rounds, MOVE_UNIT per process at a time */
63-
chunk_size = MOVE_UNIT;
64-
}
55+
/* make fileview entire file visible */
56+
TRACE_IO(MPI_File_set_view)(fh, 0, MPI_BYTE, MPI_BYTE, "native",
57+
MPI_INFO_NULL);
6558

59+
/* Divide amount nbytes into chunks of size MOVE_UNIT each and assign
60+
* chunks among all processes. If the number of chunks is larger then
61+
* the number of processes, carry out the data move in multiple rounds.
62+
*/
63+
#define MOVE_UNIT 16777216
6664
/* buf will be used as a temporal buffer to move data in chunks, i.e.
67-
* read a chunk and later write to the new location */
68-
buf = NCI_Malloc(chunk_size);
65+
* read a chunk and later write to the new location
66+
*/
67+
buf = NCI_Malloc(MOVE_UNIT);
6968
if (buf == NULL) DEBUG_RETURN_ERROR(NC_ENOMEM)
7069

71-
/* make fileview entire file visible */
72-
TRACE_IO(MPI_File_set_view)(fh, 0, MPI_BYTE, MPI_BYTE, "native",
73-
MPI_INFO_NULL);
70+
p_units = MOVE_UNIT * nprocs;
71+
num_moves = nbytes / p_units;
72+
if (nbytes % p_units) num_moves++;
73+
off_last = (num_moves - 1) * p_units + rank * MOVE_UNIT;
74+
off_from = from + off_last;
75+
off_to = to + off_last;
76+
mv_amnt = nbytes % p_units;
77+
if (mv_amnt == 0 && nbytes > 0) mv_amnt = p_units;
7478

75-
/* move the variable starting from its tail toward its beginning */
79+
/* move the data section starting from its tail toward its beginning */
7680
while (nbytes > 0) {
77-
int get_size=0;
81+
int chunk_size, get_size=0;
7882

79-
/* calculate how much to move at each time. chunk_size has been
80-
* checked, must be < NC_MAX_INT
81-
*/
82-
bufcount = (int)chunk_size;
83-
if (nbytes < (MPI_Offset)ncp->nprocs * chunk_size) {
84-
/* handle the last group of chunks */
85-
MPI_Offset rem_chunks = nbytes / chunk_size;
86-
if (rank > rem_chunks) /* these processes do not read/write */
87-
bufcount = 0;
88-
else if (rank == rem_chunks) /* this process reads/writes less */
89-
/* make bufcount < chunk_size */
90-
bufcount = (int)(nbytes % chunk_size);
91-
nbytes = 0;
83+
if (mv_amnt == p_units) {
84+
/* each rank moves amount of chunk_size */
85+
chunk_size = MOVE_UNIT;
9286
}
9387
else {
94-
nbytes -= chunk_size*ncp->nprocs;
88+
/* when total move amount is less than p_units */
89+
size_t num_chunks = mv_amnt / MOVE_UNIT;
90+
if (mv_amnt % MOVE_UNIT) num_chunks++;
91+
if (rank < num_chunks) {
92+
chunk_size = MOVE_UNIT;
93+
if (rank == num_chunks - 1 && mv_amnt % MOVE_UNIT > 0)
94+
chunk_size = mv_amnt % MOVE_UNIT;
95+
assert(chunk_size > 0);
96+
}
97+
else
98+
chunk_size = 0;
9599
}
96100

101+
/* each rank moves data of size chunk_size from off_from to off_to */
102+
97103
/* explicitly initialize mpistatus object to 0. For zero-length read,
98104
* MPI_Get_count may report incorrect result for some MPICH version,
99105
* due to the uninitialized MPI_Status object passed to MPI-IO calls.
100106
* Thus we initialize it above to work around.
101107
*/
102108
memset(&mpistatus, 0, sizeof(MPI_Status));
103109

104-
/* read the original data @ from+nbytes+rank*chunk_size */
105-
TRACE_IO(MPI_File_read_at_all)(fh, from+nbytes+rank*chunk_size,
106-
buf, bufcount, MPI_BYTE, &mpistatus);
110+
/* read the original data at off_from for amount of chunk_size */
111+
if (ncp->nprocs > 1)
112+
TRACE_IO(MPI_File_read_at_all)(fh, off_from, buf, chunk_size,
113+
MPI_BYTE, &mpistatus);
114+
else
115+
TRACE_IO(MPI_File_read_at)(fh, off_from, buf, chunk_size,
116+
MPI_BYTE, &mpistatus);
107117
if (mpireturn != MPI_SUCCESS) {
108118
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_read_at_all");
109119
if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EREAD)
110-
get_size = bufcount;
120+
get_size = chunk_size;
111121
}
112122
else {
113123
/* for zero-length read, MPI_Get_count may report incorrect result
@@ -116,8 +126,8 @@ move_file_block(NC *ncp,
116126
* work around. See MPICH ticket:
117127
* https://trac.mpich.org/projects/mpich/ticket/2332
118128
*
119-
* Note we cannot set bufcount to get_size, as the actual size
120-
* read from a file may be less than bufcount. Because we are
129+
* Note we cannot set chunk_size to get_size, as the actual size
130+
* read from a file may be less than chunk_size. Because we are
121131
* moving whatever read to a new file offset, we must use the
122132
* amount actually read to call MPI_File_write_at_all below.
123133
*
@@ -139,7 +149,7 @@ move_file_block(NC *ncp,
139149
}
140150
if (status != NC_NOERR) break;
141151

142-
/* write to new location @ to+nbytes+rank*chunk_size
152+
/* write to new location at off_to for amount of chunk_size
143153
*
144154
* Ideally, we should write the amount of get_size returned from a call
145155
* to MPI_Get_count in the below MPI write. This is in case some
@@ -152,7 +162,7 @@ move_file_block(NC *ncp,
152162
* the correct value due to an internal error that fails to initialize
153163
* the MPI_Status object. Therefore, the solution can be either to
154164
* explicitly initialize the status object to zeros, or to just use
155-
* bufcount for write. Note that the latter will write the variables
165+
* chunk_size for write. Note that the latter will write the variables
156166
* that have not been written before. Below uses the former option.
157167
*/
158168

@@ -164,12 +174,12 @@ move_file_block(NC *ncp,
164174
memset(&mpistatus, 0, sizeof(MPI_Status));
165175

166176
if (ncp->nprocs > 1)
167-
TRACE_IO(MPI_File_write_at_all)(fh, to+nbytes+rank*chunk_size,
168-
buf, get_size /* NOT bufcount */,
177+
TRACE_IO(MPI_File_write_at_all)(fh, off_to, buf,
178+
get_size /* NOT chunk_size */,
169179
MPI_BYTE, &mpistatus);
170180
else
171-
TRACE_IO(MPI_File_write_at)(fh, to+nbytes+rank*chunk_size,
172-
buf, get_size /* NOT bufcount */,
181+
TRACE_IO(MPI_File_write_at)(fh, off_to, buf,
182+
get_size /* NOT chunk_size */,
173183
MPI_BYTE, &mpistatus);
174184
if (mpireturn != MPI_SUCCESS) {
175185
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_write_at_all");
@@ -184,15 +194,22 @@ move_file_block(NC *ncp,
184194
int put_size;
185195
mpireturn = MPI_Get_count(&mpistatus, MPI_BYTE, &put_size);
186196
if (mpireturn != MPI_SUCCESS || put_size == MPI_UNDEFINED)
187-
ncp->put_size += get_size; /* or bufcount */
197+
ncp->put_size += get_size; /* or chunk_size */
188198
else
189199
ncp->put_size += put_size;
190200
}
191201
if (ncp->nprocs > 1) {
192-
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, ncp->comm);
202+
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN,
203+
ncp->comm);
193204
status = min_st;
194205
}
195206
if (status != NC_NOERR) break;
207+
208+
/* move on to the next round */
209+
mv_amnt = p_units;
210+
off_from -= mv_amnt;
211+
off_to -= mv_amnt;
212+
nbytes -= mv_amnt;
196213
}
197214
NCI_Free(buf);
198215
return status;
@@ -1174,8 +1191,9 @@ ncmpio__enddef(void *ncdp,
11741191

11751192
if (ncp->vars.ndefined > 0) { /* no. record and non-record variables */
11761193
if (ncp->begin_var > ncp->old->begin_var) {
1177-
/* header size increases, shift the entire data part down */
1178-
/* shift record variables first */
1194+
/* header extent has increased, shift the entire data section
1195+
* to a higher offset, by moving record variables first
1196+
*/
11791197
err = move_record_vars(ncp, ncp->old);
11801198
CHECK_ERROR(err)
11811199

0 commit comments

Comments
 (0)