"""Define the DictionaryJacobian class."""
import numpy as np
from openmdao.jacobians.jacobian import Jacobian
from openmdao.jacobians.subjac import SUBJAC_META_DEFAULTS
[docs]class DictionaryJacobian(Jacobian):
"""
A Jacobian that stores nonzero subjacobians in a dictionary.
Parameters
----------
system : System
Parent system to this jacobian.
Attributes
----------
_has_children : bool
True if the system has children, False otherwise.
"""
[docs] def __init__(self, system):
"""
Initialize all attributes.
"""
super().__init__(system)
self._has_children = bool(system._subsystems_allprocs)
self._setup(system)
def _setup(self, system):
self._subjacs = self._get_subjacs(system)
def _apply(self, system, d_inputs, d_outputs, d_residuals, mode):
"""
Compute matrix-vector product.
Parameters
----------
system : System
System that is updating this jacobian.
d_inputs : Vector
inputs linear vector.
d_outputs : Vector
outputs linear vector.
d_residuals : Vector
residuals linear vector.
mode : str
'fwd' or 'rev'.
"""
fwd = mode == 'fwd'
d_out_names = d_outputs._names
d_res_names = d_residuals._names
d_inp_names = d_inputs._names
if not d_out_names and not d_inp_names:
return
abs_resids = d_residuals._abs_get_val
abs_outs = d_outputs._abs_get_val
abs_ins = d_inputs._abs_get_val
subjacs_info = self._subjacs_info
subjacs = self._get_subjacs(system)
randgen = self._randgen
key_owners = system._get_subjac_owners()
with system._unscaled_context(outputs=[d_outputs], residuals=[d_residuals]):
for abs_key in self._get_ordered_subjac_keys(system):
if abs_key not in subjacs_info:
continue
res_name, other_name = abs_key
ofvec = abs_resids(res_name) if res_name in d_res_names else None
if other_name in d_out_names:
wrtvec = abs_outs(other_name)
elif other_name in d_inp_names:
wrtvec = abs_ins(other_name)
else:
wrtvec = None
if self._has_children and abs_key in system._cross_keys and abs_key in key_owners:
wrtowner = key_owners[abs_key]
if system.comm.rank == wrtowner:
system.comm.bcast(wrtvec, root=wrtowner)
else:
wrtvec = system.comm.bcast(None, root=wrtowner)
if fwd:
left_vec = ofvec
right_vec = wrtvec
if left_vec is not None and right_vec is not None:
subjacs[abs_key].apply_fwd(d_inputs, d_outputs, d_residuals, randgen)
else: # rev
left_vec = wrtvec
right_vec = ofvec
if left_vec is not None and right_vec is not None:
subjacs[abs_key].apply_rev(d_inputs, d_outputs, d_residuals, randgen)
if abs_key in key_owners:
owner = key_owners[abs_key]
if owner == system.comm.rank:
system.comm.bcast(left_vec, root=owner)
elif owner is not None:
left_vec = system.comm.bcast(None, root=owner)
if left_vec is not None:
if fwd:
if res_name in d_res_names:
d_residuals._abs_set_val(res_name, left_vec)
else: # rev
if other_name in d_out_names:
d_outputs._abs_set_val(other_name, left_vec)
elif other_name in d_inp_names:
d_inputs._abs_set_val(other_name, left_vec)
class _CheckingJacobian(DictionaryJacobian):
"""
A special type of Jacobian that we use only inside of check_partials.
It checks during set_col to make sure that any user specified rows/cols don't mask any
nonzero values found in the column being set.
"""
def __init__(self, system, uncovered_threshold=1.0E-16):
self._uncovered_threshold = uncovered_threshold
super().__init__(system)
def _setup(self, system):
self._subjacs_info = self._subjacs_info.copy()
self._setup_index_maps(system)
self._subjacs = self._get_subjacs(system)
def __iter__(self):
for key, _ in self.items():
yield key
def items(self):
for key, subjac in self._subjacs.items():
meta = subjac.info
if self._is_explicitcomp and key[0] == key[1]:
continue
if 'directional' in meta:
yield key, np.atleast_2d(meta['val']).T
else:
yield key, subjac.todense()
def _setup_index_maps(self, system):
super()._setup_index_maps(system)
from openmdao.core.component import Component
if isinstance(system, Component):
local_opts = system._get_check_partial_options()
else:
local_opts = None
for of, start, end, _, _ in system._get_jac_ofs():
nrows = end - start
for wrt, wstart, wend, vec, _, _ in system._get_jac_wrts():
if local_opts:
loc_wrt = wrt.rsplit('.', 1)[-1]
directional = (loc_wrt in local_opts and
local_opts[loc_wrt]['directional'])
else:
directional = False
key = (of, wrt)
if vec is not None and key not in self._subjacs_info:
ncols = wend - wstart
# create subjacs_info objects for matrix_free systems that don't have them.
# Note that this is our own copy of _subjacs_info so when this instance is
# destroyed, any extra allocated subjacs will be garbage collected.
self._subjacs_info[key] = subjac = SUBJAC_META_DEFAULTS.copy()
subjac['val'] = np.zeros((nrows, 1 if directional else ncols))
subjac['shape'] = subjac['val'].shape
elif directional:
shape = self._subjacs_info[key]['val'].shape
if shape[-1] != 1:
self._subjacs_info[key] = meta = self._subjacs_info[key].copy()
if len(shape) > 1:
meta['val'] = np.atleast_2d(meta['val'][:, 0]).T
else:
meta['val'] = np.atleast_1d(meta['val'])
def set_col(self, system, icol, column):
"""
Set a column of the jacobian.
The column is assumed to be the same size as a column of the jacobian.
If the column has any nonzero values that are outside of specified sparsity patterns for
any of the subjacs, the information will be saved in subjacs_info so we can report it
during the derivative test.
Parameters
----------
system : System
The system that owns this jacobian.
icol : int
Column index.
column : ndarray
Column value.
"""
if self._col_mapper is None:
self._setup_index_maps(system)
wrt, loc_idx = self._col_mapper.index2key_rel(icol) # local col index into subjacs
# If we are doing a directional derivative, then the sparsity will be violated.
# Skip sparsity check if that is the case.
options = system._get_check_partial_options()
loc_wrt = wrt.rpartition('.')[2]
directional = (options is not None and loc_wrt in options and
options[loc_wrt]['directional'])
subjacs = self._get_subjacs(system)
for of, start, end, _, _ in system._get_jac_ofs():
key = (of, wrt)
if key in subjacs:
subjac = subjacs[key]
info = subjac.info
if info['diagonal']:
subjac.set_col(loc_idx, column[start:end], self._uncovered_threshold)
if directional:
info['directional'] = True
elif info['cols'] is not None:
subjac.set_col(loc_idx, column[start:end], self._uncovered_threshold)
if directional:
info['directional'] = True
continue
else:
subjac.set_col(loc_idx, column[start:end], self._uncovered_threshold)
[docs]class ExplicitDictionaryJacobian(Jacobian):
"""
A DictionaryJacobian that is a collection of sub-Jacobians.
It is intended to be used with ExplicitComponents only because dr/do is assumed to be -I.
Parameters
----------
system : System
System that is updating this jacobian. Must be an ExplicitComponent.
"""
[docs] def __init__(self, system):
"""
Initialize all attributes.
"""
super().__init__(system)
self._subjacs = self._get_subjacs(system)
def _get_subjacs(self, system=None):
"""
Get the subjacs for the current system, creating them if necessary based on _subjacs_info.
Parameters
----------
system : System
System that is updating this jacobian.
Returns
-------
dict
Dictionary of subjacs keyed by absolute names.
"""
if not self._initialized:
if not self._is_explicitcomp:
msginfo = system.msginfo if system else ''
raise RuntimeError(f"{msginfo}: ExplicitDictionaryJacobian is only intended to be "
"used with ExplicitComponents.")
self._subjacs = {}
for key, meta, dtype in self._subjacs_info_iter(system):
# only keep dr/di subjacs. dr/do matrix is always -I
if key[1] in self._input_slices:
self._subjacs[key] = self.create_subjac(key, meta, dtype)
self._initialized = True
return self._subjacs
def _apply(self, system, d_inputs, d_outputs, d_residuals, mode):
"""
Compute matrix-vector product.
Parameters
----------
system : System
System that is updating this jacobian.
d_inputs : Vector
inputs linear vector.
d_outputs : Vector
outputs linear vector.
d_residuals : Vector
residuals linear vector.
mode : str
'fwd' or 'rev'.
"""
randgen = self._randgen
d_inp_names = d_inputs._names
with system._unscaled_context(outputs=(d_outputs,), residuals=(d_residuals,)):
dresids = d_residuals.asarray()
if mode == 'fwd':
if d_outputs._names:
# apply dr/do of -I
dresids -= d_outputs.asarray()
if d_inp_names:
for key, subjac in self._get_subjacs(system).items():
if key[1] in d_inp_names:
subjac.apply_fwd(d_inputs, d_outputs, d_residuals, randgen)
else: # rev
if d_outputs._names:
# apply dr/do of -I
doutarr = d_outputs.asarray()
doutarr -= dresids
if d_inp_names:
for key, subjac in self._get_subjacs(system).items():
if key[1] in d_inp_names:
subjac.apply_rev(d_inputs, d_outputs, d_residuals, randgen)
[docs] def todense(self):
"""
Return a dense version of the jacobian.
Returns
-------
ndarray
Dense version of the jacobian.
"""
if self._subjacs:
lst = [-np.eye(self.shape[0])]
drdi_shape = (self.shape[0], self.shape[1] - self.shape[0])
J_dr_di = np.zeros(drdi_shape)
lst.append(J_dr_di)
for subjac in self._subjacs.values():
J_dr_di[subjac.row_slice, subjac.col_slice] = subjac.todense()
return np.hstack(lst)
return -np.eye(self.shape[0])