Skip to content

Han gatherv noncontiguous datatype fix #12439

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ompi/mca/coll/han/coll_han.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ typedef struct mca_coll_han_module_t {
int *cached_topo;
bool is_mapbycore;
bool are_ppn_imbalanced;
bool is_heterogeneous;

/* To be able to fallback when the cases are not supported */
struct mca_coll_han_collectives_fallback_s fallback;
Expand Down
17 changes: 8 additions & 9 deletions ompi/mca/coll/han/coll_han_gatherv.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,8 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
root_low_rank, low_comm,
low_comm->c_coll->coll_gatherv_module);

size_t rdsize;
char *tmp_rbuf = rbuf;

ompi_datatype_type_size(rdtype, &rdsize);

up_rcounts = calloc(up_size, sizeof(int));
up_displs = malloc(up_size * sizeof(int));
up_peer_ub = calloc(up_size, sizeof(int));
Expand Down Expand Up @@ -210,7 +207,9 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
}

if (need_bounce_buf) {
bounce_buf = malloc(rdsize * total_up_rcounts);
ptrdiff_t rsize, rgap;
rsize = opal_datatype_span(&rdtype->super, total_up_rcounts, &rgap);
bounce_buf = malloc(rsize);
if (!bounce_buf) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto root_out;
Expand All @@ -222,7 +221,7 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
: 0;
}

tmp_rbuf = bounce_buf;
tmp_rbuf = bounce_buf - rgap;
}

/* Up Gatherv */
Expand All @@ -231,7 +230,8 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp

/* Use a temp buffer to reorder the output buffer if needed */
if (need_bounce_buf) {
ptrdiff_t offset = 0;
ptrdiff_t offset = 0, rdext;
ompi_datatype_type_extent(rdtype, &rdext);

for (int i = 0; i < w_size; ++i) {
up_peer = topo[2 * i];
Expand All @@ -242,10 +242,9 @@ int mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatyp
w_peer = topo[2 * i + 1];

ompi_datatype_copy_content_same_ddt(rdtype, (size_t) rcounts[w_peer],
(char *) rbuf
+ (size_t) displs[w_peer] * rdsize,
(char *) rbuf + (size_t) displs[w_peer] * rdext,
bounce_buf + offset);
offset += rdsize * (size_t) rcounts[w_peer];
offset += rdext * (size_t) rcounts[w_peer];
}
}

Expand Down
33 changes: 23 additions & 10 deletions ompi/mca/coll/han/coll_han_scatterv.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
* to send the data in the correct order even if the process are NOT mapped by core.
* 2. In the send buffer, other than the root's node, data destined to the same node are continuous
* - it is ok if data to different nodes has gap.
*
* Limitation:
* The node leader acts as a broker between the Root and node followers, but it cannot match the
* exact type signature of the followers; instead it forwards the intermediate data from Root in its
* packed form of MPI_BYTE type. This works for Gatherv but NOT for Scatterv provided that the Root
* has a different architecture, e.g. endianness, integer representation, etc.
*/
int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int *displs,
struct ompi_datatype_t *sdtype, void *rbuf, int rcount,
Expand Down Expand Up @@ -94,6 +100,14 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
return han_module->previous_scatterv(sbuf, scounts, displs, sdtype, rbuf, rcount, rdtype,
root, comm, han_module->previous_scatterv_module);
}
if (han_module->is_heterogeneous) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle scatterv with this communicator (heterogeneous). Fall "
"back on another component\n"));
HAN_LOAD_FALLBACK_COLLECTIVE(han_module, comm, scatterv);
return han_module->previous_scatterv(sbuf, scounts, displs, sdtype, rbuf, rcount, rdtype,
root, comm, han_module->previous_scatterv_module);
}

w_rank = ompi_comm_rank(comm);
w_size = ompi_comm_size(comm);
Expand Down Expand Up @@ -125,7 +139,6 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
int need_bounce_buf = 0, total_up_scounts = 0, *up_displs = NULL, *up_scounts = NULL,
*up_peer_lb = NULL, *up_peer_ub = NULL;
char *reorder_sbuf = (char *) sbuf, *bounce_buf = NULL;
size_t sdsize;

low_scounts = malloc(low_size * sizeof(int));
low_displs = malloc(low_size * sizeof(int));
Expand All @@ -144,8 +157,6 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
low_scounts[low_peer] = scounts[w_peer];
}

ompi_datatype_type_size(sdtype, &sdsize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for gather except that you should use unpack to go from a packed buffer into the local type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently inside ompi we need extra data size check in order to use MPI_PACKED - it is currently possible that the total byte size of datatype size x count can exceeded INT_MAX.

I think we should switch to explicit pack/unpack after large count support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bosilca I took time to come up with possible optimizations following our discussion on Monday. I was trying to take advantage of the type map, but soon realized that this is not generally useful between node leader and follower.

For scatterv, the invariant is actually between Root and other processes:

The type signature implied by sendcount[i], sendtype at the root must be equal to the type signature implied by recvcount, recvtype at MPI process i (however, the type maps may be different). This implies that the amount of data sent must be equal to the amount of data received, pairwise between each MPI process and the root. Distinct type maps between sender and receiver are still allowed.

When we focus on the node leader and its local neighbors, this information is not helpful, since they do not need to match in their respective recvcount, recvtype.

There is a simplification(rather than optimization) opportunity though, if and only if the node leader's recvtype's type map size is the same than its local followers. In that case we don't need MPI_BYTE - we can safely use recvtype instead. This wouldn't bring better performance though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the typemap provided by the different processes must match when complemented with the count, we could use MPI_Get_elements to build any datatype used in the communication for as long as we know the size in bytes we are supposed to handle.


up_scounts = calloc(up_size, sizeof(int));
up_displs = malloc(up_size * sizeof(int));
up_peer_ub = calloc(up_size, sizeof(int));
Expand Down Expand Up @@ -201,11 +212,14 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
}

if (need_bounce_buf) {
bounce_buf = malloc(sdsize * total_up_scounts);
ptrdiff_t ssize, sgap;
ssize = opal_datatype_span(&rdtype->super, total_up_scounts, &sgap);
bounce_buf = malloc(ssize);
if (!bounce_buf) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto root_out;
}
reorder_sbuf = bounce_buf - sgap;

/* Calculate displacements for the inter-node scatterv */
for (up_peer = 0; up_peer < up_size; ++up_peer) {
Expand All @@ -214,7 +228,8 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
}

/* Use a temp buffer to reorder the send buffer if needed */
ptrdiff_t offset = 0;
ptrdiff_t offset = 0, sdext;
ompi_datatype_type_extent(sdtype, &sdext);

for (int i = 0; i < w_size; ++i) {
up_peer = topo[2 * i];
Expand All @@ -225,13 +240,11 @@ int mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts, const int
w_peer = topo[2 * i + 1];

ompi_datatype_copy_content_same_ddt(sdtype, (size_t) scounts[w_peer],
bounce_buf + offset,
reorder_sbuf + offset,
(char *) sbuf
+ (size_t) displs[w_peer] * sdsize);
offset += sdsize * (size_t) scounts[w_peer];
+ (size_t) displs[w_peer] * sdext);
offset += sdext * (size_t) scounts[w_peer];
}

reorder_sbuf = bounce_buf;
}

/* Up Iscatterv */
Expand Down
23 changes: 16 additions & 7 deletions ompi/mca/coll/han/coll_han_topo.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,19 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
}
assert(up_comm != NULL && low_comm != NULL);

int up_rank = ompi_comm_rank(up_comm);
int low_rank = ompi_comm_rank(low_comm);
int low_size = ompi_comm_size(low_comm);

ompi_proc_t *up_proc = NULL;

int *topo = (int *)malloc(sizeof(int) * size * num_topo_level);
int is_imbalanced = 1;
int ranks_non_consecutive = 0;
int is_imbalanced = 1, ranks_non_consecutive = 0, is_heterogeneous = 0;

if (0 != up_rank) {
up_proc = ompi_comm_peer_lookup(up_comm, 0);
is_heterogeneous = up_proc->super.proc_convertor->remoteArch != opal_local_arch;
}

/* node leaders translate the node-local ranks to global ranks and check whether they are placed consecutively */
if (0 == low_rank) {
Expand All @@ -116,15 +123,16 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
}
}

int reduce_vals[] = {ranks_non_consecutive, low_size, -low_size};
int reduce_vals[] = {ranks_non_consecutive, low_size, -low_size, is_heterogeneous};

up_comm->c_coll->coll_allreduce(MPI_IN_PLACE, &reduce_vals, 3,
up_comm->c_coll->coll_allreduce(MPI_IN_PLACE, &reduce_vals, 4,
MPI_INT, MPI_MAX, up_comm,
up_comm->c_coll->coll_allreduce_module);

/* is the distribution of processes balanced per node? */
is_imbalanced = (reduce_vals[1] == -reduce_vals[2]) ? 0 : 1;
ranks_non_consecutive = reduce_vals[0];
is_heterogeneous = reduce_vals[3];

if ( ranks_non_consecutive && !is_imbalanced ) {
/* kick off up_comm allgather to collect non-consecutive rank information at node leaders */
Expand All @@ -136,12 +144,13 @@ mca_coll_han_topo_init(struct ompi_communicator_t *comm,
}


/* broadcast balanced and consecutive properties from node leaders to remaining ranks */
int bcast_vals[] = {is_imbalanced, ranks_non_consecutive};
low_comm->c_coll->coll_bcast(bcast_vals, 2, MPI_INT, 0,
/* broadcast balanced, consecutive and homogeneity properties from node leaders to remaining ranks */
int bcast_vals[] = {is_imbalanced, ranks_non_consecutive, is_heterogeneous};
low_comm->c_coll->coll_bcast(bcast_vals, 3, MPI_INT, 0,
low_comm, low_comm->c_coll->coll_bcast_module);
is_imbalanced = bcast_vals[0];
ranks_non_consecutive = bcast_vals[1];
han_module->is_heterogeneous = bcast_vals[2];

/* error out if the rank distribution is not balanced */
if (is_imbalanced) {
Expand Down
Loading