Skip to content

Commit 4fc6044

Browse files
authored
Enable usched and initret and create ConverseExit (#143)
* usched and initret implementation * reset stop flag * create ConverseExit * fix nodereduction cmake * initialization of csv on rank 0 only * reduction id * fix in header * fix exit thread cleanup * remove print * remove unneeded atomics * remove another * lock before id gen * comment clarification * Revert "lock before id gen" This reverts commit 3455256. * fix hang at exit with multiple devices
1 parent 400ec62 commit 4fc6044

File tree

6 files changed

+131
-41
lines changed

6 files changed

+131
-41
lines changed

include/converse.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,11 @@ int CmiGetArgc(char **argv);
539539
int CmiScanf(const char *format, ...);
540540
int CmiError(const char *format, ...);
541541

542-
#define ConverseExit(...) CmiExit(__VA_ARGS__ + 0)
542+
#ifdef __cplusplus
543+
void ConverseExit(int status=0);
544+
#else
545+
void ConverseExit(int status);
546+
#endif
543547
#define CmiMemcpy(dest, src, size) memcpy((dest), (src), (size))
544548

545549
#define setMemoryTypeChare(p) /* empty memory debugging method */

src/collectives.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,11 @@ void CmiReductionsInit(void) {
229229
CpvAccess(_reduction_counter) = 0;
230230

231231
// SETUP NODE-LEVEL REDUCTIONS
232-
CsvInitialize(CmiNodeReduction *, _node_reduction_info);
233-
CsvInitialize(CmiNodeReductionID, _node_reduction_counter);
232+
if(CmiMyRank() == 0)
233+
{
234+
CsvInitialize(CmiNodeReduction *, _node_reduction_info);
235+
CsvInitialize(CmiNodeReductionID, _node_reduction_counter);
236+
234237

235238
auto noderedinfo =
236239
(CmiNodeReduction *)malloc(CmiMaxReductions * sizeof(CmiNodeReduction));
@@ -239,10 +242,13 @@ void CmiReductionsInit(void) {
239242

240243
// node reduction must be initialized with a valid lock
241244
nodered.lock = CmiCreateLock(); // in non-smp this would just be a nullptr
242-
245+
246+
CsvAccess(_node_reduction_info) = noderedinfo;
247+
CsvAccess(_node_reduction_counter) = 0;
243248
}
244-
CsvAccess(_node_reduction_info) = noderedinfo;
245-
CsvAccess(_node_reduction_counter) = 0;
249+
250+
}
251+
246252
}
247253

248254
// extract reduction ID from message
@@ -428,16 +434,15 @@ static void CmiClearNodeReduction(CmiReductionID id) {
428434

429435
// lock and unlock are used to support SMP
430436
void CmiNodeReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
437+
const CmiReductionID id = CmiGetNextNodeReductionID();
431438

432439
CmiNodeReduction nodeRed =
433-
CsvAccess(_node_reduction_info)[CmiGetReductionIndex(CmiGetRedID(msg))];
440+
CsvAccess(_node_reduction_info)[CmiGetReductionIndex(id)];
434441
CmiLock(nodeRed.lock);
435442

436-
const CmiReductionID id = CmiGetNextNodeReductionID();
437443
CmiReduction *red = CmiGetCreateNodeReduction(id);
438444
CmiInternalNodeReduce(msg, size, mergeFn, red);
439445

440-
441446
CmiUnlock(nodeRed.lock);
442447
}
443448

src/convcore.cpp

Lines changed: 109 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ int Cmi_mynodesize; // represents the number of PE's/threads on a single
2828
int Cmi_numnodes; // represents the number of physical nodes/systems machine
2929
int Cmi_nodestart;
3030
std::vector<CmiHandlerInfo> **CmiHandlerTable; // array of handler vectors
31+
std::atomic<int> numPEsReadyForExit {0};
3132
ConverseNodeQueue<void *> *CmiNodeQueue;
3233
CpvDeclare(Queue, CsdSchedQueue);
3334
CsvDeclare(Queue, CsdNodeQueue);
@@ -49,6 +50,8 @@ int userDrivenMode;
4950
int _replaySystem = 0;
5051
static int CmiMemoryIs_flag=0;
5152
CsvDeclare(CmiIpcManager*, coreIpcManager_);
53+
int Cmi_usched;
54+
int Cmi_initret;
5255

5356
CmiNodeLock CmiMemLock_lock;
5457
CpvDeclare(int, isHelperOn);
@@ -114,7 +117,7 @@ void CmiCallHandler(int handler, void *msg) {
114117
}
115118
}
116119

117-
void converseRunPe(int rank) {
120+
void converseRunPe(int rank, int everReturn) {
118121
char **CmiMyArgv;
119122
CmiMyArgv = CmiCopyArgs(Cmi_argv);
120123

@@ -146,50 +149,110 @@ void converseRunPe(int rank) {
146149
if (CmiTraceFn)
147150
CmiTraceFn(Cmi_argv);
148151

149-
// call initial function and start scheduler
150-
Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
151-
CsdScheduler();
152+
/*Converse modes:
153+
* usched=0, initret/everReturn=0: normal mode, converse starts scheduler for you
154+
* usched=1, initret/everReturn=0: user-driven scheduling, user calls scheduler
155+
* in startfn, then exits
156+
* usched=1, initret/everReturn=1: ConverseInit returns without calling startfn
157+
* on rank 0, still calls startfn on other ranks (used by namd)
158+
*/
152159

160+
if(!everReturn)
161+
{
162+
Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
163+
if (Cmi_usched == 0) {
164+
CsdScheduler();
165+
}
166+
CmiFreeArgs(CmiMyArgv);
167+
//todo: add interoperate condition
168+
ConverseExit();
169+
}
170+
else CmiFreeArgs(CmiMyArgv);
171+
172+
/*
153173
// Wait for all PEs on this node to finish
154174
CmiNodeBarrier();
155175
156176
// Finalize comm_backend
157177
comm_backend::exitThread();
158-
// free args
159-
CmiFreeArgs(CmiMyArgv);
178+
*/
160179
}
161180

162-
void CmiStartThreads() {
163-
// allocate global arrayss
164-
Cmi_queues = new ConverseQueue<void *> *[Cmi_mynodesize];
165-
CmiHandlerTable = new std::vector<CmiHandlerInfo> *[Cmi_mynodesize];
166-
CmiNodeQueue = new ConverseNodeQueue<void *>();
181+
//function to exit converse and terminate the program
182+
//waits for all threads to call, then does cleanup on rank 0
183+
void ConverseExit(int exitcode)
184+
{
185+
// increment number of PEs ready for exit
186+
std::atomic_fetch_add_explicit(&numPEsReadyForExit, 1, std::memory_order_release);
187+
// we need everyone to spin unlike old converse to be able to exit threads
188+
while (std::atomic_load_explicit(&numPEsReadyForExit, std::memory_order_acquire) != CmiMyNodeSize()) {
189+
// make progress while waiting so network progress can continue
190+
comm_backend::progress();
191+
}
167192

168-
_smp_mutex = CmiCreateLock();
169-
CmiMemLock_lock = CmiCreateLock();
193+
// At this point all threads on the node are ready to exit. We must perform
194+
// the inter-node barrier while per-thread communication contexts are still
195+
// alive so that progress() can drive completion. To coordinate that,
196+
// rank 0 performs the inter-node barrier and then notifies other threads
197+
// (via an atomic) that the barrier is complete. After the notification,
198+
// every thread performs its per-thread cleanup (exitThread). Finally,
199+
// rank 0 waits for all threads to finish exitThread before tearing down
200+
// the global comm backend and exiting the process.
170201

171-
// make sure the queues are allocated before PEs start sending messages around
172-
comm_backend::barrier();
202+
static std::atomic<int> barrier_done{0};
203+
static std::atomic<int> exitThread_done{0};
173204

174-
std::vector<std::thread> threads;
175-
for (int i = 0; i < Cmi_mynodesize; i++) {
176-
std::thread t(converseRunPe, i);
177-
threads.push_back(std::move(t));
205+
if (CmiMyRank() == 0) {
206+
// participate in the global barrier (blocks until other nodes arrive)
207+
comm_backend::barrier();
208+
// let other local threads know barrier has completed
209+
barrier_done.store(1, std::memory_order_release);
210+
} else {
211+
// other threads help make progress until rank 0 finishes the barrier
212+
while (std::atomic_load_explicit(&barrier_done, std::memory_order_acquire) == 0) {
213+
comm_backend::progress();
214+
}
178215
}
179216

180-
for (auto &thread : threads) {
181-
thread.join();
217+
// Now every thread can clean up its thread-local comm state.
218+
comm_backend::exitThread();
219+
220+
// signal we've finished exitThread()
221+
std::atomic_fetch_add_explicit(&exitThread_done, 1, std::memory_order_release);
222+
223+
if (CmiMyRank() == 0) {
224+
// wait for all local threads to complete their per-thread cleanup. Use
225+
// progress() to avoid deadlock if any backend progress is needed.
226+
while (std::atomic_load_explicit(&exitThread_done, std::memory_order_acquire) != CmiMyNodeSize()) {
227+
comm_backend::progress();
228+
}
229+
230+
// safe to tear down global comm backend and process-wide structures now
231+
comm_backend::exit();
232+
delete[] Cmi_queues;
233+
delete CmiNodeQueue;
234+
delete[] CmiHandlerTable;
235+
Cmi_queues = nullptr;
236+
CmiNodeQueue = nullptr;
237+
CmiHandlerTable = nullptr;
238+
exit(exitcode);
239+
} else {
240+
// Non-rank-0 threads block here indefinitely; rank 0 will terminate the
241+
// process once cleanup is done.
242+
while (true) {
243+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
244+
}
245+
}
246+
}
247+
248+
void CmiStartThreads() {
249+
250+
// Create threads for ranks 1 and up, run rank 0 on main thread (like original Converse)
251+
for (int i = 1; i < Cmi_mynodesize; i++) {
252+
std::thread t(converseRunPe, i, 0); // everReturn is 0 for ranks > 0, meaning these ranks will call the start function and not return from ConverseInit
253+
t.detach();
182254
}
183255

184-
// make sure all PEs are done before we free the queues.
185-
comm_backend::barrier();
186-
comm_backend::exit();
187-
delete[] Cmi_queues;
188-
delete CmiNodeQueue;
189-
delete[] CmiHandlerTable;
190-
Cmi_queues = nullptr;
191-
CmiNodeQueue = nullptr;
192-
CmiHandlerTable = nullptr;
193256
}
194257

195258
// argument form: ./prog +pe <N>
@@ -253,6 +316,8 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched,
253316
Cmi_argv = argv;
254317
Cmi_startfn = fn;
255318
CharmLibInterOperate = 0;
319+
Cmi_usched = usched;
320+
Cmi_initret = initret;
256321

257322
#ifdef CMK_HAS_PARTITION
258323
CmiCreatePartitions(argv);
@@ -266,7 +331,21 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched,
266331
Cmi_nodestartGlobal = _Cmi_mynode_global * Cmi_mynodesize;
267332
#endif
268333

334+
// allocate global arrays
335+
Cmi_queues = new ConverseQueue<void *> *[Cmi_mynodesize];
336+
CmiHandlerTable = new std::vector<CmiHandlerInfo> *[Cmi_mynodesize];
337+
CmiNodeQueue = new ConverseNodeQueue<void *>();
338+
339+
_smp_mutex = CmiCreateLock();
340+
CmiMemLock_lock = CmiCreateLock();
341+
342+
// make sure the queues are allocated before PEs start sending messages around
343+
comm_backend::barrier();
344+
345+
//launch threads on rank 1+
269346
CmiStartThreads();
347+
//run rank 0 on main thread
348+
converseRunPe(0, Cmi_initret);
270349
}
271350

272351
// CMI STATE

src/converse_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ typedef struct GroupDef_s {
2727
#define DEBUGF(...) //CmiPrintf(__VA_ARGS__)
2828

2929
void CmiStartThreads(char **argv);
30-
void converseRunPe(int rank);
30+
void converseRunPe(int rank, int everReturn);
3131
void collectiveInit(void);
3232

3333
// HANDLERS

src/scheduler.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ void CsdSchedulePoll() {
261261

262262
int CsdScheduler(int maxmsgs){
263263
if (maxmsgs < 0) {
264+
//reset stop flag
265+
CmiGetState()->stopFlag = 0;
264266
CsdScheduler(); //equivalent to CsdScheduleForever in old converse
265267
}
266268
else CsdSchedulePoll(); //not implementing CsdScheduleCount
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
add_reconverse_executable(nodereduction reduction.cpp)
22
add_test(NAME node-reduction-singlenode COMMAND nodereduction +pe 7)
3-
add_test(NAME node-reduction-multinode COMMAND ${RECONVERSE_TEST_LAUNCHER} -n 4 $<TARGET_FILE:reduction> +pe 8)
3+
add_test(NAME node-reduction-multinode COMMAND ${RECONVERSE_TEST_LAUNCHER} -n 4 $<TARGET_FILE:nodereduction> +pe 8)

0 commit comments

Comments
 (0)