Skip to content

feat: allow chunkwise training and limit the number of processes for AFHMM+SAC #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
359 changes: 159 additions & 200 deletions nilmtk_contrib/disaggregate/afhmm.py
Original file line number Diff line number Diff line change
@@ -1,223 +1,182 @@
from collections import Counter, OrderedDict
import cvxpy as cvx
import math
import pandas as pd
import multiprocessing
import nilmtk.docinherit
import numpy as np
from nilmtk.disaggregate import Disaggregator
import cvxpy as cvx
import pandas as pd

from collections import Counter, OrderedDict
from hmmlearn import hmm
from multiprocessing import Process, Manager
from nilmtk.disaggregate import Disaggregator

class AFHMM(Disaggregator):

class AFHMM(Disaggregator):
"""
Additive Factorial Hidden Markov Model (without Signal Aggregate Constraints)
See: http://papers.nips.cc/paper/5526-signal-aggregate-constraints-in-additive-factorial-hmms-with-application-to-energy-disaggregation.pdf
"""
def __init__(self, params):
self.model = []
self.MODEL_NAME = 'AFHMM'
self.MODEL_NAME = 'AFHMM'
self.models = []
self.num_appliances = 0
self.appliances = []
self.means_vector = OrderedDict()
self.pi_s_vector = OrderedDict()
self.transmat_vector = OrderedDict()
self.signal_aggregates = OrderedDict()
self.time_period = 720
self.time_period = params.get('time_period', self.time_period)
self.default_num_states = params.get('default_num_states',2)
self.save_model_path = params.get('save-model-path', None)
self.load_model_path = params.get('pretrained-model-path',None)
self.chunk_wise_training = False
self.time_period = params.get("time_period", 720)
self.default_num_states = params.get("default_num_states", 2)
self.save_model_path = params.get("save-model-path", None)
self.load_model_path = params.get("pretrained-model-path", None)
self.chunk_wise_training = params.get("chunk_wise_training", False)
if self.load_model_path:
self.load_model(self.load_model_path)


@nilmtk.docinherit.doc_inherit
def partial_fit(self, train_main, train_appliances, **load_kwargs):

self.models = []
self.num_appliances = 0
self.appliances = []
train_main = pd.concat(train_main, axis=0)
train_app_tmp = []
for app_name, df_list in train_appliances:
df_list = pd.concat(df_list, axis=0)
train_app_tmp.append((app_name,df_list))

# All the initializations required by the model
train_appliances = train_app_tmp
learnt_model = OrderedDict()
means_vector = []
one_hot_states_vector = []
pi_s_vector = []
transmat_vector = []
states_vector = []
train_main = train_main.values.flatten().reshape((-1,1))

for appliance_name, power in train_appliances:
#print (appliance_name)
# Learning the pi's and transistion probabliites for each appliance using a simple HMM
self.appliances.append(appliance_name)
X = power.values.reshape((-1,1))
learnt_model[appliance_name] = hmm.GaussianHMM(self.default_num_states, "full")
# Fit
learnt_model[appliance_name].fit(X)
means = learnt_model[appliance_name].means_.flatten().reshape((-1,1))
states = learnt_model[appliance_name].predict(X)
transmat = learnt_model[appliance_name].transmat_
"""
train_main: pd.DataFrame It will contain the mains reading.
train_appliances: list of tuples [('appliance1', [ df1 ]),('appliance2', [ df2 ]),...]
"""
for appli_name, df_list in train_appliances:
# Compute model parameters for this chunk.
app_df = pd.concat(df_list, axis=0)
X = app_df.values.reshape(( -1, 1 ))
learnt_model = hmm.GaussianHMM(self.default_num_states, "full")
learnt_model.fit(X)
means = learnt_model.means_.flatten().reshape(( -1, 1 ))
states = learnt_model.predict(X)
transmat = learnt_model.transmat_.T
counter = Counter(states.flatten())
total = 0
keys = list(counter.keys())
keys.sort()

for i in keys:
total+=counter[i]
pi = []

for i in keys:
pi.append(counter[i]/total)
pi = np.array(pi)
nb_classes = self.default_num_states
targets = states.reshape(-1)
means_vector.append(means)
pi_s_vector.append(pi)
transmat_vector.append(transmat.T)
states_vector.append(states)
self.num_appliances+=1
self.signal_aggregates[appliance_name] = (np.mean(X)*self.time_period).reshape((-1,))

self.means_vector = means_vector
self.pi_s_vector = pi_s_vector
self.means_vector = means_vector
self.transmat_vector = transmat_vector
print ("Finished Training")

def disaggregate_thread(self, test_mains,index,d):

# A threads that does disaggregation

means_vector = self.means_vector
pi_s_vector = self.pi_s_vector
means_vector = self.means_vector
transmat_vector = self.transmat_vector

sigma = 100*np.ones((len(test_mains),1))
flag = 0

for epoch in range(6):
# The alernative Minimization
if epoch%2==1:
usage = np.zeros((len(test_mains)))
for appliance_id in range(self.num_appliances):
app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1)
usage+=app_usage
sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1))
sigma = np.where(sigma<1,1,sigma)
total = sum(counter.values())
pi = np.array([ v/total for v in counter.values() ])
sigagg = (np.mean(X) * self.time_period).reshape(( -1, ))
# Merge with previous values.
# Hypothesis 1: chunk size is constant. (mean of means)
# Hypothesis 2: if the appliance is already registered in
# self.means_vector, then it is also known in all other dicts.
if appli_name in self.means_vector:
self.means_vector[appli_name] = (self.means_vector[appli_name] + means) / 2
self.pi_s_vector[appli_name] = (self.pi_s_vector[appli_name] + pi) / 2
self.transmat_vector[appli_name] = (self.transmat_vector[appli_name] + transmat) / 2
self.signal_aggregates[appli_name] = (self.signal_aggregates[appli_name] + sigagg) / 2
else:

if flag==0:
constraints = []
cvx_state_vectors = []
cvx_variable_matrices = []
delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t')
for appliance_id in range(self.num_appliances):
state_vector = cvx.Variable(shape=(len(test_mains), self.default_num_states), name='state_vec-%s'%(appliance_id))
cvx_state_vectors.append(state_vector)
# Enforcing that their values are ranged
constraints+=[cvx_state_vectors[appliance_id]>=0]
constraints+=[cvx_state_vectors[appliance_id]<=1]
# Enforcing that sum of states equals 1
for t in range(len(test_mains)): # 6c
constraints+=[cvx.sum(cvx_state_vectors[appliance_id][t])==1]
# Creating Variable matrices for every appliance
appliance_variable_matrix = []
for t in range(len(test_mains)):
matrix = cvx.Variable(shape=(self.default_num_states, self.default_num_states), name='variable_matrix-%s-%d'%(appliance_id,t))
appliance_variable_matrix.append(matrix)
cvx_variable_matrices.append(appliance_variable_matrix)
# Enforcing that their values are ranged
for t in range(len(test_mains)):
constraints+=[cvx_variable_matrices[appliance_id][t]>=0]
constraints+=[cvx_variable_matrices[appliance_id][t]<=1]
# Constraint 6e
for t in range(0,len(test_mains)): # 6e
for i in range(self.default_num_states):
constraints+=[cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i]]
# Constraint 6d
for t in range(1,len(test_mains)): # 6d
for i in range(self.default_num_states):
constraints+=[cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i]]



total_observed_reading = np.zeros((test_mains.shape))
# TOtal observed reading equals the sum of each appliance
for appliance_id in range(self.num_appliances):
total_observed_reading+=cvx_state_vectors[appliance_id]@means_vector[appliance_id]

# Loss function to be minimized

term_1 = 0
term_2 = 0
for appliance_id in range(self.num_appliances):
# First loop is over appliances
variable_matrix = cvx_variable_matrices[appliance_id]
transmat = transmat_vector[appliance_id]
# Next loop is over different time-stamps
for matrix in variable_matrix:
term_1-=cvx.sum(cvx.multiply(matrix,np.log(transmat)))
one_hot_states = cvx_state_vectors[appliance_id]
pi = pi_s_vector[appliance_id]
# The expression involving start states
first_one_hot_states = one_hot_states[0]
term_2-= cvx.sum(cvx.multiply(first_one_hot_states,np.log(pi)))
flag = 1

expression = 0
self.means_vector[appli_name] = means
self.pi_s_vector[appli_name] = pi
self.transmat_vector[appli_name] = transmat
self.signal_aggregates[appli_name] = sigagg

print ("{}: Finished training".format(self.MODEL_NAME))

def setup_cvx_constraints(self, n_samples, n_appliances):
cvx_state_vectors = [
cvx.Variable(
shape=( n_samples, self.default_num_states ),
name="state_vec-{}".format(i)
)
for i in range(n_appliances)
]
constraints = []
for stv in cvx_state_vectors:
# State vector values are ranged.
constraints += [ stv >= 0, stv <= 1 ]
# Sum of states equals 1.
for t in range(n_samples):
constraints.append(cvx.sum(stv[t]) == 1)
# Create variable matrices for each appliance, for each sample.
cvx_variable_matrices = [
[
cvx.Variable(
shape=( self.default_num_states, self.default_num_states ),
name="variable_matrix-{}-{}".format(i, t)
)
for t in range(n_samples)
]
for i in range(n_appliances)
]
for i, appli_varmats in enumerate(cvx_variable_matrices):
for t, varmat in enumerate(appli_varmats):
# Assign range constraints to variable matrices.
constraints += [ varmat >= 0, varmat <= 1 ]
# Assign equality constraints with state vectors.
constraints += [
cvx.sum(varmat[l]) == cvx_state_vectors[i][t-1][l]
for l in range(self.default_num_states)
]
constraints += [
cvx.sum((varmat.T)[l]) == cvx_state_vectors[i][t][l]
for l in range(self.default_num_states)
]

return cvx_state_vectors, constraints, cvx_variable_matrices

def disaggregate_thread(self, test_mains):
n_epochs = 6 # don't put in __init__, those are inference epochs!
n_samples = len(test_mains)
sigma = 100*np.ones(( n_samples, 1 ))
cvx_state_vectors, constraints, cvx_varmats = self.setup_cvx_constraints(
n_samples, len(self.means_vector))
# Preparing first terms of the objective function.
term_1 = 0
term_2 = 0
total_appli_energy = np.zeros_like(test_mains)
for i, (appli_name, means) in enumerate(self.means_vector.items()):
total_appli_energy += cvx_state_vectors[i]@means
appli_varmats = cvx_varmats[i]
transmat = self.transmat_vector[appli_name]
for varmat in appli_varmats:
term_1 -= cvx.sum(cvx.multiply(varmat, np.log(transmat)))

first_hot_state = cvx_state_vectors[i][0]
transition_p = self.pi_s_vector[appli_name]
term_2 -= cvx.sum(cvx.multiply(first_hot_state, np.log(transition_p)))

for epoch in range(n_epochs):
if epoch % 2:
# Alernative minimization on odd epochs.
usage = np.zeros(( n_samples, ))
for i, (appli_name, means) in enumerate(self.means_vector.items()):
usage += np.sum(s_[i]@means, axis=1)
sigma = (test_mains.flatten() - usage.flatten()).reshape(( -1, 1 ))
sigma = np.where(sigma < 1, 1, sigma)
else:
# Primary minimization on even epochs.
term_3 = 0
term_4 = 0


for t in range(len(test_mains)):
term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2))
term_3+= .5 * (np.log(sigma[t]**2))
expression = term_1 + term_2 + term_3 + term_4
expression = cvx.Minimize(expression)
prob = cvx.Problem(expression, constraints,)
prob.solve(solver=cvx.SCS,verbose=False,warm_start=True)
s_ = [i.value for i in cvx_state_vectors]
for t in range(n_samples):
term_3 += .5 * (np.log(sigma[t]**2))
term_4 += .5 * ((test_mains[t][0] - total_appli_energy[t][0])**2 / (sigma[t]**2))

objective = cvx.Minimize(term_1 + term_2 + term_3 + term_4)
prob = cvx.Problem(objective, constraints)
prob.solve(solver=cvx.SCS, verbose=False, warm_start=True)
s_ = [
np.zeros((n_samples, self.default_num_states)) if i.value is None
else i.value
for i in cvx_state_vectors
]

prediction_dict = {}
for appliance_id in range(self.num_appliances):
app_name = self.appliances[appliance_id]
app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1)
prediction_dict[app_name] = app_usage.flatten()
for i, (appli_name, means) in enumerate(self.means_vector.items()):
app_usage = np.sum(s_[i]@means, axis=1)
prediction_dict[appli_name] = app_usage.flatten()

# Store the result in the index corresponding to the thread.
return pd.DataFrame(prediction_dict, dtype="float32")

d[index] = pd.DataFrame(prediction_dict,dtype='float32')

def disaggregate_chunk(self, test_mains_list):

# Sistributes the test mains across multiple threads and runs them in parallel
manager = Manager()
d = manager.dict()

@nilmtk.docinherit.doc_inherit
def disaggregate_chunk(self, test_mains):
# Distributes the test mains across multiple threads and disaggregate in parallel
# Use all available CPUs except one for the OS.
n_workers = max(( 1, multiprocessing.cpu_count() - 1 ))
predictions_lst = []
for test_mains in test_mains_list:
test_mains_big = test_mains.values.flatten().reshape((-1,1))
self.arr_of_results = []
threads = []
for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))):
test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period]
t = Process(target=self.disaggregate_thread, args=(test_mains,test_block,d))
threads.append(t)

for t in threads:
t.start()

for t in threads:
t.join()

for i in range(len(threads)):
self.arr_of_results.append(d[i])
prediction = pd.concat(self.arr_of_results,axis=0)
predictions_lst.append(prediction)

return predictions_lst

with multiprocessing.Pool(n_workers) as workers:
for mains_df in test_mains:
mains_vect = mains_df.values.flatten().reshape(( -1, 1 ))
n_blocks = int(math.ceil(len(mains_vect)/self.time_period))
blocks = [
mains_vect[b * self.time_period:(b + 1) * self.time_period]
for b in range(n_blocks)
]
res_arr = workers.map(self.disaggregate_thread, blocks)
predictions_lst.append(pd.concat(res_arr, axis=0))

return predictions_lst

Loading