From c521d78020bfe249fbf8ee6f9b76b689f9523cdb Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Fri, 11 Sep 2020 10:56:47 +0200 Subject: [PATCH 1/9] fix: AFHMM & AFHMM+SAC: the parameter chunk_wise_training is no longer hardcoded. Fixes: https://github.com/nilmtk/nilmtk-contrib/issues/36 --- nilmtk_contrib/disaggregate/afhmm.py | 2 +- nilmtk_contrib/disaggregate/afhmm_sac.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index ad16433..c6a08a5 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -21,7 +21,7 @@ def __init__(self, params): self.default_num_states = params.get('default_num_states',2) self.save_model_path = params.get('save-model-path', None) self.load_model_path = params.get('pretrained-model-path',None) - self.chunk_wise_training = False + self.chunk_wise_training = params.get("chunk_wise_training", False) if self.load_model_path: self.load_model(self.load_model_path) diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 1e87b27..44b5bc8 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -24,7 +24,7 @@ def __init__(self, params): self.default_num_states = params.get('default_num_states',2) self.save_model_path = params.get('save-model-path', None) self.load_model_path = params.get('pretrained-model-path',None) - self.chunk_wise_training = False + self.chunk_wise_training = params.get("chunk_wise_training", False) if self.load_model_path: self.load_model(self.load_model_path) @@ -289,4 +289,4 @@ def disaggregate_chunk(self, test_mains_list): return predictions_lst - \ No newline at end of file + From 8289c4067282ceefaf836ba67d80420cfadc4141 Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Fri, 11 Sep 2020 11:03:30 +0200 Subject: [PATCH 2/9] doc: AFHMM & AFHMM+SAC: update docstring class with real name and paper --- nilmtk_contrib/disaggregate/afhmm.py | 5 ++++- nilmtk_contrib/disaggregate/afhmm_sac.py | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index c6a08a5..ab9d46f 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -8,7 +8,10 @@ from multiprocessing import Process, Manager class AFHMM(Disaggregator): - + """ + Additive Factorial Hidden Markov Model (without Signal Aggregate Constraints) + See: http://papers.nips.cc/paper/5526-signal-aggregate-constraints-in-additive-factorial-hmms-with-application-to-energy-disaggregation.pdf + """ def __init__(self, params): self.model = [] self.MODEL_NAME = 'AFHMM' diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 44b5bc8..2efdc77 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -8,8 +8,10 @@ from multiprocessing import Process, Manager class AFHMM_SAC(Disaggregator): - """1 dimensional baseline Mean algorithm.""" - + """ + Additive Factorial Hidden Markov Model with Signal Aggregate Constraints + See: http://papers.nips.cc/paper/5526-signal-aggregate-constraints-in-additive-factorial-hmms-with-application-to-energy-disaggregation.pdf + """ def __init__(self, params): self.model = [] self.MIN_CHUNK_LENGTH = 100 From 14f331f692c1f061a207b0c80cc0404d2b28df39 Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Fri, 11 Sep 2020 11:08:25 +0200 Subject: [PATCH 3/9] refactor: AHFMM: clean __init__ declarations. Use .get() instead of double variable declarations. AFHMM+SAC: inherit from AFHMM to remove duplicate code. --- nilmtk_contrib/disaggregate/afhmm.py | 10 ++++------ nilmtk_contrib/disaggregate/afhmm_sac.py | 23 ++++------------------- 2 files changed, 8 insertions(+), 25 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index ab9d46f..404d781 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -13,17 +13,15 @@ class AFHMM(Disaggregator): See: http://papers.nips.cc/paper/5526-signal-aggregate-constraints-in-additive-factorial-hmms-with-application-to-energy-disaggregation.pdf """ def __init__(self, params): - self.model = [] self.MODEL_NAME = 'AFHMM' self.models = [] self.num_appliances = 0 self.appliances = [] self.signal_aggregates = OrderedDict() - self.time_period = 720 - self.time_period = params.get('time_period', self.time_period) - self.default_num_states = params.get('default_num_states',2) - self.save_model_path = params.get('save-model-path', None) - self.load_model_path = params.get('pretrained-model-path',None) + self.time_period = params.get("time_period", 720) + self.default_num_states = params.get("default_num_states", 2) + self.save_model_path = params.get("save-model-path", None) + self.load_model_path = params.get("pretrained-model-path", None) self.chunk_wise_training = params.get("chunk_wise_training", False) if self.load_model_path: self.load_model(self.load_model_path) diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 2efdc77..65732c0 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -6,34 +6,19 @@ import cvxpy as cvx from hmmlearn import hmm from multiprocessing import Process, Manager +from nilmtk_contrib.disaggregate import AFHMM -class AFHMM_SAC(Disaggregator): + +class AFHMM_SAC(AFHMM): """ Additive Factorial Hidden Markov Model with Signal Aggregate Constraints See: http://papers.nips.cc/paper/5526-signal-aggregate-constraints-in-additive-factorial-hmms-with-application-to-energy-disaggregation.pdf """ def __init__(self, params): - self.model = [] - self.MIN_CHUNK_LENGTH = 100 + super().__init__(params) self.MODEL_NAME = 'AFHMM_SAC' - self.default_num_states = 2 - self.models = [] - self.num_appliances = 0 - self.appliances = [] - self.time_period = 720 - self.signal_aggregates = OrderedDict() - self.time_period = params.get('time_period', self.time_period) - self.default_num_states = params.get('default_num_states',2) - self.save_model_path = params.get('save-model-path', None) - self.load_model_path = params.get('pretrained-model-path',None) - self.chunk_wise_training = params.get("chunk_wise_training", False) - if self.load_model_path: - self.load_model(self.load_model_path) - - def partial_fit(self, train_main, train_appliances, **load_kwargs): - self.models = [] self.num_appliances = 0 self.appliances = [] From 0885f26e5a37a417aca57b079fa74ee3ad50e3bc Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Fri, 11 Sep 2020 11:11:04 +0200 Subject: [PATCH 4/9] style: AFHMM & AFHMM+SAC: clean commented print statements. Fix comment typo. Remove commented code from 1 year ago. --- nilmtk_contrib/disaggregate/afhmm.py | 59 +++++------ nilmtk_contrib/disaggregate/afhmm_sac.py | 119 +++++++---------------- 2 files changed, 61 insertions(+), 117 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index 404d781..124fd37 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -13,10 +13,10 @@ class AFHMM(Disaggregator): See: http://papers.nips.cc/paper/5526-signal-aggregate-constraints-in-additive-factorial-hmms-with-application-to-energy-disaggregation.pdf """ def __init__(self, params): - self.MODEL_NAME = 'AFHMM' + self.MODEL_NAME = 'AFHMM' self.models = [] self.num_appliances = 0 - self.appliances = [] + self.appliances = [] self.signal_aggregates = OrderedDict() self.time_period = params.get("time_period", 720) self.default_num_states = params.get("default_num_states", 2) @@ -26,9 +26,7 @@ def __init__(self, params): if self.load_model_path: self.load_model(self.load_model_path) - def partial_fit(self, train_main, train_appliances, **load_kwargs): - self.models = [] self.num_appliances = 0 self.appliances = [] @@ -49,9 +47,8 @@ def partial_fit(self, train_main, train_appliances, **load_kwargs): train_main = train_main.values.flatten().reshape((-1,1)) for appliance_name, power in train_appliances: - #print (appliance_name) - # Learning the pi's and transistion probabliites for each appliance using a simple HMM - self.appliances.append(appliance_name) + # Learning the pi's and transistion probablities for each appliance using a simple HMM + self.appliances.append(appliance_name) X = power.values.reshape((-1,1)) learnt_model[appliance_name] = hmm.GaussianHMM(self.default_num_states, "full") # Fit @@ -87,9 +84,6 @@ def partial_fit(self, train_main, train_appliances, **load_kwargs): print ("Finished Training") def disaggregate_thread(self, test_mains,index,d): - - # A threads that does disaggregation - means_vector = self.means_vector pi_s_vector = self.pi_s_vector means_vector = self.means_vector @@ -99,7 +93,7 @@ def disaggregate_thread(self, test_mains,index,d): flag = 0 for epoch in range(6): - # The alernative Minimization + # The alernative minimization if epoch%2==1: usage = np.zeros((len(test_mains))) for appliance_id in range(self.num_appliances): @@ -108,14 +102,16 @@ def disaggregate_thread(self, test_mains,index,d): sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1)) sigma = np.where(sigma<1,1,sigma) else: - if flag==0: constraints = [] cvx_state_vectors = [] cvx_variable_matrices = [] delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t') for appliance_id in range(self.num_appliances): - state_vector = cvx.Variable(shape=(len(test_mains), self.default_num_states), name='state_vec-%s'%(appliance_id)) + state_vector = cvx.Variable( + shape=(len(test_mains), self.default_num_states), + name='state_vec-%s'%(appliance_id) + ) cvx_state_vectors.append(state_vector) # Enforcing that their values are ranged constraints+=[cvx_state_vectors[appliance_id]>=0] @@ -126,7 +122,10 @@ def disaggregate_thread(self, test_mains,index,d): # Creating Variable matrices for every appliance appliance_variable_matrix = [] for t in range(len(test_mains)): - matrix = cvx.Variable(shape=(self.default_num_states, self.default_num_states), name='variable_matrix-%s-%d'%(appliance_id,t)) + matrix = cvx.Variable( + shape=(self.default_num_states, self.default_num_states), + name='variable_matrix-%s-%d'%(appliance_id,t) + ) appliance_variable_matrix.append(matrix) cvx_variable_matrices.append(appliance_variable_matrix) # Enforcing that their values are ranged @@ -140,17 +139,16 @@ def disaggregate_thread(self, test_mains,index,d): # Constraint 6d for t in range(1,len(test_mains)): # 6d for i in range(self.default_num_states): - constraints+=[cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i]] + constraints+=[ + cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i] + ] - - total_observed_reading = np.zeros((test_mains.shape)) - # TOtal observed reading equals the sum of each appliance + # Total observed reading equals the sum of each appliance for appliance_id in range(self.num_appliances): - total_observed_reading+=cvx_state_vectors[appliance_id]@means_vector[appliance_id] + total_observed_reading+=cvx_state_vectors[appliance_id]@means_vector[appliance_id] # Loss function to be minimized - term_1 = 0 term_2 = 0 for appliance_id in range(self.num_appliances): @@ -171,13 +169,13 @@ def disaggregate_thread(self, test_mains,index,d): term_3 = 0 term_4 = 0 - for t in range(len(test_mains)): - term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) - term_3+= .5 * (np.log(sigma[t]**2)) + term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) + term_3+= .5 * (np.log(sigma[t]**2)) + expression = term_1 + term_2 + term_3 + term_4 expression = cvx.Minimize(expression) - prob = cvx.Problem(expression, constraints,) + prob = cvx.Problem(expression, constraints,) prob.solve(solver=cvx.SCS,verbose=False,warm_start=True) s_ = [i.value for i in cvx_state_vectors] @@ -188,19 +186,16 @@ def disaggregate_thread(self, test_mains,index,d): prediction_dict[app_name] = app_usage.flatten() # Store the result in the index corresponding to the thread. - d[index] = pd.DataFrame(prediction_dict,dtype='float32') def disaggregate_chunk(self, test_mains_list): - - # Sistributes the test mains across multiple threads and runs them in parallel + # Distributes the test mains across multiple threads and runs them in parallel manager = Manager() d = manager.dict() - predictions_lst = [] - for test_mains in test_mains_list: + for test_mains in test_mains_list: test_mains_big = test_mains.values.flatten().reshape((-1,1)) - self.arr_of_results = [] + self.arr_of_results = [] threads = [] for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))): test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period] @@ -217,8 +212,6 @@ def disaggregate_chunk(self, test_mains_list): self.arr_of_results.append(d[i]) prediction = pd.concat(self.arr_of_results,axis=0) predictions_lst.append(prediction) - - return predictions_lst - + return predictions_lst diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 65732c0..40a2c24 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -34,32 +34,22 @@ def partial_fit(self, train_main, train_appliances, **load_kwargs): train_app_tmp.append((app_name,df_list)) train_appliances = train_app_tmp - - learnt_model = OrderedDict() - means_vector = [] - one_hot_states_vector = [] - pi_s_vector = [] - transmat_vector = [] - states_vector = [] train_main = train_main.values.flatten().reshape((-1,1)) - + for appliance_name, power in train_appliances: #print (appliance_name) self.appliances.append(appliance_name) - X = power.values.reshape((-1,1)) - learnt_model[appliance_name] = hmm.GaussianHMM(self.default_num_states, "full") # Fit learnt_model[appliance_name].fit(X) - means = learnt_model[appliance_name].means_.flatten().reshape((-1,1)) states = learnt_model[appliance_name].predict(X) transmat = learnt_model[appliance_name].transmat_ @@ -70,17 +60,16 @@ def partial_fit(self, train_main, train_appliances, **load_kwargs): for i in keys: total+=counter[i] - - pi = [] + pi = [] for i in keys: pi.append(counter[i]/total) - + pi = np.array(pi) nb_classes = self.default_num_states targets = states.reshape(-1) - + means_vector.append(means) pi_s_vector.append(pi) transmat_vector.append(transmat.T) @@ -92,46 +81,7 @@ def partial_fit(self, train_main, train_appliances, **load_kwargs): self.pi_s_vector = pi_s_vector self.means_vector = means_vector self.transmat_vector = transmat_vector - -# print(transmat_vector) -# print (means_vector) -# print (states_vector) -# print (pi_s_vector) print ("Finished Training") -# print (self.signal_aggregates) -# print (np.log(transmat)) -# print(pi) -# print (np.log(pi)) - #print (np.sum(transmat_vector[0],axis=1)) - #print (np.sum(transmat_vector[0],axis=0)) - #print (states.shape) - #print (one_hot_targets.shape) - - # one_hot_states_vector = np.array(one_hot_states_vector) - - # # print (transmat_vector[0]) - # # print (np.sum(transmat_vector[0],axis=0)) - # # print (np.sum(transmat_vector[0],axis=1)) - # appliance_variable_matrix = [] - - # #print (len(states_vector)) - # #variable_matrix = np.zeros((len(appliance_states),self.default_num_states,self.default_num_states)) - - # for appliance_states in states_vector: - - # variable_matrix = np.zeros((len(appliance_states),self.default_num_states,self.default_num_states)) - - # for i in range(1,len(appliance_states)): - # current_state = appliance_states[i] - # previous_state = appliance_states[i-1] - # variable_matrix[i,current_state, previous_state] = 1 - # appliance_variable_matrix.append(variable_matrix) - - # appliance_variable_matrix = np.array(appliance_variable_matrix) - # term_1_list = [] - - # term_2_list = [] - def disaggregate_thread(self, test_mains,index,d): means_vector = self.means_vector @@ -150,7 +100,6 @@ def disaggregate_thread(self, test_mains,index,d): sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1)) sigma = np.where(sigma<1,1,sigma) else: - if flag==0: constraints = [] cvx_state_vectors = [] @@ -158,7 +107,11 @@ def disaggregate_thread(self, test_mains,index,d): delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t') for appliance_id in range(self.num_appliances): - state_vector = cvx.Variable(shape=(len(test_mains), self.default_num_states), name='state_vec-%s'%(appliance_id)) + state_vector = cvx.Variable( + shape=(len(test_mains), + self.default_num_states), + name='state_vec-%s'%(appliance_id) + ) cvx_state_vectors.append(state_vector) # Enforcing that their values are ranged constraints+=[cvx_state_vectors[appliance_id]>=0] @@ -166,41 +119,48 @@ def disaggregate_thread(self, test_mains,index,d): # Enforcing that sum of states equals 1 for t in range(len(test_mains)): # 6c constraints+=[cvx.sum(cvx_state_vectors[appliance_id][t])==1] + # Creating Variable matrices for every appliance appliance_variable_matrix = [] for t in range(len(test_mains)): - matrix = cvx.Variable(shape=(self.default_num_states, self.default_num_states), name='variable_matrix-%s-%d'%(appliance_id,t)) + matrix = cvx.Variable( + shape=(self.default_num_states, self.default_num_states), + name='variable_matrix-%s-%d'%(appliance_id,t) + ) appliance_variable_matrix.append(matrix) + cvx_variable_matrices.append(appliance_variable_matrix) # Enforcing that their values are ranged for t in range(len(test_mains)): constraints+=[cvx_variable_matrices[appliance_id][t]>=0] constraints+=[cvx_variable_matrices[appliance_id][t]<=1] + # Constraint 6e for t in range(0,len(test_mains)): # 6e for i in range(self.default_num_states): - constraints+=[cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i]] + constraints += [ + cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i] + ] + # Constraint 6d for t in range(1,len(test_mains)): # 6d for i in range(self.default_num_states): - constraints+=[cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i]] - + constraints += [ + cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i] + ] for appliance_id in range(self.num_appliances): appliance_usage = cvx_state_vectors[appliance_id]@means_vector[appliance_id] total_appliance_usage = cvx.sum(appliance_usage) - constraints+=[total_appliance_usage <= self.signal_aggregates[self.appliances[appliance_id]]] - + constraints += [ + total_appliance_usage <= self.signal_aggregates[self.appliances[appliance_id]] + ] # Second order cone constraints - total_observed_reading = np.zeros((test_mains.shape)) - #print (len(cvx_state_vectors)) for appliance_id in range(self.num_appliances): - total_observed_reading+=cvx_state_vectors[appliance_id]@means_vector[appliance_id] + total_observed_reading += cvx_state_vectors[appliance_id]@means_vector[appliance_id] flag=1 - - term_1 = 0 term_2 = 0 @@ -216,20 +176,19 @@ def disaggregate_thread(self, test_mains,index,d): # The expression involving start states first_one_hot_states = one_hot_states[0] term_2-= cvx.sum(cvx.multiply(first_one_hot_states,np.log(pi))) - + flag = 1 expression = 0 term_3 = 0 term_4 = 0 - for t in range(len(test_mains)): - term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) + term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) term_3+= .5 * (np.log(sigma[t]**2)) + expression = term_1 + term_2 + term_3 + term_4 expression = cvx.Minimize(expression) prob = cvx.Problem(expression, constraints) - prob.solve(solver=cvx.SCS,verbose=False, warm_start=True) s_ = [i.value for i in cvx_state_vectors] @@ -241,22 +200,15 @@ def disaggregate_thread(self, test_mains,index,d): d[index] = pd.DataFrame(prediction_dict,dtype='float32') - - - - - - def disaggregate_chunk(self, test_mains_list): - - # Sistributes the test mains across multiple threads and runs them in parallel + # Distributes the test mains across multiple threads and runs them in parallel manager = Manager() d = manager.dict() - predictions_lst = [] - for test_mains in test_mains_list: + for test_mains in test_mains_list: test_mains_big = test_mains.values.flatten().reshape((-1,1)) - self.arr_of_results = [] + self.arr_of_results = [] + st = time.time() threads = [] for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))): test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period] @@ -273,7 +225,6 @@ def disaggregate_chunk(self, test_mains_list): self.arr_of_results.append(d[i]) prediction = pd.concat(self.arr_of_results,axis=0) predictions_lst.append(prediction) - + return predictions_lst - From d93ed0ac3be6ee5cd6e4d7e90b3fafeadbd6126f Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Tue, 15 Sep 2020 18:17:39 +0200 Subject: [PATCH 5/9] fix: AFHMM & AFHMM+SAC: add safeguard for when the optimizer does not find any constraint and returns None. Fixes: https://github.com/nilmtk/nilmtk-contrib/issues/40 --- nilmtk_contrib/disaggregate/afhmm.py | 6 +++++- nilmtk_contrib/disaggregate/afhmm_sac.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index 124fd37..42244ac 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -177,7 +177,11 @@ def disaggregate_thread(self, test_mains,index,d): expression = cvx.Minimize(expression) prob = cvx.Problem(expression, constraints,) prob.solve(solver=cvx.SCS,verbose=False,warm_start=True) - s_ = [i.value for i in cvx_state_vectors] + s_ = [ + np.zeros((len(test_mains), self.default_num_states)) if i.value is None + else i.value + for i in cvx_state_vectors + ] prediction_dict = {} for appliance_id in range(self.num_appliances): diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 40a2c24..29e41f7 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -190,7 +190,11 @@ def disaggregate_thread(self, test_mains,index,d): expression = cvx.Minimize(expression) prob = cvx.Problem(expression, constraints) prob.solve(solver=cvx.SCS,verbose=False, warm_start=True) - s_ = [i.value for i in cvx_state_vectors] + s_ = [ + np.zeros((len(test_mains), self.default_num_states)) if i.value is None + else i.value + for i in cvx_state_vectors + ] prediction_dict = {} for appliance_id in range(self.num_appliances): From f1403e8199a2ae5346810a976b0e6c6431ed2c39 Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Wed, 7 Oct 2020 09:03:16 +0200 Subject: [PATCH 6/9] refactor: AFHMM & AFHMM+SAC: clean up imports --- nilmtk_contrib/disaggregate/afhmm.py | 13 ++++++++----- nilmtk_contrib/disaggregate/afhmm_sac.py | 10 +++++----- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index 42244ac..7257dbf 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -1,11 +1,14 @@ -from collections import Counter, OrderedDict +import cvxpy as cvx import math -import pandas as pd +import multiprocessing +import nilmtk.docinherit import numpy as np -from nilmtk.disaggregate import Disaggregator -import cvxpy as cvx +import pandas as pd + +from collections import Counter, OrderedDict from hmmlearn import hmm -from multiprocessing import Process, Manager +from nilmtk.disaggregate import Disaggregator + class AFHMM(Disaggregator): """ diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 29e41f7..e8c3780 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -1,11 +1,11 @@ -from collections import Counter, OrderedDict +import cvxpy as cvx import math -import pandas as pd +import multiprocessing import numpy as np -from nilmtk.disaggregate import Disaggregator -import cvxpy as cvx +import pandas as pd + +from collections import Counter, OrderedDict from hmmlearn import hmm -from multiprocessing import Process, Manager from nilmtk_contrib.disaggregate import AFHMM From 5de20f7ee8c419b1eff2095fd7beb9c88e5fabe6 Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Wed, 7 Oct 2020 09:29:20 +0200 Subject: [PATCH 7/9] feat: AFHMM & AFHMM+SAC: limit number of processes for disaggregation Fixes: https://github.com/nilmtk/nilmtk-contrib/issues/27 --- nilmtk_contrib/disaggregate/afhmm.py | 38 ++++++++++-------------- nilmtk_contrib/disaggregate/afhmm_sac.py | 30 ------------------- 2 files changed, 15 insertions(+), 53 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index 7257dbf..87a4f06 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -195,30 +195,22 @@ def disaggregate_thread(self, test_mains,index,d): # Store the result in the index corresponding to the thread. d[index] = pd.DataFrame(prediction_dict,dtype='float32') - def disaggregate_chunk(self, test_mains_list): - # Distributes the test mains across multiple threads and runs them in parallel - manager = Manager() - d = manager.dict() + @nilmtk.docinherit.doc_inherit + def disaggregate_chunk(self, test_mains): + # Distributes the test mains across multiple threads and disaggregate in parallel + # Use all available CPUs except one for the OS. + n_workers = max(( 1, multiprocessing.cpu_count() - 1 )) predictions_lst = [] - for test_mains in test_mains_list: - test_mains_big = test_mains.values.flatten().reshape((-1,1)) - self.arr_of_results = [] - threads = [] - for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))): - test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period] - t = Process(target=self.disaggregate_thread, args=(test_mains,test_block,d)) - threads.append(t) - - for t in threads: - t.start() - - for t in threads: - t.join() - - for i in range(len(threads)): - self.arr_of_results.append(d[i]) - prediction = pd.concat(self.arr_of_results,axis=0) - predictions_lst.append(prediction) + with multiprocessing.Pool(n_workers) as workers: + for mains_df in test_mains: + mains_vect = mains_df.values.flatten().reshape(( -1, 1 )) + n_blocks = int(math.ceil(len(mains_vect)/self.time_period)) + blocks = [ + mains_vect[b * self.time_period:(b + 1) * self.time_period] + for b in range(n_blocks) + ] + res_arr = workers.map(self.disaggregate_thread, blocks) + predictions_lst.append(pd.concat(res_arr, axis=0)) return predictions_lst diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index e8c3780..2bb02e3 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -1,6 +1,4 @@ import cvxpy as cvx -import math -import multiprocessing import numpy as np import pandas as pd @@ -204,31 +202,3 @@ def disaggregate_thread(self, test_mains,index,d): d[index] = pd.DataFrame(prediction_dict,dtype='float32') - def disaggregate_chunk(self, test_mains_list): - # Distributes the test mains across multiple threads and runs them in parallel - manager = Manager() - d = manager.dict() - predictions_lst = [] - for test_mains in test_mains_list: - test_mains_big = test_mains.values.flatten().reshape((-1,1)) - self.arr_of_results = [] - st = time.time() - threads = [] - for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))): - test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period] - t = Process(target=self.disaggregate_thread, args=(test_mains,test_block,d)) - threads.append(t) - - for t in threads: - t.start() - - for t in threads: - t.join() - - for i in range(len(threads)): - self.arr_of_results.append(d[i]) - prediction = pd.concat(self.arr_of_results,axis=0) - predictions_lst.append(prediction) - - return predictions_lst - From 4dab5d0469399620880882e2deae2df9b81f9b98 Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Wed, 7 Oct 2020 10:09:17 +0200 Subject: [PATCH 8/9] feat: AFHMM & AFHMM+SAC: implement chunkwise training. Fixes https://github.com/nilmtk/nilmtk-contrib/issues/42 --- nilmtk_contrib/disaggregate/afhmm.py | 92 ++++++++++-------------- nilmtk_contrib/disaggregate/afhmm_sac.py | 67 ----------------- 2 files changed, 36 insertions(+), 123 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index 87a4f06..fde8bb6 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -18,8 +18,9 @@ class AFHMM(Disaggregator): def __init__(self, params): self.MODEL_NAME = 'AFHMM' self.models = [] - self.num_appliances = 0 - self.appliances = [] + self.means_vector = OrderedDict() + self.pi_s_vector = OrderedDict() + self.transmat_vector = OrderedDict() self.signal_aggregates = OrderedDict() self.time_period = params.get("time_period", 720) self.default_num_states = params.get("default_num_states", 2) @@ -29,62 +30,41 @@ def __init__(self, params): if self.load_model_path: self.load_model(self.load_model_path) + @nilmtk.docinherit.doc_inherit def partial_fit(self, train_main, train_appliances, **load_kwargs): - self.models = [] - self.num_appliances = 0 - self.appliances = [] - train_main = pd.concat(train_main, axis=0) - train_app_tmp = [] - for app_name, df_list in train_appliances: - df_list = pd.concat(df_list, axis=0) - train_app_tmp.append((app_name,df_list)) - - # All the initializations required by the model - train_appliances = train_app_tmp - learnt_model = OrderedDict() - means_vector = [] - one_hot_states_vector = [] - pi_s_vector = [] - transmat_vector = [] - states_vector = [] - train_main = train_main.values.flatten().reshape((-1,1)) - - for appliance_name, power in train_appliances: - # Learning the pi's and transistion probablities for each appliance using a simple HMM - self.appliances.append(appliance_name) - X = power.values.reshape((-1,1)) - learnt_model[appliance_name] = hmm.GaussianHMM(self.default_num_states, "full") - # Fit - learnt_model[appliance_name].fit(X) - means = learnt_model[appliance_name].means_.flatten().reshape((-1,1)) - states = learnt_model[appliance_name].predict(X) - transmat = learnt_model[appliance_name].transmat_ + """ + train_main: pd.DataFrame It will contain the mains reading. + train_appliances: list of tuples [('appliance1', [ df1 ]),('appliance2', [ df2 ]),...] + """ + for appli_name, df_list in train_appliances: + # Compute model parameters for this chunk. + app_df = pd.concat(df_list, axis=0) + X = app_df.values.reshape(( -1, 1 )) + learnt_model = hmm.GaussianHMM(self.default_num_states, "full") + learnt_model.fit(X) + means = learnt_model.means_.flatten().reshape(( -1, 1 )) + states = learnt_model.predict(X) + transmat = learnt_model.transmat_.T counter = Counter(states.flatten()) - total = 0 - keys = list(counter.keys()) - keys.sort() - - for i in keys: - total+=counter[i] - pi = [] - - for i in keys: - pi.append(counter[i]/total) - pi = np.array(pi) - nb_classes = self.default_num_states - targets = states.reshape(-1) - means_vector.append(means) - pi_s_vector.append(pi) - transmat_vector.append(transmat.T) - states_vector.append(states) - self.num_appliances+=1 - self.signal_aggregates[appliance_name] = (np.mean(X)*self.time_period).reshape((-1,)) - - self.means_vector = means_vector - self.pi_s_vector = pi_s_vector - self.means_vector = means_vector - self.transmat_vector = transmat_vector - print ("Finished Training") + total = sum(counter.values()) + pi = np.array([ v/total for v in counter.values() ]) + sigagg = (np.mean(X) * self.time_period).reshape(( -1, )) + # Merge with previous values. + # Hypothesis 1: chunk size is constant. (mean of means) + # Hypothesis 2: if the appliance is already registered in + # self.means_vector, then it is also known in all other dicts. + if appli_name in self.means_vector: + self.means_vector[appli_name] = (self.means_vector[appli_name] + means) / 2 + self.pi_s_vector[appli_name] = (self.pi_s_vector[appli_name] + pi) / 2 + self.transmat_vector[appli_name] = (self.transmat_vector[appli_name] + transmat) / 2 + self.signal_aggregates[appli_name] = (self.signal_aggregates[appli_name] + sigagg) / 2 + else: + self.means_vector[appli_name] = means + self.pi_s_vector[appli_name] = pi + self.transmat_vector[appli_name] = transmat + self.signal_aggregates[appli_name] = sigagg + + print ("{}: Finished training".format(self.MODEL_NAME)) def disaggregate_thread(self, test_mains,index,d): means_vector = self.means_vector diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 2bb02e3..a11ed03 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -2,8 +2,6 @@ import numpy as np import pandas as pd -from collections import Counter, OrderedDict -from hmmlearn import hmm from nilmtk_contrib.disaggregate import AFHMM @@ -16,71 +14,6 @@ def __init__(self, params): super().__init__(params) self.MODEL_NAME = 'AFHMM_SAC' - def partial_fit(self, train_main, train_appliances, **load_kwargs): - self.models = [] - self.num_appliances = 0 - self.appliances = [] - ''' - train_main :- pd.DataFrame It will contain the mains reading. - train_appliances :- list of tuples [('appliance1',df1),('appliance2',df2),...] - ''' - train_main = pd.concat(train_main, axis=0) - train_app_tmp = [] - - for app_name, df_list in train_appliances: - df_list = pd.concat(df_list, axis=0) - train_app_tmp.append((app_name,df_list)) - - train_appliances = train_app_tmp - learnt_model = OrderedDict() - means_vector = [] - one_hot_states_vector = [] - pi_s_vector = [] - transmat_vector = [] - states_vector = [] - - train_main = train_main.values.flatten().reshape((-1,1)) - - for appliance_name, power in train_appliances: - #print (appliance_name) - self.appliances.append(appliance_name) - X = power.values.reshape((-1,1)) - learnt_model[appliance_name] = hmm.GaussianHMM(self.default_num_states, "full") - # Fit - learnt_model[appliance_name].fit(X) - means = learnt_model[appliance_name].means_.flatten().reshape((-1,1)) - states = learnt_model[appliance_name].predict(X) - transmat = learnt_model[appliance_name].transmat_ - counter = Counter(states.flatten()) - total = 0 - keys = list(counter.keys()) - keys.sort() - - for i in keys: - total+=counter[i] - - pi = [] - for i in keys: - pi.append(counter[i]/total) - - pi = np.array(pi) - - nb_classes = self.default_num_states - targets = states.reshape(-1) - - means_vector.append(means) - pi_s_vector.append(pi) - transmat_vector.append(transmat.T) - states_vector.append(states) - self.num_appliances+=1 - self.signal_aggregates[appliance_name] = (np.mean(X)*self.time_period).reshape((-1,)) - - self.means_vector = means_vector - self.pi_s_vector = pi_s_vector - self.means_vector = means_vector - self.transmat_vector = transmat_vector - print ("Finished Training") - def disaggregate_thread(self, test_mains,index,d): means_vector = self.means_vector pi_s_vector = self.pi_s_vector From a0610f4fcbb01f9469d577afaaf0a037a894c324 Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Wed, 7 Oct 2020 10:12:30 +0200 Subject: [PATCH 9/9] refactor: AFHMM & AFHMM+SAC: split disaggregate_thread function into the constraints setup and the actual minimization --- nilmtk_contrib/disaggregate/afhmm.py | 180 +++++++++++------------ nilmtk_contrib/disaggregate/afhmm_sac.py | 129 +--------------- 2 files changed, 91 insertions(+), 218 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index fde8bb6..9b8e486 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -66,114 +66,100 @@ def partial_fit(self, train_main, train_appliances, **load_kwargs): print ("{}: Finished training".format(self.MODEL_NAME)) - def disaggregate_thread(self, test_mains,index,d): - means_vector = self.means_vector - pi_s_vector = self.pi_s_vector - means_vector = self.means_vector - transmat_vector = self.transmat_vector - - sigma = 100*np.ones((len(test_mains),1)) - flag = 0 - - for epoch in range(6): - # The alernative minimization - if epoch%2==1: - usage = np.zeros((len(test_mains))) - for appliance_id in range(self.num_appliances): - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - usage+=app_usage - sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1)) - sigma = np.where(sigma<1,1,sigma) + def setup_cvx_constraints(self, n_samples, n_appliances): + cvx_state_vectors = [ + cvx.Variable( + shape=( n_samples, self.default_num_states ), + name="state_vec-{}".format(i) + ) + for i in range(n_appliances) + ] + constraints = [] + for stv in cvx_state_vectors: + # State vector values are ranged. + constraints += [ stv >= 0, stv <= 1 ] + # Sum of states equals 1. + for t in range(n_samples): + constraints.append(cvx.sum(stv[t]) == 1) + # Create variable matrices for each appliance, for each sample. + cvx_variable_matrices = [ + [ + cvx.Variable( + shape=( self.default_num_states, self.default_num_states ), + name="variable_matrix-{}-{}".format(i, t) + ) + for t in range(n_samples) + ] + for i in range(n_appliances) + ] + for i, appli_varmats in enumerate(cvx_variable_matrices): + for t, varmat in enumerate(appli_varmats): + # Assign range constraints to variable matrices. + constraints += [ varmat >= 0, varmat <= 1 ] + # Assign equality constraints with state vectors. + constraints += [ + cvx.sum(varmat[l]) == cvx_state_vectors[i][t-1][l] + for l in range(self.default_num_states) + ] + constraints += [ + cvx.sum((varmat.T)[l]) == cvx_state_vectors[i][t][l] + for l in range(self.default_num_states) + ] + + return cvx_state_vectors, constraints, cvx_variable_matrices + + def disaggregate_thread(self, test_mains): + n_epochs = 6 # don't put in __init__, those are inference epochs! + n_samples = len(test_mains) + sigma = 100*np.ones(( n_samples, 1 )) + cvx_state_vectors, constraints, cvx_varmats = self.setup_cvx_constraints( + n_samples, len(self.means_vector)) + # Preparing first terms of the objective function. + term_1 = 0 + term_2 = 0 + total_appli_energy = np.zeros_like(test_mains) + for i, (appli_name, means) in enumerate(self.means_vector.items()): + total_appli_energy += cvx_state_vectors[i]@means + appli_varmats = cvx_varmats[i] + transmat = self.transmat_vector[appli_name] + for varmat in appli_varmats: + term_1 -= cvx.sum(cvx.multiply(varmat, np.log(transmat))) + + first_hot_state = cvx_state_vectors[i][0] + transition_p = self.pi_s_vector[appli_name] + term_2 -= cvx.sum(cvx.multiply(first_hot_state, np.log(transition_p))) + + for epoch in range(n_epochs): + if epoch % 2: + # Alernative minimization on odd epochs. + usage = np.zeros(( n_samples, )) + for i, (appli_name, means) in enumerate(self.means_vector.items()): + usage += np.sum(s_[i]@means, axis=1) + sigma = (test_mains.flatten() - usage.flatten()).reshape(( -1, 1 )) + sigma = np.where(sigma < 1, 1, sigma) else: - if flag==0: - constraints = [] - cvx_state_vectors = [] - cvx_variable_matrices = [] - delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t') - for appliance_id in range(self.num_appliances): - state_vector = cvx.Variable( - shape=(len(test_mains), self.default_num_states), - name='state_vec-%s'%(appliance_id) - ) - cvx_state_vectors.append(state_vector) - # Enforcing that their values are ranged - constraints+=[cvx_state_vectors[appliance_id]>=0] - constraints+=[cvx_state_vectors[appliance_id]<=1] - # Enforcing that sum of states equals 1 - for t in range(len(test_mains)): # 6c - constraints+=[cvx.sum(cvx_state_vectors[appliance_id][t])==1] - # Creating Variable matrices for every appliance - appliance_variable_matrix = [] - for t in range(len(test_mains)): - matrix = cvx.Variable( - shape=(self.default_num_states, self.default_num_states), - name='variable_matrix-%s-%d'%(appliance_id,t) - ) - appliance_variable_matrix.append(matrix) - cvx_variable_matrices.append(appliance_variable_matrix) - # Enforcing that their values are ranged - for t in range(len(test_mains)): - constraints+=[cvx_variable_matrices[appliance_id][t]>=0] - constraints+=[cvx_variable_matrices[appliance_id][t]<=1] - # Constraint 6e - for t in range(0,len(test_mains)): # 6e - for i in range(self.default_num_states): - constraints+=[cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i]] - # Constraint 6d - for t in range(1,len(test_mains)): # 6d - for i in range(self.default_num_states): - constraints+=[ - cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i] - ] - - total_observed_reading = np.zeros((test_mains.shape)) - # Total observed reading equals the sum of each appliance - for appliance_id in range(self.num_appliances): - total_observed_reading+=cvx_state_vectors[appliance_id]@means_vector[appliance_id] - - # Loss function to be minimized - term_1 = 0 - term_2 = 0 - for appliance_id in range(self.num_appliances): - # First loop is over appliances - variable_matrix = cvx_variable_matrices[appliance_id] - transmat = transmat_vector[appliance_id] - # Next loop is over different time-stamps - for matrix in variable_matrix: - term_1-=cvx.sum(cvx.multiply(matrix,np.log(transmat))) - one_hot_states = cvx_state_vectors[appliance_id] - pi = pi_s_vector[appliance_id] - # The expression involving start states - first_one_hot_states = one_hot_states[0] - term_2-= cvx.sum(cvx.multiply(first_one_hot_states,np.log(pi))) - flag = 1 - - expression = 0 + # Primary minimization on even epochs. term_3 = 0 term_4 = 0 + for t in range(n_samples): + term_3 += .5 * (np.log(sigma[t]**2)) + term_4 += .5 * ((test_mains[t][0] - total_appli_energy[t][0])**2 / (sigma[t]**2)) - for t in range(len(test_mains)): - term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) - term_3+= .5 * (np.log(sigma[t]**2)) - - expression = term_1 + term_2 + term_3 + term_4 - expression = cvx.Minimize(expression) - prob = cvx.Problem(expression, constraints,) - prob.solve(solver=cvx.SCS,verbose=False,warm_start=True) + objective = cvx.Minimize(term_1 + term_2 + term_3 + term_4) + prob = cvx.Problem(objective, constraints) + prob.solve(solver=cvx.SCS, verbose=False, warm_start=True) s_ = [ - np.zeros((len(test_mains), self.default_num_states)) if i.value is None + np.zeros((n_samples, self.default_num_states)) if i.value is None else i.value for i in cvx_state_vectors ] prediction_dict = {} - for appliance_id in range(self.num_appliances): - app_name = self.appliances[appliance_id] - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - prediction_dict[app_name] = app_usage.flatten() + for i, (appli_name, means) in enumerate(self.means_vector.items()): + app_usage = np.sum(s_[i]@means, axis=1) + prediction_dict[appli_name] = app_usage.flatten() - # Store the result in the index corresponding to the thread. - d[index] = pd.DataFrame(prediction_dict,dtype='float32') + return pd.DataFrame(prediction_dict, dtype="float32") @nilmtk.docinherit.doc_inherit def disaggregate_chunk(self, test_mains): diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index a11ed03..e1abf80 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -1,6 +1,4 @@ import cvxpy as cvx -import numpy as np -import pandas as pd from nilmtk_contrib.disaggregate import AFHMM @@ -14,124 +12,13 @@ def __init__(self, params): super().__init__(params) self.MODEL_NAME = 'AFHMM_SAC' - def disaggregate_thread(self, test_mains,index,d): - means_vector = self.means_vector - pi_s_vector = self.pi_s_vector - means_vector = self.means_vector - transmat_vector = self.transmat_vector - sigma = 100*np.ones((len(test_mains),1)) - flag = 0 - for epoch in range(6): - if epoch%2==1: - # The alernative Minimization - usage = np.zeros((len(test_mains))) - for appliance_id in range(self.num_appliances): - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - usage+=app_usage - sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1)) - sigma = np.where(sigma<1,1,sigma) - else: - if flag==0: - constraints = [] - cvx_state_vectors = [] - cvx_variable_matrices = [] - delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t') + def setup_cvx_constraints(self, n_samples, n_appliances): + cvx_state_vectors, constraints, cvx_variable_matrices = super().setup_cvx_constraints(n_samples, n_appliances) + # Constraints on signal aggregates. + for i, (appli_name, signal_aggregate) in enumerate(self.signal_aggregates.items()): + appliance_usage = cvx_state_vectors[i]@self.means_vector[appli_name] + total_appliance_usage = cvx.sum(appliance_usage) + constraints.append(total_appliance_usage <= signal_aggregate) - for appliance_id in range(self.num_appliances): - state_vector = cvx.Variable( - shape=(len(test_mains), - self.default_num_states), - name='state_vec-%s'%(appliance_id) - ) - cvx_state_vectors.append(state_vector) - # Enforcing that their values are ranged - constraints+=[cvx_state_vectors[appliance_id]>=0] - constraints+=[cvx_state_vectors[appliance_id]<=1] - # Enforcing that sum of states equals 1 - for t in range(len(test_mains)): # 6c - constraints+=[cvx.sum(cvx_state_vectors[appliance_id][t])==1] - - # Creating Variable matrices for every appliance - appliance_variable_matrix = [] - for t in range(len(test_mains)): - matrix = cvx.Variable( - shape=(self.default_num_states, self.default_num_states), - name='variable_matrix-%s-%d'%(appliance_id,t) - ) - appliance_variable_matrix.append(matrix) - - cvx_variable_matrices.append(appliance_variable_matrix) - # Enforcing that their values are ranged - for t in range(len(test_mains)): - constraints+=[cvx_variable_matrices[appliance_id][t]>=0] - constraints+=[cvx_variable_matrices[appliance_id][t]<=1] - - # Constraint 6e - for t in range(0,len(test_mains)): # 6e - for i in range(self.default_num_states): - constraints += [ - cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i] - ] - - # Constraint 6d - for t in range(1,len(test_mains)): # 6d - for i in range(self.default_num_states): - constraints += [ - cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i] - ] - - for appliance_id in range(self.num_appliances): - appliance_usage = cvx_state_vectors[appliance_id]@means_vector[appliance_id] - total_appliance_usage = cvx.sum(appliance_usage) - constraints += [ - total_appliance_usage <= self.signal_aggregates[self.appliances[appliance_id]] - ] - - # Second order cone constraints - total_observed_reading = np.zeros((test_mains.shape)) - for appliance_id in range(self.num_appliances): - total_observed_reading += cvx_state_vectors[appliance_id]@means_vector[appliance_id] - flag=1 - term_1 = 0 - term_2 = 0 - - for appliance_id in range(self.num_appliances): - # First loop is over appliances - variable_matrix = cvx_variable_matrices[appliance_id] - transmat = transmat_vector[appliance_id] - # Next loop is over different time-stamps - for matrix in variable_matrix: - term_1-=cvx.sum(cvx.multiply(matrix,np.log(transmat))) - one_hot_states = cvx_state_vectors[appliance_id] - pi = pi_s_vector[appliance_id] - # The expression involving start states - first_one_hot_states = one_hot_states[0] - term_2-= cvx.sum(cvx.multiply(first_one_hot_states,np.log(pi))) - - flag = 1 - - expression = 0 - term_3 = 0 - term_4 = 0 - for t in range(len(test_mains)): - term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) - term_3+= .5 * (np.log(sigma[t]**2)) - - expression = term_1 + term_2 + term_3 + term_4 - expression = cvx.Minimize(expression) - prob = cvx.Problem(expression, constraints) - prob.solve(solver=cvx.SCS,verbose=False, warm_start=True) - s_ = [ - np.zeros((len(test_mains), self.default_num_states)) if i.value is None - else i.value - for i in cvx_state_vectors - ] - - prediction_dict = {} - for appliance_id in range(self.num_appliances): - app_name = self.appliances[appliance_id] - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - prediction_dict[app_name] = app_usage.flatten() - - d[index] = pd.DataFrame(prediction_dict,dtype='float32') + return cvx_state_vectors, constraints, cvx_variable_matrices