Skip to content

Commit 6f31228

Browse files
committed
Merge branch 'feature/simMPIwithMP'
2 parents 41c436f + 9af66f4 commit 6f31228

File tree

5 files changed

+342
-6
lines changed

5 files changed

+342
-6
lines changed

multiprocexec.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import multiprocessing as mp
2+
import numpy as np
3+
import os, sys
4+
5+
class mpComm(object):
6+
def __init__(self, pid, N_proc, queue_list,
7+
mutex, barriex, turnstile, turnstile2, cnt):
8+
self._pid = pid
9+
self._N_proc = N_proc
10+
self._queue_list = queue_list
11+
self.mutex = mutex
12+
self.turnstile = turnstile
13+
self.turnstile2 = turnstile2
14+
self.cnt = cnt
15+
16+
def Get_size(self):
17+
return self._N_proc
18+
19+
def Get_rank(self):
20+
return self._pid
21+
22+
def Barrier(self):
23+
self.mutex.acquire()
24+
self.cnt.value += 1
25+
if self.cnt.value == self._N_proc:
26+
self.turnstile2.acquire()
27+
self.turnstile.release()
28+
self.mutex.release()
29+
self.turnstile.acquire()
30+
self.turnstile.release()
31+
#criticalpoint
32+
self.mutex.acquire()
33+
self.cnt.value -= 1
34+
if self.cnt.value == 0:
35+
self.turnstile.acquire()
36+
self.turnstile2.release()
37+
self.mutex.release()
38+
self.turnstile2.acquire()
39+
self.turnstile2.release()
40+
41+
42+
43+
def Sendrecv(self, sendbuf, dest, sendtag, recvbuf, source, recvtag):
44+
self._queue_list[dest].put(sendbuf)
45+
temp = self._queue_list[self._pid].get()
46+
recvbuf[:len(temp)]=temp
47+
48+
def Bcast(self, buf, root):
49+
self.Barrier()
50+
if self._pid==root:
51+
for ii, q in enumerate(self._queue_list):
52+
if ii!=root:
53+
q.put(buf)
54+
else:
55+
temp = self._queue_list[self._pid].get()
56+
buf[:len(temp)]=temp
57+
self.Barrier()
58+
59+
def todo(sim_module_string, pid, N_proc, queue_list,
60+
mutex, barriex, turnstile, turnstile2, cnt):
61+
62+
comm = mpComm(pid, N_proc, queue_list,
63+
mutex, barriex, turnstile, turnstile2, cnt)
64+
65+
BIN = os.path.expanduser("./")
66+
sys.path.append(BIN)
67+
68+
sim_module_strings = sim_module_string.split('.')
69+
if len(sim_module_strings)!=2:
70+
raise(ValueError('\n\nsim_class must be given in the form: module.class.\nNested referencing not implemented.\n\n'))
71+
module_name = sim_module_strings[0]
72+
class_name = sim_module_strings[1]
73+
74+
75+
SimModule = __import__(module_name)
76+
SimClass = getattr(SimModule, class_name)
77+
78+
from ring_of_CPUs import RingOfCPUs
79+
simulation_content = SimClass()
80+
myCPUring = RingOfCPUs(simulation_content, comm=comm)
81+
myCPUring.run()
82+
83+
84+
85+
86+
87+
88+
if __name__=='__main__':
89+
import sys
90+
91+
print repr(sys.argv)
92+
93+
if len(sys.argv)!=4:
94+
raise ValueError('\n\nSyntax must be:\n\t python multiprocexec -n N_proc sim_class=module.class\n\n')
95+
if '-n' not in sys.argv[1] or 'sim_class' not in sys.argv[3]:
96+
raise ValueError('\n\nSyntax must be:\n python multiprocexec -n N_proc sim_class=module.class\n\n')
97+
98+
N_proc = int(sys.argv[2])
99+
100+
sim_module_string = sys.argv[3].split('=')[-1]
101+
102+
queue_list = [mp.Queue() for _ in xrange(N_proc)]
103+
104+
mutex = mp.Semaphore(1)
105+
barrier = mp.Semaphore(0)
106+
turnstile = mp.Semaphore(0)
107+
turnstile2 = mp.Semaphore(1)
108+
cnt = mp.Value('i', 0)
109+
110+
proc_list = []
111+
for pid in xrange(N_proc):
112+
proc_list.append(mp.Process(target=todo,
113+
args=(sim_module_string, pid, N_proc, queue_list,
114+
mutex, barrier, turnstile, turnstile2, cnt)))
115+
for p in proc_list:
116+
p.start()
117+
for p in proc_list:
118+
p.join()
119+

ring_of_CPUs.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,30 @@
1-
from mpi4py import MPI
2-
31
import numpy as np
42

53
import communication_helpers as ch
64

75
class RingOfCPUs(object):
8-
def __init__(self, sim_content, N_pieces_per_transfer=1, single_CPU_mode = False):
6+
def __init__(self, sim_content, N_pieces_per_transfer=1, single_CPU_mode = False, comm=None):
97

108
self.sim_content = sim_content
119
self.N_turns = sim_content.N_turns
1210
self.N_pieces_per_transfer = N_pieces_per_transfer
1311

12+
if hasattr(sim_content, 'N_pieces_per_transfer'):
13+
self.N_pieces_per_transfer = N_pieces_per_transfer
14+
15+
print 'N_pieces_per_transfer = ', N_pieces_per_transfer
16+
1417
self.sim_content.ring_of_CPUs = self
1518

16-
# check if user is forcing simulation mode
19+
# choice of the communicator
1720
if single_CPU_mode:
18-
print '\nSingle_CPU_forced_by_user!\n'
21+
print '\nSingle CPU forced by user!\n'
1922
self.comm = SingleCoreComminicator()
23+
elif comm is not None:
24+
print '\nMultiprocessing using communicator provided as argument.\n'
25+
self.comm = comm
2026
else:
27+
print '\nMultiprocessing via MPI.'
2128
from mpi4py import MPI
2229
self.comm = MPI.COMM_WORLD
2330

@@ -63,6 +70,9 @@ def __init__(self, sim_content, N_pieces_per_transfer=1, single_CPU_mode = False
6370

6471
def run(self):
6572
if self.I_am_the_master:
73+
with open('logfile.txt', 'a+') as fid:
74+
import socket
75+
fid.writelines(['Running on %s\n'%socket.gethostname()])
6676
import time
6777
t_last_turn = time.mktime(time.localtime())
6878
while True: #(it will be stopped with a break)
@@ -106,6 +116,8 @@ def run(self):
106116

107117
t_now = time.mktime(time.localtime())
108118
print 'Turn %d, %d s'%(self.i_turn,t_now-t_last_turn)
119+
with open('logfile.txt', 'a+') as fid:
120+
fid.writelines(['Turn %d, %d s\n'%(self.i_turn,t_now-t_last_turn)])
109121
t_last_turn = t_now
110122

111123
# prepare next turn
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import sys, os
2+
BIN = os.path.expanduser("../../../")
3+
sys.path.append(BIN)
4+
BIN = os.path.expanduser("../../")
5+
sys.path.append(BIN)
6+
7+
import multiprocessing as mp
8+
import numpy as np
9+
10+
class mpComm(object):
11+
def __init__(self, pid, N_proc, queue_list,
12+
mutex, barriex, turnstile, turnstile2, cnt):
13+
self._pid = pid
14+
self._N_proc = N_proc
15+
self._queue_list = queue_list
16+
self.mutex = mutex
17+
self.turnstile = turnstile
18+
self.turnstile2 = turnstile2
19+
self.cnt = cnt
20+
21+
def Get_size(self):
22+
return self._N_proc
23+
24+
def Get_rank(self):
25+
return self._pid
26+
27+
def Barrier(self):
28+
self.mutex.acquire()
29+
self.cnt.value += 1
30+
if self.cnt.value == self._N_proc:
31+
self.turnstile2.acquire()
32+
self.turnstile.release()
33+
self.mutex.release()
34+
self.turnstile.acquire()
35+
self.turnstile.release()
36+
#criticalpoint
37+
self.mutex.acquire()
38+
self.cnt.value -= 1
39+
if self.cnt.value == 0:
40+
self.turnstile.acquire()
41+
self.turnstile2.release()
42+
self.mutex.release()
43+
self.turnstile2.acquire()
44+
self.turnstile2.release()
45+
46+
47+
48+
def Sendrecv(self, sendbuf, dest, sendtag, recvbuf, source, recvtag):
49+
self._queue_list[dest].put(sendbuf)
50+
temp = self._queue_list[self._pid].get()
51+
recvbuf[:len(temp)]=temp
52+
53+
def Bcast(self, buf, root):
54+
self.Barrier()
55+
if self._pid==root:
56+
for ii, q in enumerate(self._queue_list):
57+
if ii!=root:
58+
q.put(buf)
59+
else:
60+
temp = self._queue_list[self._pid].get()
61+
buf[:len(temp)]=temp
62+
self.Barrier()
63+
64+
def todo(pid, N_proc, queue_list,
65+
mutex, barriex, turnstile, turnstile2, cnt):
66+
67+
comm = mpComm(pid, N_proc, queue_list,
68+
mutex, barriex, turnstile, turnstile2, cnt)
69+
70+
from ring_of_CPUs import RingOfCPUs
71+
from Simulation_with_eclouds import Simulation
72+
simulation_content = Simulation()
73+
74+
myCPUring = RingOfCPUs(simulation_content, N_pieces_per_transfer=1, comm=comm)
75+
76+
myCPUring.run()
77+
78+
79+
if __name__=='__main__':
80+
N_proc = 2
81+
82+
queue_list = [mp.Queue() for _ in xrange(N_proc)]
83+
84+
mutex = mp.Semaphore(1)
85+
barrier = mp.Semaphore(0)
86+
turnstile = mp.Semaphore(0)
87+
turnstile2 = mp.Semaphore(1)
88+
cnt = mp.Value('i', 0)
89+
90+
proc_list = []
91+
for pid in xrange(N_proc):
92+
proc_list.append(mp.Process(target=todo,
93+
args=(pid, N_proc, queue_list,
94+
mutex, barrier, turnstile, turnstile2, cnt)))
95+
for p in proc_list:
96+
p.start()
97+
for p in proc_list:
98+
p.join()
99+
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import sys, os
2+
BIN = os.path.expanduser("../../")
3+
sys.path.append(BIN)
4+
BIN = os.path.expanduser("../")
5+
sys.path.append(BIN)
6+
7+
import multiprocessing as mp
8+
import numpy as np
9+
10+
class mpComm(object):
11+
def __init__(self, pid, N_proc, queue_list,
12+
mutex, barriex, turnstile, turnstile2, cnt):
13+
self._pid = pid
14+
self._N_proc = N_proc
15+
self._queue_list = queue_list
16+
self.mutex = mutex
17+
self.turnstile = turnstile
18+
self.turnstile2 = turnstile2
19+
self.cnt = cnt
20+
21+
def Get_size(self):
22+
return self._N_proc
23+
24+
def Get_rank(self):
25+
return self._pid
26+
27+
def Barrier(self):
28+
self.mutex.acquire()
29+
self.cnt.value += 1
30+
if self.cnt.value == self._N_proc:
31+
self.turnstile2.acquire()
32+
self.turnstile.release()
33+
self.mutex.release()
34+
self.turnstile.acquire()
35+
self.turnstile.release()
36+
#criticalpoint
37+
self.mutex.acquire()
38+
self.cnt.value -= 1
39+
if self.cnt.value == 0:
40+
self.turnstile.acquire()
41+
self.turnstile2.release()
42+
self.mutex.release()
43+
self.turnstile2.acquire()
44+
self.turnstile2.release()
45+
46+
47+
48+
def Sendrecv(self, sendbuf, dest, sendtag, recvbuf, source, recvtag):
49+
self._queue_list[dest].put(sendbuf)
50+
temp = self._queue_list[self._pid].get()
51+
recvbuf[:len(temp)]=temp
52+
53+
def Bcast(self, buf, root):
54+
self.Barrier()
55+
if self._pid==root:
56+
for ii, q in enumerate(self._queue_list):
57+
if ii!=root:
58+
q.put(buf)
59+
else:
60+
temp = self._queue_list[self._pid].get()
61+
buf[:len(temp)]=temp
62+
self.Barrier()
63+
64+
def todo(pid, N_proc, queue_list,
65+
mutex, barriex, turnstile, turnstile2, cnt):
66+
67+
comm = mpComm(pid, N_proc, queue_list,
68+
mutex, barriex, turnstile, turnstile2, cnt)
69+
70+
from ring_of_CPUs import RingOfCPUs
71+
from Simulation import Simulation
72+
#~ from Simulation_with_eclouds import Simulation
73+
simulation_content = Simulation()
74+
75+
myCPUring = RingOfCPUs(simulation_content, N_pieces_per_transfer=5, comm=comm)
76+
77+
myCPUring.run()
78+
79+
80+
if __name__=='__main__':
81+
N_proc = 2
82+
83+
queue_list = [mp.Queue() for _ in xrange(N_proc)]
84+
85+
mutex = mp.Semaphore(1)
86+
barrier = mp.Semaphore(0)
87+
turnstile = mp.Semaphore(0)
88+
turnstile2 = mp.Semaphore(1)
89+
cnt = mp.Value('i', 0)
90+
91+
proc_list = []
92+
for pid in xrange(N_proc):
93+
proc_list.append(mp.Process(target=todo,
94+
args=(pid, N_proc, queue_list,
95+
mutex, barrier, turnstile, turnstile2, cnt)))
96+
for p in proc_list:
97+
p.start()
98+
for p in proc_list:
99+
p.join()
100+

test_ring_with_objects/Simulation.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
import sys, os
2+
BIN = os.path.expanduser("../../../")
3+
sys.path.append(BIN)
4+
BIN = os.path.expanduser("../../")
5+
sys.path.append(BIN)
6+
7+
18
import communication_helpers as ch
29
import numpy as np
310
from scipy.constants import c
@@ -190,4 +197,3 @@ def buffer_to_piece(self, buf):
190197

191198

192199

193-

0 commit comments

Comments
 (0)