Skip to content

Commit eb9ddd1

Browse files
Annnnyafwyzard
authored andcommitted
added configuration to run mpi in local and remote mode
1 parent 7d5cf67 commit eb9ddd1

File tree

4 files changed

+81
-44
lines changed

4 files changed

+81
-44
lines changed

HeterogeneousCore/MPICore/plugins/MPIController.cc

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class MPIController : public edm::one::EDProducer<edm::one::WatchRuns, edm::one:
6363
MPI_Comm comm_ = MPI_COMM_NULL;
6464
MPIChannel channel_;
6565
edm::EDPutTokenT<MPIToken> token_;
66+
bool run_local_;
6667
};
6768

6869
MPIController::MPIController(edm::ParameterSet const& config)
@@ -71,30 +72,45 @@ MPIController::MPIController(edm::ParameterSet const& config)
7172
// make sure that MPI is initialised
7273
MPIService::required();
7374

75+
// get from parameter set whether remote mpi is used
76+
run_local_ = config.getUntrackedParameter<bool>("run_local", false);
77+
78+
edm::LogAbsolute("MPI") << "Running controller in " << (run_local_ ? "local" : "remote") << " mode.";
79+
7480
// FIXME move into the MPIService ?
7581
// make sure the EDM MPI types are available
7682
EDM_MPI_build_types();
7783

78-
// look up the "server" port
79-
char port[MPI_MAX_PORT_NAME];
80-
MPI_Lookup_name("server", MPI_INFO_NULL, port);
81-
edm::LogAbsolute("MPI") << "Trying to connect to the MPI server on port " << port;
82-
83-
// connect to the server
84-
int size;
85-
MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_);
86-
MPI_Comm_remote_size(comm_, &size);
87-
edm::LogAbsolute("MPI") << "Client connected to " << size << (size == 1 ? " server" : " servers");
88-
if (size > 1) {
89-
throw cms::Exception("UnsupportedFeature")
90-
<< "MPIController supports only a single follower, but it was connected to " << size << " followers";
84+
if (!run_local_) {
85+
// look up the "server" port
86+
char port[MPI_MAX_PORT_NAME];
87+
MPI_Lookup_name("server", MPI_INFO_NULL, port);
88+
edm::LogAbsolute("MPI") << "Trying to connect to the MPI server on port " << port;
89+
90+
// connect to the server
91+
int size;
92+
MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_);
93+
MPI_Comm_remote_size(comm_, &size);
94+
edm::LogAbsolute("MPI") << "Client connected to " << size << (size == 1 ? " server" : " servers");
95+
if (size > 1) {
96+
throw cms::Exception("UnsupportedFeature")
97+
<< "MPIController supports only a single follower, but it was connected to " << size << " followers";
98+
}
99+
channel_ = MPIChannel(comm_, 0);
100+
} else {
101+
comm_ = MPI_COMM_WORLD;
102+
channel_ = MPIChannel(comm_, 0);
103+
int world_rank;
104+
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
105+
std::cout << "MPI Controller Rank: " << world_rank << " in MPI_COMM_WORLD" << std::endl;
91106
}
92-
channel_ = MPIChannel(comm_, 0);
93107
}
94108

95109
MPIController::~MPIController() {
96110
// close the intercommunicator
97-
MPI_Comm_disconnect(&comm_);
111+
if (!run_local_) {
112+
MPI_Comm_disconnect(&comm_);
113+
}
98114
}
99115

100116
void MPIController::beginJob() {
@@ -223,6 +239,7 @@ void MPIController::fillDescriptions(edm::ConfigurationDescriptions& description
223239
"Runs, LuminosityBlocks and Events from the current process to the remote one.");
224240

225241
edm::ParameterSetDescription desc;
242+
desc.addOptionalUntracked<bool>("run_local", false);
226243
descriptions.addWithDefaultLabel(desc);
227244
}
228245

HeterogeneousCore/MPICore/plugins/MPIReceiver.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ class MPIReceiver : public edm::global::EDProducer<> {
5050
MPIToken token = event.get(upstream_);
5151

5252
// Receive the number of products
53-
int numProducts;
54-
token.channel()->receiveProduct(instance_, numProducts);
55-
edm::LogVerbatim("MPIReceiver") << "Received number of products: " << numProducts;
53+
// int numProducts;
54+
// token.channel()->receiveProduct(instance_, numProducts);
55+
// edm::LogVerbatim("MPIReceiver") << "Received number of products: " << numProducts;
5656

5757
for (auto const& entry : products_) {
5858
std::unique_ptr<edm::WrapperBase> wrapper(

HeterogeneousCore/MPICore/plugins/MPISender.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ class MPISender : public edm::global::EDProducer<> {
8383
// read the MPIToken used to establish the communication channel
8484
MPIToken token = event.get(upstream_);
8585

86-
int numProducts = static_cast<int>(products_.size());
87-
token.channel()->sendProduct(instance_, numProducts);
86+
// int numProducts = static_cast<int>(products_.size());
87+
// token.channel()->sendProduct(instance_, numProducts);
8888

8989
for (auto const& entry : products_) {
9090
// read the products to be sent over the MPI channel

HeterogeneousCore/MPICore/plugins/MPISource.cc

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class MPISource : public edm::ProducerSourceBase {
5858
MPI_Comm comm_ = MPI_COMM_NULL;
5959
MPIChannel channel_;
6060
edm::EDPutTokenT<MPIToken> token_;
61+
bool run_local_;
6162

6263
edm::ProcessHistory history_;
6364
};
@@ -76,22 +77,37 @@ MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescriptio
7677
// make sure the EDM MPI types are available
7778
EDM_MPI_build_types();
7879

79-
// open a server-side port
80-
MPI_Open_port(MPI_INFO_NULL, port_);
81-
82-
// publish the port under the name "server"
83-
MPI_Info port_info;
84-
MPI_Info_create(&port_info);
85-
MPI_Info_set(port_info, "ompi_global_scope", "true");
86-
MPI_Info_set(port_info, "ompi_unique", "true");
87-
MPI_Publish_name("server", port_info, port_);
88-
89-
// create an intercommunicator and accept a client connection
90-
edm::LogAbsolute("MPI") << "waiting for a connection to the MPI server at port " << port_;
91-
MPI_Comm_accept(port_, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &comm_);
92-
channel_ = MPIChannel(comm_, 0);
80+
// get from parameter set whether remote mpi is used
81+
run_local_ = config.getUntrackedParameter<bool>("run_local", false);
82+
83+
edm::LogAbsolute("MPI") << "Running source in " << (run_local_ ? "local" : "remote") << " mode.";
84+
85+
if (run_local_) {
86+
int world_rank;
87+
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
88+
std::cout << "MPI Source Rank: " << world_rank << " in MPI_COMM_WORLD" << std::endl;
89+
comm_ = MPI_COMM_WORLD;
90+
channel_ = MPIChannel(comm_, 1);
91+
} else {
92+
// open a server-side port
93+
MPI_Open_port(MPI_INFO_NULL, port_);
94+
95+
// publish the port under the name "server"
96+
MPI_Info port_info;
97+
MPI_Info_create(&port_info);
98+
MPI_Info_set(port_info, "ompi_global_scope", "true");
99+
MPI_Info_set(port_info, "ompi_unique", "true");
100+
MPI_Publish_name("server", port_info, port_);
101+
102+
// create an intercommunicator and accept a client connection
103+
edm::LogAbsolute("MPI") << "waiting for a connection to the MPI server at port " << port_;
104+
105+
MPI_Comm_accept(port_, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &comm_);
106+
edm::LogAbsolute("MPI") << "connection accept ";
107+
channel_ = MPIChannel(comm_, 0);
108+
}
93109

94-
// wait for a client to connect
110+
// // wait for a client to connect
95111
MPI_Status status;
96112
EDM_MPI_Empty_t buffer;
97113
MPI_Recv(&buffer, 1, EDM_MPI_Empty, MPI_ANY_SOURCE, EDM_MPI_Connect, comm_, &status);
@@ -100,15 +116,17 @@ MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescriptio
100116

101117
MPISource::~MPISource() {
102118
// close the intercommunicator
103-
MPI_Comm_disconnect(&comm_);
104-
105-
// unpublish and close the port
106-
MPI_Info port_info;
107-
MPI_Info_create(&port_info);
108-
MPI_Info_set(port_info, "ompi_global_scope", "true");
109-
MPI_Info_set(port_info, "ompi_unique", "true");
110-
MPI_Unpublish_name("server", port_info, port_);
111-
MPI_Close_port(port_);
119+
if (!run_local_) {
120+
MPI_Comm_disconnect(&comm_);
121+
122+
// unpublish and close the port
123+
MPI_Info port_info;
124+
MPI_Info_create(&port_info);
125+
MPI_Info_set(port_info, "ompi_global_scope", "true");
126+
MPI_Info_set(port_info, "ompi_unique", "true");
127+
MPI_Unpublish_name("server", port_info, port_);
128+
MPI_Close_port(port_);
129+
}
112130
}
113131

114132
//MPISource::ItemTypeInfo MPISource::getNextItemType() {
@@ -259,6 +277,8 @@ void MPISource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
259277
desc.setComment("Comunicate with another cmsRun process over MPI.");
260278
edm::ProducerSourceBase::fillDescription(desc);
261279

280+
desc.addOptionalUntracked<bool>("run_local");
281+
262282
descriptions.add("source", desc);
263283
}
264284

0 commit comments

Comments
 (0)