Skip to content

Commit 14e10c0

Browse files
committed
Add MPI_Barrier to ensure all writes finish before any process starts to read
1 parent 3525303 commit 14e10c0

File tree

1 file changed

+87
-68
lines changed

1 file changed

+87
-68
lines changed

src/drivers/ncmpio/ncmpio_enddef.c

Lines changed: 87 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
* Copyright (C) 2003, Northwestern University and Argonne National Laboratory
33
* See COPYRIGHT notice in top-level directory.
44
*/
5-
/* $Id$ */
65

76
/*
87
* This file implements the corresponding APIs defined in src/dispatchers/file.c
@@ -31,15 +30,21 @@
3130
#include "ncmpio_subfile.h"
3231
#endif
3332

33+
/* Divide the amount of data to be moved into chunks of size MOVE_UNIT each,
34+
* and assign chunks to all processes. If the number of chunks is larger than
35+
* the number of processes, carry out the data movement in multiple rounds.
36+
*/
37+
#define MOVE_UNIT 16777216
3438

35-
/*----< move_file_block() >--------------------------------------------------*/
39+
/*----< move_file_block() >-------------------------------------------------*/
40+
/* Call MPI independent I/O subroutines to move data */
3641
static int
3742
move_file_block(NC *ncp,
38-
MPI_Offset to, /* destination file starting offset */
39-
MPI_Offset from, /* source file starting offset */
43+
MPI_Offset to, /* destination starting file offset */
44+
MPI_Offset from, /* source starting file offset */
4045
MPI_Offset nbytes) /* amount to be moved */
4146
{
42-
int rank, nprocs, mpireturn, err, status=NC_NOERR, min_st;
47+
int rank, nprocs, mpireturn, err, status=NC_NOERR;
4348
void *buf;
4449
size_t num_moves, mv_amnt, p_units;
4550
MPI_Offset off_last, off_from, off_to;
@@ -49,18 +54,18 @@ move_file_block(NC *ncp,
4954
rank = ncp->rank;
5055
nprocs = ncp->nprocs;
5156

57+
/* To make sure all processes finish their I/O before any process starts to
58+
* read, it is necessary to call MPI_Barrier.
59+
*/
60+
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
61+
5262
/* moving file blocks must be done in collective mode, ignoring NC_HCOLL */
5363
fh = ncp->collective_fh;
5464

5565
/* make fileview entire file visible */
5666
TRACE_IO(MPI_File_set_view)(fh, 0, MPI_BYTE, MPI_BYTE, "native",
5767
MPI_INFO_NULL);
5868

59-
/* Divide amount nbytes into chunks of size MOVE_UNIT each and assign
60-
* chunks among all processes. If the number of chunks is larger then
61-
* the number of processes, carry out the data move in multiple rounds.
62-
*/
63-
#define MOVE_UNIT 16777216
6469
/* buf will be used as a temporal buffer to move data in chunks, i.e.
6570
* read a chunk and later write to the new location
6671
*/
@@ -116,7 +121,8 @@ move_file_block(NC *ncp,
116121
MPI_BYTE, &mpistatus);
117122
if (mpireturn != MPI_SUCCESS) {
118123
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_read_at_all");
119-
if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EREAD)
124+
if (status == NC_NOERR && err == NC_EFILE)
125+
DEBUG_ASSIGN_ERROR(status, NC_EREAD)
120126
get_size = chunk_size;
121127
}
122128
else {
@@ -140,14 +146,8 @@ move_file_block(NC *ncp,
140146
ncp->get_size += get_size;
141147
}
142148

143-
if (ncp->nprocs > 1) {
144-
/* MPI_Barrier(ncp->comm); */
145-
/* important, in case new region overlaps old region */
146-
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN,
147-
ncp->comm);
148-
status = min_st;
149-
}
150-
if (status != NC_NOERR) break;
149+
/* to prevent from one rank's write run faster than other's read */
150+
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
151151

152152
/* write to new location at off_to for amount of chunk_size
153153
*
@@ -179,11 +179,12 @@ move_file_block(NC *ncp,
179179
MPI_BYTE, &mpistatus);
180180
else
181181
TRACE_IO(MPI_File_write_at)(fh, off_to, buf,
182-
get_size /* NOT chunk_size */,
183-
MPI_BYTE, &mpistatus);
182+
get_size /* NOT chunk_size */,
183+
MPI_BYTE, &mpistatus);
184184
if (mpireturn != MPI_SUCCESS) {
185185
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_write_at_all");
186-
if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EWRITE)
186+
if (status == NC_NOERR && err == NC_EFILE)
187+
DEBUG_ASSIGN_ERROR(status, NC_EWRITE)
187188
}
188189
else {
189190
/* update the number of bytes written since file open.
@@ -198,41 +199,15 @@ move_file_block(NC *ncp,
198199
else
199200
ncp->put_size += put_size;
200201
}
201-
if (ncp->nprocs > 1) {
202-
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN,
203-
ncp->comm);
204-
status = min_st;
205-
}
206-
if (status != NC_NOERR) break;
207202

208203
/* move on to the next round */
209204
mv_amnt = p_units;
210205
off_from -= mv_amnt;
211206
off_to -= mv_amnt;
212207
nbytes -= mv_amnt;
213208
}
214-
NCI_Free(buf);
215-
return status;
216-
}
217209

218-
/*----< move_fixed_vars() >--------------------------------------------------*/
219-
/* move one fixed variable at a time, only when the new begin > old begin */
220-
static int
221-
move_fixed_vars(NC *ncp, NC *old)
222-
{
223-
int i, err, status=NC_NOERR;
224-
225-
/* move starting from the last fixed variable */
226-
for (i=old->vars.ndefined-1; i>=0; i--) {
227-
if (IS_RECVAR(old->vars.value[i])) continue;
228-
229-
MPI_Offset from = old->vars.value[i]->begin;
230-
MPI_Offset to = ncp->vars.value[i]->begin;
231-
if (to > from) {
232-
err = move_file_block(ncp, to, from, ncp->vars.value[i]->len);
233-
if (status == NC_NOERR) status = err;
234-
}
235-
}
210+
NCI_Free(buf);
236211
return status;
237212
}
238213

@@ -1278,40 +1253,84 @@ ncmpio__enddef(void *ncdp,
12781253
}
12791254
#endif
12801255

1281-
if (ncp->old != NULL) {
1256+
if (ncp->old != NULL && ncp->vars.ndefined > 0) {
12821257
/* The current define mode was entered from ncmpi_redef, not from
1283-
* ncmpi_create. We must check if header has been expanded.
1258+
* ncmpi_create. We must check if header extent has grown.
1259+
* This only needs to be done when there are variables defined.
12841260
*/
1261+
int mov_done=0;
1262+
MPI_Offset nbytes;
12851263

12861264
assert(!NC_IsNew(ncp));
12871265
assert(fIsSet(ncp->flags, NC_MODE_DEF));
12881266
assert(ncp->begin_rec >= ncp->old->begin_rec);
12891267
assert(ncp->begin_var >= ncp->old->begin_var);
12901268
assert(ncp->vars.ndefined >= ncp->old->vars.ndefined);
1269+
12911270
/* ncp->numrecs has already sync-ed in ncmpi_redef */
12921271

1293-
if (ncp->vars.ndefined > 0) { /* no. record and non-record variables */
1272+
if (ncp->begin_var > ncp->old->begin_var &&
1273+
ncp->begin_rec > ncp->old->begin_rec &&
1274+
ncp->vars.ndefined == ncp->old->vars.ndefined) {
1275+
/* Data section grows and no new variable has been added. The
1276+
* entire data section must be moved to a higher file offset.
1277+
*/
1278+
/* amount of data section to be moved */
1279+
nbytes = ncp->old->begin_rec - ncp->old->begin_var
1280+
+ ncp->old->recsize * ncp->old->numrecs;
1281+
1282+
err = move_file_block(ncp, ncp->begin_var, ncp->old->begin_var,
1283+
nbytes);
1284+
if (status == NC_NOERR) status = err;
1285+
mov_done = 1;
1286+
}
1287+
else {
1288+
if (ncp->begin_rec > ncp->old->begin_rec) {
1289+
/* beginning of record variable section grows. The entire
1290+
* record variable section must be moved to a higher file
1291+
* offset.
1292+
*/
1293+
if (ncp->recsize == ncp->old->recsize) {
1294+
/* no new record variable has been added */
1295+
1296+
/* amount of data to be moved */
1297+
nbytes = ncp->old->recsize * ncp->old->numrecs;
1298+
1299+
err = move_file_block(ncp, ncp->begin_rec,
1300+
ncp->old->begin_rec, nbytes);
1301+
if (status == NC_NOERR) status = err;
1302+
}
1303+
else {
1304+
/* new record variables have been added. Must move one
1305+
* record at a time
1306+
*/
1307+
err = move_record_vars(ncp, ncp->old);
1308+
if (status == NC_NOERR) status = err;
1309+
}
1310+
mov_done = 1;
1311+
}
1312+
12941313
if (ncp->begin_var > ncp->old->begin_var) {
1295-
/* header extent has increased, shift the entire data section
1296-
* to a higher offset, by moving record variables first
1314+
/* beginning of fix-sized variable section grows. The
1315+
* fix-sized variable section must be moved to a higher
1316+
* file offset.
12971317
*/
1298-
err = move_record_vars(ncp, ncp->old);
1299-
CHECK_ERROR(err)
1318+
/* amount of data to be moved. Note there may be some free
1319+
* space at the end of fix-sized variable section that need not
1320+
* be moved.
1321+
*/
1322+
nbytes = ncp->old->begin_rec - ncp->old->begin_var;
13001323

1301-
/* shift non-record variables */
1302-
/* err = move_vars_r(ncp, ncp->old); */
1303-
err = move_fixed_vars(ncp, ncp->old);
1304-
CHECK_ERROR(err)
1305-
}
1306-
else if (ncp->begin_rec > ncp->old->begin_rec ||
1307-
ncp->recsize > ncp->old->recsize) {
1308-
/* number of non-record variables increases, or
1309-
number of records of record variables increases,
1310-
shift and move all record variables down */
1311-
err = move_record_vars(ncp, ncp->old);
1312-
CHECK_ERROR(err)
1324+
err = move_file_block(ncp, ncp->begin_var, ncp->old->begin_var,
1325+
nbytes);
1326+
if (status == NC_NOERR) status = err;
1327+
mov_done = 1;
13131328
}
13141329
}
1330+
1331+
/* to prevent some ranks run faster than others */
1332+
if (mov_done && ncp->nprocs > 1) MPI_Barrier(ncp->comm);
1333+
13151334
} /* ... ncp->old != NULL */
13161335

13171336
/* first sync header objects in memory across all processes, and then root

0 commit comments

Comments
 (0)