Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/converse.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,17 @@ void CmiLock(CmiNodeLock lock);
void CmiUnlock(CmiNodeLock lock);
int CmiTryLock(CmiNodeLock lock);

//decrementToEnqueue
typedef struct DecrementToEnqueueMsg{
unsigned int *counter;
void *msg;
} DecrementToEnqueueMsg;

DecrementToEnqueueMsg *CmiCreateDecrementToEnqueue(unsigned int initialCount, void *msg);
void CmiDecrementCounter(DecrementToEnqueueMsg *dteMsg);
void CmiResetCounter(unsigned int newCount, DecrementToEnqueueMsg *dteMsg);
void CmiFreeDecrementToEnqueue(DecrementToEnqueueMsg *dteMsg);

// error checking

// do we want asserts to be defaulted to be on or off(right now it is on)
Expand Down
65 changes: 65 additions & 0 deletions src/convcore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,71 @@ int CmiRegisterHandlerEx(CmiHandlerEx h, void *userPtr) {
return handlerVector->size() - 1;
}

//decrement to enqueue

DecrementToEnqueueMsg *CmiCreateDecrementToEnqueue(unsigned int initialCount, void *msg){
if(initialCount == 0){
CmiAbort("CmiCreateDecrementToEnqueue: initialCount cannot be zero\n");
}
DecrementToEnqueueMsg *dteMsg = static_cast<DecrementToEnqueueMsg *>(malloc(sizeof(DecrementToEnqueueMsg)));
dteMsg->counter = static_cast<unsigned int *>(malloc(sizeof(unsigned int)));
*(dteMsg->counter) = initialCount;
dteMsg->msg = msg;
return dteMsg;
}

void CmiDecrementCounter(DecrementToEnqueueMsg *dteMsg){
if(dteMsg == nullptr){
CmiAbort("CmiDecrementCounter: dteMsg is nullptr\n");
}
if(dteMsg->counter == nullptr){
// In concurrent scenarios the counter pointer may have been freed by
// another thread (race between decrements). Instead of aborting, be
// tolerant and return silently since the counter has already been
// consumed.
return;
}
unsigned int oldValue = __atomic_fetch_sub(dteMsg->counter, 1, __ATOMIC_SEQ_CST);
if(oldValue == 0){
CmiAbort("CmiDecrementCounter: counter already zero, cannot decrement further\n");
}
if(oldValue == 1){
//get dest PE from message header
CmiMessageHeader *header = static_cast<CmiMessageHeader *>(dteMsg->msg);
int destPE = header->destPE;
CmiAssertMsg(
destPE >= 0 && destPE < Cmi_npes,
"CmiDecrementCounter: destPE out of range"
);
// enqueue the message without freeing
CmiSyncSend(destPE, header->messageSize, dteMsg->msg);
}
}

void CmiResetCounter(unsigned int newCount, DecrementToEnqueueMsg *dteMsg){
if(dteMsg == nullptr){
CmiAbort("CmiResetCounter: dteMsg is nullptr\n");
}
if(dteMsg->counter == nullptr){
CmiAbort("CmiResetCounter: counter is nullptr\n");
}
if(newCount == 0){
CmiAbort("CmiResetCounter: newCount cannot be zero\n");
}
__atomic_store_n(dteMsg->counter, newCount, __ATOMIC_SEQ_CST);
}

void CmiFreeDecrementToEnqueue(DecrementToEnqueueMsg *dteMsg){
if(dteMsg == nullptr){
CmiAbort("CmiFreeDecrementToEnqueue: dteMsg is nullptr\n");
}
if(dteMsg->counter == nullptr){
CmiAbort("CmiFreeDecrementToEnqueue: counter is nullptr\n");
}
free(dteMsg->counter);
free(dteMsg);
}

void CmiNodeBarrier(void) {
static Barrier nodeBarrier(CmiMyNodeSize());
int64_t ticket = nodeBarrier.arrive();
Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ add_subdirectory(broadcast)
add_subdirectory(group)
add_subdirectory(within-node-bcast)
add_subdirectory(conds)
add_subdirectory(decrement_enqueue)
add_subdirectory(decrement_bcast)
add_subdirectory(idle)
add_subdirectory(kNeighbors)
add_subdirectory(orig-converse/pingpong)
Expand Down
3 changes: 3 additions & 0 deletions tests/decrement_bcast/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_reconverse_executable(decrement_bcast decrement_bcast.cpp)
# run with 4 PEs by default to exercise multiple PEs; override in ctest if desired
add_test(NAME decrement_bcast COMMAND decrement_bcast +pe 4)
126 changes: 126 additions & 0 deletions tests/decrement_bcast/decrement_bcast.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Multi-PE reconverse test for CmiCreateDecrementToEnqueue / CmiDecrementCounter
// Intended to run with an arbitrary number of PEs.
// Assumption: the counter is created on PE 0 with initialCount = 4 * CmiNumPes()
// (the user requested "4*CmiMyPe()" but that would be zero on PE 0; to make the
// test meaningful across arbitrary PEs we initialize to 4 * number of PEs so
// each PE can send 4 messages to PE 0).

#include "converse.h"
#include <string.h>

// Global pointer to the DecrementToEnqueueMsg created on PE 0. Other PEs do not
// need direct access to its fields; PE 0 will own and use this pointer when
// decrementing. Making it global simplifies the broadcast message.
static DecrementToEnqueueMsg *g_dte = NULL;
static int g_decInvH = -1;

// Handler called when final message is delivered (counter reached zero).
void exit_handler(void *msg) {
CmiPrintf("Exit handler: counter reached zero on PE %d\n", CmiMyPe());
CmiFreeDecrementToEnqueue(g_dte);
CmiExit(0);
}

// Handler that will be invoked on PE0 for each incoming decrement-invoker
// message. It calls CmiDecrementCounter on the global DTE.
void decrement_invoker(void *msg) {
(void)msg; // incoming message payload is not used

//CmiPrintf("decrement_invoker: PE %d decrementing counter\n", CmiMyPe());

// Call the decrement operation on the global DTE
CmiDecrementCounter(g_dte);

// Free the incoming message
CmiFree(msg);
}

// Handler invoked on every PE when the broadcast arrives. Each PE will send 4
// small messages to PE 0; those messages will trigger decrement_invoker on PE0.
void broadcast_handler(void *msg) {
// The broadcast carries no extra payload for this test; simply send 4
// messages to PE 0. PE 0 owns the global DTE (g_dte) and will perform the
// decrements when it receives these messages.
(void)msg; // unused

int dest = 0;

// Send 4 messages to PE 0. The messages themselves carry no useful payload
// other than the header and are used only to trigger the decrement handler
// on PE 0.
//CmiPrintf("broadcast_handler: PE %d sending 4 decrement messages to PE 0\n", CmiMyPe());
for (int i = 0; i < 4; ++i) {
int sendSize = (int)sizeof(CmiMessageHeader);
char *smsg = (char *)CmiAlloc(sendSize);
memset(smsg, 0, sendSize);

// set handler to the pre-registered decrement_invoker
CmiSetHandler(smsg, g_decInvH);
CmiMessageHeader *sendHdr = (CmiMessageHeader *)smsg;
sendHdr->destPE = dest;
sendHdr->messageSize = sendSize;

// send to PE 0 and free the buffer if the send copies it; use SyncSendAndFree
CmiSyncSendAndFree(dest, sendSize, smsg);
}
}

// Start function: PE 0 creates the DecrementToEnqueueMsg with initialCount =
// 4 * CmiNumPes(), then broadcasts a message carrying the dte pointer. All
// PEs will receive the broadcast and send 4 messages to PE 0 which trigger
// decrements.
void test_start(int argc, char **argv) {
(void)argc; (void)argv;
int exitH = CmiRegisterHandler((CmiHandler)exit_handler);
int bcastH = CmiRegisterHandler((CmiHandler)broadcast_handler);

// register the decrement invoker once and store its handler globally so
// broadcast_handler can reuse it when composing outgoing messages.
g_decInvH = CmiRegisterHandler((CmiHandler)decrement_invoker);

int numPes = CmiNumPes();
int initial = 4 * numPes; // 4 messages per PE

if (CmiMyPe() == 0) {
// create final message that will be sent when counter reaches zero
int finalSize = (int)sizeof(CmiMessageHeader);
void *finalMsg = CmiAlloc(finalSize);
memset(finalMsg, 0, finalSize);
CmiSetHandler(finalMsg, exitH);
CmiMessageHeader *fhdr = (CmiMessageHeader *)finalMsg;
fhdr->destPE = 0;
fhdr->messageSize = finalSize;

g_dte = CmiCreateDecrementToEnqueue((unsigned int)initial, finalMsg);
// Ensure stores to g_dte and its internals are visible to other PEs/threads.
//CmiMemoryWriteFence();
//CmiPrintf("[PE %d] created g_dte=%p, counter=%p (initial=%d)\n", CmiMyPe(), (void*)g_dte, (void*)(g_dte?g_dte->counter:NULL), initial);
}

// Ensure that PE 0 has finished creating g_dte before anyone reacts to the
// broadcast. This prevents races where receivers send messages that reach
// PE 0 before g_dte is initialized, which would cause CmiDecrementCounter to
// see a null counter.
CmiNodeAllBarrier();
//CmiPrintf("[PE %d] passed node barrier\n", CmiMyPe());

// Build a small broadcast message. Receivers will consult the global g_dte
// (which is valid on PE 0) and send messages to PE 0 to trigger decrements.
int bsize = (int)sizeof(CmiMessageHeader);
void *bmsg = CmiAlloc(bsize);
memset(bmsg, 0, bsize);
CmiMessageHeader *bhdr = (CmiMessageHeader *)bmsg;
bhdr->messageSize = bsize;
CmiSetHandler(bmsg, bcastH);

// Broadcast to all PEs
if (CmiMyPe() == 0) CmiSyncBroadcastAllAndFree(bsize, bmsg);

// Return from start; scheduler will process incoming messages and the exit
// handler will terminate when the counter reaches zero.
}

int main(int argc, char **argv) {
ConverseInit(argc, argv, test_start, 0, 0);
return 0;
}
2 changes: 2 additions & 0 deletions tests/decrement_enqueue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
add_reconverse_executable(decrement_enqueue decrement_enqueue.cpp)
add_test(NAME decrement_enqueue COMMAND decrement_enqueue +pe 1)
53 changes: 53 additions & 0 deletions tests/decrement_enqueue/decrement_enqueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Simple reconverse test for CmiCreateDecrementToEnqueue / CmiDecrementCounter
// Intended to run with 1 PE on 1 process.

#include "converse.h"
#include <string.h>

DecrementToEnqueueMsg *dte;

// Handler invoked when the decrement counter reaches zero. Exits the program.
void exit_handler(void *msg) {
// free DecrementToEnqueueMsg
CmiFreeDecrementToEnqueue(dte);
CmiPrintf("Exit handler called, exiting...\n");
CmiExit(0);
}

// Start function invoked by ConverseInit.
void test_start(int argc, char **argv) {
CmiPrintf("Starting decrement_enqueue test...\n");
// Only PE 0 will create and drive the counter per the test spec.
if (CmiMyPe() != 0) return;

int handler = CmiRegisterHandler((CmiHandler)exit_handler);

// Create a minimal message consisting only of the message header.
int msgSize = (int)sizeof(CmiMessageHeader);
void *msg = CmiAlloc(msgSize);
memset(msg, 0, msgSize);

// Set handler, destination and size on the header so CmiDecrementCounter
// can inspect them when it enqueues the message.
CmiSetHandler(msg, handler);
CmiMessageHeader *hdr = (CmiMessageHeader *)msg;
hdr->destPE = (CmiUInt4)CmiMyPe();
hdr->messageSize = msgSize;

// Create the decrement-to-enqueue helper with initial count 16.
dte = CmiCreateDecrementToEnqueue(16u, msg);

// Decrement 16 times; on the 16th call the message will be sent and the
// registered handler will call CmiExit.
for (int i = 0; i < 16; ++i) {
CmiDecrementCounter(dte);
}

// Return from start; scheduler will run and the exit handler will stop it.
}

int main(int argc, char **argv) {
// Start the Converse runtime with our test_start function.
ConverseInit(argc, argv, test_start, 0, 0);
return 0;
}