Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 46 additions & 39 deletions include/mesh/mesh.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -742,44 +742,50 @@ void mpm::Mesh<Tdim>::transfer_halo_particles() {
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

if (mpi_size > 1) {
// Stable count buffers + properly sized requests
const std::size_t nghost = this->ghost_cells_.size();
std::vector<unsigned> send_counts(nghost, 0);
std::vector<MPI_Request> send_requests;
send_requests.reserve(ghost_cells_.size());
send_requests.resize(nghost);

unsigned i = 0;
unsigned np = 0;
// Collect particles
std::vector<mpm::Index> remove_pids;
// Iterate through the ghost cells and send particles

// Iterate through the ghost cells and send counts (nonblocking)
std::size_t i = 0;
for (auto citr = this->ghost_cells_.cbegin();
citr != this->ghost_cells_.cend(); ++citr, ++i) {

// Send number of particles to receiver rank
auto particle_ids = (*citr)->particles();
unsigned nparticles = particle_ids.size();
MPI_Isend(&nparticles, 1, MPI_UNSIGNED, (*citr)->rank(), 1,
const auto& particle_ids = (*citr)->particles();
send_counts[i] = static_cast<unsigned>(particle_ids.size());

MPI_Isend(&send_counts[i], 1, MPI_UNSIGNED, (*citr)->rank(), 1,
MPI_COMM_WORLD, &send_requests[i]);
}

// Iterate through the ghost cells and send particles
// Iterate through the ghost cells and send payloads (blocking, unchanged)
for (auto citr = this->ghost_cells_.cbegin();
citr != this->ghost_cells_.cend(); ++citr, ++i) {
citr != this->ghost_cells_.cend(); ++citr) {
// Send number of particles to receiver rank
auto particle_ids = (*citr)->particles();
const auto& particle_ids = (*citr)->particles();
for (auto& id : particle_ids) {
// Create a vector of serialized particle
std::vector<uint8_t> buffer = map_particles_[id]->serialize();
MPI_Send(buffer.data(), buffer.size(), MPI_UINT8_T, (*citr)->rank(), 0,
MPI_COMM_WORLD);
++np;
MPI_Send(buffer.data(), static_cast<int>(buffer.size()), MPI_UINT8_T,
(*citr)->rank(), 0, MPI_COMM_WORLD);

// Particles to be removed from the current rank
remove_pids.emplace_back(id);
}
(*citr)->clear_particle_ids();
}

// Remove all sent particles
this->remove_particles(remove_pids);
// Send complete
for (unsigned i = 0; i < this->ghost_cells_.size(); ++i)
MPI_Wait(&send_requests[i], MPI_STATUS_IGNORE);

// Wait for all count sends to complete
for (std::size_t k = 0; k < nghost; ++k)
MPI_Wait(&send_requests[k], MPI_STATUS_IGNORE);

// Particle id
mpm::Index pid = 0;
Expand All @@ -792,53 +798,53 @@ void mpm::Mesh<Tdim>::transfer_halo_particles() {
citr != this->local_ghost_cells_.cend(); ++citr) {
std::vector<unsigned> neighbour_ranks =
ghost_cells_neighbour_ranks_[(*citr)->id()];
// Total number of particles
// Total number of particles (one count per neighbour rank)
std::vector<unsigned> nrank_particles(neighbour_ranks.size(), 0);
for (unsigned i = 0; i < neighbour_ranks.size(); ++i)
MPI_Recv(&nrank_particles[i], 1, MPI_UNSIGNED, neighbour_ranks[i], 1,
for (unsigned ir = 0; ir < neighbour_ranks.size(); ++ir)
MPI_Recv(&nrank_particles[ir], 1, MPI_UNSIGNED, neighbour_ranks[ir], 1,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);

// Receive number of particles
unsigned nrecv_particles =
std::accumulate(nrank_particles.begin(), nrank_particles.end(), 0);
std::accumulate(nrank_particles.begin(), nrank_particles.end(), 0u);

for (unsigned j = 0; j < nrecv_particles; ++j) {
// Retrieve information about the incoming message
MPI_Status status;
MPI_Probe(MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &status);

// Get buffer size
int size;
int size = 0;
MPI_Get_count(&status, MPI_UINT8_T, &size);

// Allocate the buffer now that we know how many elements there are
std::vector<uint8_t> buffer;
buffer.resize(size);
std::vector<uint8_t> buffer(static_cast<std::size_t>(size));

// Finally receive the message
MPI_Recv(buffer.data(), size, MPI_UINT8_T, MPI_ANY_SOURCE, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// Receive from the probed source+tag
MPI_Recv(buffer.data(), size, MPI_UINT8_T, status.MPI_SOURCE,
status.MPI_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

uint8_t* bufptr = const_cast<uint8_t*>(&buffer[0]);
uint8_t* bufptr = const_cast<uint8_t*>(buffer.data());
int position = 0;

// Get particle type
int ptype;
MPI_Unpack(bufptr, buffer.size(), &position, &ptype, 1, MPI_INT,
MPI_COMM_WORLD);
unsigned ptype = 0;
MPI_Unpack(bufptr, static_cast<int>(buffer.size()), &position, &ptype,
1, MPI_UNSIGNED, MPI_COMM_WORLD);
std::string particle_type = mpm::ParticleTypeName.at(ptype);

// Get materials material id
int nmaterials = 0;
MPI_Unpack(bufptr, buffer.size(), &position, &nmaterials, 1,
MPI_UNSIGNED, MPI_COMM_WORLD);
// Get materials material ids
unsigned nmaterials = 0;
MPI_Unpack(bufptr, static_cast<int>(buffer.size()), &position,
&nmaterials, 1, MPI_UNSIGNED, MPI_COMM_WORLD);

// Vector of materials
std::vector<std::shared_ptr<mpm::Material<Tdim>>> materials;
materials.reserve(nmaterials);
materials.reserve(static_cast<std::size_t>(nmaterials));
for (unsigned k = 0; k < nmaterials; ++k) {
int mat_id;
MPI_Unpack(bufptr, buffer.size(), &position, &mat_id, 1, MPI_UNSIGNED,
MPI_COMM_WORLD);
unsigned mat_id = 0;
MPI_Unpack(bufptr, static_cast<int>(buffer.size()), &position,
&mat_id, 1, MPI_UNSIGNED, MPI_COMM_WORLD);
materials.emplace_back(materials_.at(mat_id));
}

Expand All @@ -848,6 +854,7 @@ void mpm::Mesh<Tdim>::transfer_halo_particles() {
const Eigen::Matrix<double, Tdim, 1>&>::instance()
->create(particle_type, static_cast<mpm::Index>(pid),
pcoordinates);

particle->deserialize(buffer, materials);
// Add particle to mesh
this->add_particle(particle, true);
Expand Down