Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .hadolint.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
ignored:
- DL3008 # Pin versions in apt-get install
- DL3018 # Pin versions in apk add
- DL3008 # Pin versions in apt-get install
- DL3018 # Pin versions in apk add
26 changes: 26 additions & 0 deletions tasks/klimov_m_torus/common/include/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <tuple>
#include <vector>

#include "task/include/task.hpp"

namespace klimov_m_torus {

struct TransferRequest {
int sender{};
int receiver{};
std::vector<int> data;
};

struct TransferResult {
std::vector<int> received_data;
std::vector<int> route;
};

using InType = TransferRequest;
using OutType = TransferResult;
using TestParam = std::tuple<int>;
using BaseTask = ppc::task::Task<InType, OutType>;

} // namespace klimov_m_torus
9 changes: 9 additions & 0 deletions tasks/klimov_m_torus/info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"student": {
"first_name": "Михаил",
"group_number": "3823Б1ПР2",
"last_name": "Климов",
"middle_name": "Дмитриевич",
"task_number": "2"
}
}
47 changes: 47 additions & 0 deletions tasks/klimov_m_torus/mpi/include/ops_mpi.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <utility>
#include <vector>

#include "klimov_m_torus/common/include/common.hpp"
#include "task/include/task.hpp"

namespace klimov_m_torus {

class TorusMeshCommunicator : public BaseTask {
public:
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
return ppc::task::TypeOfTask::kMPI;
}

explicit TorusMeshCommunicator(const InType &in);

private:
bool ValidationImpl() override;
bool PreProcessingImpl() override;
bool RunImpl() override;
bool PostProcessingImpl() override;

static std::pair<int, int> CalculateGridSize(int total_processes);
static int CombineCoordinates(int row, int col, int rows, int cols);
static std::pair<int, int> SplitRank(int rank, int cols);
static std::vector<int> BuildMessageRoute(int rows, int cols, int from, int to);

void DistributeSenderReceiver(int &src, int &dst);
void DistributeDataLength(int src, int &len) const;
[[nodiscard]] std::vector<int> AssembleSendBuffer(int src, int len) const;
void RelayMessage(int src, int dst, const std::vector<int> &route, const std::vector<int> &buffer,
std::vector<int> &output) const;
void SaveFinalResult(int dst, const std::vector<int> &output, const std::vector<int> &route);

InType local_request_{};
OutType local_response_{};

int current_rank_{0};
int total_ranks_{0};

int grid_rows_{1};
int grid_cols_{1};
};

} // namespace klimov_m_torus
237 changes: 237 additions & 0 deletions tasks/klimov_m_torus/mpi/src/ops_mpi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
#include "klimov_m_torus/mpi/include/ops_mpi.hpp"

#include <mpi.h>

#include <algorithm>
#include <cmath>
#include <iterator>
#include <utility>
#include <vector>

#include "klimov_m_torus/common/include/common.hpp"

namespace klimov_m_torus {

namespace {

// Вспомогательные функции, вынесенные в анонимный namespace для снижения сложности RelayMessage

void HandleSameNode(int current_rank, int src, const std::vector<int> &buffer, std::vector<int> &output) {
if (current_rank == src) {
output = buffer;
}
}

void HandleSourceNode(int current_rank, int src, const std::vector<int> &route, const std::vector<int> &buffer,
std::vector<int> &output) {
output = buffer;
if (current_rank == src && route.size() > 1) {
int next_hop = route[1];
int send_len = static_cast<int>(buffer.size());
MPI_Send(&send_len, 1, MPI_INT, next_hop, 0, MPI_COMM_WORLD);
if (send_len > 0) {
MPI_Send(output.data(), send_len, MPI_INT, next_hop, 1, MPI_COMM_WORLD);
}
}
}

void HandleIntermediateNode(int current_rank, int dst, const std::vector<int> &route, int my_pos,
std::vector<int> &output) {
int prev_hop = route[my_pos - 1];
int recv_len = 0;
MPI_Recv(&recv_len, 1, MPI_INT, prev_hop, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
output.resize(recv_len);
if (recv_len > 0) {
MPI_Recv(output.data(), recv_len, MPI_INT, prev_hop, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}

if (current_rank != dst && my_pos + 1 < static_cast<int>(route.size())) {
int next_hop = route[my_pos + 1];
MPI_Send(&recv_len, 1, MPI_INT, next_hop, 0, MPI_COMM_WORLD);
if (recv_len > 0) {
MPI_Send(output.data(), recv_len, MPI_INT, next_hop, 1, MPI_COMM_WORLD);
}
}
}

} // namespace

TorusMeshCommunicator::TorusMeshCommunicator(const InType &in) {
SetTypeOfTask(GetStaticTypeOfTask());
GetInput() = in;
GetOutput() = {};
}

std::pair<int, int> TorusMeshCommunicator::CalculateGridSize(int total_processes) {
int rows = static_cast<int>(std::sqrt(static_cast<double>(total_processes)));
while (rows > 1 && (total_processes % rows != 0)) {
--rows;
}
if (rows <= 0) {
rows = 1;
}
int cols = total_processes / rows;
if (cols <= 0) {
cols = 1;
}
return {rows, cols};
}

int TorusMeshCommunicator::CombineCoordinates(int row, int col, int rows, int cols) {
int wrapped_row = ((row % rows) + rows) % rows;
int wrapped_col = ((col % cols) + cols) % cols;
return (wrapped_row * cols) + wrapped_col;
}

std::pair<int, int> TorusMeshCommunicator::SplitRank(int rank, int cols) {
int r = rank / cols;
int c = rank % cols;
return {r, c};
}

std::vector<int> TorusMeshCommunicator::BuildMessageRoute(int rows, int cols, int from, int to) {
std::vector<int> route;
if (rows <= 0 || cols <= 0) {
route.push_back(from);
return route;
}

auto [src_row, src_col] = SplitRank(from, cols);
auto [dst_row, dst_col] = SplitRank(to, cols);

int cur_row = src_row;
int cur_col = src_col;
route.push_back(from);

int col_diff = dst_col - src_col;
int right_steps = (col_diff >= 0) ? col_diff : col_diff + cols;
int left_steps = (col_diff <= 0) ? -col_diff : cols - col_diff;
int col_step = (right_steps <= left_steps) ? 1 : -1;
int col_moves = (right_steps <= left_steps) ? right_steps : left_steps;

for (int i = 0; i < col_moves; ++i) {
cur_col += col_step;
route.push_back(CombineCoordinates(cur_row, cur_col, rows, cols));
}

int row_diff = dst_row - src_row;
int down_steps = (row_diff >= 0) ? row_diff : row_diff + rows;
int up_steps = (row_diff <= 0) ? -row_diff : rows - row_diff;
int row_step = (down_steps <= up_steps) ? 1 : -1;
int row_moves = (down_steps <= up_steps) ? down_steps : up_steps;

for (int i = 0; i < row_moves; ++i) {
cur_row += row_step;
route.push_back(CombineCoordinates(cur_row, cur_col, rows, cols));
}

return route;
}

bool TorusMeshCommunicator::ValidationImpl() {
int initialized = 0;
MPI_Initialized(&initialized);
if (initialized == 0) {
return false;
}

MPI_Comm_rank(MPI_COMM_WORLD, &current_rank_);
MPI_Comm_size(MPI_COMM_WORLD, &total_ranks_);

int valid = 0;
if (current_rank_ == 0) {
const auto &req = GetInput();
if (req.sender >= 0 && req.receiver >= 0 && req.sender < total_ranks_ && req.receiver < total_ranks_) {
valid = 1;
}
}
MPI_Bcast(&valid, 1, MPI_INT, 0, MPI_COMM_WORLD);
return valid != 0;
}

bool TorusMeshCommunicator::PreProcessingImpl() {
MPI_Comm_rank(MPI_COMM_WORLD, &current_rank_);
MPI_Comm_size(MPI_COMM_WORLD, &total_ranks_);

auto [r, c] = CalculateGridSize(total_ranks_);
grid_rows_ = r;
grid_cols_ = c;

local_request_ = GetInput();
local_response_ = OutType{};
return true;
}

bool TorusMeshCommunicator::RunImpl() {
int sender = 0;
int receiver = 0;
DistributeSenderReceiver(sender, receiver);

int data_len = 0;
DistributeDataLength(sender, data_len);

std::vector<int> send_buffer = AssembleSendBuffer(sender, data_len);
std::vector<int> message_route = BuildMessageRoute(grid_rows_, grid_cols_, sender, receiver);

std::vector<int> received_data;
RelayMessage(sender, receiver, message_route, send_buffer, received_data);

SaveFinalResult(receiver, received_data, message_route);
return true;
}

void TorusMeshCommunicator::DistributeSenderReceiver(int &src, int &dst) {
if (current_rank_ == 0) {
const auto &req = GetInput();
src = req.sender;
dst = req.receiver;
}
MPI_Bcast(&src, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&dst, 1, MPI_INT, 0, MPI_COMM_WORLD);
}

void TorusMeshCommunicator::DistributeDataLength(int src, int &len) const {
if (current_rank_ == src) {
len = static_cast<int>(local_request_.data.size());
}
MPI_Bcast(&len, 1, MPI_INT, src, MPI_COMM_WORLD);
}

std::vector<int> TorusMeshCommunicator::AssembleSendBuffer(int src, int len) const {
std::vector<int> buffer(len);
if (current_rank_ == src && len > 0) {
std::ranges::copy(local_request_.data, buffer.begin());
}
return buffer;
}

void TorusMeshCommunicator::RelayMessage(int src, int dst, const std::vector<int> &route,
const std::vector<int> &buffer, std::vector<int> &output) const {
auto it = std::ranges::find(route, current_rank_);
bool on_route = (it != route.end());
int my_pos = on_route ? static_cast<int>(std::distance(route.begin(), it)) : -1;

if (src == dst) {
HandleSameNode(current_rank_, src, buffer, output);
} else if (current_rank_ == src) {
HandleSourceNode(current_rank_, src, route, buffer, output);
} else if (on_route) {
HandleIntermediateNode(current_rank_, dst, route, my_pos, output);
}
}

void TorusMeshCommunicator::SaveFinalResult(int dst, const std::vector<int> &output, const std::vector<int> &route) {
if (current_rank_ == dst) {
local_response_.received_data = output;
local_response_.route = route;
GetOutput() = local_response_;
} else {
GetOutput() = OutType{};
}
}

bool TorusMeshCommunicator::PostProcessingImpl() {
return true;
}

} // namespace klimov_m_torus
Loading
Loading