Skip to content

Commit 9f56654

Browse files
committed
implemented multiprocexec
1 parent 37d44ac commit 9f56654

File tree

3 files changed

+135
-3
lines changed

3 files changed

+135
-3
lines changed

multiprocexec.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import multiprocessing as mp
2+
import numpy as np
3+
import os, sys
4+
5+
class mpComm(object):
6+
def __init__(self, pid, N_proc, queue_list,
7+
mutex, barriex, turnstile, turnstile2, cnt):
8+
self._pid = pid
9+
self._N_proc = N_proc
10+
self._queue_list = queue_list
11+
self.mutex = mutex
12+
self.turnstile = turnstile
13+
self.turnstile2 = turnstile2
14+
self.cnt = cnt
15+
16+
def Get_size(self):
17+
return self._N_proc
18+
19+
def Get_rank(self):
20+
return self._pid
21+
22+
def Barrier(self):
23+
self.mutex.acquire()
24+
self.cnt.value += 1
25+
if self.cnt.value == self._N_proc:
26+
self.turnstile2.acquire()
27+
self.turnstile.release()
28+
self.mutex.release()
29+
self.turnstile.acquire()
30+
self.turnstile.release()
31+
#criticalpoint
32+
self.mutex.acquire()
33+
self.cnt.value -= 1
34+
if self.cnt.value == 0:
35+
self.turnstile.acquire()
36+
self.turnstile2.release()
37+
self.mutex.release()
38+
self.turnstile2.acquire()
39+
self.turnstile2.release()
40+
41+
42+
43+
def Sendrecv(self, sendbuf, dest, sendtag, recvbuf, source, recvtag):
44+
self._queue_list[dest].put(sendbuf)
45+
temp = self._queue_list[self._pid].get()
46+
recvbuf[:len(temp)]=temp
47+
48+
def Bcast(self, buf, root):
49+
self.Barrier()
50+
if self._pid==root:
51+
for ii, q in enumerate(self._queue_list):
52+
if ii!=root:
53+
q.put(buf)
54+
else:
55+
temp = self._queue_list[self._pid].get()
56+
buf[:len(temp)]=temp
57+
self.Barrier()
58+
59+
def todo(sim_module_string, pid, N_proc, queue_list,
60+
mutex, barriex, turnstile, turnstile2, cnt):
61+
62+
comm = mpComm(pid, N_proc, queue_list,
63+
mutex, barriex, turnstile, turnstile2, cnt)
64+
65+
BIN = os.path.expanduser("./")
66+
sys.path.append(BIN)
67+
68+
sim_module_strings = sim_module_string.split('.')
69+
if len(sim_module_strings)!=2:
70+
raise(ValueError('\n\nsim_class must be given in the form: module.class.\nNested referencing not implemented.\n\n'))
71+
module_name = sim_module_strings[0]
72+
class_name = sim_module_strings[1]
73+
74+
75+
SimModule = __import__(module_name)
76+
SimClass = getattr(SimModule, class_name)
77+
78+
from ring_of_CPUs import RingOfCPUs
79+
simulation_content = SimClass()
80+
myCPUring = RingOfCPUs(simulation_content, comm=comm)
81+
myCPUring.run()
82+
83+
84+
85+
86+
87+
88+
if __name__=='__main__':
89+
import sys
90+
91+
print repr(sys.argv)
92+
93+
if len(sys.argv)!=4:
94+
raise ValueError('\n\nSyntax must be:\n\t python multiprocexec -n N_proc sim_class=module.class\n\n')
95+
if '-n' not in sys.argv[1] or 'sim_class' not in sys.argv[3]:
96+
raise ValueError('\n\nSyntax must be:\n python multiprocexec -n N_proc sim_class=module.class\n\n')
97+
98+
N_proc = int(sys.argv[2])
99+
100+
sim_module_string = sys.argv[3].split('=')[-1]
101+
102+
queue_list = [mp.Queue() for _ in xrange(N_proc)]
103+
104+
mutex = mp.Semaphore(1)
105+
barrier = mp.Semaphore(0)
106+
turnstile = mp.Semaphore(0)
107+
turnstile2 = mp.Semaphore(1)
108+
cnt = mp.Value('i', 0)
109+
110+
proc_list = []
111+
for pid in xrange(N_proc):
112+
proc_list.append(mp.Process(target=todo,
113+
args=(sim_module_string, pid, N_proc, queue_list,
114+
mutex, barrier, turnstile, turnstile2, cnt)))
115+
for p in proc_list:
116+
p.start()
117+
for p in proc_list:
118+
p.join()
119+

ring_of_CPUs.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,22 @@ def __init__(self, sim_content, N_pieces_per_transfer=1, single_CPU_mode = False
1111
self.N_turns = sim_content.N_turns
1212
self.N_pieces_per_transfer = N_pieces_per_transfer
1313

14+
if hasattr(sim_content, 'N_pieces_per_transfer'):
15+
self.N_pieces_per_transfer = N_pieces_per_transfer
16+
17+
print 'N_pieces_per_transfer = ', N_pieces_per_transfer
18+
1419
self.sim_content.ring_of_CPUs = self
1520

16-
# check if user is forcing simulation mode
21+
# choice of the communicator
1722
if single_CPU_mode:
18-
print '\nSingle_CPU_forced_by_user!\n'
23+
print '\nSingle CPU forced by user!\n'
1924
self.comm = SingleCoreComminicator()
2025
elif comm is not None:
26+
print '\nMultiprocessing using communicator provided as argument.\n'
2127
self.comm = comm
2228
else:
29+
print '\nMultiprocessing via MPI.'
2330
from mpi4py import MPI
2431
self.comm = MPI.COMM_WORLD
2532

test_ring_with_objects/Simulation.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
import sys, os
2+
BIN = os.path.expanduser("../../../")
3+
sys.path.append(BIN)
4+
BIN = os.path.expanduser("../../")
5+
sys.path.append(BIN)
6+
7+
18
import communication_helpers as ch
29
import numpy as np
310
from scipy.constants import c
@@ -190,4 +197,3 @@ def buffer_to_piece(self, buf):
190197

191198

192199

193-

0 commit comments

Comments
 (0)