Skip to content

Commit 5de20f7

Browse files
committed
feat: AFHMM & AFHMM+SAC: limit number of processes for disaggregation
Fixes: #27
1 parent f1403e8 commit 5de20f7

File tree

2 files changed

+15
-53
lines changed

2 files changed

+15
-53
lines changed

nilmtk_contrib/disaggregate/afhmm.py

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -195,30 +195,22 @@ def disaggregate_thread(self, test_mains,index,d):
195195
# Store the result in the index corresponding to the thread.
196196
d[index] = pd.DataFrame(prediction_dict,dtype='float32')
197197

198-
def disaggregate_chunk(self, test_mains_list):
199-
# Distributes the test mains across multiple threads and runs them in parallel
200-
manager = Manager()
201-
d = manager.dict()
198+
@nilmtk.docinherit.doc_inherit
199+
def disaggregate_chunk(self, test_mains):
200+
# Distributes the test mains across multiple threads and disaggregate in parallel
201+
# Use all available CPUs except one for the OS.
202+
n_workers = max(( 1, multiprocessing.cpu_count() - 1 ))
202203
predictions_lst = []
203-
for test_mains in test_mains_list:
204-
test_mains_big = test_mains.values.flatten().reshape((-1,1))
205-
self.arr_of_results = []
206-
threads = []
207-
for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))):
208-
test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period]
209-
t = Process(target=self.disaggregate_thread, args=(test_mains,test_block,d))
210-
threads.append(t)
211-
212-
for t in threads:
213-
t.start()
214-
215-
for t in threads:
216-
t.join()
217-
218-
for i in range(len(threads)):
219-
self.arr_of_results.append(d[i])
220-
prediction = pd.concat(self.arr_of_results,axis=0)
221-
predictions_lst.append(prediction)
204+
with multiprocessing.Pool(n_workers) as workers:
205+
for mains_df in test_mains:
206+
mains_vect = mains_df.values.flatten().reshape(( -1, 1 ))
207+
n_blocks = int(math.ceil(len(mains_vect)/self.time_period))
208+
blocks = [
209+
mains_vect[b * self.time_period:(b + 1) * self.time_period]
210+
for b in range(n_blocks)
211+
]
212+
res_arr = workers.map(self.disaggregate_thread, blocks)
213+
predictions_lst.append(pd.concat(res_arr, axis=0))
222214

223215
return predictions_lst
224216

nilmtk_contrib/disaggregate/afhmm_sac.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import cvxpy as cvx
2-
import math
3-
import multiprocessing
42
import numpy as np
53
import pandas as pd
64

@@ -204,31 +202,3 @@ def disaggregate_thread(self, test_mains,index,d):
204202

205203
d[index] = pd.DataFrame(prediction_dict,dtype='float32')
206204

207-
def disaggregate_chunk(self, test_mains_list):
208-
# Distributes the test mains across multiple threads and runs them in parallel
209-
manager = Manager()
210-
d = manager.dict()
211-
predictions_lst = []
212-
for test_mains in test_mains_list:
213-
test_mains_big = test_mains.values.flatten().reshape((-1,1))
214-
self.arr_of_results = []
215-
st = time.time()
216-
threads = []
217-
for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))):
218-
test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period]
219-
t = Process(target=self.disaggregate_thread, args=(test_mains,test_block,d))
220-
threads.append(t)
221-
222-
for t in threads:
223-
t.start()
224-
225-
for t in threads:
226-
t.join()
227-
228-
for i in range(len(threads)):
229-
self.arr_of_results.append(d[i])
230-
prediction = pd.concat(self.arr_of_results,axis=0)
231-
predictions_lst.append(prediction)
232-
233-
return predictions_lst
234-

0 commit comments

Comments
 (0)