diff --git a/include/converse.h b/include/converse.h index 26902bb..8fd9f6a 100644 --- a/include/converse.h +++ b/include/converse.h @@ -752,6 +752,17 @@ void CmiLock(CmiNodeLock lock); void CmiUnlock(CmiNodeLock lock); int CmiTryLock(CmiNodeLock lock); +//decrementToEnqueue +typedef struct DecrementToEnqueueMsg{ + unsigned int *counter; + void *msg; +} DecrementToEnqueueMsg; + +DecrementToEnqueueMsg *CmiCreateDecrementToEnqueue(unsigned int initialCount, void *msg); +void CmiDecrementCounter(DecrementToEnqueueMsg *dteMsg); +void CmiResetCounter(unsigned int newCount, DecrementToEnqueueMsg *dteMsg); +void CmiFreeDecrementToEnqueue(DecrementToEnqueueMsg *dteMsg); + // error checking // do we want asserts to be defaulted to be on or off(right now it is on) diff --git a/src/convcore.cpp b/src/convcore.cpp index 9716ad0..abf70cd 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -664,6 +664,71 @@ int CmiRegisterHandlerEx(CmiHandlerEx h, void *userPtr) { return handlerVector->size() - 1; } +//decrement to enqueue + +DecrementToEnqueueMsg *CmiCreateDecrementToEnqueue(unsigned int initialCount, void *msg){ + if(initialCount == 0){ + CmiAbort("CmiCreateDecrementToEnqueue: initialCount cannot be zero\n"); + } + DecrementToEnqueueMsg *dteMsg = static_cast(malloc(sizeof(DecrementToEnqueueMsg))); + dteMsg->counter = static_cast(malloc(sizeof(unsigned int))); + *(dteMsg->counter) = initialCount; + dteMsg->msg = msg; + return dteMsg; +} + +void CmiDecrementCounter(DecrementToEnqueueMsg *dteMsg){ + if(dteMsg == nullptr){ + CmiAbort("CmiDecrementCounter: dteMsg is nullptr\n"); + } + if(dteMsg->counter == nullptr){ + // In concurrent scenarios the counter pointer may have been freed by + // another thread (race between decrements). Instead of aborting, be + // tolerant and return silently since the counter has already been + // consumed. + return; + } + unsigned int oldValue = __atomic_fetch_sub(dteMsg->counter, 1, __ATOMIC_SEQ_CST); + if(oldValue == 0){ + CmiAbort("CmiDecrementCounter: counter already zero, cannot decrement further\n"); + } + if(oldValue == 1){ + //get dest PE from message header + CmiMessageHeader *header = static_cast(dteMsg->msg); + int destPE = header->destPE; + CmiAssertMsg( + destPE >= 0 && destPE < Cmi_npes, + "CmiDecrementCounter: destPE out of range" + ); + // enqueue the message without freeing + CmiSyncSend(destPE, header->messageSize, dteMsg->msg); + } +} + +void CmiResetCounter(unsigned int newCount, DecrementToEnqueueMsg *dteMsg){ + if(dteMsg == nullptr){ + CmiAbort("CmiResetCounter: dteMsg is nullptr\n"); + } + if(dteMsg->counter == nullptr){ + CmiAbort("CmiResetCounter: counter is nullptr\n"); + } + if(newCount == 0){ + CmiAbort("CmiResetCounter: newCount cannot be zero\n"); + } + __atomic_store_n(dteMsg->counter, newCount, __ATOMIC_SEQ_CST); +} + +void CmiFreeDecrementToEnqueue(DecrementToEnqueueMsg *dteMsg){ + if(dteMsg == nullptr){ + CmiAbort("CmiFreeDecrementToEnqueue: dteMsg is nullptr\n"); + } + if(dteMsg->counter == nullptr){ + CmiAbort("CmiFreeDecrementToEnqueue: counter is nullptr\n"); + } + free(dteMsg->counter); + free(dteMsg); +} + void CmiNodeBarrier(void) { static Barrier nodeBarrier(CmiMyNodeSize()); int64_t ticket = nodeBarrier.arrive(); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index aee3fd4..0998333 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,6 +2,8 @@ add_subdirectory(broadcast) add_subdirectory(group) add_subdirectory(within-node-bcast) add_subdirectory(conds) +add_subdirectory(decrement_enqueue) +add_subdirectory(decrement_bcast) add_subdirectory(idle) add_subdirectory(kNeighbors) add_subdirectory(orig-converse/pingpong) diff --git a/tests/decrement_bcast/CMakeLists.txt b/tests/decrement_bcast/CMakeLists.txt new file mode 100644 index 0000000..00f0e3a --- /dev/null +++ b/tests/decrement_bcast/CMakeLists.txt @@ -0,0 +1,3 @@ +add_reconverse_executable(decrement_bcast decrement_bcast.cpp) +# run with 4 PEs by default to exercise multiple PEs; override in ctest if desired +add_test(NAME decrement_bcast COMMAND decrement_bcast +pe 4) diff --git a/tests/decrement_bcast/decrement_bcast.cpp b/tests/decrement_bcast/decrement_bcast.cpp new file mode 100644 index 0000000..c0c245f --- /dev/null +++ b/tests/decrement_bcast/decrement_bcast.cpp @@ -0,0 +1,126 @@ +// Multi-PE reconverse test for CmiCreateDecrementToEnqueue / CmiDecrementCounter +// Intended to run with an arbitrary number of PEs. +// Assumption: the counter is created on PE 0 with initialCount = 4 * CmiNumPes() +// (the user requested "4*CmiMyPe()" but that would be zero on PE 0; to make the +// test meaningful across arbitrary PEs we initialize to 4 * number of PEs so +// each PE can send 4 messages to PE 0). + +#include "converse.h" +#include + +// Global pointer to the DecrementToEnqueueMsg created on PE 0. Other PEs do not +// need direct access to its fields; PE 0 will own and use this pointer when +// decrementing. Making it global simplifies the broadcast message. +static DecrementToEnqueueMsg *g_dte = NULL; +static int g_decInvH = -1; + +// Handler called when final message is delivered (counter reached zero). +void exit_handler(void *msg) { + CmiPrintf("Exit handler: counter reached zero on PE %d\n", CmiMyPe()); + CmiFreeDecrementToEnqueue(g_dte); + CmiExit(0); +} + +// Handler that will be invoked on PE0 for each incoming decrement-invoker +// message. It calls CmiDecrementCounter on the global DTE. +void decrement_invoker(void *msg) { + (void)msg; // incoming message payload is not used + + //CmiPrintf("decrement_invoker: PE %d decrementing counter\n", CmiMyPe()); + + // Call the decrement operation on the global DTE + CmiDecrementCounter(g_dte); + + // Free the incoming message + CmiFree(msg); +} + +// Handler invoked on every PE when the broadcast arrives. Each PE will send 4 +// small messages to PE 0; those messages will trigger decrement_invoker on PE0. +void broadcast_handler(void *msg) { + // The broadcast carries no extra payload for this test; simply send 4 + // messages to PE 0. PE 0 owns the global DTE (g_dte) and will perform the + // decrements when it receives these messages. + (void)msg; // unused + + int dest = 0; + + // Send 4 messages to PE 0. The messages themselves carry no useful payload + // other than the header and are used only to trigger the decrement handler + // on PE 0. + //CmiPrintf("broadcast_handler: PE %d sending 4 decrement messages to PE 0\n", CmiMyPe()); + for (int i = 0; i < 4; ++i) { + int sendSize = (int)sizeof(CmiMessageHeader); + char *smsg = (char *)CmiAlloc(sendSize); + memset(smsg, 0, sendSize); + + // set handler to the pre-registered decrement_invoker + CmiSetHandler(smsg, g_decInvH); + CmiMessageHeader *sendHdr = (CmiMessageHeader *)smsg; + sendHdr->destPE = dest; + sendHdr->messageSize = sendSize; + + // send to PE 0 and free the buffer if the send copies it; use SyncSendAndFree + CmiSyncSendAndFree(dest, sendSize, smsg); + } +} + +// Start function: PE 0 creates the DecrementToEnqueueMsg with initialCount = +// 4 * CmiNumPes(), then broadcasts a message carrying the dte pointer. All +// PEs will receive the broadcast and send 4 messages to PE 0 which trigger +// decrements. +void test_start(int argc, char **argv) { + (void)argc; (void)argv; + int exitH = CmiRegisterHandler((CmiHandler)exit_handler); + int bcastH = CmiRegisterHandler((CmiHandler)broadcast_handler); + + // register the decrement invoker once and store its handler globally so + // broadcast_handler can reuse it when composing outgoing messages. + g_decInvH = CmiRegisterHandler((CmiHandler)decrement_invoker); + + int numPes = CmiNumPes(); + int initial = 4 * numPes; // 4 messages per PE + + if (CmiMyPe() == 0) { + // create final message that will be sent when counter reaches zero + int finalSize = (int)sizeof(CmiMessageHeader); + void *finalMsg = CmiAlloc(finalSize); + memset(finalMsg, 0, finalSize); + CmiSetHandler(finalMsg, exitH); + CmiMessageHeader *fhdr = (CmiMessageHeader *)finalMsg; + fhdr->destPE = 0; + fhdr->messageSize = finalSize; + + g_dte = CmiCreateDecrementToEnqueue((unsigned int)initial, finalMsg); + // Ensure stores to g_dte and its internals are visible to other PEs/threads. + //CmiMemoryWriteFence(); + //CmiPrintf("[PE %d] created g_dte=%p, counter=%p (initial=%d)\n", CmiMyPe(), (void*)g_dte, (void*)(g_dte?g_dte->counter:NULL), initial); + } + + // Ensure that PE 0 has finished creating g_dte before anyone reacts to the + // broadcast. This prevents races where receivers send messages that reach + // PE 0 before g_dte is initialized, which would cause CmiDecrementCounter to + // see a null counter. + CmiNodeAllBarrier(); + //CmiPrintf("[PE %d] passed node barrier\n", CmiMyPe()); + + // Build a small broadcast message. Receivers will consult the global g_dte + // (which is valid on PE 0) and send messages to PE 0 to trigger decrements. + int bsize = (int)sizeof(CmiMessageHeader); + void *bmsg = CmiAlloc(bsize); + memset(bmsg, 0, bsize); + CmiMessageHeader *bhdr = (CmiMessageHeader *)bmsg; + bhdr->messageSize = bsize; + CmiSetHandler(bmsg, bcastH); + + // Broadcast to all PEs + if (CmiMyPe() == 0) CmiSyncBroadcastAllAndFree(bsize, bmsg); + + // Return from start; scheduler will process incoming messages and the exit + // handler will terminate when the counter reaches zero. +} + +int main(int argc, char **argv) { + ConverseInit(argc, argv, test_start, 0, 0); + return 0; +} diff --git a/tests/decrement_enqueue/CMakeLists.txt b/tests/decrement_enqueue/CMakeLists.txt new file mode 100644 index 0000000..9e3b349 --- /dev/null +++ b/tests/decrement_enqueue/CMakeLists.txt @@ -0,0 +1,2 @@ +add_reconverse_executable(decrement_enqueue decrement_enqueue.cpp) +add_test(NAME decrement_enqueue COMMAND decrement_enqueue +pe 1) \ No newline at end of file diff --git a/tests/decrement_enqueue/decrement_enqueue.cpp b/tests/decrement_enqueue/decrement_enqueue.cpp new file mode 100644 index 0000000..5efa73c --- /dev/null +++ b/tests/decrement_enqueue/decrement_enqueue.cpp @@ -0,0 +1,53 @@ +// Simple reconverse test for CmiCreateDecrementToEnqueue / CmiDecrementCounter +// Intended to run with 1 PE on 1 process. + +#include "converse.h" +#include + +DecrementToEnqueueMsg *dte; + +// Handler invoked when the decrement counter reaches zero. Exits the program. +void exit_handler(void *msg) { + // free DecrementToEnqueueMsg + CmiFreeDecrementToEnqueue(dte); + CmiPrintf("Exit handler called, exiting...\n"); + CmiExit(0); +} + +// Start function invoked by ConverseInit. +void test_start(int argc, char **argv) { + CmiPrintf("Starting decrement_enqueue test...\n"); + // Only PE 0 will create and drive the counter per the test spec. + if (CmiMyPe() != 0) return; + + int handler = CmiRegisterHandler((CmiHandler)exit_handler); + + // Create a minimal message consisting only of the message header. + int msgSize = (int)sizeof(CmiMessageHeader); + void *msg = CmiAlloc(msgSize); + memset(msg, 0, msgSize); + + // Set handler, destination and size on the header so CmiDecrementCounter + // can inspect them when it enqueues the message. + CmiSetHandler(msg, handler); + CmiMessageHeader *hdr = (CmiMessageHeader *)msg; + hdr->destPE = (CmiUInt4)CmiMyPe(); + hdr->messageSize = msgSize; + + // Create the decrement-to-enqueue helper with initial count 16. + dte = CmiCreateDecrementToEnqueue(16u, msg); + + // Decrement 16 times; on the 16th call the message will be sent and the + // registered handler will call CmiExit. + for (int i = 0; i < 16; ++i) { + CmiDecrementCounter(dte); + } + + // Return from start; scheduler will run and the exit handler will stop it. +} + +int main(int argc, char **argv) { + // Start the Converse runtime with our test_start function. + ConverseInit(argc, argv, test_start, 0, 0); + return 0; +}