Skip to content

Commit 400ec62

Browse files
ritvikraojszaday
andauthored
POSIX Shared Memory implementation (#115)
* add shared memory functions * fix build for reconverse * fix build * newlines * remove import * remove cmishmem.h * CldHandlerIndex is Cpv variable * cthissuspendable * cmake change * small fix * fix scheduling and freeing * Update src/cmishm.cpp Co-authored-by: Justin Szaday <[email protected]> * fix pointers in whichBin * change uintptr * c compatible * c compilation fix * avoid cpv calls in destructor --------- Co-authored-by: Justin Szaday <[email protected]>
1 parent cfb685d commit 400ec62

File tree

15 files changed

+1000
-44
lines changed

15 files changed

+1000
-44
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ option(SPANTREE "whether to enable spanning tree collectives" OFF) #should turn
5252
# Reconverse uses SMP always, but turn on this option for backwards compatibility (see pingpong_multipairs for example)
5353
option(CMK_SMP "whether to enable SMP support" ON)
5454
option(CMK_CPV_IS_SMP "whether to enable SMP support for cpvs" ON)
55+
option(CMK_USE_SHMEM "whether to use POSIX shared memory for IPC" OFF)
5556

5657
option(RECONVERSE_ATOMIC_QUEUE "whether to use atomic queue" ON)
5758

include/converse.h

Lines changed: 126 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#ifdef __cplusplus
77
#include <atomic>
88
#include <queue>
9+
#include <utility>
910
#else
1011
#include <stdatomic.h>
1112
#endif
@@ -279,6 +280,8 @@ void CthFree(CthThread t);
279280

280281
void CthResume(CthThread t);
281282

283+
int CthIsSuspendable(CthThread t);
284+
282285
void CthSuspend(void);
283286

284287
void CthAwaken(CthThread th);
@@ -546,15 +549,25 @@ void CmiInitCPUTopology(char **argv);
546549
void CmiInitCPUAffinity(char **argv);
547550
int CmiOnCore(void);
548551

549-
void __CmiEnforceMsgHelper(const char *expr, const char *fileName, int lineNum,
550-
const char *msg, ...);
552+
#define __CMK_STRING(x) #x
553+
#define __CMK_XSTRING(x) __CMK_STRING(x)
551554

552-
#define CmiEnforce(condition) \
553-
do { \
554-
if (!(condition)) { \
555-
__CmiEnforceMsgHelper(#condition, __FILE__, __LINE__, ""); \
556-
} \
557-
} while (0)
555+
void __CmiEnforceHelper(const char* expr, const char* fileName, const char* lineNum);
556+
void __CmiEnforceMsgHelper(const char* expr, const char* fileName,
557+
const char* lineNum, const char* msg, ...);
558+
559+
#define CmiEnforce(expr) \
560+
((void)(CMI_LIKELY(expr) ? 0 \
561+
: (__CmiEnforceHelper(__CMK_STRING(expr), __FILE__, \
562+
__CMK_XSTRING(__LINE__)), \
563+
0)))
564+
565+
#define CmiEnforceMsg(expr, ...) \
566+
((void)(CMI_LIKELY(expr) \
567+
? 0 \
568+
: (__CmiEnforceMsgHelper(__CMK_STRING(expr), __FILE__, \
569+
__CMK_XSTRING(__LINE__), __VA_ARGS__), \
570+
0)))
558571

559572
double getCurrentTime(void);
560573
double CmiWallTimer(void);
@@ -1196,4 +1209,109 @@ void CmiInterSyncNodeSendAndFreeFn(int destNode, int partition, int messageSize,
11961209

11971210
/* end of variables and functions for partition */
11981211

1212+
#define CMI_IPC_CUTOFF_ARG "ipccutoff"
1213+
#define CMI_IPC_CUTOFF_DESC "max message size for cmi-shmem (in bytes)"
1214+
#define CMI_IPC_POOL_SIZE_ARG "ipcpoolsize"
1215+
#define CMI_IPC_POOL_SIZE_DESC "size of cmi-shmem pool (in bytes)"
1216+
1217+
struct CmiIpcManager;
1218+
1219+
#ifndef __cplusplus
1220+
typedef struct CmiIpcManager CmiIpcManager;
1221+
#endif
1222+
1223+
#ifdef __cplusplus
1224+
namespace cmi {
1225+
namespace ipc {
1226+
// recommended cutoff for block sizes
1227+
CpvExtern(size_t, kRecommendedCutoff);
1228+
// used to represent an empty linked list
1229+
constexpr auto nil = uintptr_t(0);
1230+
// used to represent the tail of a linked list
1231+
constexpr auto max = UINTPTR_MAX;
1232+
// used to indicate a message bound for a node
1233+
constexpr auto nodeDatagram = UINT16_MAX;
1234+
// default number of attempts to alloc before timing out
1235+
constexpr auto defaultTimeout = 4;
1236+
} // namespace ipc
1237+
} // namespace cmi
1238+
1239+
// alignas is used for padding here, rather than for alignment of the
1240+
// CmiIpcBlock itself.
1241+
struct alignas(ALIGN_BYTES) CmiIpcBlock {
1242+
public:
1243+
// "home" rank of the block
1244+
int src;
1245+
int dst;
1246+
uintptr_t orig;
1247+
uintptr_t next;
1248+
size_t size;
1249+
1250+
CmiIpcBlock(size_t size_, uintptr_t orig_)
1251+
: orig(orig_), next(cmi::ipc::nil), size(size_) {}
1252+
};
1253+
1254+
enum CmiIpcAllocStatus {
1255+
CMI_IPC_OUT_OF_MEMORY,
1256+
CMI_IPC_REMOTE_DESTINATION,
1257+
CMI_IPC_SUCCESS,
1258+
CMI_IPC_TIMEOUT
1259+
};
1260+
1261+
// sets up ipc environment
1262+
void CmiIpcInit(char** argv);
1263+
1264+
// creates an ipc manager, waking the thread when it's done
1265+
// ( this must be called in the same order on all pes! )
1266+
CmiIpcManager* CmiMakeIpcManager(CthThread th);
1267+
1268+
// push/pop blocks from the manager's send/recv queue
1269+
bool CmiPushIpcBlock(CmiIpcManager*, CmiIpcBlock*);
1270+
CmiIpcBlock* CmiPopIpcBlock(CmiIpcManager*);
1271+
1272+
// tries to allocate a block, returning null if unsucessful
1273+
// (fails when other PEs are contending resources)
1274+
// second value of pair indicates failure cause
1275+
std::pair<CmiIpcBlock*, CmiIpcAllocStatus> CmiAllocIpcBlock(CmiIpcManager*, int node, std::size_t size);
1276+
1277+
// frees a block -- enabling it to be used again
1278+
void CmiFreeIpcBlock(CmiIpcManager*, CmiIpcBlock*);
1279+
1280+
// currently a no-op but may be eventually usable
1281+
// intended to "capture" blocks from remote pes
1282+
inline void CmiCacheIpcBlock(CmiIpcBlock*) { return; }
1283+
1284+
// identifies whether a void* is the payload of a block
1285+
// belonging to the given node
1286+
CmiIpcBlock* CmiIsIpcBlock(CmiIpcManager*, void*, int node);
1287+
1288+
// if (init) is true -- initializes the
1289+
// memory segment for use as a message
1290+
void* CmiIpcBlockToMsg(CmiIpcBlock*, bool init);
1291+
1292+
// equivalent to calling above with (init = false)
1293+
inline void* CmiIpcBlockToMsg(CmiIpcBlock* block) {
1294+
auto res = (char*)block + sizeof(CmiIpcBlock) + sizeof(CmiChunkHeader);
1295+
return (void*)res;
1296+
}
1297+
1298+
inline CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager* manager, void* msg) {
1299+
return CmiIsIpcBlock(manager, (char*)msg - sizeof(CmiChunkHeader), CmiMyNode());
1300+
}
1301+
1302+
CmiIpcBlock* CmiMsgToIpcBlock(CmiIpcManager*, char* msg, std::size_t len, int node,
1303+
int rank = cmi::ipc::nodeDatagram,
1304+
int timeout = cmi::ipc::defaultTimeout);
1305+
1306+
// deliver a block as a message
1307+
void CmiDeliverIpcBlockMsg(CmiIpcBlock*);
1308+
1309+
inline const std::size_t& CmiRecommendedIpcBlockCutoff(void) {
1310+
using namespace cmi::ipc;
1311+
return CpvAccess(kRecommendedCutoff);
1312+
}
1313+
#endif /* __cplusplus */
1314+
1315+
CsvExtern(CmiIpcManager*, coreIpcManager_);
1316+
11991317
#endif // CONVERSE_H

src/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ target_include_directories(reconverse PRIVATE .)
22
target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp
33
scheduler.cpp cpuaffinity.cpp collectives.cpp
44
comm_backend/comm_backend_internal.cpp threads.cpp cldb.rand.cpp cldb.cpp cmirdmautils.cpp
5-
conv-rdma.cpp conv-topology.cpp msgmgr.cpp queueing.cpp)
5+
conv-rdma.cpp conv-topology.cpp msgmgr.cpp queueing.cpp cmishmem.cpp)
66
target_include_directories(
77
reconverse PRIVATE $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>
88
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)

src/cldb.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
typedef char *BitVector;
88

9-
/*
109
CpvDeclare(int, CldHandlerIndex);
10+
/*
1111
CpvDeclare(int, CldNodeHandlerIndex);
1212
CpvDeclare(BitVector, CldPEBitVector);
1313
CpvDeclare(int, CldBalanceHandlerIndex);

src/cldb.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
#define MAXMSGBFRSIZE 100000
77

8-
extern thread_local int CldHandlerIndex;
8+
CpvExtern(int, CldHandlerIndex);
99
extern thread_local int CldNodeHandlerIndex;
1010
extern thread_local int CldBalanceHandlerIndex;
1111
extern thread_local int CldRelocatedMessages;

src/cldb.rand.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn) {
4040
pfn(&msg);
4141
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
4242
}
43-
CldSwitchHandler((char *)msg, CldHandlerIndex);
43+
CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
4444
CmiSetInfo(msg, infofn);
4545

4646
CmiSyncMulticastAndFree(grp, len, msg);
@@ -59,7 +59,7 @@ void CldEnqueueWithinNode(void *msg, int infofn) {
5959
pfn(&msg);
6060
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
6161
}
62-
CldSwitchHandler((char *)msg, CldHandlerIndex);
62+
CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
6363
CmiSetInfo(msg, infofn);
6464

6565
CmiWithinNodeBroadcast(len, (char *)msg);
@@ -75,7 +75,7 @@ void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn) {
7575
pfn(&msg);
7676
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
7777
}
78-
CldSwitchHandler((char *)msg, CldHandlerIndex);
78+
CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
7979
CmiSetInfo(msg, infofn);
8080

8181
CmiSyncListSendAndFree(npes, pes, len, msg);
@@ -108,7 +108,7 @@ void CldEnqueue(int pe, void *msg, int infofn) {
108108
pfn(&msg);
109109
ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
110110
}
111-
CldSwitchHandler((char *)msg, CldHandlerIndex);
111+
CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
112112
CmiSetInfo(msg, infofn);
113113
if (pe == CLD_BROADCAST) {
114114
CmiSyncBroadcastAndFree(len, msg);
@@ -153,7 +153,8 @@ void CldNodeEnqueue(int node, void *msg, int infofn) {
153153
}
154154

155155
void CldModuleInit(char **argv) {
156-
CldHandlerIndex = CmiRegisterHandler((CmiHandler)CldHandler);
156+
CpvInitialize(int, CldHandlerIndex);
157+
CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler);
157158
CldNodeHandlerIndex = CmiRegisterHandler((CmiHandler)CldNodeHandler);
158159
CldRelocatedMessages = 0;
159160
CldLoadBalanceMessages = 0;

src/cmi-shmem-common.h

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#ifndef CMI_SHMEM_COMMON_HH
2+
#define CMI_SHMEM_COMMON_HH
3+
4+
#include "converse_internal.h"
5+
6+
#include <array>
7+
#include <limits>
8+
#include <map>
9+
#include <memory>
10+
#include <vector>
11+
12+
13+
namespace cmi {
14+
namespace ipc {
15+
CpvDeclare(std::size_t, kRecommendedCutoff);
16+
}
17+
} // namespace cmi
18+
19+
CpvStaticDeclare(std::size_t, kSegmentSize);
20+
constexpr std::size_t kDefaultSegmentSize = 8 * 1024 * 1024;
21+
22+
constexpr std::size_t kNumCutOffPoints = 25;
23+
const std::array<std::size_t, kNumCutOffPoints> kCutOffPoints = {
24+
64, 128, 256, 512, 1024, 2048, 4096,
25+
8192, 16384, 32768, 65536, 131072, 262144, 524288,
26+
1048576, 2097152, 4194304, 8388608, 16777216, 33554432, 67108864,
27+
134217728, 268435456, 536870912, 1073741824};
28+
29+
struct ipc_metadata_;
30+
CsvStaticDeclare(CmiNodeLock, sleeper_lock);
31+
32+
using sleeper_map_t = std::vector<CthThread>;
33+
CsvStaticDeclare(sleeper_map_t, sleepers);
34+
35+
// the data each pe shares with its peers
36+
// contains pool of free blocks, heap, and receive queue
37+
struct ipc_shared_ {
38+
std::array<std::atomic<std::uintptr_t>, kNumCutOffPoints> free;
39+
std::atomic<std::uintptr_t> queue;
40+
std::atomic<std::uintptr_t> heap;
41+
std::uintptr_t max;
42+
43+
ipc_shared_(std::uintptr_t begin, std::uintptr_t end)
44+
: queue(cmi::ipc::max), heap(begin), max(end) {
45+
for (auto& f : this->free) {
46+
f.store(cmi::ipc::max);
47+
}
48+
}
49+
};
50+
51+
// shared data for each pe
52+
struct ipc_metadata_ {
53+
// maps ranks to shared segments
54+
std::map<int, ipc_shared_*> shared;
55+
// physical node rank
56+
int mine;
57+
// key of this instance
58+
std::size_t key;
59+
// base constructor
60+
ipc_metadata_(std::size_t key_) : mine(CmiMyNode()), key(key_) {}
61+
// virtual destructor may be needed
62+
virtual ~ipc_metadata_() {}
63+
};
64+
65+
inline std::size_t whichBin_(std::size_t size);
66+
67+
inline static void initIpcShared_(ipc_shared_* shared) {
68+
auto begin = (std::uintptr_t)(sizeof(ipc_shared_) +
69+
(sizeof(ipc_shared_) % ALIGN_BYTES));
70+
CmiAssert(begin != cmi::ipc::nil);
71+
auto end = begin + CpvAccess(kSegmentSize);
72+
new (shared) ipc_shared_(begin, end);
73+
}
74+
75+
inline static ipc_shared_* makeIpcShared_(void) {
76+
auto* shared = (ipc_shared_*)(::operator new(sizeof(ipc_shared_) +
77+
CpvAccess(kSegmentSize)));
78+
initIpcShared_(shared);
79+
return shared;
80+
}
81+
82+
inline void initSegmentSize_(char** argv) {
83+
using namespace cmi::ipc;
84+
CpvInitialize(std::size_t, kRecommendedCutoff);
85+
CpvInitialize(std::size_t, kSegmentSize);
86+
87+
CmiInt8 value;
88+
auto flag =
89+
CmiGetArgLongDesc(argv, "++" CMI_IPC_POOL_SIZE_ARG, &value, CMI_IPC_POOL_SIZE_DESC);
90+
CpvAccess(kSegmentSize) = flag ? (std::size_t)value : kDefaultSegmentSize;
91+
CmiEnforceMsg(CpvAccess(kSegmentSize), "segment size must be non-zero!");
92+
if (CmiGetArgLongDesc(argv, "++" CMI_IPC_CUTOFF_ARG, &value, CMI_IPC_CUTOFF_DESC)) {
93+
auto bin = whichBin_((std::size_t)value);
94+
CmiEnforceMsg(bin < kNumCutOffPoints, "ipc cutoff out of range!");
95+
CpvAccess(kRecommendedCutoff) = kCutOffPoints[bin];
96+
} else {
97+
auto max = CpvAccess(kSegmentSize) / kNumCutOffPoints;
98+
auto bin = (std::intptr_t)whichBin_(max) - 1;
99+
CpvAccess(kRecommendedCutoff) = kCutOffPoints[(bin >= 0) ? bin : 0];
100+
}
101+
}
102+
103+
inline static void printIpcStartupMessage_(const char* implName) {
104+
using namespace cmi::ipc;
105+
CmiPrintf("Converse> %s pool init'd with %luB segment and %luB cutoff.\n",
106+
implName, CpvAccess(kSegmentSize),
107+
CpvAccess(kRecommendedCutoff));
108+
}
109+
110+
inline static void initSleepers_(void) {
111+
if (CmiMyRank() == 0) {
112+
CsvInitialize(sleeper_map_t, sleepers);
113+
CsvAccess(sleepers).resize(CmiMyNodeSize());
114+
CsvInitialize(CmiNodeLock, sleeper_lock);
115+
CsvAccess(sleeper_lock) = CmiCreateLock();
116+
}
117+
}
118+
119+
inline static void putSleeper_(CthThread th) {
120+
CmiLock(CsvAccess(sleeper_lock));
121+
(CsvAccess(sleepers))[CmiMyRank()] = th;
122+
CmiUnlock(CsvAccess(sleeper_lock));
123+
}
124+
125+
static void awakenSleepers_(void);
126+
127+
using ipc_manager_ptr_ = std::unique_ptr<CmiIpcManager>;
128+
using ipc_manager_map_ = std::vector<ipc_manager_ptr_>;
129+
CsvStaticDeclare(ipc_manager_map_, managers_);
130+
131+
#endif

0 commit comments

Comments
 (0)