"""Define the BalanceComp class."""
from types import FunctionType
import numpy as np
from openmdao.core.implicitcomponent import ImplicitComponent
from openmdao.utils import cs_safe
from openmdao.utils.array_utils import shape_to_len
[docs]class BalanceComp(ImplicitComponent):
"""
A simple equation balance for solving implicit equations.
Parameters
----------
name : str
The name of the state variable to be created.
eq_units : str or None
Units for the left-hand-side and right-hand-side of the equation to be balanced.
lhs_name : str or None
Optional name for the LHS variable associated with the implicit state variable. If
None, the default will be used: 'lhs:{name}'.
rhs_name : str or None
Optional name for the RHS variable associated with the implicit state variable. If
None, the default will be used: 'rhs:{name}'.
rhs_val : int, float, or np.array
Default value for the RHS of the given state. Must be compatible
with the shape (optionally) given by the val or shape option in kwargs.
use_mult : bool
Specifies whether the LHS multiplier is to be used. If True, then an additional
input `mult_name` is created, with the default value given by `mult_val`, that
multiplies lhs. Default is False.
mult_name : str or None
Optional name for the LHS multiplier variable associated with the implicit state
variable. If None, the default will be used: 'mult:{name}'.
mult_val : int, float, or np.array
Default value for the LHS multiplier of the given state. Must be compatible
with the shape (optionally) given by the val or shape option in kwargs.
normalize : bool
Specifies whether or not the resulting residual should be normalized by a quadratic
function of the RHS.
val : float, int, or np.ndarray
Set initial value for the state.
**kwargs : dict
Additional arguments to be passed for the creation of the implicit state variable.
(see `add_output` method).
Attributes
----------
_state_vars : dict
Cache the data provided during `add_balance`
so everything can be saved until setup is called.
"""
[docs] def initialize(self):
"""
Declare options.
"""
self.options.declare('guess_func', types=FunctionType, allow_none=True, default=None,
recordable=False, desc='A callable function in the form '
'f(inputs, outputs, residuals) that can provide an initial "guess" '
'value of the state variable(s) based on the inputs, outputs and '
'residuals.')
[docs] def __init__(self, name=None, eq_units=None, lhs_name=None, rhs_name=None, rhs_val=0.0,
use_mult=False, mult_name=None, mult_val=1.0, normalize=True, val=None, **kwargs):
r"""
Initialize a BalanceComp, optionally creating a new implicit state variable.
The BalanceComp allows for the creation of one or more implicit state variables,
and computes the residuals for those variables based on the following equation.
.. math::
\mathcal{R}_{name} =
\frac{f_{mult}(x,...) \times f_{lhs}(x,...) - f_{rhs}(x,...)}{f_{norm}(f_{rhs}(x,...))}
Where :math:`f_{lhs}` represents the left-hand-side of the equation,
:math:`f_{rhs}` represents the right-hand-side, and :math:`f_{mult}`
is an optional multiplier on the left hand side. At least one of these
quantities should be a function of the associated state variable. If
use_mult is True the default value of the multiplier is 1. The optional normalization
function :math:`f_{norm}(f_{rhs}(x,...))` is computed as:
.. math::
f_{norm}(f_{rhs}(x,...)) =
\begin{cases}
\left| f_{rhs} \right|, & \text{if normalize and } \left| f_{rhs} \right| \geq 2 \\
0.25 f_{rhs}^2 + 1, & \text{if normalize and } \left| f_{rhs} \right| < 2 \\
1, & \text{if not normalize}
\end{cases}
New state variables, and their associated residuals are created by
calling `add_balance`. As an example, solving the equation
:math:`x**2 = 2` implicitly can be be accomplished as follows:
.. code-block:: python
prob = Problem()
bal = BalanceComp()
bal.add_balance('x', val=1.0)
exec_comp = ExecComp('y=x**2')
prob.model.add_subsystem(name='exec', subsys=exec_comp)
prob.model.add_subsystem(name='balance', subsys=bal)
prob.model.connect('balance.x', 'exec.x')
prob.model.connect('exec.y', 'balance.lhs:x')
prob.model.linear_solver = DirectSolver()
prob.model.nonlinear_solver = NewtonSolver(solve_subsystems=False)
prob.setup()
prob.set_val('exec.x', 2)
prob.run_model()
The arguments to add_balance can be provided on initialization to provide a balance
with a one state/residual without the need to call `add_balance`:
.. code-block:: python
prob = Problem()
bal = BalanceComp('x', val=1.0)
exec_comp = ExecComp('y=x**2')
prob.model.add_subsystem(name='exec', subsys=exec_comp)
prob.model.add_subsystem(name='balance', subsys=bal)
prob.model.connect('balance.x', 'exec.x')
prob.model.connect('exec.y', 'balance.lhs:x')
prob.model.linear_solver = DirectSolver()
prob.model.nonlinear_solver = NewtonSolver(solve_subsystems=False)
prob.setup()
prob.set_val('exec.x', 2)
prob.run_model()
"""
if 'guess_func' in kwargs:
super().__init__(guess_func=kwargs['guess_func'])
kwargs.pop('guess_func')
else:
super().__init__()
self._state_vars = {}
if name is not None:
self.add_balance(name, eq_units, lhs_name, rhs_name, rhs_val,
use_mult, mult_name, mult_val, normalize, val, **kwargs)
self._no_check_partials = True
[docs] def apply_nonlinear(self, inputs, outputs, residuals):
"""
Calculate the residual for each balance.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
outputs : Vector
Unscaled, dimensional output variables read via outputs[key].
residuals : Vector
Unscaled, dimensional residuals written to via residuals[key].
"""
for name, options in self._state_vars.items():
lhs = inputs[options['lhs_name']]
rhs = inputs[options['rhs_name']]
# set dtype to rhs.dtype to prevent
# "Casting complex values to real discards the imaginary part" warning.
_scale_factor = np.ones((rhs.shape), dtype=rhs.dtype)
if options['normalize']:
# Indices where the rhs is near zero or not near zero
idxs_nz = np.where(cs_safe.abs(rhs) < 2)
idxs_nnz = np.where(cs_safe.abs(rhs) >= 2)
# Compute scaling factors
# scale factor that normalizes by the rhs, except near 0
_scale_factor[idxs_nnz] = 1.0 / cs_safe.abs(rhs[idxs_nnz])
_scale_factor[idxs_nz] = 1.0 / (.25 * rhs[idxs_nz] ** 2 + 1)
if options['use_mult']:
residuals[name] = (inputs[options['mult_name']] * lhs - rhs) * _scale_factor
else:
residuals[name] = (lhs - rhs) * _scale_factor
[docs] def linearize(self, inputs, outputs, jacobian):
"""
Calculate the partials of the residual for each balance.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
outputs : Vector
Unscaled, dimensional output variables read via outputs[key].
jacobian : Jacobian
Sub-jac components written to jacobian[output_name, input_name].
"""
for name, options in self._state_vars.items():
lhs_name = options['lhs_name']
rhs_name = options['rhs_name']
lhs = inputs[lhs_name]
rhs = inputs[rhs_name]
_scale_factor = np.ones((rhs.shape), dtype=rhs.dtype)
_dscale_drhs = np.zeros((rhs.shape), dtype=rhs.dtype)
if options['normalize']:
# Indices where the rhs is near zero or not near zero
idxs_nz = np.where(cs_safe.abs(rhs) < 2)[0]
idxs_nnz = np.where(cs_safe.abs(rhs) >= 2)[0]
# scale factor that normalizes by the rhs, except near 0
_scale_factor[idxs_nnz] = 1.0 / cs_safe.abs(rhs[idxs_nnz])
_scale_factor[idxs_nz] = 1.0 / (.25 * rhs[idxs_nz] ** 2 + 1)
_dscale_drhs[idxs_nnz] = -np.sign(rhs[idxs_nnz]) / rhs[idxs_nnz]**2
_dscale_drhs[idxs_nz] = -.5 * rhs[idxs_nz] / (.25 * rhs[idxs_nz] ** 2 + 1) ** 2
if options['use_mult']:
mult_name = options['mult_name']
mult = inputs[mult_name]
# Partials of residual wrt mult
deriv = lhs * _scale_factor
jacobian[name, mult_name] = deriv.flatten()
else:
mult = 1.0
# Partials of residual wrt rhs
deriv = (mult * lhs - rhs) * _dscale_drhs - _scale_factor
jacobian[name, rhs_name] = deriv.flatten()
# Partials of residual wrt lhs
deriv = mult * _scale_factor
jacobian[name, lhs_name] = deriv.flatten()
[docs] def guess_nonlinear(self, inputs, outputs, residuals):
"""
Provide initial guess for states.
Override this method to set the initial guess for states.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
outputs : Vector
Unscaled, dimensional output variables read via outputs[key].
residuals : Vector
Unscaled, dimensional residuals written to via residuals[key].
"""
if self.options['guess_func'] is not None:
self.options['guess_func'](inputs, outputs, residuals)
[docs] def add_balance(self, name, eq_units=None, lhs_name=None, rhs_name=None, rhs_val=0.0,
use_mult=False, mult_name=None, mult_val=1.0, normalize=True, val=None,
**kwargs):
"""
Add a new state variable and associated equation to be balanced.
This will create new inputs `lhs:name`, `rhs:name`, and `mult:name` that will
define the left and right sides of the equation to be balanced, and a
multiplier for the left-hand-side.
Parameters
----------
name : str
The name of the state variable to be created.
eq_units : str or None
Units for the left-hand-side and right-hand-side of the equation to be balanced.
lhs_name : str or None
Optional name for the LHS variable associated with the implicit state variable. If
None, the default will be used: 'lhs:{name}'.
rhs_name : str or None
Optional name for the RHS variable associated with the implicit state variable. If
None, the default will be used: 'rhs:{name}'.
rhs_val : int, float, or np.array
Default value for the RHS. Must be compatible with the shape (optionally)
given by the val or shape option in kwargs.
use_mult : bool
Specifies whether the LHS multiplier is to be used. If True, then an additional
input `mult_name` is created, with the default value given by `mult_val`, that
multiplies lhs. Default is False.
mult_name : str or None
Optional name for the LHS multiplier variable associated with the implicit state
variable. If None, the default will be used: 'mult:{name}'.
mult_val : int, float, or np.array
Default value for the LHS multiplier. Must be compatible with the shape (optionally)
given by the val or shape option in kwargs.
normalize : bool
Specifies whether or not the resulting residual should be normalized by a quadratic
function of the RHS.
val : float, int, or np.ndarray
Set initial value for the state.
**kwargs : dict
Additional arguments to be passed for the creation of the implicit state variable.
(see `add_output` method).
"""
options = {'kwargs': kwargs,
'eq_units': eq_units,
'lhs_name': lhs_name,
'rhs_name': rhs_name,
'rhs_val': rhs_val,
'use_mult': use_mult,
'mult_name': mult_name,
'mult_val': mult_val,
'normalize': normalize}
self._state_vars[name] = options
if val is None:
# If user doesn't specify initial guess for val, we can size problem from initial
# rhs_val.
if 'shape' not in kwargs and np.ndim(rhs_val) > 0:
kwargs['shape'] = rhs_val.shape
else:
options['kwargs']['val'] = val
meta = self.add_output(name, **options['kwargs'])
shape = meta['shape']
for s in ('lhs', 'rhs', 'mult'):
if options['{0}_name'.format(s)] is None:
options['{0}_name'.format(s)] = '{0}:{1}'.format(s, name)
self.add_input(options['lhs_name'],
val=np.ones(shape),
units=options['eq_units'])
self.add_input(options['rhs_name'],
val=options['rhs_val'] * np.ones(shape),
units=options['eq_units'])
if options['use_mult']:
self.add_input(options['mult_name'],
val=options['mult_val'] * np.ones(shape),
units=None)
ar = np.arange(shape_to_len(shape))
self.declare_partials(of=name, wrt=options['lhs_name'], rows=ar, cols=ar, val=1.0)
self.declare_partials(of=name, wrt=options['rhs_name'], rows=ar, cols=ar, val=1.0)
if options['use_mult']:
self.declare_partials(of=name, wrt=options['mult_name'], rows=ar, cols=ar, val=1.0)