Skip to content

Commit 74e0140

Browse files
committed
feat: zerocopy for hypercube
1 parent 812c664 commit 74e0140

File tree

4 files changed

+17
-32
lines changed

4 files changed

+17
-32
lines changed

sim

200 Bytes
Binary file not shown.

src/allGather/allGather.cc

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,13 @@ void AllGather::local_buff_done(CkDataMsg *m) {
7171
}
7272

7373
void AllGather::startGather() {
74+
for (int i = 0; i < k; i++) {
75+
store[k * thisIndex + i] = data[i];
76+
}
77+
CkNcpyBuffer src(data, k*sizeof(long int), dum_dum, CK_BUFFER_UNREG);
78+
7479
switch (type) {
7580
case allGatherType::ALL_GATHER_DEFAULT: {
76-
for (int i = 0; i < k; i++) {
77-
store[k * thisIndex + i] = data[i];
78-
}
79-
CkNcpyBuffer src(data, k*sizeof(long int), dum_dum, CK_BUFFER_UNREG);
8081
#ifdef TIMESTAMP
8182
thisProxy[(thisIndex + 1) % n].recvDefault(
8283
thisIndex, src, (timeStamp + alpha + beta * k * 8));
@@ -87,17 +88,11 @@ void AllGather::startGather() {
8788
} break;
8889
case allGatherType::ALL_GATHER_HYPERCUBE: {
8990
hyperCubeIndx.push_back(thisIndex);
90-
for (int i = 0; i < k; i++) {
91-
hyperCubeStore.push_back(data[i]);
92-
}
91+
hyperCubeStore.push_back(src);
9392
thisProxy(thisIndex).Hypercube();
9493
} break;
9594
case allGatherType::ALL_GATHER_FLOODING: {
96-
for (int i = 0; i < k; i++) {
97-
store[k * thisIndex + i] = data[i];
98-
}
9995
recvFloodMsg[thisIndex] = true;
100-
CkNcpyBuffer src(data, k*sizeof(long int), dum_dum, CK_BUFFER_UNREG);
10196
for (int i = 0; i < n; i++) {
10297
if (graph[thisIndex][i] == 1) {
10398
#ifdef TIMESTAMP

src/allGather/allGather.ci

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,27 @@ module allGather {
1515
serial { HypercubeToSend = thisIndex ^ ((int)pow(2, iter)); }
1616
if (HypercubeToSend < n) {
1717
serial {
18-
long int data[hyperCubeStore.size()];
19-
for(int i = 0; i < hyperCubeStore.size(); i++) {
20-
data[i] = hyperCubeStore[i];
21-
}
18+
CkNcpyBuffer data[hyperCubeStore.size()];
2219
int dataIndx[hyperCubeIndx.size()];
23-
for(int i = 0; i < hyperCubeIndx.size(); i++) {
20+
int numBuffers = hyperCubeStore.size();
21+
for(int i = 0; i < numBuffers; i++) {
22+
data[i] = hyperCubeStore[i];
2423
dataIndx[i] = hyperCubeIndx[i];
2524
}
2625
#ifdef TIMESTAMP
27-
thisProxy(HypercubeToSend).recvHypercube(iter, data, hyperCubeStore.size(), dataIndx, hyperCubeIndx.size(), (timeStamp + alpha + beta * hyperCubeStore.size() * 8));
26+
thisProxy(HypercubeToSend).recvHypercube(iter, data, dataIndx, numBuffers, (timeStamp + alpha + beta * hyperCubeStore.size() * 8));
2827
timeStamp += alpha;
2928
#else
30-
thisProxy(HypercubeToSend).recvHypercube(iter, data, hyperCubeStore.size(), dataIndx, hyperCubeIndx.size(), 0.0);
29+
thisProxy(HypercubeToSend).recvHypercube(iter, data, dataIndx, numBuffers, 0.0);
3130
#endif
3231
}
33-
when recvHypercube[iter](int ref, long int data[size], int size, int dataIndx[indxSize], int indxSize, double recvTime) {
32+
when recvHypercube[iter](int ref, CkNcpyBuffer data[size], int dataIndx[size], int size, double recvTime) {
3433
serial {
3534
for(int m = 0; m < size; m++) {
3635
hyperCubeStore.emplace_back(data[m]);
37-
}
38-
for(int m = 0; m < indxSize; m++) {
3936
hyperCubeIndx.emplace_back(dataIndx[m]);
37+
CkNcpyBuffer dst(store + dataIndx[m] * k, k * sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG);
38+
dst.get(data[m]);
4039
}
4140
#ifdef TIMESTAMP
4241
timeStamp = std::max(recvTime, timeStamp);
@@ -45,17 +44,8 @@ module allGather {
4544
}
4645
}
4746
}
48-
serial {
49-
for(int m = 0; m < hyperCubeIndx.size(); m++) {
50-
int currIndx = hyperCubeIndx[m];
51-
for(int j = 0; j < k; j++) {
52-
store[k * currIndx + j] = hyperCubeStore[m * k + j];
53-
}
54-
}
55-
lib_done_callback.send(msg);
56-
}
5747
};
58-
entry void recvHypercube(int ref, long int data[size], int size, int dataIndx[indxSize], int indxSize, double recvTime);
48+
entry void recvHypercube(int ref, CkNcpyBuffer data[size], int dataIndx[size], int size, double recvTime);
5949
entry void Flood(int sender, CkNcpyBuffer data, double recvTime);
6050
};
6151
};

src/allGather/allGather.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private:
3434
std::map<int, bool> recvFloodMsg{};
3535
int randCounter{};
3636
std::vector<int> hyperCubeIndx{};
37-
std::vector<long int> hyperCubeStore{};
37+
std::vector<CkNcpyBuffer> hyperCubeStore{};
3838
allGatherMsg *msg = new allGatherMsg;
3939
long int* data;
4040
CkCallback zero_copy_callback;

0 commit comments

Comments
 (0)