"""Define the SplineComp class."""
import numpy as np
from openmdao.components.interp_util.interp import InterpND
from openmdao.core.explicitcomponent import ExplicitComponent
from openmdao.components.interp_util.interp import SPLINE_METHODS
[docs]class SplineComp(ExplicitComponent):
"""
Interpolation component that can use any of OpenMDAO's interpolation methods.
Parameters
----------
**kwargs : dict
Interpolator options to pass onward.
Attributes
----------
interp_to_cp : dict
Dictionary of relationship between the interpolated data and its control points.
interps : dict
Dictionary of interpolations for each output.
_n_cp = int
Number of control points.
_spline_cache : list
Cached arguments passed to add_spline. These are processed in setup.
"""
[docs] def __init__(self, **kwargs):
"""
Initialize all attributes.
"""
super().__init__(**kwargs)
self.interp_to_cp = {}
self.interps = {}
self._spline_cache = []
self._n_cp = None
self._no_check_partials = True
def _declare_options(self):
"""
Declare options.
"""
super()._declare_options()
self.options.declare('vec_size', types=int, default=1,
desc='Number of points to evaluate at once.')
self.options.declare('method', values=SPLINE_METHODS, default='akima',
desc='Spline interpolation method to use for all outputs.')
self.options.declare('x_interp_val', types=(list, np.ndarray),
desc='List/array of x interpolated point values.')
self.options.declare('x_cp_val', default=None, types=(list, np.ndarray), allow_none=True,
desc='List/array of x control point values, must be monotonically '
'increasing. Optional alternative to num_cp. Not applicable for '
'bsplines. ')
self.options.declare('num_cp', default=None, types=(int, ), allow_none=True,
desc='Number of spline control points. Optional alternative to '
'x_cp_val. Required for bsplines. If None, num_cp will be a linspace '
'from 0 to 1.')
self.options.declare('interp_options', types=dict, default={},
desc='Dict contains the name and value of options specific to the '
'chosen interpolation method.')
[docs] def add_spline(self, y_cp_name, y_interp_name, y_cp_val=None, y_units=None):
"""
Add a single spline output to this component.
Parameters
----------
y_cp_name : str
Name for the y control points input.
y_interp_name : str
Name of the y interpolated points output.
y_cp_val : list or ndarray
List/array of default y control point values.
y_units : str or None
Units of the y variable.
"""
self._spline_cache.append((y_cp_name, y_interp_name, y_cp_val, y_units))
[docs] def setup(self):
"""
Perform some final setup and checks.
"""
interp_method = self.options['method']
x_cp_val = self.options['x_cp_val']
n_cp = self.options['num_cp']
if x_cp_val is not None:
if interp_method == 'bsplines':
msg = "{}: 'x_cp_val' is not a valid option when using method 'bsplines'. "
msg += "Set 'num_cp' instead."
raise ValueError(msg.format(self.msginfo))
if n_cp is not None:
msg = "{}: It is not valid to set both options 'x_cp_val' and 'num_cp'."
raise ValueError(msg.format(self.msginfo))
grid = np.asarray(x_cp_val)
n_cp = len(grid)
elif n_cp is not None:
grid = np.linspace(0, 1.0, n_cp)
else:
msg = "{}: Either option 'x_cp_val' or 'num_cp' must be set."
raise ValueError(msg.format(self.msginfo))
self._n_cp = n_cp
opts = {}
if 'interp_options' in self.options:
opts = self.options['interp_options']
vec_size = self.options['vec_size']
n_interp = len(self.options['x_interp_val'])
for y_cp_name, y_interp_name, y_cp_val, y_units in self._spline_cache:
self.add_output(y_interp_name, np.ones((vec_size, n_interp)), units=y_units)
if y_cp_val is None:
y_cp_val = np.ones((vec_size, n_cp))
elif len(y_cp_val.shape) < 2:
y_cp_val = y_cp_val.reshape((vec_size, n_cp))
self.add_input(name=y_cp_name, val=y_cp_val, units=y_units)
self.interp_to_cp[y_interp_name] = y_cp_name
row = np.repeat(np.arange(n_interp), n_cp)
col = np.tile(np.arange(n_cp), n_interp)
rows = np.tile(row, vec_size) + \
np.repeat(n_interp * np.arange(vec_size), n_interp * n_cp)
cols = np.tile(col, vec_size) + np.repeat(n_cp * np.arange(vec_size), n_interp * n_cp)
self.declare_partials(y_interp_name, y_cp_name, rows=rows, cols=cols)
# Separate data for each vec_size, but we only need to do sizing, so just pass
# in the first. Most interps aren't vectorized.
cp_val = y_cp_val[0, :]
self.interps[y_interp_name] = InterpND(points=(grid, ), values=cp_val,
method=interp_method,
x_interp=self.options['x_interp_val'],
extrapolate=True, **opts)
# The scipy methods do not support complex step.
if self.options['method'].startswith('scipy'):
self.set_check_partial_options('*', method='fd')
[docs] def compute(self, inputs, outputs):
"""
Perform the interpolation at run time.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
outputs : Vector
Unscaled, dimensional output variables read via outputs[key].
"""
for out_name, interp in self.interps.items():
values = inputs[self.interp_to_cp[out_name]]
interp._compute_d_dvalues = True
interp._compute_d_dx = False
interp.x_interp = self.options['x_interp_val']
try:
outputs[out_name] = interp._evaluate_spline(values)
except ValueError as err:
msg = "{}: Error interpolating output '{}':\n{}"
raise ValueError(msg.format(self.msginfo, out_name, str(err)))
[docs] def compute_partials(self, inputs, partials):
"""
Collect computed partial derivatives and return them.
Checks if the needed derivatives are cached already based on the
inputs vector. Refreshes the cache by re-computing the current point
if necessary.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
partials : Jacobian
Sub-jac components written to partials[output_name, input_name].
"""
for out_name, interp in self.interps.items():
cp_name = self.interp_to_cp[out_name]
dy_ddata = interp.spline_gradient()
partials[out_name, cp_name] = dy_ddata.flatten()