"""Define the ExplicitComponent class."""
import numpy as np
from itertools import chain
from openmdao.jacobians.dictionary_jacobian import DictionaryJacobian
from openmdao.utils.coloring import _ColSparsityJac
from openmdao.core.component import Component
from openmdao.vectors.vector import _full_slice
from openmdao.utils.class_util import overrides_method
from openmdao.recorders.recording_iteration_stack import Recording
from openmdao.core.constants import INT_DTYPE, _UNDEFINED
from openmdao.utils.general_utils import is_undefined
_tuplist = (tuple, list)
[docs]class ExplicitComponent(Component):
"""
Class to inherit from when all output variables are explicit.
Parameters
----------
**kwargs : dict of keyword arguments
Keyword arguments that will be mapped into the Component options.
Attributes
----------
_has_compute_partials : bool
If True, the instance overrides compute_partials.
_vjp_hash : int or None
Hash value for the last set of inputs to the compute_primal function.
_vjp_fun : function or None
The vector-Jacobian product function.
"""
[docs] def __init__(self, **kwargs):
"""
Store some bound methods so we can detect runtime overrides.
"""
super().__init__(**kwargs)
self._has_compute_partials = overrides_method('compute_partials', self, ExplicitComponent)
self.options.undeclare('assembled_jac_type')
self._vjp_hash = None
self._vjp_fun = None
@property
def nonlinear_solver(self):
"""
Get the nonlinear solver for this system.
"""
return self._nonlinear_solver
@nonlinear_solver.setter
def nonlinear_solver(self, solver):
"""
Raise an exception.
"""
raise RuntimeError(f"{self.msginfo}: Explicit components don't support nonlinear solvers.")
@property
def linear_solver(self):
"""
Get the linear solver for this system.
"""
return self._linear_solver
@linear_solver.setter
def linear_solver(self, solver):
"""
Raise an exception.
"""
raise RuntimeError(f"{self.msginfo}: Explicit components don't support linear solvers.")
def _configure(self):
"""
Configure this system to assign children settings and detect if matrix_free.
"""
if is_undefined(self.matrix_free):
self.matrix_free = overrides_method('compute_jacvec_product', self, ExplicitComponent)
def _jac_wrt_iter(self, wrt_matches=None):
"""
Iterate over (name, start, end, vec, slice, dist_sizes) for each column var in the jacobian.
Parameters
----------
wrt_matches : set or None
Only include row vars that are contained in this set. This will determine what
the actual offsets are, i.e. the offsets will be into a reduced jacobian
containing only the matching columns.
Yields
------
str
Absolute name of 'wrt' variable.
int
Starting index.
int
Ending index.
Vector
The _inputs vector.
slice
A full slice.
ndarray or None
Distributed sizes if var is distributed else None
"""
start = end = 0
local_ins = self._var_abs2meta['input']
toidx = self._var_allprocs_abs2idx
sizes = self._var_sizes['input']
for wrt, meta in self._var_abs2meta['input'].items():
if wrt_matches is None or wrt in wrt_matches:
end += meta['size']
vec = self._inputs if wrt in local_ins else None
dist_sizes = sizes[:, toidx[wrt]] if meta['distributed'] else None
yield wrt, start, end, vec, _full_slice, dist_sizes
start = end
def _setup_residuals(self):
"""
Prevent the user from implementing setup_residuals for explicit components.
"""
if overrides_method('setup_residuals', self, ExplicitComponent):
raise RuntimeError(f'{self.msginfo}: Class overrides setup_residuals but '
'is an ExplicitComponent. setup_residuals may only be '
'overridden by ImplicitComponents.')
def _setup_partials(self):
"""
Call setup_partials in components.
"""
super()._setup_partials()
if self.matrix_free:
return
# Note: These declare calls are outside of setup_partials so that users do not have to
# call the super version of setup_partials. This is still in the final setup.
for out_abs, meta in self._var_abs2meta['output'].items():
size = meta['size']
if size > 0:
# ExplicitComponent jacobians have -1 on the diagonal.
arange = np.arange(size, dtype=INT_DTYPE)
self._subjacs_info[out_abs, out_abs] = {
'rows': arange,
'cols': arange,
'shape': (size, size),
'val': np.full(size, -1.),
'dependent': True,
}
def _setup_jacobians(self, recurse=True):
"""
Set and populate jacobian.
Parameters
----------
recurse : bool
If True, setup jacobians in all descendants. (ignored)
"""
if self._has_approx and self._use_derivatives:
self._set_approx_partials_meta()
[docs] def add_output(self, name, val=1.0, shape=None, units=None, res_units=None, desc='',
lower=None, upper=None, ref=1.0, ref0=0.0, res_ref=None, tags=None,
shape_by_conn=False, copy_shape=None, compute_shape=None, distributed=None,
primal_name=None):
"""
Add an output variable to the component.
For ExplicitComponent, res_ref defaults to the value in res unless otherwise specified.
Parameters
----------
name : str
Name of the variable in this component's namespace.
val : float or list or tuple or ndarray
The initial value of the variable being added in user-defined units. Default is 1.0.
shape : int or tuple or list or None
Shape of this variable, only required if val is not an array.
Default is None.
units : str or None
Units in which the output variables will be provided to the component during execution.
Default is None, which means it has no units.
res_units : str or None
Units in which the residuals of this output will be given to the user when requested.
Default is None, which means it has no units.
desc : str
Description of the variable.
lower : float or list or tuple or ndarray or None
Lower bound(s) in user-defined units. It can be (1) a float, (2) an array_like
consistent with the shape arg (if given), or (3) an array_like matching the shape of
val, if val is array_like. A value of None means this output has no lower bound.
Default is None.
upper : float or list or tuple or ndarray or None
Upper bound(s) in user-defined units. It can be (1) a float, (2) an array_like
consistent with the shape arg (if given), or (3) an array_like matching the shape of
val, if val is array_like. A value of None means this output has no upper bound.
Default is None.
ref : float
Scaling parameter. The value in the user-defined units of this output variable when
the scaled value is 1. Default is 1.
ref0 : float
Scaling parameter. The value in the user-defined units of this output variable when
the scaled value is 0. Default is 0.
res_ref : float
Scaling parameter. The value in the user-defined res_units of this output's residual
when the scaled value is 1. Default is None, which means residual scaling matches
output scaling.
tags : str or list of strs
User defined tags that can be used to filter what gets listed when calling
list_inputs and list_outputs and also when listing results from case recorders.
shape_by_conn : bool
If True, shape this output to match its connected input(s).
copy_shape : str or None
If a str, that str is the name of a variable. Shape this output to match that of
the named variable.
compute_shape : function or None
If a function, that function is called to determine the shape of this output.
distributed : bool
If True, this variable is a distributed variable, so it can have different sizes/values
across MPI processes.
primal_name : str or None
Valid python name to represent the variable in compute_primal if 'name' is not a valid
python name.
Returns
-------
dict
Metadata for added variable.
"""
if res_ref is None:
res_ref = ref
return super().add_output(name, val=val, shape=shape, units=units,
res_units=res_units, desc=desc,
lower=lower, upper=upper,
ref=ref, ref0=ref0, res_ref=res_ref,
tags=tags, shape_by_conn=shape_by_conn,
copy_shape=copy_shape, compute_shape=compute_shape,
distributed=distributed, primal_name=primal_name)
def _approx_subjac_keys_iter(self):
is_output = self._outputs._contains_abs
for abs_key, meta in self._subjacs_info.items():
if 'method' in meta and not is_output(abs_key[1]):
method = meta['method']
if (method is not None and method in self._approx_schemes):
yield abs_key
def _compute_wrapper(self):
"""
Call compute based on the value of the "run_root_only" option.
"""
with self._call_user_function('compute'):
if self._run_root_only():
if self.comm.rank == 0:
if self._discrete_inputs or self._discrete_outputs:
self.compute(self._inputs, self._outputs,
self._discrete_inputs, self._discrete_outputs)
else:
self.compute(self._inputs, self._outputs)
self.comm.bcast([self._outputs.asarray(), self._discrete_outputs], root=0)
else:
new_outs, new_disc_outs = self.comm.bcast(None, root=0)
self._outputs.set_val(new_outs)
if new_disc_outs:
for name, val in new_disc_outs.items():
self._discrete_outputs[name] = val
else:
if self._discrete_inputs or self._discrete_outputs:
self.compute(self._inputs, self._outputs,
self._discrete_inputs, self._discrete_outputs)
else:
self.compute(self._inputs, self._outputs)
def _apply_nonlinear(self):
"""
Compute residuals. The model is assumed to be in a scaled state.
"""
outputs = self._outputs
residuals = self._residuals
with self._unscaled_context(outputs=[outputs], residuals=[residuals]):
residuals.set_vec(outputs)
# Sign of the residual is minus the sign of the output vector.
residuals *= -1.0
self._compute_wrapper()
residuals += outputs
outputs -= residuals
self.iter_count_apply += 1
def _solve_nonlinear(self):
"""
Compute outputs. The model is assumed to be in a scaled state.
"""
with Recording(self.pathname + '._solve_nonlinear', self.iter_count, self):
with self._unscaled_context(outputs=[self._outputs], residuals=[self._residuals]):
self._residuals.set_val(0.0)
self._compute_wrapper()
# Iteration counter is incremented in the Recording context manager at exit.
def _compute_jacvec_product_wrapper(self, inputs, d_inputs, d_resids, mode,
discrete_inputs=None):
"""
Call compute_jacvec_product based on the value of the "run_root_only" option.
Parameters
----------
inputs : Vector
Nonlinear input vector.
d_inputs : Vector
Linear input vector.
d_resids : Vector
Linear residual vector.
mode : str
Indicates direction of derivative computation, either 'fwd' or 'rev'.
discrete_inputs : dict or None
Mapping of variable name to discrete value.
"""
if self._run_root_only():
if self.comm.rank == 0:
if discrete_inputs:
self.compute_jacvec_product(inputs, d_inputs, d_resids, mode, discrete_inputs)
else:
self.compute_jacvec_product(inputs, d_inputs, d_resids, mode)
if mode == 'fwd':
self.comm.bcast(d_resids.asarray(), root=0)
else: # rev
self.comm.bcast(d_inputs.asarray(), root=0)
else:
new_vals = self.comm.bcast(None, root=0)
if mode == 'fwd':
d_resids.set_val(new_vals)
else: # rev
d_inputs.set_val(new_vals)
else:
dochk = self._problem_meta['checking'] and mode == 'rev' and self.comm.size > 1
if dochk:
nzdresids = self._get_dist_nz_dresids()
if discrete_inputs:
self.compute_jacvec_product(inputs, d_inputs, d_resids, mode, discrete_inputs)
else:
self.compute_jacvec_product(inputs, d_inputs, d_resids, mode)
if dochk:
self._check_consistent_serial_dinputs(nzdresids)
def _apply_linear(self, jac, mode, scope_out=None, scope_in=None):
"""
Compute jac-vec product. The model is assumed to be in a scaled state.
Parameters
----------
jac : Jacobian or None
If None, use local jacobian, else use jac.
mode : str
'fwd' or 'rev'.
scope_out : set or None
Set of absolute output names in the scope of this mat-vec product.
If None, all are in the scope.
scope_in : set or None
Set of absolute input names in the scope of this mat-vec product.
If None, all are in the scope.
"""
J = self._jacobian if jac is None else jac
with self._matvec_context(scope_out, scope_in, mode) as vecs:
d_inputs, d_outputs, d_residuals = vecs
if not self.matrix_free:
# if we're not matrix free, we can skip the rest because
# compute_jacvec_product does nothing.
# Jacobian and vectors are all scaled, unitless
J._apply(self, d_inputs, d_outputs, d_residuals, mode)
return
# Jacobian and vectors are all unscaled, dimensional
with self._unscaled_context(outputs=[self._outputs], residuals=[d_residuals]):
# set appropriate vectors to read_only to help prevent user error
if mode == 'fwd':
d_inputs.read_only = True
else: # rev
d_residuals.read_only = True
try:
# handle identity subjacs (output_or_resid wrt itself)
if J is None or isinstance(J, DictionaryJacobian):
if d_outputs._names:
rflat = d_residuals._abs_get_val
oflat = d_outputs._abs_get_val
subjacs_empty = len(self._subjacs_info) == 0
# 'val' in the code below is a reference to the part of the
# output or residual array corresponding to the variable 'v'
if mode == 'fwd':
for v in d_outputs._names:
if subjacs_empty or (v, v) not in self._subjacs_info:
val = rflat(v)
val -= oflat(v)
else: # rev
for v in d_outputs._names:
if subjacs_empty or (v, v) not in self._subjacs_info:
val = oflat(v)
val -= rflat(v)
# We used to negate the residual here, and then re-negate after the hook
with self._call_user_function('compute_jacvec_product'):
self._compute_jacvec_product_wrapper(self._inputs, d_inputs, d_residuals,
mode, self._discrete_inputs)
finally:
d_inputs.read_only = d_residuals.read_only = False
def _solve_linear(self, mode, scope_out=_UNDEFINED, scope_in=_UNDEFINED):
"""
Apply inverse jac product. The model is assumed to be in a scaled state.
Parameters
----------
mode : str
'fwd' or 'rev'.
scope_out : set, None, or _UNDEFINED
Outputs relevant to possible lower level calls to _apply_linear on Components.
scope_in : set, None, or _UNDEFINED
Inputs relevant to possible lower level calls to _apply_linear on Components.
"""
d_outputs = self._doutputs
d_residuals = self._dresiduals
if mode == 'fwd':
if self._has_resid_scaling:
with self._unscaled_context(outputs=[d_outputs], residuals=[d_residuals]):
d_outputs.set_vec(d_residuals)
else:
d_outputs.set_vec(d_residuals)
# ExplicitComponent jacobian defined with -1 on diagonal.
d_outputs *= -1.0
else: # rev
if self._has_resid_scaling:
with self._unscaled_context(outputs=[d_outputs], residuals=[d_residuals]):
d_residuals.set_vec(d_outputs)
else:
d_residuals.set_vec(d_outputs)
# ExplicitComponent jacobian defined with -1 on diagonal.
d_residuals *= -1.0
def _compute_partials_wrapper(self):
"""
Call compute_partials based on the value of the "run_root_only" option.
"""
with self._call_user_function('compute_partials'):
if self._run_root_only():
if self.comm.rank == 0:
if self._discrete_inputs:
self.compute_partials(self._inputs, self._jacobian, self._discrete_inputs)
else:
self.compute_partials(self._inputs, self._jacobian)
self.comm.bcast(list(self._jacobian.items()), root=0)
else:
for key, val in self.comm.bcast(None, root=0):
self._jacobian[key] = val
else:
if self._discrete_inputs:
self.compute_partials(self._inputs, self._jacobian, self._discrete_inputs)
else:
self.compute_partials(self._inputs, self._jacobian)
def _linearize(self, jac=None, sub_do_ln=False):
"""
Compute jacobian / factorization. The model is assumed to be in a scaled state.
Parameters
----------
jac : Jacobian or None
Ignored.
sub_do_ln : bool
Flag indicating if the children should call linearize on their linear solvers.
"""
if self.matrix_free or not (self._has_compute_partials or self._approx_schemes):
return
self._check_first_linearize()
with self._unscaled_context(outputs=[self._outputs], residuals=[self._residuals]):
# Computing the approximation before the call to compute_partials allows users to
# override FD'd values.
for approximation in self._approx_schemes.values():
approximation.compute_approximations(self, jac=self._jacobian)
if self._has_compute_partials:
# We used to negate the jacobian here, and then re-negate after the hook.
self._compute_partials_wrapper()
[docs] def compute(self, inputs, outputs, discrete_inputs=None, discrete_outputs=None):
"""
Compute outputs given inputs. The model is assumed to be in an unscaled state.
An inherited component may choose to either override this function or to define a
compute_primal function.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
outputs : Vector
Unscaled, dimensional output variables read via outputs[key].
discrete_inputs : dict-like or None
If not None, dict-like object containing discrete input values.
discrete_outputs : dict-like or None
If not None, dict-like object containing discrete output values.
"""
global _tuplist
if self.compute_primal is None:
return
returns = \
self.compute_primal(*self._get_compute_primal_invals(inputs, discrete_inputs))
if not isinstance(returns, _tuplist):
returns = (returns,)
if not discrete_outputs:
outputs.set_vals(returns)
else:
outputs.set_vals(returns[:outputs.nvars()])
self._discrete_outputs.set_vals(returns[outputs.nvars():])
[docs] def compute_partials(self, inputs, partials, discrete_inputs=None):
"""
Compute sub-jacobian parts. The model is assumed to be in an unscaled state.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
partials : Jacobian
Sub-jac components written to partials[output_name, input_name]..
discrete_inputs : dict or None
If not None, dict containing discrete input values.
"""
pass
[docs] def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode, discrete_inputs=None):
r"""
Compute jac-vector product. The model is assumed to be in an unscaled state.
If mode is:
'fwd': d_inputs \|-> d_outputs
'rev': d_outputs \|-> d_inputs
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
d_inputs : Vector
See inputs; product must be computed only if var_name in d_inputs.
d_outputs : Vector
See outputs; product must be computed only if var_name in d_outputs.
mode : str
Either 'fwd' or 'rev'.
discrete_inputs : dict or None
If not None, dict containing discrete input values.
"""
pass
[docs] def is_explicit(self):
"""
Return True if this is an explicit component.
Returns
-------
bool
True if this is an explicit component.
"""
return True
def _get_compute_primal_invals(self, inputs=None, discrete_inputs=None):
"""
Yield the inputs expected by the compute_primal method.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables Vector.
discrete_inputs : dict or None
If not None, dict containing discrete input values.
Yields
------
any
Inputs expected by the compute_primal method.
"""
if inputs is None:
inputs = self._inputs
if discrete_inputs is None:
discrete_inputs = self._discrete_inputs
yield from inputs.values()
if discrete_inputs:
yield from discrete_inputs.values()
def _get_compute_primal_argnames(self):
"""
Return the expected argnames for the compute_primal method.
Returns
-------
list
List of argnames expected by the compute_primal method.
"""
if self._valid_name_map:
return [self._valid_name_map.get(n, n) for n in chain(self._var_rel_names['input'],
self._discrete_inputs)]
else:
return list(chain(self._var_rel_names['input'], self._discrete_inputs))
[docs] def compute_fd_sparsity(self, method='fd', num_full_jacs=2, perturb_size=1e-9):
"""
Use finite difference to compute a sparsity matrix.
Parameters
----------
method : str
The type of finite difference to perform. Valid options are 'fd' for forward difference,
or 'cs' for complex step.
num_full_jacs : int
Number of times to repeat jacobian computation using random perturbations.
perturb_size : float
Size of the random perturbation.
Returns
-------
coo_matrix
The sparsity matrix.
"""
jac = _ColSparsityJac(self)
for _ in self._perturbation_iter(num_full_jacs, perturb_size,
(self._inputs,), (self._outputs, self._residuals)):
self._apply_nonlinear()
self.compute_fd_jac(jac=jac, method=method)
return jac.get_sparsity()