Skip to content

Commit ce9132b

Browse files
committed
Add MPI_Barrier to ensure all writes finish before any process starts to read
1 parent 35bf19d commit ce9132b

File tree

1 file changed

+87
-68
lines changed

1 file changed

+87
-68
lines changed

src/drivers/ncmpio/ncmpio_enddef.c

Lines changed: 87 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
* Copyright (C) 2003, Northwestern University and Argonne National Laboratory
33
* See COPYRIGHT notice in top-level directory.
44
*/
5-
/* $Id$ */
65

76
/*
87
* This file implements the corresponding APIs defined in src/dispatchers/file.c
@@ -31,15 +30,21 @@
3130
#include "ncmpio_subfile.h"
3231
#endif
3332

33+
/* Divide the amount of data to be moved into chunks of size MOVE_UNIT each,
34+
* and assign chunks to all processes. If the number of chunks is larger than
35+
* the number of processes, carry out the data movement in multiple rounds.
36+
*/
37+
#define MOVE_UNIT 16777216
3438

35-
/*----< move_file_block() >--------------------------------------------------*/
39+
/*----< move_file_block() >-------------------------------------------------*/
40+
/* Call MPI independent I/O subroutines to move data */
3641
static int
3742
move_file_block(NC *ncp,
38-
MPI_Offset to, /* destination file starting offset */
39-
MPI_Offset from, /* source file starting offset */
43+
MPI_Offset to, /* destination starting file offset */
44+
MPI_Offset from, /* source starting file offset */
4045
MPI_Offset nbytes) /* amount to be moved */
4146
{
42-
int rank, nprocs, mpireturn, err, status=NC_NOERR, min_st;
47+
int rank, nprocs, mpireturn, err, status=NC_NOERR;
4348
void *buf;
4449
size_t num_moves, mv_amnt, p_units;
4550
MPI_Offset off_last, off_from, off_to;
@@ -49,18 +54,18 @@ move_file_block(NC *ncp,
4954
rank = ncp->rank;
5055
nprocs = ncp->nprocs;
5156

57+
/* To make sure all processes finish their I/O before any process starts to
58+
* read, it is necessary to call MPI_Barrier.
59+
*/
60+
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
61+
5262
/* moving file blocks must be done in collective mode, ignoring NC_HCOLL */
5363
fh = ncp->collective_fh;
5464

5565
/* make fileview entire file visible */
5666
TRACE_IO(MPI_File_set_view)(fh, 0, MPI_BYTE, MPI_BYTE, "native",
5767
MPI_INFO_NULL);
5868

59-
/* Divide amount nbytes into chunks of size MOVE_UNIT each and assign
60-
* chunks among all processes. If the number of chunks is larger then
61-
* the number of processes, carry out the data move in multiple rounds.
62-
*/
63-
#define MOVE_UNIT 16777216
6469
/* buf will be used as a temporal buffer to move data in chunks, i.e.
6570
* read a chunk and later write to the new location
6671
*/
@@ -116,7 +121,8 @@ move_file_block(NC *ncp,
116121
MPI_BYTE, &mpistatus);
117122
if (mpireturn != MPI_SUCCESS) {
118123
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_read_at_all");
119-
if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EREAD)
124+
if (status == NC_NOERR && err == NC_EFILE)
125+
DEBUG_ASSIGN_ERROR(status, NC_EREAD)
120126
get_size = chunk_size;
121127
}
122128
else {
@@ -140,14 +146,8 @@ move_file_block(NC *ncp,
140146
ncp->get_size += get_size;
141147
}
142148

143-
if (ncp->nprocs > 1) {
144-
/* MPI_Barrier(ncp->comm); */
145-
/* important, in case new region overlaps old region */
146-
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN,
147-
ncp->comm);
148-
status = min_st;
149-
}
150-
if (status != NC_NOERR) break;
149+
/* to prevent from one rank's write run faster than other's read */
150+
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
151151

152152
/* write to new location at off_to for amount of chunk_size
153153
*
@@ -179,11 +179,12 @@ move_file_block(NC *ncp,
179179
MPI_BYTE, &mpistatus);
180180
else
181181
TRACE_IO(MPI_File_write_at)(fh, off_to, buf,
182-
get_size /* NOT chunk_size */,
183-
MPI_BYTE, &mpistatus);
182+
get_size /* NOT chunk_size */,
183+
MPI_BYTE, &mpistatus);
184184
if (mpireturn != MPI_SUCCESS) {
185185
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_write_at_all");
186-
if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EWRITE)
186+
if (status == NC_NOERR && err == NC_EFILE)
187+
DEBUG_ASSIGN_ERROR(status, NC_EWRITE)
187188
}
188189
else {
189190
/* update the number of bytes written since file open.
@@ -198,41 +199,15 @@ move_file_block(NC *ncp,
198199
else
199200
ncp->put_size += put_size;
200201
}
201-
if (ncp->nprocs > 1) {
202-
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN,
203-
ncp->comm);
204-
status = min_st;
205-
}
206-
if (status != NC_NOERR) break;
207202

208203
/* move on to the next round */
209204
mv_amnt = p_units;
210205
off_from -= mv_amnt;
211206
off_to -= mv_amnt;
212207
nbytes -= mv_amnt;
213208
}
214-
NCI_Free(buf);
215-
return status;
216-
}
217209

218-
/*----< move_fixed_vars() >--------------------------------------------------*/
219-
/* move one fixed variable at a time, only when the new begin > old begin */
220-
static int
221-
move_fixed_vars(NC *ncp, NC *old)
222-
{
223-
int i, err, status=NC_NOERR;
224-
225-
/* move starting from the last fixed variable */
226-
for (i=old->vars.ndefined-1; i>=0; i--) {
227-
if (IS_RECVAR(old->vars.value[i])) continue;
228-
229-
MPI_Offset from = old->vars.value[i]->begin;
230-
MPI_Offset to = ncp->vars.value[i]->begin;
231-
if (to > from) {
232-
err = move_file_block(ncp, to, from, ncp->vars.value[i]->len);
233-
if (status == NC_NOERR) status = err;
234-
}
235-
}
210+
NCI_Free(buf);
236211
return status;
237212
}
238213

@@ -1177,40 +1152,84 @@ ncmpio__enddef(void *ncdp,
11771152
}
11781153
#endif
11791154

1180-
if (ncp->old != NULL) {
1155+
if (ncp->old != NULL && ncp->vars.ndefined > 0) {
11811156
/* The current define mode was entered from ncmpi_redef, not from
1182-
* ncmpi_create. We must check if header has been expanded.
1157+
* ncmpi_create. We must check if header extent has grown.
1158+
* This only needs to be done when there are variables defined.
11831159
*/
1160+
int mov_done=0;
1161+
MPI_Offset nbytes;
11841162

11851163
assert(!NC_IsNew(ncp));
11861164
assert(fIsSet(ncp->flags, NC_MODE_DEF));
11871165
assert(ncp->begin_rec >= ncp->old->begin_rec);
11881166
assert(ncp->begin_var >= ncp->old->begin_var);
11891167
assert(ncp->vars.ndefined >= ncp->old->vars.ndefined);
1168+
11901169
/* ncp->numrecs has already sync-ed in ncmpi_redef */
11911170

1192-
if (ncp->vars.ndefined > 0) { /* no. record and non-record variables */
1171+
if (ncp->begin_var > ncp->old->begin_var &&
1172+
ncp->begin_rec > ncp->old->begin_rec &&
1173+
ncp->vars.ndefined == ncp->old->vars.ndefined) {
1174+
/* Data section grows and no new variable has been added. The
1175+
* entire data section must be moved to a higher file offset.
1176+
*/
1177+
/* amount of data section to be moved */
1178+
nbytes = ncp->old->begin_rec - ncp->old->begin_var
1179+
+ ncp->old->recsize * ncp->old->numrecs;
1180+
1181+
err = move_file_block(ncp, ncp->begin_var, ncp->old->begin_var,
1182+
nbytes);
1183+
if (status == NC_NOERR) status = err;
1184+
mov_done = 1;
1185+
}
1186+
else {
1187+
if (ncp->begin_rec > ncp->old->begin_rec) {
1188+
/* beginning of record variable section grows. The entire
1189+
* record variable section must be moved to a higher file
1190+
* offset.
1191+
*/
1192+
if (ncp->recsize == ncp->old->recsize) {
1193+
/* no new record variable has been added */
1194+
1195+
/* amount of data to be moved */
1196+
nbytes = ncp->old->recsize * ncp->old->numrecs;
1197+
1198+
err = move_file_block(ncp, ncp->begin_rec,
1199+
ncp->old->begin_rec, nbytes);
1200+
if (status == NC_NOERR) status = err;
1201+
}
1202+
else {
1203+
/* new record variables have been added. Must move one
1204+
* record at a time
1205+
*/
1206+
err = move_record_vars(ncp, ncp->old);
1207+
if (status == NC_NOERR) status = err;
1208+
}
1209+
mov_done = 1;
1210+
}
1211+
11931212
if (ncp->begin_var > ncp->old->begin_var) {
1194-
/* header extent has increased, shift the entire data section
1195-
* to a higher offset, by moving record variables first
1213+
/* beginning of fix-sized variable section grows. The
1214+
* fix-sized variable section must be moved to a higher
1215+
* file offset.
11961216
*/
1197-
err = move_record_vars(ncp, ncp->old);
1198-
CHECK_ERROR(err)
1217+
/* amount of data to be moved. Note there may be some free
1218+
* space at the end of fix-sized variable section that need not
1219+
* be moved.
1220+
*/
1221+
nbytes = ncp->old->begin_rec - ncp->old->begin_var;
11991222

1200-
/* shift non-record variables */
1201-
/* err = move_vars_r(ncp, ncp->old); */
1202-
err = move_fixed_vars(ncp, ncp->old);
1203-
CHECK_ERROR(err)
1204-
}
1205-
else if (ncp->begin_rec > ncp->old->begin_rec ||
1206-
ncp->recsize > ncp->old->recsize) {
1207-
/* number of non-record variables increases, or
1208-
number of records of record variables increases,
1209-
shift and move all record variables down */
1210-
err = move_record_vars(ncp, ncp->old);
1211-
CHECK_ERROR(err)
1223+
err = move_file_block(ncp, ncp->begin_var, ncp->old->begin_var,
1224+
nbytes);
1225+
if (status == NC_NOERR) status = err;
1226+
mov_done = 1;
12121227
}
12131228
}
1229+
1230+
/* to prevent some ranks run faster than others */
1231+
if (mov_done && ncp->nprocs > 1) MPI_Barrier(ncp->comm);
1232+
12141233
} /* ... ncp->old != NULL */
12151234

12161235
/* first sync header objects in memory across all processes, and then root

0 commit comments

Comments
 (0)