1+ #include < stdio.h>
2+ #include < math.h>
3+ #include < time.h>
4+ #include < converse.h>
5+
6+ CpvDeclare (int , bigmsg_index);
7+ CpvDeclare (int , ackmsg_index);
8+ CpvDeclare (int , shortmsg_index);
9+ CpvDeclare (int , stop_index);
10+ CpvDeclare (int , msg_size);
11+ CpvDeclare (int , trial); // increments per trial, gets set to 0 at the start of a new msg size
12+ CpvDeclare (int , round); // increments per msg size
13+ CpvDeclare (int , warmup_flag); // 1 when in warmup round, 0 when not
14+ CpvDeclare (int , recv_count);
15+ CpvDeclare (int , ack_count);
16+ CpvDeclare (double , total_time);
17+ CpvDeclare (double , process_time);
18+ CpvDeclare (double , send_time);
19+
20+ int msg_count;
21+ #define nMSG_SIZE 3 // if the msg_sizes are hard_coded, this should be the same as the length of the hard coded array
22+ #define nTRIALS_PER_SIZE 10
23+ #define CALCULATION_PRECISION 0.0001 // the decimal place that the output data is rounded to
24+
25+ double total_time[nTRIALS_PER_SIZE]; // times are stored in us
26+ double process_time[nTRIALS_PER_SIZE];
27+ double send_time[nTRIALS_PER_SIZE];
28+
29+
30+ int msg_sizes[nMSG_SIZE] = {56 , 4096 , 65536 }; // hard coded msg_size values
31+
32+
33+
34+ typedef struct myMsg
35+ {
36+ char header[CmiMsgHeaderSizeBytes];
37+ int payload[1 ];
38+ } *message;
39+
40+ // helper functions
41+
42+ double round_to (double val, double precision) {
43+ return round (val / precision) * precision;
44+ }
45+
46+ double get_average (double arr[]) {
47+ double tot = 0 ;
48+ for (int i = 0 ; i < nTRIALS_PER_SIZE; ++i) tot += arr[i];
49+ return (round_to (tot, CALCULATION_PRECISION) / nTRIALS_PER_SIZE);
50+
51+ }
52+
53+ double get_stdev (double arr[]) {
54+ double stdev = 0.0 ;
55+ double avg = get_average (arr);
56+ for (int i = 0 ; i < nTRIALS_PER_SIZE; ++i)
57+ stdev += pow (arr[i] - avg, 2 );
58+ stdev = sqrt (stdev / nTRIALS_PER_SIZE);
59+ return stdev;
60+ }
61+
62+ double get_max (double arr[]) {
63+ double max = arr[0 ];
64+ for (int i = 1 ; i < nTRIALS_PER_SIZE; ++i)
65+ if (arr[i] > arr[0 ]) max = arr[i];
66+ return max;
67+ }
68+
69+
70+ void print_results () {
71+ if (!CpvAccess (warmup_flag)) {
72+ CmiPrintf (" msg_size\n %d\n " , CpvAccess (msg_size));
73+ for (int i = 0 ; i < nTRIALS_PER_SIZE; ++i) {
74+ // DEBUG: print without trial number:
75+ // CmiPrintf("%f\n%f\n%f\n", send_time[i], process_time[i], total_time[i]);
76+
77+ // DEBUG: print with trial number:
78+ // CmiPrintf("%d %f\n %f\n %f\n", i, send_time[i], process_time[i], total_time[i]);
79+ }
80+ // print data:
81+ CmiPrintf (" Format: {#PEs},{msg_size},{averages*3},{stdevs*3},{maxs*3}\n " );
82+ CmiPrintf (" DATA,%d,%d,%f,%f,%f,%f,%f,%f,%f,%f,%f\n " , CmiNumPes (), CpvAccess (msg_size), get_average (send_time), get_average (process_time), get_average (total_time),
83+ get_stdev (send_time), get_stdev (process_time), get_stdev (total_time), get_max (send_time), get_max (process_time), get_max (total_time));
84+
85+
86+ } else {
87+ if (CpvAccess (round) == nMSG_SIZE - 1 ) // if this is the end of the warmup round
88+ CmiPrintf (" Warm up Done!\n " );
89+
90+ // DEBUG: Print what msg_size the warmup round is on
91+ // else // otherwise move to the next msg size
92+ // CmiPrintf("Warming up msg_size %d\n", CpvAccess(msg_size));
93+ }
94+ }
95+
96+ void stop (void *msg)
97+ {
98+ CsdExitScheduler ();
99+ }
100+
101+ void send_msg () {
102+ double start_time, crt_time;
103+ struct myMsg *msg;
104+ // CmiPrintf("\nSending msg fron pe%d to pe%d\n",CmiMyPe(), CmiNumPes()/2+CmiMyPe());
105+ CpvAccess (process_time) = 0.0 ;
106+ CpvAccess (send_time) = 0.0 ;
107+ CpvAccess (total_time) = CmiWallTimer ();
108+ for (int k = 0 ; k < msg_count; k++) {
109+ crt_time = CmiWallTimer ();
110+ msg = (message)CmiAlloc (CpvAccess (msg_size));
111+
112+ // Fills payload with ints
113+ for (int i = 0 ; i < (CpvAccess (msg_size) - CmiMsgHeaderSizeBytes) / sizeof (int ); ++i) msg->payload [i] = i;
114+
115+ // DEBUG: Print ints stored in payload
116+ // for (int i = 0; i < (CpvAccess(msg_size) - CmiMsgHeaderSizeBytes) / sizeof(int); ++i) CmiPrintf("%d ", msg->payload[i]);
117+ // CmiPrintf("\n");
118+
119+ CmiSetHandler (msg, CpvAccess (bigmsg_index));
120+ CpvAccess (process_time) = CmiWallTimer () - crt_time + CpvAccess (process_time);
121+ start_time = CmiWallTimer ();
122+ // Send from my pe-i on node-0 to q+i on node-1
123+ CmiSyncSendAndFree (CmiNumPes () / 2 + CmiMyPe (), CpvAccess (msg_size), msg);
124+ CpvAccess (send_time) = CmiWallTimer () - start_time + CpvAccess (send_time);
125+ }
126+ }
127+
128+
129+
130+ void shortmsg_handler (void *vmsg) {
131+ message smsg = (message)vmsg;
132+ CmiFree (smsg);
133+ if (!CpvAccess (warmup_flag)) { // normal round handling
134+ if (CpvAccess (trial) == nTRIALS_PER_SIZE) { // if we have run the current msg size for nTRIALS
135+ CpvAccess (round) = CpvAccess (round) + 1 ;
136+ CpvAccess (trial) = 0 ;
137+ CpvAccess (msg_size) = msg_sizes[CpvAccess (round)];
138+ }
139+ } else { // warmup round handling
140+ if (CpvAccess (round) == nMSG_SIZE - 1 ) { // if this is the end of the warmup round
141+ CpvAccess (round) = 0 ;
142+ CpvAccess (msg_size) = msg_sizes[0 ];
143+ CpvAccess (warmup_flag) = 0 ;
144+ } else { // otherwise warm up the next msg size
145+ CpvAccess (round) = CpvAccess (round) + 1 ;
146+ CpvAccess (msg_size) = msg_sizes[CpvAccess (round)];
147+ }
148+ CpvAccess (trial) = 0 ;
149+ }
150+ send_msg ();
151+ }
152+
153+ void do_work (long start, long end, void *result) {
154+ long tmp=0 ;
155+ for (long i=start; i<=end; i++) {
156+ tmp+=(long )(sqrt (1 +cos (i*1.57 )));
157+ }
158+ *(long *)result = tmp + *(long *)result;
159+ }
160+
161+
162+ void bigmsg_handler (void *vmsg)
163+ {
164+ int i, next;
165+ message msg = (message)vmsg;
166+ // if this is a receiving PE
167+ if (CmiMyPe () >= CmiNumPes () / 2 ) {
168+ CpvAccess (recv_count) = 1 + CpvAccess (recv_count);
169+ long sum = 0 ;
170+ long result = 0 ;
171+ double num_ints = (CpvAccess (msg_size) - CmiMsgHeaderSizeBytes) / sizeof (int );
172+ double exp_avg = (num_ints - 1 ) / 2 ;
173+ for (i = 0 ; i < num_ints; ++i) {
174+ sum += msg->payload [i];
175+ do_work (i,sum,&result);
176+ }
177+ if (result < 0 ) {
178+ CmiPrintf (" Error! in computation" );
179+ }
180+ double calced_avg = sum / num_ints;
181+ if (calced_avg != exp_avg) {
182+ CmiPrintf (" Calculated average of %f does not match expected value of %f, exiting\n " , calced_avg, exp_avg);
183+ message exit_msg = (message) CmiAlloc (CpvAccess (msg_size));
184+ CmiSetHandler (exit_msg, CpvAccess (stop_index));
185+ CmiSyncBroadcastAllAndFree (CpvAccess (msg_size), exit_msg);
186+ return ;
187+ }
188+ // else
189+ // CmiPrintf("Calculation OK\n"); // DEBUG: Computation Check
190+ if (CpvAccess (recv_count) == msg_count) {
191+ CpvAccess (recv_count) = 0 ;
192+
193+ CmiFree (msg);
194+ msg = (message)CmiAlloc (CpvAccess (msg_size));
195+ CmiSetHandler (msg, CpvAccess (ackmsg_index));
196+ CmiSyncSendAndFree (0 , CpvAccess (msg_size), msg);
197+ } else
198+ CmiFree (msg);
199+ } else
200+ CmiPrintf (" \n Error: Only node-1 can be receiving node!!!!\n " );
201+ }
202+
203+ void pe0_ack_handler (void *vmsg)
204+ {
205+ int pe;
206+ message msg = (message)vmsg;
207+ // Pe-0 receives all acks
208+ CpvAccess (ack_count) = 1 + CpvAccess (ack_count);
209+
210+ // DEBUG: Computation Print Check
211+ // CmiPrintf("All %d messages of size %d on trial %d OK\n", MSG_COUNT, CpvAccess(msg_size), CpvAccess(trial));
212+
213+
214+ if (CpvAccess (ack_count) == CmiNumPes ()/2 ) {
215+ CpvAccess (ack_count) = 0 ;
216+ CpvAccess (total_time) = CmiWallTimer () - CpvAccess (total_time);
217+
218+ // DEBUG: Original Print Statement
219+ // CmiPrintf("Received [Trial=%d, msg size=%d] ack on PE-#%d send time=%lf, process time=%lf, total time=%lf\n",
220+ // CpvAccess(trial), CpvAccess(msg_size), CmiMyPe(), CpvAccess(send_time), CpvAccess(process_time), CpvAccess(total_time));
221+
222+ CmiFree (msg);
223+
224+ // store times in arrays
225+ send_time[CpvAccess (trial)] = CpvAccess (send_time) * 1000000.0 ; // convert to microsecs.
226+ process_time[CpvAccess (trial)] = CpvAccess (process_time) * 1000000.0 ;
227+ total_time[CpvAccess (trial)] = CpvAccess (total_time) * 1000000.0 ;
228+
229+ CpvAccess (trial) = CpvAccess (trial) + 1 ;
230+
231+ // print results
232+ if (CpvAccess (warmup_flag) || CpvAccess (trial) == nTRIALS_PER_SIZE) print_results ();
233+
234+ // if this is not the warmup round, and we have finished the final trial, and we are on the final msg size, exit
235+ if (!CpvAccess (warmup_flag) && CpvAccess (trial) == nTRIALS_PER_SIZE && CpvAccess (round) == nMSG_SIZE - 1 )
236+ {
237+ message exit_msg = (message) CmiAlloc (CpvAccess (msg_size));
238+ CmiSetHandler (exit_msg, CpvAccess (stop_index));
239+ CmiSyncBroadcastAllAndFree (CpvAccess (msg_size), exit_msg);
240+ return ;
241+ }
242+ else {
243+ // CmiPrintf("\nSending short msgs from PE-%d", CmiMyPe());
244+ for (pe = 0 ; pe<CmiNumPes () / 2 ; pe++) {
245+ int smsg_size = 4 +CmiMsgHeaderSizeBytes;
246+ message smsg = (message)CmiAlloc (smsg_size);
247+ CmiSetHandler (smsg, CpvAccess (shortmsg_index));
248+ CmiSyncSendAndFree (pe, smsg_size, smsg);
249+ }
250+ }
251+ }
252+ }
253+
254+
255+ void bigmsg_init ()
256+ {
257+ int totalpes = CmiNumPes (); // p=num_pes
258+ int pes_per_node = totalpes/2 ; // q=p/2
259+ if (CmiNumPes ()%2 !=0 ) {
260+ CmiPrintf (" note: this test requires at multiple of 2 pes, skipping test.\n " );
261+ CmiPrintf (" exiting.\n " );
262+ // CsdExitScheduler();
263+ message exit_msg = (message) CmiAlloc (CpvAccess (msg_size));
264+ CmiSetHandler (exit_msg, CpvAccess (stop_index));
265+ CmiSyncBroadcastAllAndFree (CpvAccess (msg_size), exit_msg);
266+ return ;
267+ } else {
268+ if (CmiMyPe () < pes_per_node)
269+ send_msg ();
270+ }
271+ }
272+
273+
274+
275+ void bigmsg_moduleinit (int argc, char **argv)
276+ {
277+ CpvInitialize (int , bigmsg_index);
278+ CpvInitialize (int , ackmsg_index);
279+ CpvInitialize (int , shortmsg_index);
280+ CpvInitialize (int , msg_size);
281+ CpvInitialize (int , trial);
282+ CpvInitialize (int , round);
283+ CpvInitialize (int , warmup_flag);
284+ CpvInitialize (int , recv_count);
285+ CpvInitialize (int , ack_count);
286+ CpvInitialize (double , total_time);
287+ CpvInitialize (double , send_time);
288+ CpvInitialize (double , process_time);
289+ CpvInitialize (int , stop_index);
290+
291+ CpvAccess (bigmsg_index) = CmiRegisterHandler (bigmsg_handler);
292+ CpvAccess (shortmsg_index) = CmiRegisterHandler (shortmsg_handler);
293+ CpvAccess (ackmsg_index) = CmiRegisterHandler (pe0_ack_handler);
294+ CpvAccess (stop_index) = CmiRegisterHandler (stop);
295+ CpvAccess (msg_size) = 16 +CmiMsgHeaderSizeBytes;
296+ CpvAccess (trial) = 0 ;
297+ CpvAccess (round) = 0 ;
298+ CpvAccess (warmup_flag) = 1 ;
299+ msg_count = 100 ; // default msg count
300+ CmiGetArgInt (argv, " -msg_count" , &msg_count);
301+
302+ // Set runtime cpuaffinity
303+ CmiInitCPUAffinity (argv);
304+ // Initialize CPU topology
305+ CmiInitCPUTopology (argv);
306+ // Wait for all PEs of the node to complete topology init
307+ CmiNodeAllBarrier ();
308+
309+ // Update the argc after runtime parameters are extracted out
310+ argc = CmiGetArgc (argv);
311+ if (CmiMyPe () < CmiNumPes ()/2 )
312+ bigmsg_init ();
313+ }
314+
315+ int main (int argc, char **argv)
316+ {
317+ ConverseInit (argc,argv,bigmsg_moduleinit,0 ,0 );
318+ }
0 commit comments