11import numpy as np
22import time
3+ import sys , os
4+ import socket
35
46from ring_of_CPUs import SingleCoreComminicator
57
@@ -13,11 +15,20 @@ def print2logandstdo(message, mode='a+'):
1315 fid .writelines ([message + '\n ' ])
1416
1517
18+ def verbose_mpi_out (message , myid , mpi_verbose , mode = 'a+' ):
19+ if mpi_verbose :
20+ t_now = time .mktime (time .localtime ())
21+ time_string = time .strftime ("%d/%m/%Y %H:%M:%S" , time .localtime (t_now ))
22+
23+ with open ('mpi_logfile_cpu%03d.txt' % myid , mode ) as fid :
24+ fid .writelines ([time_string + ' - ' + message + '\n ' ])
25+
26+
1627class RingOfCPUs_multiturn (object ):
1728 def __init__ (self , sim_content , N_pieces_per_transfer = 1 , force_serial = False , comm = None ,
1829 N_parellel_rings = 1 ,
1930 N_buffer_float_size = 1000000 , N_buffer_int_size = 100 ,
20- verbose = False ,
31+ verbose = False , mpi_verbose = False , enable_barriers = False ,
2132 enable_orders_from_master = True ):
2233
2334
@@ -30,6 +41,8 @@ def __init__(self, sim_content, N_pieces_per_transfer=1, force_serial = False, c
3041 self .N_parellel_rings = N_parellel_rings
3142
3243 self .verbose = verbose
44+ self .mpi_verbose = mpi_verbose
45+ self .enable_barriers = enable_barriers
3346
3447 self .enable_orders_from_master = enable_orders_from_master
3548
@@ -45,28 +58,56 @@ def __init__(self, sim_content, N_pieces_per_transfer=1, force_serial = False, c
4558 if hasattr (sim_content , 'N_parellel_rings' ):
4659 self .N_parellel_rings = sim_content .N_parellel_rings
4760
61+ if hasattr (sim_content , 'verbose' ):
62+ self .verbose = sim_content .verbose
63+
64+ if hasattr (sim_content , 'mpi_verbose' ):
65+ self .mpi_verbose = sim_content .mpi_verbose
66+
67+ if hasattr (sim_content , 'enable_barriers' ):
68+ self .enable_barriers = sim_content .enable_barriers
69+
4870
4971 self .sim_content .ring_of_CPUs = self
5072
5173 # choice of the communicator
5274 if force_serial :
5375 comm_info = 'Single CPU forced by user.'
5476 self .comm = SingleCoreComminicator ()
77+ self .comm_type = 'Serial'
5578 elif comm is not None :
5679 comm_info = 'Multiprocessing using communicator provided as argument.'
5780 self .comm = comm
81+ self .comm_type = 'Multip'
5882 else :
5983 comm_info = 'Multiprocessing via MPI.'
6084 from mpi4py import MPI
6185 self .comm = MPI .COMM_WORLD
86+ self .comm_type = 'MPI'
6287
88+ self .verbose_mpi_out = lambda message : verbose_mpi_out (message , self .comm .Get_rank (),
89+ self .mpi_verbose )
90+
91+ verbose_mpi_out ('Debug file (cpu %d)' % (self .comm .Get_rank ()), self .comm .Get_rank (),
92+ self .mpi_verbose , mode = 'w' )
93+
94+ self .verbose_mpi_out ('Interpreter at %s (cpu %d)' % (sys .executable , self .comm .Get_rank ()))
95+ if self .mpi_verbose and self .comm_type == 'MPI' :
96+ import mpi4py
97+ self .verbose_mpi_out ('mpi4py version: %s (cpu %d)' % (mpi4py .__version__ , self .comm .Get_rank ()))
98+
99+ self .verbose_mpi_out ('Running on %s (cpu %d)' % (socket .gethostname (), self .comm .Get_rank ()))
100+
101+ if self .enable_barriers :
102+ self .verbose_mpi_out ('At barrier 1 (cpu %d)' % self .comm .Get_rank ())
103+ self .comm .Barrier ()
104+ self .verbose_mpi_out ('After barrier 1 (cpu %d)' % self .comm .Get_rank ())
105+
63106 #check if there is only one node
64107 if self .comm .Get_size ()== 1 :
65108 #in case it is forced by user it will be rebound but there is no harm in that
66109 self .comm = SingleCoreComminicator ()
67110
68- #~ if self.N_pieces_per_transfer>1:
69- #~ raise ValueError("Not implemented!")
70111
71112 # get info on the grid
72113 self .N_nodes = self .comm .Get_size ()
@@ -96,8 +137,18 @@ def __init__(self, sim_content, N_pieces_per_transfer=1, force_serial = False, c
96137 self .buf_float = np .zeros (self .N_buffer_float_size , dtype = np .float64 )
97138 self .buf_int = np .array (self .N_buffer_int_size * [0 ])
98139
140+ if self .enable_barriers :
141+ self .verbose_mpi_out ('At barrier 2 (cpu %d)' % self .comm .Get_rank ())
142+ self .comm .Barrier ()
143+ self .verbose_mpi_out ('After barrier 2 (cpu %d)' % self .comm .Get_rank ())
144+
99145 self .sim_content .init_all ()
100146
147+ if self .enable_barriers :
148+ self .verbose_mpi_out ('At barrier 3 (cpu %d)' % self .comm .Get_rank ())
149+ self .comm .Barrier ()
150+ self .verbose_mpi_out ('After barrier 3 (cpu %d)' % self .comm .Get_rank ())
151+
101152 if self .I_am_the_master :
102153 print2logandstdo ('PyPARIS simulation -- multiturn parallelization' )#, mode='w+')
103154 print2logandstdo (comm_info )
@@ -108,14 +159,17 @@ def __init__(self, sim_content, N_pieces_per_transfer=1, force_serial = False, c
108159 print2logandstdo ('Multi-ring info:' )
109160 print2logandstdo ('N_parellel_rings = %d' % self .N_parellel_rings )
110161 print2logandstdo ('N_nodes_per_ring = %d' % self .N_nodes_per_ring )
111- import socket
112- import sys
113162 print2logandstdo ('Running on %s' % socket .gethostname ())
114- print2logandstdo ('Interpreter at %s' % sys .executable )
163+ print2logandstdo ('Interpreter at %s' % sys .executable )
115164
116165 self .left = int (np .mod (self .myid - 1 , self .N_nodes ))
117166 self .right = int (np .mod (self .myid + 1 , self .N_nodes ))
118167
168+ if self .enable_barriers :
169+ self .verbose_mpi_out ('At barrier 4 (cpu %d)' % self .comm .Get_rank ())
170+ self .comm .Barrier ()
171+ self .verbose_mpi_out ('After barrier 4 (cpu %d)' % self .comm .Get_rank ())
172+
119173 if self .I_am_at_start_ring :
120174 self .bunches_to_be_treated = deque ([])
121175 self .slices_to_be_treated = []
@@ -128,7 +182,13 @@ def __init__(self, sim_content, N_pieces_per_transfer=1, force_serial = False, c
128182 if self .I_am_the_master :
129183 list_bunches = sim_content .init_master ()
130184 self .bunches_to_be_treated .extend (list_bunches )
131-
185+
186+ if self .enable_barriers :
187+ self .verbose_mpi_out ('At barrier 5 (cpu %d)' % self .comm .Get_rank ())
188+ self .comm .Barrier ()
189+ self .verbose_mpi_out ('After barrier 5 (cpu %d)' % self .comm .Get_rank ())
190+
191+
132192 def run (self ):
133193
134194
@@ -228,11 +288,34 @@ def run(self):
228288 sendbuf = ch .combine_float_buffers (list_of_buffers_to_send )
229289 if len (sendbuf ) > self .N_buffer_float_size :
230290 raise ValueError ('Float buffer (%d) is too small!\n %d required.' % (self .N_buffer_float_size , len (sendbuf )))
291+
292+ if self .enable_barriers :
293+ self .verbose_mpi_out ('At barrier L1 (cpu %d)' % self .comm .Get_rank ())
294+ self .comm .Barrier ()
295+ self .verbose_mpi_out ('After barrier L1 (cpu %d)' % self .comm .Get_rank ())
296+
297+ self .verbose_mpi_out ('At Sendrecv, cpu %d/%d, iter %d' % (self .myid , self .N_nodes , iteration ))
231298 self .comm .Sendrecv (sendbuf , dest = self .right , sendtag = self .right ,
232299 recvbuf = self .buf_float , source = self .left , recvtag = self .myid )
300+ self .verbose_mpi_out ('After Sendrecv, cpu %d/%d, iter %d' % (self .myid , self .N_nodes , iteration ))
301+
302+ if self .enable_barriers :
303+ self .verbose_mpi_out ('At barrier L2 (cpu %d)' % self .comm .Get_rank ())
304+ self .comm .Barrier ()
305+ self .verbose_mpi_out ('After barrier L2 (cpu %d)' % self .comm .Get_rank ())
306+
233307 list_received_buffers = ch .split_float_buffers (self .buf_float )
234-
235- # Handle orders (for now only to stopping the simulation)
308+
309+
310+ #######################################################
311+ # Handle orders (for now only to stop the simulation) #
312+ #######################################################
313+ if self .enable_barriers :
314+ self .verbose_mpi_out ('At barrier L3 (cpu %d)' % self .comm .Get_rank ())
315+ self .comm .Barrier ()
316+ self .verbose_mpi_out ('After barrier L3 (cpu %d)' % self .comm .Get_rank ())
317+
318+
236319 if self .enable_orders_from_master :
237320 if self .I_am_the_master :
238321 # send orders
@@ -241,16 +324,28 @@ def run(self):
241324 raise ValueError ('Int buffer is too small!' )
242325 self .buf_int = 0 * self .buf_int
243326 self .buf_int [:len (buforders )]= buforders
327+
328+ self .verbose_mpi_out ('At Bcast, cpu %d/%d, iter %d' % (self .myid , self .N_nodes , iteration ))
244329 self .comm .Bcast (self .buf_int , self .master_id )
330+ self .verbose_mpi_out ('After Bcast, cpu %d/%d, iter %d' % (self .myid , self .N_nodes , iteration ))
331+
245332 else :
246333 # receive orders from the master
334+ self .verbose_mpi_out ('At Bcast, cpu %d/%d, iter %d' % (self .myid , self .N_nodes , iteration ))
247335 self .comm .Bcast (self .buf_int , self .master_id )
336+ self .verbose_mpi_out ('After Bcast, cpu %d/%d, iter %d' % (self .myid , self .N_nodes , iteration ))
337+
248338 orders_from_master = ch .buffer_2_list_of_strings (self .buf_int )
249339
250340 # check if simulation has to be ended
251341 if 'stop' in orders_from_master :
252342 break
253343
344+ if self .enable_barriers :
345+ self .verbose_mpi_out ('At barrier L4 (cpu %d)' % self .comm .Get_rank ())
346+ self .comm .Barrier ()
347+ self .verbose_mpi_out ('After barrier L4 (cpu %d)' % self .comm .Get_rank ())
348+
254349 iteration += 1
255350
256351 # (TEMPORARY!) To stop
0 commit comments