|
| 1 | +# ___________________________________________________________________________ |
| 2 | +# |
| 3 | +# Pyomo: Python Optimization Modeling Objects |
| 4 | +# Copyright (c) 2008-2025 |
| 5 | +# National Technology and Engineering Solutions of Sandia, LLC |
| 6 | +# Under the terms of Contract DE-NA0003525 with National Technology and |
| 7 | +# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain |
| 8 | +# rights in this software. |
| 9 | +# This software is distributed under the 3-clause BSD License. |
| 10 | +# ___________________________________________________________________________ |
| 11 | + |
| 12 | +from pyomo.common.dependencies import numpy as np |
| 13 | +import pyomo.environ as pyo |
| 14 | + |
| 15 | + |
| 16 | +def _f_cubic(x, alpha, s=None): |
| 17 | + """ |
| 18 | + Cubic function: |
| 19 | + y = a1 + a2 * x + a3 * x^2 + a4 * x^3 |
| 20 | +
|
| 21 | + Optionally if s is provided it is a segment index. |
| 22 | + y = a1[s] + a2[s] * x + a3[s] * x^2 + a4[s] * x^3 |
| 23 | +
|
| 24 | + This is used to write constraints more compactly. |
| 25 | +
|
| 26 | + Args: |
| 27 | + x: x variable, numeric, numpy array, or Pyomo component |
| 28 | + alpha: cubic parameters, numeric or Pyomo component |
| 29 | + s: optional segment index |
| 30 | +
|
| 31 | + Returns: |
| 32 | + Pyomo expression, numpy array, or float |
| 33 | + """ |
| 34 | + if s is None: |
| 35 | + return alpha[1] + alpha[2] * x + alpha[3] * x**2 + alpha[4] * x**3 |
| 36 | + return alpha[s, 1] + alpha[s, 2] * x + alpha[s, 3] * x**2 + alpha[s, 4] * x**3 |
| 37 | + |
| 38 | + |
| 39 | +def _fx_cubic(x, alpha, s=None): |
| 40 | + """ |
| 41 | + Cubic function first derivative: |
| 42 | + dy/dx = a2 + 2 * a3 * x + 3 * a4 * x^2 |
| 43 | +
|
| 44 | + Optionally if s is provided it is a segment index. |
| 45 | + dy/dx = a2[s] + 2 * a3[s] * x + 3 * a4[s] * x^2 |
| 46 | +
|
| 47 | + This is used to write constraints more compactly. |
| 48 | +
|
| 49 | + Args: |
| 50 | + x: x variable, numeric, numpy array, or Pyomo component |
| 51 | + alpha: cubic parameters, numeric or Pyomo component |
| 52 | + s: optional segment index |
| 53 | +
|
| 54 | + Returns: |
| 55 | + Pyomo expression, numpy array, or float |
| 56 | + """ |
| 57 | + if s is None: |
| 58 | + return alpha[2] + 2 * alpha[3] * x + 3 * alpha[4] * x**2 |
| 59 | + return alpha[s, 2] + 2 * alpha[s, 3] * x + 3 * alpha[s, 4] * x**2 |
| 60 | + |
| 61 | + |
| 62 | +def _fxx_cubic(x, alpha, s=None): |
| 63 | + """ |
| 64 | + Cubic function second derivative: |
| 65 | + d2y/dx2 = 2 * a3 + 6 * a4 * x |
| 66 | +
|
| 67 | + Optionally if s is provided it is a segment index. |
| 68 | + d2y/dx2 = 2 * a3[s] + 6 * a4[s] * x |
| 69 | +
|
| 70 | + This is used to write constraints more compactly. |
| 71 | +
|
| 72 | + Args: |
| 73 | + x: x variable, numeric, numpy array, or Pyomo component |
| 74 | + alpha: cubic parameters, numeric or Pyomo component |
| 75 | + s: optional segment index |
| 76 | +
|
| 77 | + Returns: |
| 78 | + Pyomo expression, numpy array, or float |
| 79 | + """ |
| 80 | + if s is None: |
| 81 | + return 2 * alpha[3] + 6 * alpha[4] * x |
| 82 | + return 2 * alpha[s, 3] + 6 * alpha[s, 4] * x |
| 83 | + |
| 84 | + |
| 85 | +class CsplineParameters: |
| 86 | + def __init__(self, model=None, fptr=None): |
| 87 | + """Cubic spline parameters class. This can be used to read and |
| 88 | + write parameters or calculate cubic spline function values and |
| 89 | + derivatives for testing. |
| 90 | + """ |
| 91 | + if model is not None and fptr is not None: |
| 92 | + raise ValueError("Please specify at most one of model or fptr.") |
| 93 | + if model is not None: |
| 94 | + self.get_parameters_from_model(model) |
| 95 | + elif fptr is not None: |
| 96 | + self.get_parameters_from_file(fptr) |
| 97 | + else: |
| 98 | + self.knots = np.array([]) |
| 99 | + self.a1 = np.array([]) |
| 100 | + self.a2 = np.array([]) |
| 101 | + self.a3 = np.array([]) |
| 102 | + self.a4 = np.array([]) |
| 103 | + |
| 104 | + @property |
| 105 | + def n_knots(self): |
| 106 | + """Number of knots""" |
| 107 | + return len(self.knots) |
| 108 | + |
| 109 | + @property |
| 110 | + def n_segments(self): |
| 111 | + """Number of segments""" |
| 112 | + return len(self.knots) - 1 |
| 113 | + |
| 114 | + @property |
| 115 | + def valid(self): |
| 116 | + """Ensure that the number of knots and cubic parameters is valid""" |
| 117 | + return ( |
| 118 | + len(self.a1) == self.n_segments |
| 119 | + and len(self.a2) == self.n_segments |
| 120 | + and len(self.a3) == self.n_segments |
| 121 | + and len(self.a4) == self.n_segments |
| 122 | + ) |
| 123 | + |
| 124 | + def get_parameters_from_model(self, m): |
| 125 | + """Read parameters from a Pyomo model used to calculate them""" |
| 126 | + self.knots = [pyo.value(x) for x in m.x.values()] |
| 127 | + self.a1 = [None] * len(m.seg_idx) |
| 128 | + self.a2 = [None] * len(m.seg_idx) |
| 129 | + self.a3 = [None] * len(m.seg_idx) |
| 130 | + self.a4 = [None] * len(m.seg_idx) |
| 131 | + for s in m.seg_idx: |
| 132 | + self.a1[s - 1] = pyo.value(m.alpha[s, 1]) |
| 133 | + self.a2[s - 1] = pyo.value(m.alpha[s, 2]) |
| 134 | + self.a3[s - 1] = pyo.value(m.alpha[s, 3]) |
| 135 | + self.a4[s - 1] = pyo.value(m.alpha[s, 4]) |
| 136 | + self.knots = np.array(self.knots) |
| 137 | + self.a1 = np.array(self.a1) |
| 138 | + self.a2 = np.array(self.a2) |
| 139 | + self.a3 = np.array(self.a3) |
| 140 | + self.a4 = np.array(self.a4) |
| 141 | + |
| 142 | + def get_parameters_from_file(self, fptr): |
| 143 | + """Read parameters from a file""" |
| 144 | + # line 1: number of segments |
| 145 | + ns = int(fptr.readline()) |
| 146 | + # Make param lists |
| 147 | + self.knots = [None] * (ns + 1) |
| 148 | + self.a1 = [None] * ns |
| 149 | + self.a2 = [None] * ns |
| 150 | + self.a3 = [None] * ns |
| 151 | + self.a4 = [None] * ns |
| 152 | + # Read params |
| 153 | + for i in range(ns + 1): |
| 154 | + self.knots[i] = float(fptr.readline()) |
| 155 | + for a in [self.a1, self.a2, self.a3, self.a4]: |
| 156 | + for i in range(ns): |
| 157 | + a[i] = float(fptr.readline()) |
| 158 | + self.knots = np.array(self.knots) |
| 159 | + self.a1 = np.array(self.a1) |
| 160 | + self.a2 = np.array(self.a2) |
| 161 | + self.a3 = np.array(self.a3) |
| 162 | + self.a4 = np.array(self.a4) |
| 163 | + |
| 164 | + def write_parameters(self, fptr): |
| 165 | + """Write parameters to a file""" |
| 166 | + assert self.valid |
| 167 | + fptr.write(f"{self.n_segments}\n") |
| 168 | + for l in [self.knots, self.a1, self.a2, self.a3, self.a4]: |
| 169 | + for x in l: |
| 170 | + fptr.write(f"{x}\n") |
| 171 | + |
| 172 | + def segment(self, x): |
| 173 | + """Get the spline segment containing x. |
| 174 | +
|
| 175 | + Args: |
| 176 | + x: location, float or numpy array |
| 177 | +
|
| 178 | + Returns: |
| 179 | + segment(s) containing x, if x is a numpy array a numpy |
| 180 | + array of integers is returned otherwise return an integer |
| 181 | + """ |
| 182 | + s = np.searchsorted(self.knots, x) |
| 183 | + # If x is past the last knot, use the last segment |
| 184 | + # this could happen just due to round-off even if |
| 185 | + # you don't intend to extrapolate |
| 186 | + s[s >= self.n_segments] = self.n_segments - 1 |
| 187 | + return s |
| 188 | + |
| 189 | + def f(self, x): |
| 190 | + """Get f(x) |
| 191 | +
|
| 192 | + Args: |
| 193 | + x: location, numpy array float |
| 194 | +
|
| 195 | + Returns: |
| 196 | + f(x) numpy array if x is numpy array or float |
| 197 | + """ |
| 198 | + s = self.segment(x) |
| 199 | + return self.a1[s] + self.a2[s] * x + self.a3[s] * x**2 + self.a4[s] * x**3 |
| 200 | + |
| 201 | + |
| 202 | +def cubic_parameters_model( |
| 203 | + x_data, |
| 204 | + y_data, |
| 205 | + x_knots=None, |
| 206 | + end_point_constraint=True, |
| 207 | + objective_form=False, |
| 208 | + name="cubic spline parameters model", |
| 209 | +): |
| 210 | + """Create a Pyomo model to calculate parameters for a cubic spline. By default |
| 211 | + this creates a square linear model, but optionally it can leave off the endpoint |
| 212 | + second derivative constraints and add an objective function for fitting data |
| 213 | + instead. The purpose of the alternative least squares form is to allow the spline |
| 214 | + to be constrained in other ways that don't require a perfect data match. The knots |
| 215 | + don't need to be the same as the x data to allow, for example, additional segments |
| 216 | + for extrapolation. This is not the most computationally efficient way to calculate |
| 217 | + parameters, but since it is used to precalculate parameters, speed is not important. |
| 218 | +
|
| 219 | + Args: |
| 220 | + x_data: sorted list of x data |
| 221 | + y_data: list of y data |
| 222 | + x_knots: optional sorted list of knots (default is to use x_data) |
| 223 | + end_point_constraint: if True add constraint that second derivative = 0 at |
| 224 | + endpoints (default=True) |
| 225 | + objective_form: if True write a least squares objective rather than constraints |
| 226 | + to match data (default=False) |
| 227 | + name: optional model name |
| 228 | +
|
| 229 | + Returns: |
| 230 | + Pyomo ConcreteModel |
| 231 | + """ |
| 232 | + n_data = len(x_data) |
| 233 | + assert n_data == len(y_data) |
| 234 | + if x_knots is None: |
| 235 | + n_knots = n_data |
| 236 | + x_knots = x_data |
| 237 | + else: |
| 238 | + n_knots = len(x_knots) |
| 239 | + |
| 240 | + m = pyo.ConcreteModel(name=name) |
| 241 | + # Sets of indexes for knots, segments, and data |
| 242 | + m.knt_idx = pyo.RangeSet(n_knots) |
| 243 | + m.seg_idx = pyo.RangeSet(n_knots - 1) |
| 244 | + m.dat_idx = pyo.RangeSet(n_data) |
| 245 | + |
| 246 | + m.x_data = pyo.Param(m.dat_idx, initialize={i + 1: x for i, x in enumerate(x_data)}) |
| 247 | + m.y_data = pyo.Param(m.dat_idx, initialize={i + 1: x for i, x in enumerate(y_data)}) |
| 248 | + m.x = pyo.Param(m.knt_idx, initialize={i + 1: x for i, x in enumerate(x_knots)}) |
| 249 | + m.alpha = pyo.Var(m.seg_idx, {1, 2, 3, 4}, initialize=1) |
| 250 | + |
| 251 | + # f_s(x) = f_s+1(x) |
| 252 | + @m.Constraint(m.seg_idx) |
| 253 | + def y_eqn(blk, s): |
| 254 | + if s == m.seg_idx.last(): |
| 255 | + return pyo.Constraint.Skip |
| 256 | + return _f_cubic(m.x[s + 1], m.alpha, s) == _f_cubic(m.x[s + 1], m.alpha, s + 1) |
| 257 | + |
| 258 | + # f'_s(x) = f'_s+1(x) |
| 259 | + @m.Constraint(m.seg_idx) |
| 260 | + def yx_eqn(blk, s): |
| 261 | + if s == m.seg_idx.last(): |
| 262 | + return pyo.Constraint.Skip |
| 263 | + return _fx_cubic(m.x[s + 1], m.alpha, s) == _fx_cubic( |
| 264 | + m.x[s + 1], m.alpha, s + 1 |
| 265 | + ) |
| 266 | + |
| 267 | + # f"_s(x) = f"_s+1(x) |
| 268 | + @m.Constraint(m.seg_idx) |
| 269 | + def yxx_eqn(blk, s): |
| 270 | + if s == m.seg_idx.last(): |
| 271 | + return pyo.Constraint.Skip |
| 272 | + return _fxx_cubic(m.x[s + 1], m.alpha, s) == _fxx_cubic( |
| 273 | + m.x[s + 1], m.alpha, s + 1 |
| 274 | + ) |
| 275 | + |
| 276 | + # Identify segments used to predict y_data at each x_data. We use search in |
| 277 | + # instead of a dict lookup, since we don't want to require the data to be at |
| 278 | + # the knots, even though that is almost always the case. |
| 279 | + idx = np.searchsorted(x_knots, x_data) |
| 280 | + |
| 281 | + if end_point_constraint: |
| 282 | + add_endpoint_second_derivative_constraints(m) |
| 283 | + |
| 284 | + # Expression for difference between data and prediction |
| 285 | + @m.Expression(m.dat_idx) |
| 286 | + def ydiff(blk, d): |
| 287 | + s = idx[d - 1] + 1 |
| 288 | + if s >= m.seg_idx.last(): |
| 289 | + s -= 1 |
| 290 | + return m.y_data[d] - _f_cubic(m.x_data[d], m.alpha, s) |
| 291 | + |
| 292 | + if objective_form: |
| 293 | + # least squares objective |
| 294 | + m.obj = pyo.Objective(expr=sum(m.ydiff[d] ** 2 for d in m.dat_idx)) |
| 295 | + else: |
| 296 | + |
| 297 | + @m.Constraint(m.dat_idx) |
| 298 | + def match_data(blk, d): |
| 299 | + return m.ydiff[d] == 0 |
| 300 | + |
| 301 | + return m |
| 302 | + |
| 303 | + |
| 304 | +def add_endpoint_second_derivative_constraints(m): |
| 305 | + """Usually cubic splines use the endpoint constraints that the second |
| 306 | + derivative is zero. This function adds those constraints to a model |
| 307 | + """ |
| 308 | + |
| 309 | + @m.Constraint([m.seg_idx.first(), m.seg_idx.last()]) |
| 310 | + def yxx_endpoint_eqn(blk, s): |
| 311 | + if s == m.seg_idx.last(): |
| 312 | + j = m.knt_idx.last() |
| 313 | + else: |
| 314 | + j = s |
| 315 | + return _fxx_cubic(m.x[j], m.alpha, s) == 0 |
0 commit comments