11/*
22 * Copyright (C) 2024, Northwestern University and Argonne National Laboratory
33 * See COPYRIGHT notice in top-level directory.
4+ *
5+ * This file contains the implementation of intra-node aggregation feature,
6+ * which is designed for the I/O patterns that contain many noncontiguous
7+ * requests interleaved among processes, and spreading across a wide range of
8+ * file space. It is particularly useful when the number of MPI processes
9+ * allocated to a compute node is large.
10+ *
11+ * This feature is enabled by setting the PnetCDF hint 'nc_num_aggrs_per_node'
12+ * to a positive integral value indicating the desired number of processes per
13+ * compute node to be selected as the intra-node I/O aggregators. Each process
14+ * is assigned a unique aggregator. The non-aggregators send their requests to
15+ * the assigned aggregators, and then the aggregators make MPI-IO requests to
16+ * the file.
17+ *
18+ * Such strategy can effectively reduce communication congestion due to many
19+ * pending asynchronous messages produced in the collective write inside of
20+ * MPI-IO.
21+ *
22+ * The concept of intra-node request aggregation is based on the paper:
23+ * Q. Kang, S. Lee, K. Hou, R. Ross, A. Agrawal, A. Choudhary, and W. Liao.
24+ * Improving MPI Collective I/O for High Volume Non-Contiguous Requests With
25+ * Intra-Node Aggregation. IEEE Transactions on Parallel and Distributed
26+ * Systems (TPDS), 31(11):2682-2695, November 2020.
427 */
528
629#ifdef HAVE_CONFIG_H
4871 ((*(b) < *(c)) ? (b) : ((*(a) < *(c)) ? (c) : (a))) : \
4972 ((*(b) > *(c)) ? (b) : ((*(a) < *(c)) ? (a) : (c))))
5073
74+ /*----< qsort_off_len_buf() >------------------------------------------------*/
75+ /* Sort three arrays of offsets, lengths, and buffer addresses based on the
76+ * increasing order of offsets. This code is based on the qsort routine from
77+ * Bentley & McIlroy's "Engineering a Sort Function".
78+ */
5179static void
5280qsort_off_len_buf (MPI_Aint num ,
5381#ifdef HAVE_MPI_LARGE_COUNT
@@ -125,7 +153,7 @@ qsort_off_len_buf(MPI_Aint num,
125153 if ((r = pb - pa ) > 1 )
126154 qsort_off_len_buf (r , offsets , lengths , bufAddr );
127155 if ((r = pd - pc ) > 1 ) {
128- /* Iterate rather than recurse to save stack space */
156+ /* Iterate rather than recursively call self to save stack space */
129157 lengths = lengths + (num - r );
130158 bufAddr = bufAddr + (num - r );
131159 offsets = pn - r ;
@@ -161,10 +189,18 @@ ncmpio_intra_node_aggr_init(NC *ncp)
161189 ncp -> num_nonaggrs = 0 ; /* number of non-aggregators assigned */
162190 ncp -> nonaggr_ranks = NULL ; /* ranks of assigned non-aggregators */
163191
192+ #ifdef PNETCDF_PROFILING
193+ ncp -> aggr_time = 0.0 ;
194+ #endif
195+
164196 if (ncp -> num_aggrs_per_node == 0 || ncp -> num_aggrs_per_node == ncp -> nprocs )
165197 /* disable intra-node aggregation */
166198 return NC_NOERR ;
167199
200+ #ifdef PNETCDF_PROFILING
201+ double timing = MPI_Wtime ();
202+ #endif
203+
168204 /* allocate space for storing the rank IDs of non-aggregators assigned to
169205 * this rank. Note ncp->nonaggr_ranks[] will be freed when closing the
170206 * file, if allocated.
@@ -368,6 +404,10 @@ ncmpio_intra_node_aggr_init(NC *ncp)
368404 * of processes are allocated on the same node.
369405 */
370406
407+ #ifdef PNETCDF_PROFILING
408+ ncp -> aggr_time = MPI_Wtime () - timing ;
409+ #endif
410+
371411 return NC_NOERR ;
372412}
373413
@@ -791,7 +831,9 @@ intra_node_aggregation(NC *ncp,
791831 MPI_Datatype recvTypes , fileType = MPI_BYTE ;
792832 MPI_File fh ;
793833 MPI_Request * req = NULL ;
794-
834+ #ifdef PNETCDF_PROFILING
835+ double timing = MPI_Wtime ();
836+ #endif
795837#ifdef HAVE_MPI_LARGE_COUNT
796838 MPI_Count bufLen ;
797839 MPI_Type_size_c (bufType , & bufLen );
@@ -1190,6 +1232,10 @@ intra_node_aggregation(NC *ncp,
11901232 NCI_Free (lengths );
11911233 }
11921234
1235+ #ifdef PNETCDF_PROFILING
1236+ ncp -> aggr_time += MPI_Wtime () - timing ;
1237+ #endif
1238+
11931239 if (ncp -> rank != ncp -> my_aggr ) /* non-aggregator writes nothing */
11941240 buf_count = 0 ;
11951241
@@ -1221,6 +1267,7 @@ intra_node_aggregation(NC *ncp,
12211267/* This is a collective call */
12221268int
12231269ncmpio_intra_node_aggregation_nreqs (NC * ncp ,
1270+ int reqMode ,
12241271 int num_reqs ,
12251272 NC_req * put_list ,
12261273 MPI_Offset newnumrecs )
@@ -1234,6 +1281,12 @@ ncmpio_intra_node_aggregation_nreqs(NC *ncp,
12341281 int * lengths = NULL ;
12351282#endif
12361283 MPI_Datatype bufType = MPI_BYTE ;
1284+ #ifdef PNETCDF_PROFILING
1285+ double timing = MPI_Wtime ();
1286+ #endif
1287+
1288+ /* currently supports write requests only */
1289+ if (fIsSet (reqMode , NC_REQ_RD )) return NC_NOERR ;
12371290
12381291 assert (ncp -> my_aggr >= 0 );
12391292
@@ -1260,6 +1313,10 @@ ncmpio_intra_node_aggregation_nreqs(NC *ncp,
12601313 if (put_list != NULL )
12611314 NCI_Free (put_list );
12621315
1316+ #ifdef PNETCDF_PROFILING
1317+ ncp -> aggr_time += MPI_Wtime () - timing ;
1318+ #endif
1319+
12631320 err = intra_node_aggregation (ncp , num_pairs , offsets , lengths , bufLen ,
12641321 bufType , NULL );
12651322 if (status == NC_NOERR ) status = err ;
@@ -1291,6 +1348,7 @@ ncmpio_intra_node_aggregation_nreqs(NC *ncp,
12911348/* This is a collective call */
12921349int
12931350ncmpio_intra_node_aggregation (NC * ncp ,
1351+ int reqMode ,
12941352 NC_var * varp ,
12951353 const MPI_Offset * start ,
12961354 const MPI_Offset * count ,
@@ -1307,6 +1365,12 @@ ncmpio_intra_node_aggregation(NC *ncp,
13071365 MPI_Aint * offsets = NULL ;
13081366 int * lengths = NULL ;
13091367#endif
1368+ #ifdef PNETCDF_PROFILING
1369+ double timing = MPI_Wtime ();
1370+ #endif
1371+
1372+ /* currently supports write requests only */
1373+ if (fIsSet (reqMode , NC_REQ_RD )) return NC_NOERR ;
13101374
13111375 if (buf == NULL ) /* zero-length request */
13121376 return intra_node_aggregation (ncp , 0 , NULL , NULL , 0 , MPI_BYTE , NULL );
@@ -1326,6 +1390,10 @@ ncmpio_intra_node_aggregation(NC *ncp,
13261390 }
13271391 status = err ;
13281392
1393+ #ifdef PNETCDF_PROFILING
1394+ ncp -> aggr_time += MPI_Wtime () - timing ;
1395+ #endif
1396+
13291397 err = intra_node_aggregation (ncp , num_pairs , offsets , lengths , bufCount ,
13301398 bufType , buf );
13311399 if (status == NC_NOERR ) status = err ;
0 commit comments