Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended Parmest Capability for weighted SSE objective #3535

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 207 additions & 10 deletions pyomo/contrib/parmest/parmest.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,110 @@ def _experiment_instance_creation_callback(

def SSE(model):
"""
Sum of squared error between `experiment_output` model and data values
Sum of squared error between the model prediction of measured variables and data values,
assuming Gaussian i.i.d errors.
"""
expr = sum((y - y_hat) ** 2 for y, y_hat in model.experiment_outputs.items())
expr = (1 / 2) * sum((y - y_hat) ** 2 for y_hat, y in model.experiment_outputs.items())
return expr

def SSE_weighted(model):
"""
Weighted sum of squared error between the model prediction of measured variables and data values,
assuming Gaussian i.i.d errors.
"""
expr = (1 / 2) * sum(
((y - y_hat) / model.measurement_error[y_hat]) ** 2
for y_hat, y in model.experiment_outputs.items()
)
return expr

# Calculate the sensitivity of measured variables to parameters using central finite difference
def compute_jacobian(model):
"""
Compute the Jacobian matrix using central finite difference.

Arguments:
model: Pyomo model containing experiment_outputs and measurement_error.

Returns:
J: Jacobian matrix
"""
# get the measured variables
y_hat_list = [y_hat for y_hat, y in model.experiment_outputs.items()]

# get the parameters
params = [k for k, v in model.unknown_parameters.items()]
param_values = [p.value for p in params]

# get the number of parameters and measured variables
n_params = len(param_values)
n_outputs = len(y_hat_list)

# compute the sensitivity of measured variables to the parameters (Jacobian)
J = np.zeros((n_outputs, n_params))

perturbation = 1e-6 # Small perturbation for finite differences
for i, param in enumerate(params):
# store original value of the parameter
orig_value = param_values[i]

# Forward perturbation
param.fix(orig_value + perturbation)

# solve model
solver = pyo.SolverFactory('ipopt')
solver.solve(model)

# forward perturbation measured variables
y_hat_plus = [pyo.value(y_hat) for y_hat, y in model.experiment_outputs.items()]

# Backward perturbation
param.fix(orig_value - perturbation)

# resolve model
solver.solve(model)

# backward perturbation measured variables
y_hat_minus = [pyo.value(y_hat) for y_hat, y in model.experiment_outputs.items()]

# Restore original parameter value
param.fix(orig_value)

# Central difference approximation for the Jacobian
J[:, i] = [(y_hat_plus[w] - y_hat_minus[w]) / (2 * perturbation) for w in range(len(y_hat_plus))]

return J

# compute the Fisher information matrix of the estimated parameters
def compute_FIM(model):
"""
Calculate the Fisher information matrix using the Jacobian and model measurement errors.

Arguments:
model: Pyomo model containing experiment_outputs and measurement_error.

Returns:
FIM: Fisher information matrix.
"""

# extract the measured variables and measurement errors
y_hat_list = [y_hat for y_hat, y in model.experiment_outputs.items()]
error_list = [model.measurement_error[y_hat] for y_hat, y in model.experiment_outputs.items()]

# check if error list is consistent
if len(error_list) == 0 or len(y_hat_list) == 0:
raise ValueError("Experiment outputs and measurement errors cannot be empty.")

# create the weight matrix W (inverse of variance)
W = np.diag([1 / (err**2) for err in error_list])

# calculate Jacobian matrix
J = compute_jacobian(model)

# calculate the FIM
FIM = J.T @ W @ J

return FIM

class Estimator(object):
"""
Expand Down Expand Up @@ -428,6 +527,8 @@ def _create_parmest_model(self, experiment_number):
# custom functions
if self.obj_function == 'SSE':
second_stage_rule = SSE
elif self.obj_function == 'SSE_weighted':
second_stage_rule = SSE_weighted
else:
# A custom function uses model.experiment_outputs as data
second_stage_rule = self.obj_function
Expand Down Expand Up @@ -578,10 +679,58 @@ def _Q_opt(
the constant cancels out. (was scaled by 1/n because it computes an
expected value.)
'''
cov = 2 * sse / (n - l) * inv_red_hes
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
if self.obj_function == 'SSE': # covariance calculation for measurements in the same unit
# get the model
model = self.exp_list[0].get_labeled_model()

# get the measurement error
meas_error = [model.measurement_error[y_hat] for y_hat, y in model.experiment_outputs.items()]

# check if the user specifies the measurement error
if all(item is None for item in meas_error):
cov = sse / (n - l) * inv_red_hes # covariance matrix
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
else:
cov = meas_error[0] * inv_red_hes # covariance matrix
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
elif self.obj_function == 'SSE_weighted': # covariance calculation for measurements in diff. units
# Store the FIM of all the experiments
FIM_all_exp = []
for experiment in self.exp_list: # loop through the experiments
# get a copy of the model
model = experiment.get_labeled_model().clone()

# fix the parameter values to the optimal values estimated
params = [k for k, v in model.unknown_parameters.items()]
for param in params:
param.fix(thetavals[param.name])

# resolve the model
solver = pyo.SolverFactory("ipopt")
solver.solve(model)

# compute the FIM
FIM_all_exp.append(compute_FIM(model))

# Total FIM of experiments
FIM_total = np.sum(FIM_all_exp, axis=0)

# covariance matrix
cov = np.linalg.inv(FIM_total)
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
# elif self.obj_function == 'SSE_weighted':
# cov = inv_red_hes
# cov = pd.DataFrame(
# cov, index=thetavals.keys(), columns=thetavals.keys()
# )
else:
raise NotImplementedError('Covariance calculation is only supported for SSE and SSE_weighted objectives')

thetavals = pd.Series(thetavals)

Expand Down Expand Up @@ -1726,10 +1875,58 @@ def _Q_opt(
the constant cancels out. (was scaled by 1/n because it computes an
expected value.)
'''
cov = 2 * sse / (n - l) * inv_red_hes
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
if self.obj_function == 'SSE': # covariance calculation for measurements in the same unit
# get the model
model = self.exp_list[0].get_labeled_model()

# get the measurement error
meas_error = [model.measurement_error[y_hat] for y_hat, y in model.experiment_outputs.items()]

# check if the user specifies the measurement error
if all(item is None for item in meas_error):
cov = sse / (n - l) * inv_red_hes # covariance matrix
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
else:
cov = meas_error[0] * inv_red_hes # covariance matrix
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
elif self.obj_function == 'SSE_weighted': # covariance calculation for measurements in diff. units
# Store the FIM of all the experiments
FIM_all_exp = []
for experiment in self.exp_list: # loop through the experiments
# get a copy of the model
model = experiment.get_labeled_model().clone()

# fix the parameter values to the optimal values estimated
params = [k for k, v in model.unknown_parameters.items()]
for param in params:
param.fix(thetavals[param.name])

# resolve the model
solver = pyo.SolverFactory("ipopt")
solver.solve(model)

# compute the FIM
FIM_all_exp.append(compute_FIM(model))

# Total FIM of experiments
FIM_total = np.sum(FIM_all_exp, axis=0)

# covariance matrix
cov = np.linalg.inv(FIM_total)
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
# elif self.obj_function == 'SSE_weighted':
# cov = inv_red_hes
# cov = pd.DataFrame(
# cov, index=thetavals.keys(), columns=thetavals.keys()
# )
else:
raise NotImplementedError('Covariance calculation is only supported for SSE and SSE_weighted objectives')

thetavals = pd.Series(thetavals)

Expand Down