diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f10cf21..7a07b09 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,6 @@ target_include_directories(reconverse PRIVATE .) target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp - scheduler.cpp cpuaffinity.cpp collectives.cpp + scheduler.cpp scheduler_helpers.cpp cpuaffinity.cpp collectives.cpp comm_backend/comm_backend_internal.cpp threads.cpp cldb.rand.cpp cldb.cpp cmirdmautils.cpp conv-rdma.cpp conv-topology.cpp msgmgr.cpp queueing.cpp cmishmem.cpp) target_include_directories( diff --git a/src/convcore.cpp b/src/convcore.cpp index 9716ad0..5c84ed8 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -352,6 +352,9 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, CmiHandlerTable = new std::vector *[Cmi_mynodesize]; CmiNodeQueue = new ConverseNodeQueue(); + //register queues + CmiQueueRegisterInit(); + _smp_mutex = CmiCreateLock(); CmiMemLock_lock = CmiCreateLock(); diff --git a/src/converse_internal.h b/src/converse_internal.h index ec896a4..093783f 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -67,9 +67,11 @@ CmiState *CmiGetState(void); void CmiInitState(int pe); ConverseQueue *CmiGetQueue(int pe); void CrnInit(void); - void CmiPushPE(int destPE, int messageSize, void *msg); +//queue reg init +void CmiQueueRegisterInit(void); + // node queue ConverseNodeQueue *CmiGetNodeQueue(); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index c346656..b57d02c 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -1,152 +1,136 @@ #include "scheduler.h" -#include "converse.h" -#include "converse_internal.h" -#include "queue.h" -#include -/** - * The main scheduler loop for the Charm++ runtime. - */ -void CsdScheduler() { - // get pthread level queue - ConverseQueue *queue = CmiGetQueue(CmiMyRank()); - - // get node level queue - ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); - - int loop_counter = 0; - - while (CmiStopFlag() == 0) { - - CcdRaiseCondition(CcdSCHEDLOOP); +extern std::vector g_handlers; //list of handlers +extern Groups g_groups; //groups of handlers by index - #ifdef CMK_USE_SHMEM - CmiIpcBlock* block = CmiPopIpcBlock(CsvAccess(coreIpcManager_)); - if (block != nullptr) { - CmiDeliverIpcBlockMsg(block); - } - #endif +static inline void releaseIdle() { + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } +} - // poll node queue - if (!nodeQueue->empty()) { - auto result = nodeQueue->pop(); - if (result) { - void *msg = result.value(); - // process event - CmiHandleMessage(msg); +static inline void setIdle() { + if (!CmiGetIdle()) { + CmiSetIdle(true); + CmiSetIdleTime(CmiWallTimer()); + CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); + } + // if already idle, call still idle and (maybe) long idle + else { + CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); + if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { + CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); + } + } +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } +//poll converse-level node queue +bool pollConverseNodeQueue() { + ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); + if (!nodeQueue->empty()) { + auto result = nodeQueue->pop(); + if (result) { + void *msg = result.value(); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; } + } + return false; +} - // poll thread queue - else if (!queue->empty()) { - // get next event (guaranteed to be there because only single consumer) - void *msg = queue->pop().value(); +//poll converse-level thread queue +bool pollConverseThreadQueue() { + ConverseQueue *queue = CmiGetQueue(CmiMyRank()); + if (!queue->empty()) { + // get next event (guaranteed to be there because only single consumer) + void *msg = queue->pop().value(); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; + } + return false; +} +//poll node priority queue +bool pollNodePrioQueue() { + // Try to acquire lock without blocking + if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { + if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { + void *msg = QueueTop(CsvAccess(CsdNodeQueue)); + QueuePop(CsvAccess(CsdNodeQueue)); + CmiUnlock(CsvAccess(CsdNodeQueueLock)); // process event CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } + releaseIdle(); + return true; + } else { + CmiUnlock(CsvAccess(CsdNodeQueueLock)); } + } + return false; +} - // poll node prio queue - else { - // Try to acquire lock without blocking - if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { - if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { - void* msg = QueueTop(CsvAccess(CsdNodeQueue)); - QueuePop(CsvAccess(CsdNodeQueue)); - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - // process event - CmiHandleMessage(msg); +//poll thread priority queue +bool pollThreadPrioQueue() { + if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { + void *msg = QueueTop(CpvAccess(CsdSchedQueue)); + QueuePop(CpvAccess(CsdSchedQueue)); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; + } + return false; +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - else { - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - //empty queue so check thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); +bool pollProgress() +{ + if(CmiMyRank() % backend_poll_thread == 0) comm_backend::progress(); + return false; //polling progress doesn't count +} - // process event - CmiHandleMessage(msg); +//will add queue polling functions +//called at node level (before threads created) +void CmiQueueRegisterInit() { + add_handler(pollConverseNodeQueue, 8); + add_handler(pollConverseThreadQueue, 1); + add_handler(pollNodePrioQueue, 16); + add_handler(pollThreadPrioQueue, 1); + add_handler(pollProgress, backend_poll_freq); +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - // the processor is idle - // if not already idle, set idle and raise condition - if (!CmiGetIdle()) { - CmiSetIdle(true); - CmiSetIdleTime(CmiWallTimer()); - CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); - } - // if already idle, call still idle and (maybe) long idle - else { - CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); - if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { - CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); - } - } - } - } - } - else { - // Could not acquire node queue lock, skip to thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); +/** + * The main scheduler loop for the Charm++ runtime. + */ +void CsdScheduler() { - // process event - CmiHandleMessage(msg); + uint64_t loop_counter = 0; - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - // the processor is idle - // if not already idle, set idle and raise condition - if (!CmiGetIdle()) { - CmiSetIdle(true); - CmiSetIdleTime(CmiWallTimer()); - CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); - } - // if already idle, call still idle and (maybe) long idle - else { - CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); - if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { - CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); - } - } + while (CmiStopFlag() == 0) { + + CcdRaiseCondition(CcdSCHEDLOOP); + //always deliver shmem messages first + #ifdef CMK_USE_SHMEM + CmiIpcBlock* block = CmiPopIpcBlock(CsvAccess(coreIpcManager_)); + if (block != nullptr) { + CmiDeliverIpcBlockMsg(block); } - } + #endif + //poll queues + unsigned idx = static_cast(loop_counter & 63ULL); + bool workDone = false; + for (auto fn : g_groups[idx]) { + workDone |= fn(); } - if((CmiMyRank() % backend_poll_thread == 0) && (loop_counter++ == (backend_poll_freq - 1))) - { - loop_counter = 0; - comm_backend::progress(); + if(!workDone) { + setIdle(); } - CcdCallBacks(); + loop_counter++; } } @@ -156,106 +140,24 @@ void CsdScheduler() { * are empty, not when the scheduler is stopped. */ void CsdSchedulePoll() { - // get pthread level queue - ConverseQueue *queue = CmiGetQueue(CmiMyRank()); - - // get node level queue - ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); + uint64_t loop_counter = 0; while(1){ CcdCallBacks(); - CcdRaiseCondition(CcdSCHEDLOOP); - - // poll node queue - if (!nodeQueue->empty()) { - auto result = nodeQueue->pop(); - if (result) { - void *msg = result.value(); - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } + //poll queues + unsigned idx = static_cast(loop_counter & 63ULL); + bool workDone = false; + for (auto fn : g_groups[idx]) { + workDone |= fn(); } - - // poll thread queue - else if (!queue->empty()) { - // get next event (guaranteed to be there because only single consumer) - void *msg = queue->pop().value(); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - - // poll node prio queue - else { - // Try to acquire lock without blocking - if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { - if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { - void *msg = QueueTop(CsvAccess(CsdNodeQueue)); - QueuePop(CsvAccess(CsdNodeQueue)); - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - else { - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - comm_backend::progress(); - break; //break when queues are empty - } - } - } - else { - // Could not acquire node queue lock, skip to thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - comm_backend::progress(); - break; //break when queues are empty - } - } + if(!workDone) { + setIdle(); + break; } + loop_counter++; + } } diff --git a/src/scheduler.h b/src/scheduler.h index a311734..33b878d 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -1,2 +1,24 @@ +#ifndef _SCHEDULER_H_ +#define _SCHEDULER_H_ +#include "converse.h" +#include "converse_internal.h" +#include "queue.h" +#include +#include +#include + +using QueuePollHandlerFn = bool(*)(void); //we need a return value to indicate if work was done + +struct QueuePollHandler { + QueuePollHandlerFn fn; + uint64_t mask{0}; // 64-bit mask: bit i == call at loop index i (0..63) + unsigned period{0}; // 1..64, 0 => disabled + unsigned phase{0}; +}; + +using Groups = std::array, 64>; + +void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase = 0); void CsdScheduler(); +#endif diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp new file mode 100644 index 0000000..2dea2b9 --- /dev/null +++ b/src/scheduler_helpers.cpp @@ -0,0 +1,63 @@ +#include "scheduler.h" + +std::vector g_handlers; //list of handlers +Groups g_groups; //groups of handlers by index + +// Build a 64-bit mask for a period n (1..64) with optional phase (0..n-1) +inline uint64_t make_mask_every_n(unsigned n, unsigned phase = 0) { + if (n == 0) return 0ULL; + if (n == 1) return ~0ULL; + if (n > 64) n = 64; // clamp to 64 + uint64_t mask = 0ULL; + for (unsigned pos = 0; pos < 64; ++pos) { + if (((pos + phase) % n) == 0) mask |= (1ULL << pos); + } + return mask; +} + +// Rebuild groups from current handler masks (in-place). +// Single-threaded callers may call this whenever a handler mask changes. +inline void rebuild_groups() { + // Clear all groups + for (auto &v : g_groups) v.clear(); + + // Populate groups from each handler's mask + for (const auto &h : g_handlers) { + uint64_t m = h.mask; + if (m == 0) continue; + for (unsigned bit = 0; bit < 64; ++bit) { + if ((m >> bit) & 1ULL) { + g_groups[bit].push_back(h.fn); + } + } + } +} + +// Set handler period and phase (period: 1..64, 0 disables). +// Rebuilds groups immediately (cheap relative to hot path). +inline void set_frequency(size_t handlerIndex, unsigned period, unsigned phase = 0) { + if (handlerIndex >= g_handlers.size()) return; + QueuePollHandler &h = g_handlers[handlerIndex]; + + if (period == 0) { + h.period = 0; + h.phase = 0; + h.mask = 0ULL; + } else { + if (period > 64) period = 64; + h.period = period; + h.phase = phase % period; + h.mask = make_mask_every_n(h.period, h.phase); + } + rebuild_groups(); +} + +// Add a handler that will poll a queue at given frequency. +void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase) +{ + g_handlers.push_back({fn}); + size_t index = g_handlers.size() - 1; + set_frequency(index, period, phase); +} + +