Skip to content

coll/acoll: Bcast/Barrier enhancements and bug fixes. #13222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ompi/mca/coll/acoll/README
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ $HEADER$

===========================================================================

The collective component, AMD Coll (“acoll”), is a high-performant MPI collective component for the OpenMPI library that is optimized for AMD "Zen"-based processors. “acoll” is optimized for communications within a single node of AMD “Zen”-based processors and provides the following commonly used collective algorithms: boardcast (MPI_Bcast), allreduce (MPI_Allreduce), reduce (MPI_Reduce), gather (MPI_Gather), allgather (MPI_Allgather), and barrier (MPI_Barrier).
The collective component, AMD Coll (“acoll”), is a high-performant MPI collective component for the OpenMPI library that is optimized for AMD "Zen"-based processors. “acoll” is optimized for communications within a single node of AMD “Zen”-based processors and provides the following commonly used collective algorithms: boardcast (MPI_Bcast), allreduce (MPI_Allreduce), reduce (MPI_Reduce), gather (MPI_Gather), allgather (MPI_Allgather), alltoall (MPI_Alltoall), and barrier (MPI_Barrier).

At present, “acoll” has been tested with OpenMPI v5.0.2 and can be built as part of OpenMPI.
At present, “acoll” has been tested with OpenMPI main branch and can be built as part of OpenMPI.

To run an application with acoll, use the following command line parameters
- mpirun <common mpi runtime parameters> --mca coll acoll,tuned,libnbc,basic --mca coll_acoll_priority 40 <executable>
4 changes: 4 additions & 0 deletions ompi/mca/coll/acoll/coll_acoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <xpmem.h>
#endif

#include "opal/mca/accelerator/accelerator.h"
#include "opal/mca/shmem/base/base.h"
#include "opal/mca/shmem/shmem.h"

Expand All @@ -40,6 +41,7 @@ extern int mca_coll_acoll_sg_scale;
extern int mca_coll_acoll_node_size;
extern int mca_coll_acoll_force_numa;
extern int mca_coll_acoll_use_dynamic_rules;
extern int mca_coll_acoll_disable_shmbcast;
extern int mca_coll_acoll_mnode_enable;
extern int mca_coll_acoll_bcast_lin0;
extern int mca_coll_acoll_bcast_lin1;
Expand Down Expand Up @@ -201,6 +203,7 @@ typedef struct coll_acoll_subcomms {
coll_acoll_data_t *data;
bool initialized_data;
bool initialized_shm_data;
int barrier_algo;
#ifdef HAVE_XPMEM_H
uint64_t xpmem_buf_size;
int without_xpmem;
Expand Down Expand Up @@ -233,6 +236,7 @@ struct mca_coll_acoll_module_t {
int log2_node_cnt;
int force_numa;
int use_dyn_rules;
int disable_shmbcast;
// Todo: Use substructure for every API related ones
int use_mnode;
int use_lin0;
Expand Down
4 changes: 2 additions & 2 deletions ompi/mca/coll/acoll/coll_acoll_allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ static inline int log_sg_bcast_intra(void *buff, size_t count, struct ompi_datat
mca_coll_base_module_t *module, ompi_request_t **preq,
int *nreqs)
{
int msb_pos, sub_rank, peer, err;
int msb_pos, sub_rank, peer, err = MPI_SUCCESS;
int i, mask;
int end_sg, end_peer;

Expand Down Expand Up @@ -92,7 +92,7 @@ static inline int lin_sg_bcast_intra(void *buff, size_t count, struct ompi_datat
int *nreqs)
{
int peer;
int err;
int err = MPI_SUCCESS;
int sg_end;

sg_end = sg_start + sg_size - 1;
Expand Down
24 changes: 19 additions & 5 deletions ompi/mca/coll/acoll/coll_acoll_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,21 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
ompi_datatype_type_size(dtype, &dsize);
total_dsize = dsize * count;

if (1 == size) {
/* Disable shm/xpmem based optimizations if: */
/* - datatype is not a predefined type */
/* - it's a gpu buffer */
uint64_t flags = 0;
int dev_id;
bool is_opt = true;
if (!OMPI_COMM_CHECK_ASSERT_NO_ACCEL_BUF(comm)) {
if (!ompi_datatype_is_predefined(dtype)
|| (0 < opal_accelerator.check_addr(sbuf, &dev_id, &flags))
|| (0 < opal_accelerator.check_addr(rbuf, &dev_id, &flags))) {
is_opt = false;
}
}

if ((1 == size) && is_opt) {
if (MPI_IN_PLACE != sbuf) {
memcpy((char *) rbuf, sbuf, total_dsize);
}
Expand Down Expand Up @@ -486,7 +500,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
if (total_dsize < 32) {
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
comm, module);
} else if (total_dsize < 512) {
} else if ((total_dsize < 512) && is_opt) {
return mca_coll_acoll_allreduce_small_msgs_h(sbuf, rbuf, count, dtype, op, comm, module,
subc, 1);
} else if (total_dsize <= 2048) {
Expand All @@ -505,7 +519,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
}
} else if (total_dsize < 4194304) {
#ifdef HAVE_XPMEM_H
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
} else {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
Expand All @@ -517,7 +531,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
#endif
} else if (total_dsize <= 16777216) {
#ifdef HAVE_XPMEM_H
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module, subc);
return mca_coll_acoll_bcast(rbuf, count, dtype, 0, comm, module);
} else {
Expand All @@ -530,7 +544,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
#endif
} else {
#ifdef HAVE_XPMEM_H
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
} else {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
Expand Down
10 changes: 2 additions & 8 deletions ompi/mca/coll/acoll/coll_acoll_alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,8 @@ int mca_coll_acoll_alltoall
struct ompi_communicator_t *split_comm;

/* Select the right split_comm. */
int pow2_idx = -2;
int tmp_grp_split_f = grp_split_f;
while (tmp_grp_split_f > 0)
{
pow2_idx += 1;
tmp_grp_split_f = tmp_grp_split_f / 2;
}
split_comm = subc->split_comm[pow2_idx];
int comm_idx = grp_split_f > 2 ? opal_cube_dim(grp_split_f/2) : 0;
split_comm = subc->split_comm[comm_idx];

error = mca_coll_acoll_base_alltoall_dispatcher
(sbuf, (grp_split_f * scount), sdtype,
Expand Down
181 changes: 181 additions & 0 deletions ompi/mca/coll/acoll/coll_acoll_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
#include "coll_acoll.h"
#include "coll_acoll_utils.h"



#define PROGRESS_COUNT 10000

int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);
int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);

static int mca_coll_acoll_barrier_recv_subc(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module, ompi_request_t **reqs,
int *nreqs, int root)
Expand Down Expand Up @@ -106,6 +113,170 @@ static int mca_coll_acoll_barrier_send_subc(struct ompi_communicator_t *comm,
return err;
}

int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc)
{
int err = MPI_SUCCESS;
int root = 0;
int rank = ompi_comm_rank(comm);
int size = ompi_comm_size(comm);
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
coll_acoll_init(module, comm, subc->data, subc, root);
coll_acoll_data_t *data = subc->data;

if (NULL == data) {
return -1;
}

int l1_gp_size = data->l1_gp_size;
int *l1_gp = data->l1_gp;
int *l2_gp = data->l2_gp;
int l2_gp_size = data->l2_gp_size;
/* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */
int offset_barrier = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size
+ CACHE_LINE_SIZE * size;

volatile int *root_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * rank);
volatile int *l1_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]]
+ offset_barrier + CACHE_LINE_SIZE * rank);

volatile int *leader_shm;
volatile int *my_leader_shm;
leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * root);
my_leader_shm = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
+ CACHE_LINE_SIZE * l1_gp[0]);
int ready;
int count = 0;
if (rank == root) {
ready = *leader_shm;
for (int i = 0; i < l2_gp_size; i++) {
if (l2_gp[i] == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * l2_gp[i]);
while (*val != ready + 1) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
}
ready++;
for (int i = 0; i < l1_gp_size; i++) {
if (l1_gp[i] == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * l1_gp[i]);
while (*val != ready) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
}
*leader_shm = ready;
} else if (rank == l1_gp[0]) {
int val = *l1_rank_offset;
for (int i = 0; i < l1_gp_size; i++) {
if (l1_gp[i] == l1_gp[0])
continue;
volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
+ CACHE_LINE_SIZE
* l1_gp[i]); // do we need atomic_load here?
while (*vali != val + 1) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
}
val++;
*root_rank_offset = val;
while (*leader_shm != val) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
*l1_rank_offset = val;
} else {

int done = *l1_rank_offset;
done++;
*l1_rank_offset = done;
while (done != *my_leader_shm) {
count++;
if (10000 == count) {
count = 0;
opal_progress();
}
}
}
return err;
}


int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc)
{
int err = MPI_SUCCESS;
int root = 0;
int rank = ompi_comm_rank(comm);
int size = ompi_comm_size(comm);
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;

coll_acoll_init(module, comm, subc->data, subc, root);
coll_acoll_data_t *data = subc->data;

if (NULL == data) {
return -1;
}

/* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */
int offset_barrier = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size
+ CACHE_LINE_SIZE * size;

volatile int *root_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * rank);

volatile int *leader_shm;
leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * root);

int ready = *leader_shm;
int count = 0;
if (rank == root) {
for (int i = 0; i < size; i++) {
if (i == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * i);
while (*val != ready + 1) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
}
(*leader_shm)++;
} else {
int val = ++(*root_rank_offset);
while (*leader_shm != val) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
}
return err;
}

/*
* mca_coll_acoll_barrier_intra
*
Expand Down Expand Up @@ -152,6 +323,16 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base
}
num_nodes = size > 1 ? subc->num_nodes : 1;

/* Default barrier for intra-node case - shared memory hierarchical */
/* ToDo: Need to check how this works with inter-case */
if (1 == num_nodes) {
if (0 == subc->barrier_algo) {
return mca_coll_acoll_barrier_shm_h(comm, module, subc);
} else if (1 == subc->barrier_algo) {
return mca_coll_acoll_barrier_shm_f(comm, module, subc);
}
}

reqs = ompi_coll_base_comm_get_reqs(module->base_data, size);
if (NULL == reqs) {
return OMPI_ERR_OUT_OF_RESOURCE;
Expand Down
Loading