"""Define the Group class."""
import sys
from collections import Counter, defaultdict
from collections.abc import Iterable
from itertools import product, chain
from numbers import Number
import inspect
from difflib import get_close_matches
import numpy as np
import networkx as nx
from openmdao.core.configinfo import _ConfigInfo
from openmdao.core.system import System, collect_errors
from openmdao.core.component import Component, _DictValues
from openmdao.core.implicitcomponent import ImplicitComponent
from openmdao.core.constants import _UNDEFINED, INT_DTYPE, _SetupStatus
from openmdao.vectors.vector import _full_slice
from openmdao.proc_allocators.default_allocator import DefaultAllocator, ProcAllocationError
from openmdao.jacobians.jacobian import SUBJAC_META_DEFAULTS
from openmdao.jacobians.dictionary_jacobian import DictionaryJacobian
from openmdao.recorders.recording_iteration_stack import Recording
from openmdao.solvers.nonlinear.nonlinear_runonce import NonlinearRunOnce
from openmdao.solvers.linear.linear_runonce import LinearRunOnce
from openmdao.solvers.linear.direct import DirectSolver
from openmdao.utils.array_utils import array_connection_compatible, _flatten_src_indices, \
shape_to_len, ValueRepeater
from openmdao.utils.general_utils import common_subpath, \
convert_src_inds, shape2tuple, get_connection_owner, ensure_compatible, \
meta2src_iter, get_rev_conns
from openmdao.utils.units import is_compatible, unit_conversion, _has_val_mismatch, _find_unit, \
_is_unitless, simplify_unit
from openmdao.utils.graph_utils import get_out_of_order_nodes, get_sccs_topo
from openmdao.utils.mpi import MPI, check_mpi_exceptions, multi_proc_exception_check
import openmdao.utils.coloring as coloring_mod
from openmdao.utils.indexer import indexer, Indexer
from openmdao.utils.relevance import get_relevance
from openmdao.utils.om_warnings import issue_warning, UnitsWarning, UnusedOptionWarning, \
PromotionWarning, MPIWarning, DerivativesWarning
from openmdao.utils.class_util import overrides_method
from openmdao.utils.jax_utils import jax
from openmdao.core.total_jac import _TotalJacInfo
# regex to check for valid names.
import re
namecheck_rgx = re.compile('[a-zA-Z][_a-zA-Z0-9]*')
# use a class with slots instead of a namedtuple so that we can
# change index after creation if needed.
class _SysInfo(object):
__slots__ = ['system', 'index']
def __init__(self, system, index):
self.system = system
self.index = index
def __iter__(self):
yield self.system
yield self.index
class _PromotesInfo(object):
__slots__ = ['src_indices', 'flat', 'src_shape', 'promoted_from', 'prom']
def __init__(self, src_indices=None, flat=None, src_shape=None, promoted_from='', prom=None):
self.flat = flat
self.src_shape = src_shape
if src_indices is not None:
if isinstance(src_indices, Indexer):
self.src_indices = src_indices
self.src_indices.set_src_shape(self.src_shape)
else:
self.src_indices = indexer(src_indices, src_shape=self.src_shape, flat_src=flat)
else:
self.src_indices = None
self.promoted_from = promoted_from # pathname of promoting system
self.prom = prom # local promoted name of input
def __iter__(self):
yield self.src_indices
yield self.flat
yield self.src_shape
def __repr__(self): # pragma no cover
return (f"_PromotesInfo(src_indices={self.src_indices}, flat={self.flat}, "
f"src_shape={self.src_shape}, promoted_from={self.promoted_from}, "
f"prom={self.prom})")
def prom_path(self):
if self.promoted_from is None or self.prom is None:
return ''
return '.'.join((self.promoted_from, self.prom)) if self.promoted_from else self.prom
def copy(self):
return _PromotesInfo(self.src_indices.copy(), self.flat, self.src_shape, self.promoted_from,
self.prom)
def set_src_shape(self, shape):
if self.src_indices is not None:
self.src_indices.set_src_shape(shape)
self.src_shape = shape
def compare(self, other):
"""
Compare attributes in the two objects.
Two attributes are considered mismatched only if neither is None and their values
are unequal.
Returns
-------
list
List of unequal atrribute names.
"""
mismatches = []
if self.flat != other.flat:
if self.flat is not None and other.flat is not None:
mismatches.append('flat_src_indices')
if self.src_shape != other.src_shape:
if self.src_shape is not None and other.src_shape is not None:
mismatches.append('src_shape')
self_srcinds = None if self.src_indices is None else self.src_indices.as_array()
other_srcinds = None if other.src_indices is None else other.src_indices.as_array()
if isinstance(self_srcinds, np.ndarray) and isinstance(other_srcinds, np.ndarray):
if (self_srcinds.shape != other_srcinds.shape or
not np.all(self_srcinds == other_srcinds)):
mismatches.append('src_indices')
return mismatches
[docs]class Group(System):
"""
Class used to group systems together; instantiate or inherit.
Parameters
----------
**kwargs : dict
Dict of arguments available here and in all descendants of this Group.
Attributes
----------
_mpi_proc_allocator : ProcAllocator
Object used to allocate MPI processes to subsystems.
_proc_info : dict of subsys_name: (min_procs, max_procs, weight, proc_group)
Information used to determine MPI process allocation to subsystems.
_subgroups_myproc : list
List of local subgroups, (sorted by name if Problem option allow_post_setup_reorder is
True).
_manual_connections : dict
Dictionary of input_name: (output_name, src_indices) connections.
_group_inputs : dict
Mapping of promoted names to certain metadata (src_indices, units).
_static_group_inputs : dict
Group inputs added outside of setup/configure.
_pre_config_group_inputs : dict
Group inputs added inside of setup but before configure.
_static_manual_connections : dict
Dictionary that stores all explicit connections added outside of setup.
_conn_abs_in2out : {'abs_in': 'abs_out'}
Dictionary containing all explicit & implicit continuous var connections owned
by this system only. The data is the same across all processors.
_conn_discrete_in2out : {'abs_in': 'abs_out'}
Dictionary containing all explicit & implicit discrete var connections owned
by this system only. The data is the same across all processors.
_transfers : dict of dict of Transfers
First key is mode, second is subname where
mode is 'fwd' or 'rev' and subname is the subsystem name
or subname can be None for the full, simultaneous transfer.
_discrete_transfers : dict of discrete transfer metadata
Key is system pathname or None for the full, simultaneous transfer.
_setup_procs_finished : bool
Flag to check if setup_procs is complete
_contains_parallel_group : bool
If True, this Group contains a ParallelGroup. Only used to determine if a parallel
group or distributed component is below a DirectSolver so that we can raise an exception.
_order_set : bool
Flag to check if set_order has been called.
_auto_ivc_warnings : list
List of Auto IVC warnings to be raised later.
_shapes_graph : nx.Graph
Dynamic shape dependency graph, or None.
_pre_components : set of str or None
Set of pathnames of components that are executed prior to the optimization loop. Empty
unless the 'group_by_pre_opt_post' option is True in the Problem.
_post_components : set of str or None
Set of pathnames of components that are executed after the optimization loop. Empty
unless the 'group_by_pre_opt_post' option is True in the Problem.
_iterated_components : set of str or ContainsAll
Set of pathnames of components that are executed in the optimization loop if
'group_by_pre_opt_post' is True in the Problem.
_fd_rev_xfer_correction_dist : dict
If this group is using finite difference to compute derivatives,
this is the set of inputs that are upstream of a distributed response
within this group, keyed by active response. These determine if contributions
from all ranks will be added together to get the correct input values when derivatives
in the larger model are being solved using reverse mode.
_is_explicit : bool or None
True if neither this Group nor any of its descendants contains implicit systems or cycles.
_ivcs : dict
Dict containing metadata for each independent variable.
_bad_conn_vars : set
Set of variables involved in invalid connections.
_sys_graph_cache : dict
Cache for the system graph.
"""
[docs] def __init__(self, **kwargs):
"""
Set the solvers to nonlinear and linear block Gauss--Seidel by default.
"""
self._mpi_proc_allocator = DefaultAllocator()
self._proc_info = {}
super().__init__(**kwargs)
self._subgroups_myproc = None
self._manual_connections = {}
self._group_inputs = {}
self._pre_config_group_inputs = {}
self._static_group_inputs = {}
self._static_manual_connections = {}
self._conn_abs_in2out = {}
self._conn_discrete_in2out = {}
self._transfers = {}
self._discrete_transfers = {}
self._setup_procs_finished = False
self._contains_parallel_group = False
self._order_set = False
self._shapes_graph = None
self._pre_components = None
self._post_components = None
self._iterated_components = None
self._fd_rev_xfer_correction_dist = {}
self._is_explicit = None
self._ivcs = {}
self._bad_conn_vars = None
self._sys_graph_cache = None
# TODO: we cannot set the solvers with property setters at the moment
# because our lint check thinks that we are defining new attributes
# called nonlinear_solver and linear_solver without documenting them.
if not self._nonlinear_solver:
self._nonlinear_solver = NonlinearRunOnce()
if not self._linear_solver:
self._linear_solver = LinearRunOnce()
self.options.declare('auto_order', types=bool, default=False,
desc='If True the order of subsystems is determined automatically '
'based on the dependency graph. It will not break or reorder '
'cycles.')
[docs] def setup(self):
"""
Build this group.
This method should be overidden by your Group's method. The reason for using this
method to add subsystem is to save memory and setup time when using your Group
while running under MPI. This avoids the creation of systems that will not be
used in the current process.
You may call 'add_subsystem' to add systems to this group. You may also issue connections,
and set the linear and nonlinear solvers for this group level. You cannot safely change
anything on children systems; use the 'configure' method instead.
Available attributes:
name
pathname
comm
options
"""
pass
def _get_matvec_scope(self, excl_sub=None):
"""
Find the input and output variables that are needed for a particular matvec product.
Parameters
----------
excl_sub : <System>
A subsystem whose variables should be excluded from the matvec product.
Returns
-------
(set, set)
Sets of output and input variables.
"""
if excl_sub is None:
cache_key = None
else:
cache_key = excl_sub.pathname
try:
iovars, excl = self._scope_cache[cache_key]
# Make sure they're the same subsystem instance before returning
if excl is excl_sub:
return iovars
except KeyError:
pass
if excl_sub is None:
# A value of None will be interpreted as 'all outputs'.
scope_out = None
# All inputs connected to an output in this system
scope_in = frozenset(self._conn_global_abs_in2out).intersection(
self._var_allprocs_abs2meta['input'])
else:
# Empty for the excl_sub
scope_out = frozenset()
# All inputs connected to an output in this system but not in excl_sub
# allins is used to filter out discrete variables that might be found in
# self._conn_global_abs_in2out.
allins = self._var_allprocs_abs2meta['input']
exvars = excl_sub._var_allprocs_abs2idx
scope_in = frozenset(abs_in for abs_in, abs_out in self._conn_global_abs_in2out.items()
if abs_out not in exvars and abs_in in allins)
# Use the pathname as the dict key instead of the object itself. When
# the object is used as the key, memory leaks result from multiple
# calls to setup().
self._scope_cache[cache_key] = ((scope_out, scope_in), excl_sub)
return scope_out, scope_in
def _compute_root_scale_factors(self):
"""
Compute scale factors for all variables.
Returns
-------
dict
Mapping of each absolute var name to its corresponding scaling factor tuple.
"""
# make this a defaultdict to handle the case of access using unconnected inputs
scale_factors = defaultdict(lambda: {
'input': (0.0, 1.0),
})
for abs_name, meta in self._var_allprocs_abs2meta['output'].items():
ref0 = meta['ref0']
res_ref = meta['res_ref']
a0 = ref0
a1 = meta['ref'] - ref0
scale_factors[abs_name] = {
'output': (a0, a1),
'residual': (0.0, 1.0 if res_ref is None else res_ref),
}
# Input scaling for connected inputs is added here.
# This is a combined scale factor that includes the scaling of the connected source
# and the unit conversion between the source output and each target input.
if self._has_input_scaling:
abs2meta_in = self._var_abs2meta['input']
allprocs_meta_out = self._var_allprocs_abs2meta['output']
for abs_in, abs_out in self._conn_global_abs_in2out.items():
if abs_in not in abs2meta_in:
# we only perform scaling on local, non-discrete arrays, so skip
continue
meta_in = abs2meta_in[abs_in]
meta_out = allprocs_meta_out[abs_out]
ref = meta_out['ref']
ref0 = meta_out['ref0']
src_indices = meta_in['src_indices']
if src_indices is not None:
if not (np.ndim(ref) == 0 and np.ndim(ref0) == 0):
# TODO: if either ref or ref0 are not scalar and the output is
# distributed, we need to do a scatter
# to obtain the values needed due to global src_indices
if meta_out['distributed']:
raise RuntimeError("{}: vector scalers with distrib vars "
"not supported yet.".format(self.msginfo))
if not src_indices._flat_src:
src_indices = _flatten_src_indices(src_indices.as_array(),
meta_in['shape'],
meta_out['global_shape'],
meta_out['global_size'])
if np.ndim(ref) > 0:
ref = ref[src_indices]
else: # ref is scalar so ref0 must be an array
ref = np.full(ref0.shape, ref)
if np.ndim(ref0) > 0:
ref0 = ref0[src_indices]
else: # ref0 is scalar so ref must be an array
ref0 = np.full(ref.shape, ref0)
# Compute scaling arrays for inputs using a0 and a1
# Example:
# Let x, x_src, x_tgt be the dimensionless variable,
# variable in source units, and variable in target units, resp.
# x_src = a0 + a1 x
# x_tgt = b0 + b1 x
# x_tgt = g(x_src) = d0 + d1 x_src
# b0 + b1 x = d0 + d1 a0 + d1 a1 x
# b0 = d0 + d1 a0
# b0 = g(a0)
# b1 = d0 + d1 a1 - d0
# b1 = g(a1) - g(0)
units_in = meta_in['units']
units_out = meta_out['units']
if units_in is None or units_out is None or units_in == units_out:
a0 = ref0
a1 = ref - ref0
# No unit conversion, only scaling. Just send the scale factors.
scale_factors[abs_in] = {
'input': (a0, a1),
}
else:
factor, offset = unit_conversion(units_out, units_in)
a0 = ref0
a1 = ref - ref0
# Send both unit scaling and solver scaling. Linear input vectors need to
# treat them differently in reverse mode.
scale_factors[abs_in] = {
'input': (a0, a1, factor, offset),
}
# For adder allocation check.
a0 = (ref0 + offset) * factor
# Check whether we need to allocate an adder for the input vector.
if np.any(np.asarray(a0)):
self._has_input_adder = True
return scale_factors
def _configure(self):
"""
Configure our model recursively to assign any children settings.
Highest system's settings take precedence.
"""
# reset group_inputs back to what it was just after self.setup() in case _configure
# is called multiple times.
self._group_inputs = self._pre_config_group_inputs.copy()
for n, lst in self._group_inputs.items():
self._group_inputs[n] = lst.copy()
self.matrix_free = False
self._has_guess = overrides_method('guess_nonlinear', self, Group)
for subsys in self._sorted_sys_iter():
subsys._configure()
subsys._setup_var_data()
self._has_guess |= subsys._has_guess
self._has_bounds |= subsys._has_bounds
self.matrix_free |= subsys.matrix_free
self._problem_meta['setup_status'] = _SetupStatus.POST_CONFIGURE
self.configure()
# if our configure() has added or promoted any variables, we have to call
# _setup_var_data again on any modified systems and their ancestors (only those that
# are our descendents).
self._problem_meta['config_info']._update_modified_systems(self)
def _reset_setup_vars(self):
"""
Reset all the stuff that gets initialized in setup.
"""
super()._reset_setup_vars()
self._setup_procs_finished = False
def _setup_procs(self, pathname, comm, prob_meta):
"""
Execute first phase of the setup process.
Distribute processors, assign pathnames, and call setup on the group. This method recurses
downward through the model.
Parameters
----------
pathname : str
Global name of the system, including the path.
comm : MPI.Comm or <FakeComm>
MPI communicator object.
prob_meta : dict
Problem level metadata.
"""
super()._setup_procs(pathname, comm, prob_meta)
nproc = comm.size
if self._num_par_fd > 1:
info = self._coloring_info
if comm.size > 1:
# if approx_totals has been declared, or there is an approx coloring, setup par FD
if self._owns_approx_jac or info.dynamic or info.static is not None:
comm = self._setup_par_fd_procs(comm)
else:
msg = "%s: num_par_fd = %d but FD is not active." % (self.msginfo,
self._num_par_fd)
raise RuntimeError(msg)
elif not MPI:
msg = f"MPI is not active but num_par_fd = {self._num_par_fd}. No parallel " \
f"finite difference will be performed."
issue_warning(msg, prefix=self.msginfo, category=MPIWarning)
self.comm = comm
self._subsystems_allprocs = self._static_subsystems_allprocs.copy()
self._manual_connections = self._static_manual_connections.copy()
self._group_inputs = self._static_group_inputs.copy()
# copy doesn't copy the internal list so we have to do it manually (we don't want
# a full deepcopy either because we want the internal metadata dicts to be shared)
for n, lst in self._group_inputs.items():
self._group_inputs[n] = lst.copy()
# Call setup function for this group.
self.setup()
self._setup_check()
# need to save these because _setup_var_data can be called multiple times
# during the config process and we don't want to wipe out any group_inputs
# that were added during self.setup()
self._pre_config_group_inputs = self._group_inputs.copy()
for n, lst in self._pre_config_group_inputs.items():
self._pre_config_group_inputs[n] = lst.copy()
if MPI:
allsubs = list(self._subsystems_allprocs.values())
proc_info = [self._proc_info[s.name] for s, _ in allsubs]
# Call the load balancing algorithm
try:
sub_inds, sub_comm = self._mpi_proc_allocator(proc_info, len(allsubs), comm)
except ProcAllocationError as err:
if err.sub_inds is None:
raise RuntimeError("%s: %s" % (self.msginfo, err.msg))
else:
raise RuntimeError("%s: MPI process allocation failed: %s for the following "
"subsystems: %s" %
(self.msginfo, err.msg,
[allsubs[i].system.name for i in err.sub_inds]))
self._subsystems_myproc = [allsubs[ind].system for ind in sub_inds]
# Define local subsystems
if (self._mpi_proc_allocator.parallel and
not (np.sum([minp for minp, _, _, _ in proc_info]) <= comm.size)):
# reorder the subsystems_allprocs based on which procs they live on. If we don't
# do this, we can get ordering mismatches in some of our data structures.
new_allsubs = {}
seen = set()
gathered = self.comm.allgather(sub_inds)
for inds in gathered:
for ind in inds:
if ind not in seen:
sinfo = allsubs[ind]
sinfo.index = len(new_allsubs)
new_allsubs[sinfo.system.name] = sinfo
seen.add(ind)
self._subsystems_allprocs = new_allsubs
else:
sub_comm = comm
self._subsystems_myproc = [s for s, _ in self._subsystems_allprocs.values()]
# need to set pathname correctly even for non-local subsystems
for s, _ in self._subsystems_allprocs.values():
s.pathname = '.'.join((self.pathname, s.name)) if self.pathname else s.name
# Perform recursion
for subsys in self._subsystems_myproc:
subsys._setup_procs(subsys.pathname, sub_comm, prob_meta)
# build a list of local subgroups to speed up later loops
self._subgroups_myproc = [s for s in self._subsystems_myproc if isinstance(s, Group)]
if prob_meta['allow_post_setup_reorder']:
self._subgroups_myproc.sort(key=lambda x: x.name)
if nproc > 1 and self._mpi_proc_allocator.parallel:
self._problem_meta['parallel_groups'].append(self.pathname)
allpars = self.comm.allgather(self._problem_meta['parallel_groups'])
full = set()
for p in allpars:
full.update(p)
self._problem_meta['parallel_groups'] = sorted(full)
if self._problem_meta['parallel_groups']:
prefix = self.pathname + '.' if self.pathname else ''
for par in self._problem_meta['parallel_groups']:
if par.startswith(prefix) and par != prefix:
self._contains_parallel_group = True
break
self._setup_procs_finished = True
[docs] def is_explicit(self):
"""
Return True if this Group contains only explicit systems and has no cycles.
Returns
-------
bool
True if this is an explicit component.
"""
if self._is_explicit is None:
self._is_explicit = True
for subsys in self._subsystems_myproc:
if not subsys.is_explicit():
self._is_explicit = False
break
# check for cycles
if self._is_explicit:
self._is_explicit = nx.is_directed_acyclic_graph(self.compute_sys_graph())
return self._is_explicit
def _configure_check(self):
"""
Do any error checking on i/o and connections.
"""
for subsys in self._subsystems_myproc:
subsys._configure_check()
super()._configure_check()
def _list_states(self):
"""
Return list of all local states at and below this system.
Returns
-------
list
List of all states.
"""
states = []
for subsys in self._sorted_sys_iter():
states.extend(subsys._list_states())
return states
def _list_states_allprocs(self):
"""
Return list of all states at and below this system across all procs.
Returns
-------
list
List of all states.
"""
if MPI and self.comm.size > 1:
all_states = set()
byproc = self.comm.allgather(self._list_states())
for proc_states in byproc:
all_states.update(proc_states)
return all_states
else:
return self._list_states()
def _setup(self, comm, prob_meta):
"""
Perform setup for this system and its descendant systems.
This is only called on the top-level model.
Parameters
----------
comm : MPI.Comm or <FakeComm> or None
The global communicator.
prob_meta : dict
Problem level metadata dictionary.
"""
# save a ref to the problem level options.
self._problem_meta = prob_meta
self._initial_condition_cache = {}
self._sys_graph_cache = None
# reset any coloring if a Coloring object was not set explicitly
if self._coloring_info.dynamic or self._coloring_info.static is not None:
self._coloring_info.coloring = None
self.pathname = ''
self.comm = comm
self._pre_components = None
self._post_components = None
# Besides setting up the processors, this method also builds the model hierarchy.
self._setup_procs(self.pathname, comm, self._problem_meta)
prob_meta['config_info'] = _ConfigInfo()
try:
# Recurse model from the bottom to the top for configuring.
self._configure()
finally:
prob_meta['config_info'] = None
prob_meta['setup_status'] = _SetupStatus.POST_CONFIGURE
self._configure_check()
self._setup_var_data()
# have to do this again because we are passed the point in _setup_var_data when this happens
self._has_output_scaling = False
self._has_output_adder = False
self._has_resid_scaling = False
self._has_bounds = False
for subsys in self.system_iter(include_self=True, recurse=True):
subsys._apply_output_solver_options()
self._has_output_scaling |= subsys._has_output_scaling
self._has_output_adder |= subsys._has_output_adder
self._has_resid_scaling |= subsys._has_resid_scaling
self._has_bounds |= subsys._has_bounds
# promoted names must be known to determine implicit connections so this must be
# called after _setup_var_data, and _setup_var_data will have to be partially redone
# after auto_ivcs have been added, but auto_ivcs can't be added until after we know all of
# the connections.
self._setup_global_connections()
self._setup_dynamic_shapes()
self._setup_jax()
self._top_level_post_connections()
self._setup_var_sizes()
self._top_level_post_sizes()
# determine which connections are managed by which group, and check validity of connections
self._setup_connections()
def _check_required_connections(self):
conns = self._conn_global_abs_in2out
abs2prom = self._var_allprocs_abs2prom['input']
abs2meta_in = self._var_allprocs_abs2meta['input']
discrete_in = self._var_allprocs_discrete['input']
desvar = self.get_design_vars()
for abs_tgt, src in conns.items():
if src.startswith('_auto_ivc.'):
prom_tgt = abs2prom[abs_tgt]
# Ignore inputs that are declared as design vars.
if abs_tgt in discrete_in or (desvar and prom_tgt in desvar):
continue
if abs2meta_in[abs_tgt]['require_connection']:
promoted_as = f', promoted as "{prom_tgt}",' if prom_tgt != abs_tgt else ''
self._collect_error(f'{self.msginfo}: Input "{abs_tgt}"{promoted_as} '
'requires a connection but is not connected.',
ident=(prom_tgt, abs_tgt))
def _get_dataflow_graph(self):
"""
Return a graph of all variables and components in the model.
Each component is connected to each of its input and output variables, and those variables
are connected to other variables based on the connections in the model.
This results in a smaller graph (fewer edges) than would be the case for a pure variable
graph where all inputs to a particular component would have to be connected to all outputs
from that component.
This should only be called on the top level Group.
Returns
-------
networkx.DiGraph
Graph of all variables and components in the model.
"""
assert self.pathname == '', "call _get_dataflow_graph on the top level Group only."
graph = nx.DiGraph()
comp_seen = set()
# locate any components that don't have any inputs or outputs and add them to the graph
for subsys in self.system_iter(recurse=True, typ=Component):
if not subsys._var_abs2meta['input'] and not subsys._var_abs2meta['output']:
graph.add_node(subsys.pathname, local=True)
comp_seen.add(subsys.pathname)
if self.comm.size > 1:
allemptycomps = self.comm.allgather(comp_seen)
for compset in allemptycomps:
for comp in compset:
if comp not in comp_seen:
graph.add_node(comp, local=False)
comp_seen.add(comp)
for direction in ('input', 'output'):
isout = direction == 'output'
allvmeta = self._var_allprocs_abs2meta[direction]
vmeta = self._var_abs2meta[direction]
for vname in self._var_allprocs_abs2prom[direction]:
if vname in allvmeta:
local = vname in vmeta
else: # var is discrete
local = vname in self._var_discrete[direction]
graph.add_node(vname, type_=direction, local=local)
comp = vname.rpartition('.')[0]
if comp not in comp_seen:
graph.add_node(comp, local=local)
comp_seen.add(comp)
if isout:
graph.add_edge(comp, vname)
else:
graph.add_edge(vname, comp)
for tgt, src in self._conn_global_abs_in2out.items():
# connect the variables src and tgt
graph.add_edge(src, tgt)
return graph
def _check_alias_overlaps(self, responses):
"""
Check for overlapping indices in aliased responses.
If the responses contain aliases, the returned response dict will
be a copy with the alias keys removed and any missing alias sources
added.
This may only be called on the top level Group.
Parameters
----------
responses : dict
Dictionary of response metadata. Keys don't matter.
Returns
-------
dict
Dictionary of response metadata with alias keys removed.
"""
assert self.pathname == '', "call _check_alias_overlaps on the top level System only."
aliases = set()
srcdict = {}
discrete_outs = self._var_allprocs_discrete['output']
# group all aliases by source so we can compute overlaps for each source individually
for meta in responses.values():
src = meta['source']
if src not in discrete_outs:
if meta['alias']:
aliases.add(meta['alias'])
if src in srcdict:
srcdict[src].append(meta)
else:
srcdict[src] = [meta]
abs2meta_out = self._var_allprocs_abs2meta['output']
# loop over any sources having multiple aliases to ensure no overlap of indices
for src, metalist in srcdict.items():
if len(metalist) == 1:
continue
size = abs2meta_out[src]['global_size']
shape = abs2meta_out[src]['global_shape']
mat = np.zeros(size, dtype=np.ushort)
for meta in metalist:
indices = meta['indices']
if indices is None:
mat[:] += 1
else:
indices.set_src_shape(shape)
mat[indices.flat()] += 1
if np.any(mat > 1):
matching_aliases = sorted(m['alias'] for m in metalist if m['alias'])
raise RuntimeError(f"{self.msginfo}: Indices for aliases {matching_aliases} are "
f"overlapping constraint/objective '{src}'.")
return responses
def _get_var_offsets(self):
"""
Compute global offsets for variables.
Returns
-------
dict
Arrays of global offsets keyed by vec_name and deriv direction.
"""
if self._var_offsets is None:
offsets = self._var_offsets = {}
for type_ in ['input', 'output']:
vsizes = self._var_sizes[type_]
if vsizes.size > 0:
csum = np.empty(vsizes.size, dtype=INT_DTYPE)
csum[0] = 0
csum[1:] = np.cumsum(vsizes)[:-1]
offsets[type_] = csum.reshape(vsizes.shape)
else:
offsets[type_] = np.zeros(0, dtype=INT_DTYPE).reshape((1, 0))
return self._var_offsets
def _get_jac_col_scatter(self):
"""
Return source and target indices for a scatter from output vector to total jacobian column.
If the transfer involves remote or distributed variables, the indices will be global.
Otherwise they will be converted to local.
This is only called on the top level system.
Returns
-------
ndarray
Source indices.
ndarray
Target indices.
int
Size of jacobian column.
bool
True if remote or distributed vars are present.
"""
myrank = self.comm.rank
nranks = self.comm.size
owns = self._owning_rank
abs2idx = self._var_allprocs_abs2idx
abs2meta = self._var_abs2meta['output']
sizes = self._var_sizes['output']
global_offsets = self._get_var_offsets()['output']
oflist = list(self._jac_of_iter())
tsize = oflist[-1][2]
toffset = myrank * tsize
has_dist_data = False
sinds = []
tinds = []
for name, tstart, tend, jinds, dist_sizes in oflist:
vind = abs2idx[name]
if dist_sizes is None:
if name in abs2meta:
owner = myrank
else:
owner = owns[name]
has_dist_data |= nranks > 1
voff = global_offsets[owner, vind]
if jinds is _full_slice:
vsize = sizes[owner, vind]
sinds.append(range(voff, voff + vsize))
else:
sinds.append(jinds + voff)
tinds.append(range(tstart + toffset, tend + toffset))
assert len(sinds[-1]) == len(tinds[-1])
else: # 'name' refers to a distributed variable
has_dist_data |= nranks > 1
dtstart = dtend = tstart
dsstart = dsend = 0
for rnk, sz in enumerate(dist_sizes):
dsend += sz
if sz > 0:
voff = global_offsets[rnk, vind]
if jinds is _full_slice:
dtend += sz
sinds.append(range(voff, voff + sz))
tinds.append(range(toffset + dtstart, toffset + dtend))
elif jinds.size > 0: # jinds is a flat array
subinds = jinds[jinds >= dsstart]
subinds = subinds[subinds < dsend]
if subinds.size > 0:
dtend += subinds.size
sinds.append(subinds + (voff - dsstart))
tinds.append(range(toffset + dtstart, toffset + dtend))
dtstart = dtend
dsstart = dsend
assert (len(sinds) == 0 and len(tinds) == 0) or len(sinds[-1]) == len(tinds[-1])
sarr = np.array(list(chain(*sinds)), dtype=INT_DTYPE)
tarr = np.array(list(chain(*tinds)), dtype=INT_DTYPE)
if nranks > 1:
# do an allreduce to see if any procs have distrib/remote vars
has_dist_data = bool(self.comm.allreduce(int(has_dist_data)))
if not has_dist_data:
# convert global indices back to local so we can use them to transfer between two
# local arrays
sysoffset = np.sum(sizes[:myrank, :])
sarr -= sysoffset
tarr -= toffset
return sarr, tarr, tsize, has_dist_data
def _final_setup(self):
"""
Perform final setup for this system and its descendant systems.
This part of setup is called automatically at the start of run_model or run_driver.
"""
self._setup_residuals()
if self._use_derivatives:
self._setup_partials()
self._setup_vectors(self._get_root_vectors())
self._fd_rev_xfer_correction_dist = {}
desvars = self.get_design_vars(get_sizes=False)
responses = self._check_alias_overlaps(self.get_responses(get_sizes=False))
self._dataflow_graph = self._get_dataflow_graph()
# figure out if we can remove any edges based on zero partials we find
# in components. By default all component connected outputs
# are also connected to all connected inputs from the same component.
self._missing_partials = {}
if not self._owns_approx_jac: # don't check for missing partials when doing FD
self._get_missing_partials(self._missing_partials)
if self._missing_partials:
self._update_dataflow_graph(responses)
self._problem_meta['relevance'] = get_relevance(self, responses, desvars)
# Transfers do not require recursion, but they have to be set up after the vector setup.
self._setup_transfers()
# Same situation with solvers, partials, and Jacobians.
# If we're updating, we just need to re-run setup on these, but no recursion necessary.
self._setup_solvers()
self._setup_solver_print()
if self._use_derivatives:
self._setup_jacobians()
self._setup_recording()
self.set_initial_values()
def _update_dataflow_graph(self, responses):
"""
Update the dataflow graph based on missing partials.
Parameters
----------
responses : dict
Dictionary of response metadata.
"""
resps = set(meta2src_iter(responses.values()))
missing_responses = set()
for pathname, missing in self._missing_partials.items():
inputs = [n for n, _ in self._dataflow_graph.in_edges(pathname)]
outputs = [n for _, n in self._dataflow_graph.out_edges(pathname)]
self._dataflow_graph.remove_node(pathname)
for output in outputs:
found = False
for inp in inputs:
if (output, inp) not in missing:
self._dataflow_graph.add_edge(inp, output)
found = True
if not found and output in resps:
missing_responses.add(output)
if missing_responses:
msg = (f"Constraints or objectives [{', '.join(sorted(missing_responses))}] cannot"
" be impacted by the design variables of the problem because no partials "
"were defined for them in their parent component(s).")
if self._problem_meta['singular_jac_behavior'] == 'error':
raise RuntimeError(msg)
else:
issue_warning(msg, category=DerivativesWarning)
[docs] def set_initial_values(self):
"""
Set all input and output variables to their declared initial values.
"""
for abs_name, meta in self._var_abs2meta['input'].items():
self._inputs.set_var(abs_name, meta['val'])
for abs_name, meta in self._var_abs2meta['output'].items():
self._outputs.set_var(abs_name, meta['val'])
def _get_root_vectors(self):
"""
Get the root vectors for the nonlinear and linear vectors for the model.
Returns
-------
dict of dict of Vector
Root vectors: first key is 'input', 'output', or 'residual'; second key is vec_name.
"""
# save root vecs as an attribute so that we can reuse the nonlinear scaling vecs in the
# linear root vec
self._root_vecs = root_vectors = {'input': {}, 'output': {}, 'residual': {}}
force_alloc_complex = self._problem_meta['force_alloc_complex']
# Check for complex step to set vectors up appropriately.
# If any subsystem needs complex step, then we need to allocate it everywhere.
nl_alloc_complex = force_alloc_complex
if not nl_alloc_complex:
for sub in self.system_iter(include_self=True, recurse=True):
nl_alloc_complex |= 'cs' in sub._approx_schemes
if nl_alloc_complex:
break
# Linear vectors allocated complex only if subsolvers require derivatives.
if nl_alloc_complex and self._use_derivatives:
from openmdao.error_checking.check_config import check_allocate_complex_ln
ln_alloc_complex = check_allocate_complex_ln(self, force_alloc_complex)
else:
ln_alloc_complex = False
if self._has_input_scaling or self._has_output_scaling or self._has_resid_scaling:
self._scale_factors = self._compute_root_scale_factors()
else:
self._scale_factors = None
if self._vector_class is None:
self._vector_class = self._local_vector_class
vectypes = ('nonlinear', 'linear') if self._use_derivatives else ('nonlinear',)
# If any proc's local systems need a complex vector, then all procs need it.
if self.comm.size > 1:
all_nl_alloc_complex = self.comm.allgather(nl_alloc_complex)
if np.any(all_nl_alloc_complex):
nl_alloc_complex = True
all_ln_alloc_complex = self.comm.allgather(ln_alloc_complex)
if np.any(all_ln_alloc_complex):
ln_alloc_complex = True
for vec_name in vectypes:
if vec_name == 'nonlinear':
alloc_complex = nl_alloc_complex
else:
alloc_complex = ln_alloc_complex
for key in ['input', 'output', 'residual']:
root_vectors[key][vec_name] = self._vector_class(vec_name, key, self,
alloc_complex=alloc_complex)
if self._use_derivatives:
root_vectors['input']['linear']._scaling_nl_vec = \
root_vectors['input']['nonlinear']._scaling
return root_vectors
def _get_all_promotes(self):
"""
Create the top level mapping of all promoted names to absolute names for all local systems.
This includes all buried promoted names.
Returns
-------
dict
Mapping of all promoted names to absolute names.
"""
iotypes = ('input', 'output')
if self.comm.size > 1:
prom2abs = {'input': defaultdict(set), 'output': defaultdict(set)}
rem_prom2abs = {'input': defaultdict(set), 'output': defaultdict(set)}
myrank = self.comm.rank
vars_to_gather = self._vars_to_gather
for s in self.system_iter(recurse=True, include_self=True):
prefix = s.pathname + '.' if s.pathname else ''
for typ in iotypes:
# use abs2prom to determine locality since prom2abs is for allprocs
sys_abs2prom = s._var_abs2prom[typ]
t_remprom2abs = rem_prom2abs[typ]
t_prom2abs = prom2abs[typ]
for prom, alist in s._var_allprocs_prom2abs_list[typ].items():
abs_names = [n for n in alist if n in sys_abs2prom]
t_prom2abs[prefix + prom].update(abs_names)
t_remprom2abs[prefix + prom].update(n for n in abs_names
if n in vars_to_gather
and vars_to_gather[n] == myrank)
all_proms = self.comm.gather(rem_prom2abs, root=0)
if myrank == 0:
for typ in iotypes:
t_prom2abs = prom2abs[typ]
for rankproms in all_proms:
for prom, absnames in rankproms[typ].items():
t_prom2abs[prom].update(absnames)
for prom, absnames in t_prom2abs.items():
t_prom2abs[prom] = sorted(absnames) # sort to keep order same on all procs
self.comm.bcast(prom2abs, root=0)
else:
prom2abs = self.comm.bcast(None, root=0)
else: # serial
prom2abs = {'input': defaultdict(list), 'output': defaultdict(list)}
for s in self.system_iter(recurse=True, include_self=True):
prefix = s.pathname + '.' if s.pathname else ''
for typ in iotypes:
t_prom2abs = prom2abs[typ]
for prom, abslist in s._var_allprocs_prom2abs_list[typ].items():
t_prom2abs[prefix + prom] = abslist
return prom2abs
def _top_level_post_connections(self):
# this is called on the top level group after all connections are known
self._problem_meta['vars_to_gather'] = self._vars_to_gather
self._resolve_group_input_defaults()
self._setup_auto_ivcs()
self._problem_meta['prom2abs'] = self._get_all_promotes()
self._check_prom_masking()
self._check_order()
def _check_prom_masking(self):
"""
Raise exception if any promoted variable name masks an absolute variable name.
Only called on the top level group.
"""
prom2abs_in = self._var_allprocs_prom2abs_list['input']
prom2abs_out = self._var_allprocs_prom2abs_list['output']
abs2meta = self._var_allprocs_abs2meta
for absname in chain(abs2meta['input'], abs2meta['output']):
if absname in prom2abs_in:
for name in prom2abs_in[absname]:
if name != absname:
raise RuntimeError(f"{self.msginfo}: Absolute variable name '{absname}'"
" is masked by a matching promoted name. Try"
" promoting to a different name. This can be caused"
" by promoting '*' at group level or promoting using"
" dotted names.")
elif absname in prom2abs_out:
if absname != prom2abs_out[absname][0]:
raise RuntimeError(f"{self.msginfo}: Absolute variable name '{absname}' is"
" masked by a matching promoted name. Try"
" promoting to a different name. This can be caused"
" by promoting '*' at group level or promoting using"
" dotted names.")
def _check_order(self, reorder=True, recurse=True, out_of_order=None):
"""
Check if auto ordering is needed, optionally reordering subsystems if appropriate.
Parameters
----------
reorder : bool
If True, reorder the subsystems based on the computed order. Otherwise
just return the out-of-order connections.
recurse : bool
If True, call this method on all subgroups.
out_of_order : dict or None
Lists of out-of-order connections keyed by group pathname. Out of order connections
are keyed by target system name and have values that are lists of source system names.
If incoming value of out_of_order is None, then a new dict is created and returned.
Returns
-------
dict
Lists of out-of-order connections keyed by group pathname.
"""
if out_of_order is None:
out_of_order = {}
if self.options['auto_order'] or not reorder:
G = self.compute_sys_graph()
orders = {name: i for i, name in enumerate(self._subsystems_allprocs)}
strongcomps, new_out_of_order = get_out_of_order_nodes(G, orders)
if new_out_of_order:
# group targets with all of their sources
tgts = {}
for u, v in new_out_of_order:
if v not in tgts:
tgts[v] = []
tgts[v].append(u)
for t in tgts:
tgts[t] = sorted(tgts[t])
out_of_order[self.pathname] = tgts
if reorder:
self._set_auto_order(strongcomps, orders)
if recurse:
for s in self._subgroups_myproc:
s._check_order(reorder, recurse, out_of_order)
return out_of_order
def _set_auto_order(self, strongcomps, orders):
"""
Set the order of the subsystems based on the dependency graph.
Parameters
----------
strongcomps : list of list of str
List of sets of subsystem names. Each list contains subsystems that are strongly
connected. Sets containing 2 or more subsystems indicate a cycle.
orders : dict
Dictionary mapping subsystem names to their index in the current ordering.
"""
new_order = []
for strongcomp in strongcomps:
if len(strongcomp) > 1:
# never change the internal order in a cycle
order_list = [(name, orders[name]) for name in strongcomp]
new_order.extend([name for name, _ in sorted(order_list, key=lambda x: x[1])])
else:
for s in strongcomp:
new_order.append(s)
if self._problem_meta['allow_post_setup_reorder']:
self.set_order(new_order)
else:
issue_warning(f"{self.msginfo}: A new execution order {new_order} is recommended, but "
"auto ordering has been disabled because the Problem option "
"'allow_post_setup_reorder' is False. It is recommended to either set "
"`allow_post_setup_reorder` to True or to manually set the execution "
"order to the recommended order using `set_order`.")
def _check_nondist_sizes(self):
# verify that nondistributed variables have same size across all procs
abs2idx = self._var_allprocs_abs2idx
for io in ('input', 'output'):
sizes = self._var_sizes[io]
for abs_name, meta in self._var_allprocs_abs2meta[io].items():
if not meta['distributed']:
vsizes = sizes[:, abs2idx[abs_name]]
unique = set(vsizes)
unique.discard(0)
if len(unique) > 1:
# sizes differ, now find which procs don't agree
rnklist = []
for sz in unique:
rnklist.append((sz, [i for i, s in enumerate(vsizes) if s == sz]))
msg = ', '.join([f"rank(s) {r} have size {s}" for s, r in rnklist])
self._collect_error(f"{self.msginfo}: Size of {io} '{abs_name}' "
f"differs between processes ({msg}).",
ident=('size', abs_name))
def _top_level_post_sizes(self):
# this runs after the variable sizes are known
self._check_nondist_sizes()
self._setup_global_shapes()
self._resolve_ambiguous_input_meta()
all_abs2meta_out = self._var_allprocs_abs2meta['output']
conns = self._conn_global_abs_in2out
self._resolve_src_indices()
if self.comm.size > 1 and not self._bad_conn_vars:
abs2idx = self._var_allprocs_abs2idx
all_abs2meta = self._var_allprocs_abs2meta
all_abs2meta_in = all_abs2meta['input']
# the code below is to handle the case where src_indices were not specified
# for a distributed input or an input connected to a distributed auto_ivc
# output. This update can't happen until sizes are known.
dist_ins = (n for n, m in all_abs2meta_in.items() if m['distributed'] or
(conns[n].startswith('_auto_ivc.') and
all_abs2meta_out[conns[n]]['distributed']))
dcomp_names = set(d.rpartition('.')[0] for d in dist_ins)
if dcomp_names:
added_src_inds = []
for comp in self.system_iter(recurse=True, typ=Component):
if comp.pathname in dcomp_names:
added_src_inds.extend(
comp._update_dist_src_indices(conns, all_abs2meta, abs2idx,
self._var_sizes))
updated = set()
for alist in self.comm.allgather(added_src_inds):
updated.update(alist)
for a in updated:
all_abs2meta_in[a]['has_src_indices'] = True
abs2meta_in = self._var_abs2meta['input']
allprocs_abs2meta_in = self._var_allprocs_abs2meta['input']
allprocs_abs2meta_out = self._var_allprocs_abs2meta['output']
if self.comm.size > 1:
for abs_in, abs_out in sorted(conns.items()):
if abs_out not in allprocs_abs2meta_out:
continue # discrete var
in_dist = allprocs_abs2meta_in[abs_in]['distributed']
out_dist = allprocs_abs2meta_out[abs_out]['distributed']
# check that src_indices match for dist->serial connection
# FIXME: this transfers src_indices from all ranks to the owning rank so we could
# run into memory issues if src_indices are large. Maybe try something like
# computing a hash in each rank and comparing those?
if out_dist and not in_dist:
# all non-distributed inputs must have src_indices if they connect to a
# distributed output.
owner = self._owning_rank[abs_in]
if abs_in in abs2meta_in: # input is local
src_inds = abs2meta_in[abs_in]['src_indices']
if src_inds is not None:
shaped = src_inds.shaped_instance()
if shaped is None:
self._collect_error(f"For connection from '{abs_out}' to '{abs_in}'"
f", src_indices {src_inds} have no source "
"shape.", ident=(abs_out, abs_in))
continue
else:
src_inds = shaped
else:
src_inds = None
if self.comm.rank == owner:
baseline = None
err = 0
for sinds in self.comm.gather(src_inds, root=owner):
if sinds is not None:
if baseline is None:
baseline = sinds.as_array()
else:
if not np.all(sinds.as_array() == baseline):
err = 1
break
if baseline is None: # no src_indices were set
err = -1
self.comm.bcast(err, root=owner)
else:
self.comm.gather(src_inds, root=owner)
err = self.comm.bcast(None, root=owner)
if err == 1:
self._collect_error(f"{self.msginfo}: Can't connect distributed output "
f"'{abs_out}' to non-distributed input '{abs_in}' "
"because src_indices differ on different ranks.",
ident=(abs_out, abs_in))
elif err == -1:
self._collect_error(f"{self.msginfo}: Can't connect distributed output "
f"'{abs_out}' to non-distributed input '{abs_in}' "
"without specifying src_indices.",
ident=(abs_out, abs_in))
@collect_errors
def _resolve_src_indices(self):
"""
Populate the promotes info list for each absolute input.
This is called only at the top level of the system tree.
"""
all_abs2meta_out = self._var_allprocs_abs2meta['output']
all_abs2meta_in = self._var_allprocs_abs2meta['input']
conns = self._conn_global_abs_in2out
for tgt, plist in self._problem_meta['abs_in2prom_info'].items():
src = conns[tgt]
smeta = all_abs2meta_out[src]
tmeta = all_abs2meta_in[tgt]
if not smeta['distributed'] and tmeta['distributed']:
root_shape = self._get_full_dist_shape(src, smeta['shape'])
else:
root_shape = smeta['global_shape']
# plist is a list of (pinfo, shape, use_tgt) tuples, one for each level in the
# system tree corresponding to an absolute input name, e.g., a plist for the
# input 'abc.def.ghi.x' would look like [tup0, tup1, tup2, tup3] corresponding to
# the ['', 'abc', 'abc.def', 'abc.def.ghi'] levels in the tree.
# After this routine runs, all pinfo entries will have src_indices wrt the root
# shape.
# use a _PromotesInfo for the top level even though there really isn't a promote there
current_pinfo = _PromotesInfo(src_shape=root_shape,
prom=self._var_allprocs_abs2prom['input'][tgt])
if plist[0] is None: # no top level pinfo
plist[0] = current_pinfo
for i, pinfo in enumerate(plist):
if pinfo is None:
pass
elif current_pinfo.src_indices is None:
try:
if pinfo.src_shape is None:
pinfo.set_src_shape(root_shape)
elif pinfo.src_indices is not None and \
not array_connection_compatible(root_shape, pinfo.src_shape):
self._collect_error(f"When connecting '{src}' to "
f"'{pinfo.prom_path()}': Promoted src_shape of "
f"{pinfo.src_shape} for "
f"'{pinfo.prom_path()}' differs from src_shape "
f"{root_shape} for '{current_pinfo.prom_path()}'.",
ident=(src, tgt))
except Exception:
type_exc, exc, tb = sys.exc_info()
self._collect_error(f"When connecting '{src}' to "
f"'{pinfo.prom_path()}': {exc}",
exc_type=type_exc, tback=tb, ident=(src, tgt))
current_pinfo = pinfo
continue
elif pinfo.src_indices is None:
pinfo.src_indices = current_pinfo.src_indices
if pinfo.src_shape is None:
pinfo.set_src_shape(current_pinfo.src_shape)
current_pinfo = pinfo
else: # both have src_indices
try:
if pinfo.src_shape is None:
pinfo.set_src_shape(current_pinfo.src_indices.indexed_src_shape)
sinds = convert_src_inds(current_pinfo.src_indices, current_pinfo.src_shape,
pinfo.src_indices, pinfo.src_shape)
except Exception:
type_exc, exc, tb = sys.exc_info()
self._collect_error(f"When connecting '{conns[tgt]}' to "
f"'{pinfo.prom_path()}': input "
f"'{current_pinfo.prom_path()}' src_indices are "
f"{current_pinfo.src_indices} and indexing into those "
f"failed using src_indices {pinfo.src_indices} from "
f"input '{pinfo.prom_path()}'. Error was: "
f"{exc}", exc_type=type_exc, tback=tb,
ident=(conns[tgt], tgt))
continue
# final src_indices are wrt original full sized source and are flat,
# so use val.shape and flat_src=True
# It would be nice if we didn't have to convert these and could just keep
# them in their original form and stack them to get the final result. We can
# do this when doing a get_val, but it doesn't work when doing a set_val.
src_indices = indexer(sinds, src_shape=root_shape, flat_src=True)
current_pinfo = _PromotesInfo(src_indices=src_indices, src_shape=root_shape,
flat=True, promoted_from=pinfo.promoted_from,
prom=pinfo.prom)
plist[i] = current_pinfo
with multi_proc_exception_check(self.comm):
self._resolve_src_inds()
def _resolve_src_inds(self):
abs2prom = self._var_abs2prom['input']
tree_level = self.pathname.count('.') + 1 if self.pathname else 0
abs_in2prom_info = self._problem_meta['abs_in2prom_info']
seen = set()
for tgt in self._var_abs2meta['input']:
if tgt in abs_in2prom_info:
prom = abs2prom[tgt]
if prom in seen:
continue
seen.add(prom)
plist = abs_in2prom_info[tgt]
pinfo = plist[tree_level]
if pinfo is not None:
inds, flat, shape = pinfo
if inds is not None:
self._var_prom2inds[prom] = [shape, inds, flat]
for s in self._subsystems_myproc:
s._resolve_src_inds()
def _setup_var_data(self):
"""
Compute the list of abs var names, abs/prom name maps, and metadata dictionaries.
"""
if self._var_allprocs_prom2abs_list is None:
old_prom2abs = {}
else:
old_prom2abs = self._var_allprocs_prom2abs_list['input']
super()._setup_var_data()
var_discrete = self._var_discrete
allprocs_discrete = self._var_allprocs_discrete
abs2meta = self._var_abs2meta
abs2prom = self._var_abs2prom
allprocs_abs2meta = {'input': {}, 'output': {}}
self._ivcs = {}
allprocs_prom2abs_list = self._var_allprocs_prom2abs_list
for n, lst in self._group_inputs.items():
lst[0]['path'] = self.pathname # used for error reporting
self._group_inputs[n] = lst.copy() # must copy the list manually
self._has_distrib_vars = False
self._has_fd_group = self._owns_approx_jac
abs_in2prom_info = self._problem_meta['abs_in2prom_info']
# sort the subsystems alphabetically in order to make the ordering
# of vars in vectors and other data structures independent of the
# execution order.
for subsys in self._sorted_sys_iter():
self._has_output_scaling |= subsys._has_output_scaling
self._has_output_adder |= subsys._has_output_adder
self._has_resid_scaling |= subsys._has_resid_scaling
self._has_distrib_vars |= subsys._has_distrib_vars
if len(subsys._subsystems_allprocs) > 0:
self._has_fd_group |= subsys._has_fd_group
var_maps = subsys._get_promotion_maps()
sub_prefix = subsys.name + '.'
for io in ['input', 'output']:
abs2meta[io].update(subsys._var_abs2meta[io])
allprocs_abs2meta[io].update(subsys._var_allprocs_abs2meta[io])
subprom2prom = var_maps[io]
allprocs_discrete[io].update(subsys._var_allprocs_discrete[io])
var_discrete[io].update({sub_prefix + k: v for k, v in
subsys._var_discrete[io].items()})
sub_loc_proms = subsys._var_abs2prom[io]
for sub_prom, sub_abs in subsys._var_allprocs_prom2abs_list[io].items():
if sub_prom in subprom2prom:
prom_name, _, pinfo, _ = subprom2prom[sub_prom]
if pinfo is not None and io == 'input':
pinfo = pinfo.copy()
pinfo.promoted_from = subsys.pathname
pinfo.prom = sub_prom
tree_level = subsys.pathname.count('.') + 1
for abs_in in sub_abs:
if abs_in not in abs_in2prom_info:
# need a level for each system including '', so we still
# add 1 to abs_in.count('.') which includes the var name
abs_in2prom_info[abs_in] = [None] * (abs_in.count('.') + 1)
abs_in2prom_info[abs_in][tree_level] = pinfo
else:
prom_name = sub_prefix + sub_prom
if prom_name not in allprocs_prom2abs_list[io]:
allprocs_prom2abs_list[io][prom_name] = []
allprocs_prom2abs_list[io][prom_name].extend(sub_abs)
for abs_name in sub_abs:
if abs_name in sub_loc_proms:
abs2prom[io][abs_name] = prom_name
if isinstance(subsys, Group):
# propagate any subsystem 'set_input_defaults' info up to this Group
subprom2prom = var_maps['input']
for sub_prom, metalist in subsys._group_inputs.items():
if sub_prom in subprom2prom:
key = subprom2prom[sub_prom][0]
else:
key = sub_prefix + sub_prom
if key not in self._group_inputs:
self._group_inputs[key] = [{'path': self.pathname, 'prom': key,
'auto': True}]
self._group_inputs[key].extend(metalist)
# If running in parallel, allgather
if self.comm.size > 1 and self._mpi_proc_allocator.parallel:
if self._gather_full_data():
raw = (allprocs_discrete, allprocs_prom2abs_list, allprocs_abs2meta,
self._has_output_scaling, self._has_output_adder,
self._has_resid_scaling, self._group_inputs, self._has_distrib_vars,
self._has_fd_group)
else:
raw = (
{'input': {}, 'output': {}},
{'input': {}, 'output': {}},
{'input': {}, 'output': {}},
False,
False,
False,
{},
False,
False,
)
gathered = self.comm.allgather(raw)
# start with a fresh dict to keep order the same in all procs
old_abs2meta = allprocs_abs2meta
allprocs_abs2meta = {'input': {}, 'output': {}}
for io in ['input', 'output']:
allprocs_prom2abs_list[io] = {}
myrank = self.comm.rank
for rank, (proc_discrete, proc_prom2abs_list, proc_abs2meta,
oscale, oadd, rscale, ginputs, has_dist_vars,
has_fd_group) in enumerate(gathered):
self._has_output_scaling |= oscale
self._has_output_adder |= oadd
self._has_resid_scaling |= rscale
self._has_distrib_vars |= has_dist_vars
self._has_fd_group |= has_fd_group
if rank != myrank:
for p, mlist in ginputs.items():
if p not in self._group_inputs:
self._group_inputs[p] = []
self._group_inputs[p].extend(mlist)
for io in ['input', 'output']:
allprocs_abs2meta[io].update(proc_abs2meta[io])
allprocs_discrete[io].update(proc_discrete[io])
for prom_name, abs_names_list in proc_prom2abs_list[io].items():
if prom_name not in allprocs_prom2abs_list[io]:
allprocs_prom2abs_list[io][prom_name] = []
allprocs_prom2abs_list[io][prom_name].extend(abs_names_list)
for io in ('input', 'output'):
if allprocs_abs2meta[io]:
# update new allprocs_abs2meta with our local version (now that we have a
# consistent order for our dict), so that the 'size' metadata will
# accurately reflect this proc's var size instead of one from some other proc.
allprocs_abs2meta[io].update(old_abs2meta[io])
self._var_allprocs_abs2meta = allprocs_abs2meta
for prom_name, abs_list in allprocs_prom2abs_list['output'].items():
if len(abs_list) > 1:
self._collect_error("{}: Output name '{}' refers to "
"multiple outputs: {}.".format(self.msginfo, prom_name,
sorted(abs_list)))
for io in ('input', 'output'):
a2p = self._var_allprocs_abs2prom[io]
for prom, abslist in self._var_allprocs_prom2abs_list[io].items():
for abs_name in abslist:
a2p[abs_name] = prom
if self._group_inputs:
p2abs_in = self._var_allprocs_prom2abs_list['input']
extra = [gin for gin in self._group_inputs if gin not in p2abs_in]
if extra:
# make sure that we don't have a leftover group input default entry from a previous
# execution of _setup_var_data before promoted names were updated.
ex = set()
for e in extra:
if e in old_prom2abs:
del self._group_inputs[e] # clean up old key using old promoted name
else:
ex.add(e)
if ex:
self._collect_error(f"{self.msginfo}: The following group inputs, passed to "
f"set_input_defaults(), could not be found: {sorted(ex)}.")
if self._var_discrete['input'] or self._var_discrete['output']:
self._discrete_inputs = _DictValues(self._var_discrete['input'])
self._discrete_outputs = _DictValues(self._var_discrete['output'])
else:
self._discrete_inputs = self._discrete_outputs = {}
self._vars_to_gather = self._find_vars_to_gather()
# create mapping of indep var names to their metadata
self._ivcs = self.get_indep_vars(local=False)
def _resolve_group_input_defaults(self, show_warnings=False):
"""
Resolve any ambiguities in group input defaults throughout the model.
Only called at the model level.
Parameters
----------
show_warnings : bool
Bool to show or hide the auto_ivc warnings.
"""
skip = set(('path', 'use_tgt', 'prom', 'src_shape', 'src_indices', 'auto'))
prom2abs_in = self._var_allprocs_prom2abs_list['input']
abs_in2prom_info = self._problem_meta['abs_in2prom_info']
abs2meta_in = self._var_allprocs_abs2meta['input']
self._auto_ivc_warnings = []
for prom, metalist in self._group_inputs.items():
if prom not in prom2abs_in:
# this error was already collected in setup_var_data, so just continue here
continue
try:
paths = [(i, m['path']) for i, m in enumerate(metalist) if not m['auto']]
top_origin = paths[0][1]
top_prom = metalist[paths[0][0]]['prom']
except KeyError:
issue_warning("No auto IVCs found", prefix=self.msginfo, category=PromotionWarning)
allmeta = set()
for meta in metalist:
allmeta.update(meta)
fullmeta = {n: _UNDEFINED for n in allmeta - skip}
for key in sorted(fullmeta):
for submeta in metalist:
if submeta['auto']:
continue
if key in submeta:
if fullmeta[key] is _UNDEFINED:
origin = submeta['path']
origin_prom = submeta['prom']
val = fullmeta[key] = submeta[key]
if origin != top_origin:
msg = (f"Group '{top_origin}' did not set a default "
f"'{key}' for input '{top_prom}', so the value of "
f"({val}) from group '{origin}' will be used.")
if show_warnings:
issue_warning(msg, category=PromotionWarning)
else:
self._auto_ivc_warnings.append(msg)
else:
eq = submeta[key] == val
if isinstance(eq, np.ndarray):
eq = np.all(eq)
if not eq:
# first, see if origin is an ancestor
if not origin or submeta['path'].startswith(origin + '.'):
msg = (f"Groups '{origin}' and '{submeta['path']}' "
f"called set_input_defaults for the input "
f"'{origin_prom}' with conflicting '{key}'. "
f"The value ({val}) from '{origin}' will be "
"used.")
if show_warnings:
issue_warning(msg, category=PromotionWarning)
else:
self._auto_ivc_warnings.append(msg)
else: # origin is not an ancestor, so we have an ambiguity
if origin_prom != submeta['prom']:
prm = f"('{origin_prom}' / '{submeta['prom']}')"
else:
prm = f"'{origin_prom}'"
common = common_subpath((origin, submeta['path']))
if common:
sub = self._get_subsystem(common)
if sub is not None:
for a in prom2abs_in[prom]:
if a in sub._var_abs2prom['input']:
prom = sub._var_abs2prom['input'][a]
break
gname = f"Group named '{common}'" if common else 'model'
self._collect_error(f"{self.msginfo}: The subsystems {origin} "
f"and {submeta['path']} called "
f"set_input_defaults for promoted input "
f"{prm} with conflicting values for "
f"'{key}'. Call <group>.set_input_defaults("
f"'{prom}', {key}=?), where <group> is the "
f"{gname} to remove the ambiguity.")
# update all metadata dicts with any missing metadata that was filled in elsewhere
# and update src_shape and use_tgt in abs_in2prom_info
for meta in metalist:
tree_level = meta['path'].count('.') + 1 if meta['path'] else 0
prefix = meta['path'] + '.' if meta['path'] else ''
src_shape = None
if 'val' in meta:
abs_in = prom2abs_in[prom][0]
if abs_in in abs2meta_in: # it's a continuous variable
src_shape = np.asarray(meta['val']).shape
elif 'src_shape' in meta:
src_shape = meta['src_shape']
if src_shape is not None:
# Now update the global promotes info dict
for tgt in prom2abs_in[prom]:
if tgt in abs_in2prom_info and tgt.startswith(prefix):
pinfo = abs_in2prom_info[tgt][tree_level]
if pinfo is not None:
p2 = abs_in2prom_info[tgt][tree_level + 1]
if p2 is not None:
# src_shape from a set_input_defaults call actually
# must match the promoted src_shape from one level
# deeper in the tree.
if p2.src_shape is not None and p2.src_shape != src_shape:
self._collect_error(f"{self.msginfo}: src_shape {src_shape}"
f" set by set_input_defaults('{prom}', "
f"...) in group '{meta['path']}' "
"conflicts with src_shape of "
f"{pinfo.src_shape} for promoted input "
f"'{pinfo.prom_path()}")
p2.set_src_shape(src_shape)
else:
abs_in2prom_info[tgt][tree_level] = \
_PromotesInfo(src_shape=src_shape, prom=prom,
promoted_from=self.pathname)
else:
# check for discrete targets
for tgt in prom2abs_in[prom]:
if tgt in self._discrete_inputs:
for key, val in meta.items():
# for discretes we can only set the value (not units/src_shape)
if key == 'val':
self._discrete_inputs[tgt] = val
elif key in ('units', 'src_shape'):
self._collect_error(f"{self.msginfo}: Cannot set '{key}={val}'"
f" for discrete variable '{tgt}'.")
meta.update(fullmeta)
def _find_vars_to_gather(self):
"""
Return a mapping of var pathname to owning rank.
The mapping will contain ONLY systems that are remote on at least one proc.
Distributed systems are not included.
Returns
-------
dict
The mapping of variable pathname to owning rank.
"""
remote_vars = {}
if self.comm.size > 1:
myproc = self.comm.rank
nprocs = self.comm.size
for io in ('input', 'output'):
abs2prom = self._var_abs2prom[io]
abs2meta = self._var_allprocs_abs2meta[io]
# var order must be same on all procs
sorted_names = sorted(self._var_allprocs_abs2prom[io])
locality = np.zeros((nprocs, len(sorted_names)), dtype=bool)
for i, name in enumerate(sorted_names):
if name in abs2prom:
locality[myproc, i] = True
my_loc = locality[myproc, :].copy()
self.comm.Allgather(my_loc, locality)
for i, name in enumerate(sorted_names):
nzs = np.nonzero(locality[:, i])[0]
if name in abs2meta and abs2meta[name]['distributed']:
pass
elif 0 < nzs.size < nprocs:
remote_vars[name] = nzs[0]
return remote_vars
@collect_errors
def _setup_var_sizes(self):
"""
Compute the arrays of variable sizes for all variables/procs on this system.
"""
self._var_offsets = None
abs2idx = self._var_allprocs_abs2idx = {}
all_abs2meta = self._var_allprocs_abs2meta
self._var_sizes = {
'input': np.zeros((self.comm.size, len(all_abs2meta['input'])), dtype=INT_DTYPE),
'output': np.zeros((self.comm.size, len(all_abs2meta['output'])), dtype=INT_DTYPE),
}
for subsys in self._sorted_sys_iter():
subsys._setup_var_sizes()
iproc = self.comm.rank
for io, sizes in self._var_sizes.items():
abs2meta = self._var_abs2meta[io]
for i, name in enumerate(self._var_allprocs_abs2meta[io]):
abs2idx[name] = i
if name in abs2meta:
sz = abs2meta[name]['size']
sizes[iproc, i] = 0 if sz is None else sz
if self.comm.size > 1:
my_sizes = sizes[iproc, :].copy()
self.comm.Allgather(my_sizes, sizes)
if self.comm.size > 1:
if (self._has_distrib_vars or self._contains_parallel_group or
not np.all(self._var_sizes['output']) or
not np.all(self._var_sizes['input'])):
if self._distributed_vector_class is not None:
self._vector_class = self._distributed_vector_class
else:
raise RuntimeError("{}: Distributed vectors are required but no distributed "
"vector type has been set.".format(self.msginfo))
else:
self._vector_class = self._local_vector_class
self._compute_owning_ranks()
def _compute_owning_ranks(self):
abs2meta = self._var_allprocs_abs2meta
abs2discrete = self._var_allprocs_discrete
if self.comm.size > 1:
owns = self._owning_rank
self._owned_sizes = self._var_sizes['output'].copy()
abs2idx = self._var_allprocs_abs2idx
for io in ('input', 'output'):
sizes = self._var_sizes[io]
for name, meta in abs2meta[io].items():
dist = meta['distributed']
i = abs2idx[name]
for rank in range(self.comm.size):
if sizes[rank, i] > 0:
owns[name] = rank
if not dist and io == 'output':
self._owned_sizes[rank + 1:, i] = 0 # zero out all dups
break
if abs2discrete[io]:
prefix = self.pathname + '.' if self.pathname else ''
for rank, names in enumerate(self.comm.allgather(self._var_discrete[io])):
if prefix:
toadd = {prefix + n for n in names}.difference(owns)
else:
toadd = set(names).difference(owns)
for n in toadd:
owns[n] = rank
else:
self._owned_sizes = self._var_sizes['output']
def _owned_size(self, abs_name):
"""
Return the size of the variable on the owning rank.
Parameters
----------
abs_name : str
The absolute name of the variable.
Returns
-------
int
The size of the variable on this rank, 0 if this is not the owning rank.
"""
return self._owned_sizes[self.comm.rank, self._var_allprocs_abs2idx[abs_name]]
def _setup_global_connections(self, parent_conns=None):
"""
Compute dict of all connections between this system's inputs and outputs.
Parameters
----------
parent_conns : dict
Dictionary of connections passed down from parent group.
"""
global_abs_in2out = defaultdict(set)
allprocs_prom2abs_list_in = self._var_allprocs_prom2abs_list['input']
allprocs_prom2abs_list_out = self._var_allprocs_prom2abs_list['output']
allprocs_discrete_in = self._var_allprocs_discrete['input']
allprocs_discrete_out = self._var_allprocs_discrete['output']
abs_in2prom_info = self._problem_meta['abs_in2prom_info']
pathname = self.pathname
abs_in2out = {}
new_conns = {}
prefix = pathname + '.' if pathname else ''
path_len = len(prefix)
if parent_conns is not None:
for abs_in, abs_out in parent_conns.items():
if abs_in.startswith(prefix) and abs_out.startswith(prefix):
global_abs_in2out[abs_in].add(abs_out)
in_subsys, _, _ = abs_in[path_len:].partition('.')
out_subsys, _, _ = abs_out[path_len:].partition('.')
# if connection is contained in a subgroup, add to conns
# to pass down to subsystems.
if in_subsys == out_subsys:
if in_subsys not in new_conns:
new_conns[in_subsys] = {abs_in: abs_out}
else:
new_conns[in_subsys][abs_in] = abs_out
# Add implicit connections (only ones owned by this group)
for prom_name, out_list in allprocs_prom2abs_list_out.items():
if prom_name in allprocs_prom2abs_list_in: # names match ==> a connection
abs_out = out_list[0]
out_subsys, _, _ = abs_out[path_len:].partition('.')
for abs_in in allprocs_prom2abs_list_in[prom_name]:
in_subsys, _, _ = abs_in[path_len:].partition('.')
global_abs_in2out[abs_in].add(abs_out)
if out_subsys == in_subsys:
in_subsys, _, _ = abs_in[path_len:].partition('.')
out_subsys, _, _ = abs_out[path_len:].partition('.')
# if connection is contained in a subgroup, add to conns
# to pass down to subsystems.
if in_subsys == out_subsys:
if in_subsys not in new_conns:
new_conns[in_subsys] = {abs_in: abs_out}
else:
new_conns[in_subsys][abs_in] = abs_out
else: # this group will handle the transfer
abs_in2out[abs_in] = abs_out
src_ind_inputs = set()
abs2meta = self._var_abs2meta['input']
allprocs_abs2meta_in = self._var_allprocs_abs2meta['input']
self._bad_conn_vars = set()
# Add explicit connections (only ones declared by this group)
for prom_in, (prom_out, src_indices, flat) in self._manual_connections.items():
# throw an exception if either output or input doesn't exist
# (not traceable to a connect statement, so provide context)
if not (prom_out in allprocs_prom2abs_list_out or prom_out in allprocs_discrete_out):
if (prom_out in allprocs_prom2abs_list_in or prom_out in allprocs_discrete_in):
msg = f"{self.msginfo}: Attempted to connect from '{prom_out}' to " + \
f"'{prom_in}', but '{prom_out}' is an input. " + \
"All connections must be from an output to an input."
else:
guesses = get_close_matches(prom_out, list(allprocs_prom2abs_list_out.keys()) +
list(allprocs_discrete_out.keys()))
msg = f"{self.msginfo}: Attempted to connect from '{prom_out}' to " + \
f"'{prom_in}', but '{prom_out}' doesn't exist. Perhaps you meant " + \
f"to connect to one of the following outputs: {guesses}."
self._bad_conn_vars.update((prom_in, prom_out))
self._collect_error(msg)
continue
if not (prom_in in allprocs_prom2abs_list_in or prom_in in allprocs_discrete_in):
if (prom_in in allprocs_prom2abs_list_out or prom_in in allprocs_discrete_out):
msg = f"{self.msginfo}: Attempted to connect from '{prom_out}' to " + \
f"'{prom_in}', but '{prom_in}' is an output. " + \
"All connections must be from an output to an input."
else:
guesses = get_close_matches(prom_in, list(allprocs_prom2abs_list_in.keys()) +
list(allprocs_discrete_in.keys()))
msg = f"{self.msginfo}: Attempted to connect from '{prom_out}' to " + \
f"'{prom_in}', but '{prom_in}' doesn't exist. Perhaps you meant " + \
f"to connect to one of the following inputs: {guesses}."
self._bad_conn_vars.update((prom_in, prom_out))
self._collect_error(msg)
continue
# Throw an exception if output and input are in the same system
# (not traceable to a connect statement, so provide context)
# and check if src_indices is defined in both connect and add_input.
abs_out = allprocs_prom2abs_list_out[prom_out][0]
out_comp, _, _ = abs_out.rpartition('.')
out_subsys, _, _ = abs_out[path_len:].partition('.')
for abs_in in allprocs_prom2abs_list_in[prom_in]:
in_comp, _, _ = abs_in.rpartition('.')
if out_comp == in_comp:
self._collect_error(
f"{self.msginfo}: Output and input are in the same System for connection "
f"from '{prom_out}' to '{prom_in}'.")
self._bad_conn_vars.update((prom_in, prom_out))
continue
if src_indices is not None:
a2m = allprocs_abs2meta_in[abs_in]
if (a2m['shape_by_conn'] or a2m['compute_shape']):
self._collect_error(
f"{self.msginfo}: Setting of 'src_indices' along with 'shape_by_conn', "
f"'copy_shape', or 'compute_shape' for variable '{abs_in}' "
"is unsupported.")
self._bad_conn_vars.update((prom_in, prom_out))
continue
if abs_in in abs2meta:
if abs_in not in abs_in2prom_info:
abs_in2prom_info[abs_in] = [None] * (abs_in.count('.') + 1)
# place a _PromotesInfo at the top level to handle the src_indices
if abs_in2prom_info[abs_in][0] is None:
try:
abs_in2prom_info[abs_in][0] = _PromotesInfo(src_indices=src_indices,
flat=flat, prom=abs_in)
except Exception:
type_exc, exc, tb = sys.exc_info()
self._collect_error(
f"When connecting from '{prom_out}' to '{prom_in}': {exc}",
exc_type=type_exc, tback=tb, ident=(abs_out, abs_in))
self._bad_conn_vars.update((prom_in, prom_out))
continue
meta = abs2meta[abs_in]
meta['manual_connection'] = True
meta['src_indices'] = src_indices
meta['flat_src_indices'] = flat
src_ind_inputs.add(abs_in)
if abs_in in abs_in2out:
self._collect_error(
f"{self.msginfo}: Input '{abs_in}' cannot be connected to '{abs_out}' "
f"because it's already connected to '{abs_in2out[abs_in]}'.",
ident=(abs_out, abs_in))
self._bad_conn_vars.update((prom_in, prom_out))
continue
abs_in2out[abs_in] = abs_out
# if connection is contained in a subgroup, add to conns to pass down to subsystems.
if abs_in[path_len:].partition('.')[0] == out_subsys:
if out_subsys not in new_conns:
new_conns[out_subsys] = {abs_in: abs_out}
else:
new_conns[out_subsys][abs_in] = abs_out
# Compute global_abs_in2out by first adding this group's contributions,
# then adding contributions from systems above/below, then allgathering.
for tgt, src in abs_in2out.items():
global_abs_in2out[tgt].add(src)
for subgroup in self._subgroups_myproc:
if subgroup.name in new_conns:
subgroup._setup_global_connections(parent_conns=new_conns[subgroup.name])
else:
subgroup._setup_global_connections()
for tgt, src in subgroup._conn_global_abs_in2out.items():
global_abs_in2out[tgt].add(src)
dup_info = [(n, srcs) for n, srcs in global_abs_in2out.items() if len(srcs) > 1]
if dup_info:
dup = ["%s from %s" % (tgt, sorted(srcs)) for tgt, srcs in dup_info]
dupstr = ', '.join(dup)
self._collect_error(f"{self.msginfo}: The following inputs have multiple "
f"connections: {dupstr}.", ident=dupstr)
if self.comm.size > 1 and self._mpi_proc_allocator.parallel:
# If running in parallel, allgather
if self._gather_full_data():
raw = (global_abs_in2out, src_ind_inputs)
else:
raw = ({}, ())
gathered = self.comm.allgather(raw)
all_src_ind_ins = set()
for myproc_global_abs_in2out, src_ind_ins in gathered:
for tgt, srcs in myproc_global_abs_in2out.items():
global_abs_in2out[tgt].update(srcs)
all_src_ind_ins.update(src_ind_ins)
src_ind_inputs = all_src_ind_ins
for inp in src_ind_inputs:
allprocs_abs2meta_in[inp]['has_src_indices'] = True
self._conn_global_abs_in2out = {name: srcs.pop()
for name, srcs in global_abs_in2out.items()}
[docs] def get_indep_vars(self, local):
"""
Return a dict of independant variables contained in this group or any of its subgroups.
Parameters
----------
local : bool
If True, return only the variables local to the current process.
Returns
-------
dict
Dict of all independant variables in this group and their corresponding metadata.
"""
abs2meta = self._var_abs2meta['output'] if local else self._var_allprocs_abs2meta['output']
return {n: meta for n, meta in abs2meta.items() if 'openmdao:indep_var' in meta['tags']}
[docs] def get_boundary_vars(self, io, local):
"""
Return a set of inputs or outputs connected to variables outside of this Group.
Parameters
----------
io : str
Either 'input' or 'output'.
local : bool
If True, return only the variables local to the current process.
Returns
-------
set
Set of absolute inputs or outputs connected to variables outside of this Group.
"""
assert io in ('input', 'output'), \
f"io must be either 'input' or 'output', but '{io}' was given."
# _var_*_abs2prom contains both continuous and discrete variables
vnames = self._var_abs2prom[io] if local else self._var_allprocs_abs2prom[io]
return set(vnames).difference(self._conn_global_abs_in2out)
def _setup_jax(self, from_group=False):
if jax is None:
return
# recurse down the tree and setup
# jax anywhere below where it's active, then return.
for subsys in self._subsystems_myproc:
if isinstance(subsys, Group) or subsys.options['derivs_method'] == 'jax':
subsys._setup_jax(from_group)
def _setup_dynamic_shapes(self):
"""
Dynamically add shape/size metadata for variables.
This only happens if the user has set shape_by_conn, copy_shape, or compute_shape
for a variable.
"""
def get_group_input_shape(prom, gshapes):
"""
Get the shape of the given promoted group input.
Parameters
----------
prom : str
Promoted name of the group input.
gshapes : dict
Mapping of group input name to shape.
Returns
-------
tuple or None
If the shape of the variable is known, return the shape.
Otherwise, return None.
"""
if prom in gshapes:
return gshapes[prom]
if prom in self._group_inputs:
for d in self._group_inputs[prom]:
if 'src_shape' in d:
return d['src_shape']
elif 'val' in d:
return np.asarray(d['val']).shape
def compute_var_meta(graph, to_var, shapes, func):
"""
Compute shape info for the given variable using the given function.
Parameters
----------
graph : nx.DiGraph
Graph containing all variables with shape info.
to_var : str
Name of variable to compute shape info for.
shapes : dict
Mapping of variable name to shape.
func : function
Function to use to compute the shape.
Returns
-------
tuple or None
If the shape of the variable is known, return the shape.
Otherwise, return None.
"""
compname = to_var.rpartition('.')[0]
try:
from_shape = func(shapes)
except KeyError as err:
abs_name = f"{compname}.{err.args[0]}"
self._collect_error(f"{self.msginfo}: Can't compute shape of variable '{to_var}': "
f"variable '{abs_name}' doesn't exist.")
return
except Exception as err:
self._collect_error(f"{self.msginfo}: Error occurred while computing the shape "
f"of variable '{to_var}': {err}")
return
else:
graph.nodes[to_var]['shape'] = from_shape
return from_shape
def copy_var_meta(graph, from_var, to_var, distrib_sizes):
"""
Copy shape info from from_var's metadata to to_var's metadata in the graph.
Parameters
----------
graph : nx.DiGraph
Graph containing all variables with shape info.
from_var : str
Name of variable to copy shape info from.
to_var : str
Name of variable to copy shape info to.
distrib_sizes : dict
Mapping of distributed variable name to sizes in each rank.
Returns
-------
tuple or None
If the shape of the variable is known, return the shape.
Otherwise, return None.
"""
if to_var.startswith('#'):
return
nprocs = self.comm.size
from_meta = graph.nodes[from_var]
from_dist = nprocs > 1 and from_meta['distributed']
from_shape = from_meta['shape']
from_io = from_meta['io']
to_meta = graph.nodes[to_var]
to_dist = nprocs > 1 and to_meta['distributed']
to_io = to_meta['io']
# known dist output to/from non-distributed input. We don't allow this case because
# non-distributed variables must have the same value on all procs and the only way
# this is possible is if the src_indices on each proc are identical, but that's not
# possible if we assume 'always local' transfer (see POEM 46).
if from_dist and not to_dist:
if from_io == 'output':
self._collect_error(
f"{self.msginfo}: dynamic sizing of non-distributed {to_io} '{to_var}' "
f"from distributed {from_io} '{from_var}' is not supported.")
return
else: # serial_out <- dist_in
# all input rank sizes must be the same
if not np.all(distrib_sizes[from_var] == distrib_sizes[from_var][0]):
if from_io == 'output':
ident = (from_var, to_var)
else:
ident = (to_var, from_var)
self._collect_error(
f"{self.msginfo}: dynamic sizing of non-distributed {to_io} '{to_var}' "
f"from distributed {from_io} '{from_var}' is not supported because not "
f"all {from_var} ranks are the same size "
f"(sizes={distrib_sizes[from_var]}).", ident=ident)
return
to_meta['shape'] = from_shape
if from_var in distrib_sizes:
distrib_sizes[to_var] = distrib_sizes[from_var]
return from_shape
def get_unresolved_knowns(graph, nodes=None):
"""
Return all unresolved nodes with known shape.
Unresolved means that the node has known shape and at least one successor
with unknown shape.
Parameters
----------
graph : nx.DiGraph
Graph containing all variables with shape info.
nodes : list of str or None
List of nodes to check. If None, check all nodes in the graph.
Returns
-------
set of str
Set of nodes with known shape but at least one successor with unknown shape.
"""
gnodes = graph.nodes
if nodes is None:
nodes = graph.nodes()
unresolved = set()
for node in nodes:
if gnodes[node]['shape'] is not None: # node has known shape
for succ in graph.successors(node):
if gnodes[succ]['shape'] is None:
unresolved.add(node)
break
return unresolved
def get_actives(graph, knowns):
"""
Return all active single edges and active multi nodes.
Active edges are those that are connected on one end to a known shape variable
and on the other end to an unknown shape variable. Active nodes are those that
have unknown shape but are connected to a known shape variable.
Single edges correspond to 'shape_by_conn' and 'copy_shape' connections.
Multi nodes are variables that have 'compute_shape' set to True so they
connect to multiple nodes of the opposite io type in a component. For example
a 'compute_shape' output variable will connect to all inputs in the component and
each of those edges will be labeled as 'multi'. So a multi node is a node that
has 'multi' incoming edges.
Parameters
----------
graph : nx.DiGraph
Graph containing all variables with shape info.
knowns : list of str
List of nodes with known shape.
Returns
-------
active_single_edges : set of (str, str)
Set of active 'single' edges (for copy_shape and shape_by_conn).
active_multi_nodes : set of str
Set of active nodes with 'multi' edges (for compute_shape).
"""
active_single_edges = set()
active_multi_nodes = set()
for known in knowns:
for succ in graph.successors(known):
if nodes[succ]['shape'] is None:
if edges[known, succ]['multi']:
active_multi_nodes.add(succ)
else:
active_single_edges.add((known, succ))
return active_single_edges, active_multi_nodes
def is_unresolved(graph, node):
"""
Return True if the given node is unresolved.
Unresolved means that the node has at least one successor with unknown shape.
Parameters
----------
graph : nx.DiGraph
Graph containing all variables with shape info.
node : str
Node to check.
Returns
-------
bool
True if the node is unresolved.
"""
for s in graph.successors(node):
if graph.nodes[s]['shape'] is None:
return True
return False
def meta2node_data(meta):
"""
Return a dict containing select metadata for the given variable.
Parameters
----------
meta : dict
Metadata for the variable.
Returns
-------
dict
Dict containing select metadata for the variable.
"""
return {
'distributed': meta['distributed'],
'shape': meta['shape'],
'compute_shape': meta['compute_shape'],
'shape_by_conn': meta['shape_by_conn'],
'copy_shape': meta['copy_shape'],
}
all_abs2prom_in = self._var_allprocs_abs2prom['input']
nprocs = self.comm.size
conn = self._conn_global_abs_in2out
rev_conn = None
self._shapes_graph = graph = nx.DiGraph()
knowns = set()
dist_sz = {} # local distrib sizes
my_abs2meta_out = self._var_abs2meta['output']
my_abs2meta_in = self._var_abs2meta['input']
all_abs2meta_out = self._var_allprocs_abs2meta['output']
all_abs2meta_in = self._var_allprocs_abs2meta['input']
grp_shapes = {}
compute_shape_functs = {}
component_io = defaultdict(list)
# find all variables that have an unknown shape (across all procs) and connect them
# to other unknown and known shape variables to form a directed graph.
for io in ('input', 'output'):
for name, meta in self._var_allprocs_abs2meta[io].items():
compname = name.rpartition('.')[0]
component_io[compname, io].append(name)
if meta['shape_by_conn']:
graph.add_node(name, io=io, **meta2node_data(meta))
if name in conn: # it's a connected input
abs_from = conn[name]
if abs_from not in graph:
from_meta = all_abs2meta_out[abs_from]
graph.add_node(abs_from, io='output', **meta2node_data(from_meta))
graph.add_edge(abs_from, name, multi=False)
else:
if rev_conn is None:
rev_conn = get_rev_conns(self._conn_global_abs_in2out)
if name in rev_conn: # connected output
for inp in rev_conn[name]:
inmeta = all_abs2meta_in[inp]
graph.add_node(inp, io='input', **meta2node_data(inmeta))
graph.add_edge(inp, name, multi=False)
elif not meta['compute_shape'] and not meta['copy_shape']:
# check to see if we can get shape from _group_inputs
fail = True
if io == 'input':
prom = all_abs2prom_in[name]
grp_shape = get_group_input_shape(prom, grp_shapes)
if grp_shape is not None:
# use '#' to designate this as an entry that's not a variable
gnode = f"#{prom}"
graph.add_node(gnode, io='input', shape=grp_shape,
distributed=False, shape_by_conn=None,
compute_shape=None)
graph.add_edge(gnode, name, multi=False)
grp_shapes[prom] = grp_shape
fail = False
else: # see if there are any connected inputs with known shape
for n in self._var_allprocs_prom2abs_list['input'][prom]:
if n != name:
m = all_abs2meta_in[n]
if not (m['distributed'] or m['has_src_indices']
or m['shape_by_conn'] or m['compute_shape']
or m['copy_shape']):
fail = False
graph.add_node(n, io='input', known_count=0,
**meta2node_data(all_abs2meta_in[n]))
graph.add_edge(n, name, multi=False)
break
if fail and name not in self._bad_conn_vars:
self._collect_error(
f"{self.msginfo}: 'shape_by_conn' was set for "
f"unconnected variable '{name}'.")
if meta['copy_shape']:
# variable whose shape is being copied must be on the same component, and
# name stored in 'copy_shape' entry must be the relative name.
abs_from = name.rpartition('.')[0] + '.' + meta['copy_shape']
if abs_from in all_abs2meta_in or abs_from in all_abs2meta_out:
a2m = all_abs2meta_in if abs_from in all_abs2meta_in else all_abs2meta_out
if name not in graph:
graph.add_node(name, io=io, **meta2node_data(meta))
if abs_from not in graph:
from_io = 'input' if abs_from in all_abs2meta_in else 'output'
from_meta = a2m[abs_from]
graph.add_node(abs_from, io=from_io, **meta2node_data(from_meta))
graph.add_edge(abs_from, name, multi=False)
else:
self._collect_error(f"{self.msginfo}: Can't copy shape of variable "
f"'{abs_from}'. Variable doesn't exist or is not "
"continuous.")
elif meta['compute_shape']:
compute_shape_functs[name] = meta['compute_shape']
if name not in graph:
graph.add_node(name, shape=meta['shape'], io=io,
compute_shape=meta['compute_shape'],
distributed=meta['distributed'])
# store known distributed size info needed for computing shapes
if nprocs > 1:
my_abs2meta = my_abs2meta_in if name in my_abs2meta_in else my_abs2meta_out
if name in my_abs2meta:
sz = my_abs2meta[name]['size']
if sz is not None:
dist_sz[name] = sz
else:
dist_sz[name] = 0
# loop over any 'compute_shape' variables and add edges to the graph
for name in compute_shape_functs:
comp_name = name.rpartition('.')[0]
# get 'opposite' io variables to use as inputs to compute_shape function
io = 'input' if name in all_abs2meta_out else 'output'
for abs_name in component_io[comp_name, io]:
meta = self._var_allprocs_abs2meta[io][abs_name]
if abs_name not in graph:
graph.add_node(abs_name, io=io, **meta2node_data(meta))
graph.add_edge(abs_name, name, multi=True)
if graph.order() == 0:
# we don't have any shape_by_conn or copy_shape variables, so we're done
return
if nprocs > 1:
distrib_sizes = defaultdict(lambda: np.zeros(nprocs, dtype=INT_DTYPE))
for rank, dsz in enumerate(self.comm.allgather(dist_sz)):
for n, sz in dsz.items():
distrib_sizes[n][rank] = sz
else:
distrib_sizes = {}
knowns = {n for n, d in graph.nodes(data=True) if d['shape'] is not None}
all_knowns = knowns.copy()
nodes = graph.nodes
edges = graph.edges
# connected_components needs an undirected graph, so create a temporary one here
for comps in nx.connected_components(nx.Graph(graph)):
# treat all knowns initially as unresolved
unresolved_knowns = all_knowns.intersection(comps)
if not unresolved_knowns:
# no knowns in this component, so we fail.
continue
progress = 1
while progress:
progress = 0
unresolved_knowns = get_unresolved_knowns(graph, unresolved_knowns)
active_single_edges, active_multi_nodes = get_actives(graph, unresolved_knowns)
for k, u in active_single_edges:
shp = copy_var_meta(graph, k, u, distrib_sizes)
if shp is not None:
if is_unresolved(graph, u):
unresolved_knowns.add(u)
all_knowns.add(u)
progress += 1
for mnode in active_multi_nodes:
for k, _, data in graph.in_edges(mnode, data=True):
if nodes[k]['shape'] is None and data['multi']:
break
else:
# all 'compute_shape' preds are known so compute shape
shapes = {
n.rpartition('.')[-1]: nodes[n]['shape']
for n in graph.predecessors(mnode)
}
shp = compute_var_meta(graph, mnode, shapes, nodes[mnode]['compute_shape'])
if shp is not None:
if is_unresolved(graph, mnode):
unresolved_knowns.add(mnode)
all_knowns.add(mnode)
progress += 1
# now perform a consistency check on all computed/copied shapes
mismatches = set()
for u, v, data in graph.edges(data=True):
if not data['multi']:
ushape = nodes[u]['shape']
vshape = nodes[v]['shape']
if ushape != vshape and ushape is not None and vshape is not None:
udist = nodes[u]['distributed']
vdist = nodes[v]['distributed']
if not (udist ^ vdist):
mismatches.add(tuple(sorted((u, v))))
if mismatches:
for u, v in mismatches:
self._collect_error(f"{self.msginfo}: Shape mismatch, {nodes[u]['shape']} vs. "
f"{nodes[v]['shape']} for variables '{u}' and '{v}' during "
"dynamic shape determination.")
# update variable metadata based on graph shapes
for node, data in graph.nodes(data=True):
if node.startswith('#'):
continue
io = data['io']
allmeta = self._var_allprocs_abs2meta[io][node]
shape = data['shape']
size = shape_to_len(shape)
allmeta['shape'] = shape
allmeta['size'] = size
try:
meta = self._var_abs2meta[io][node]
except KeyError:
pass # node is not local, so no need to update local metadata
else:
meta['shape'] = shape
meta['size'] = size
# Passing None into shape arguments as an alias for () is deprecated (Numpy 1.20)
shape = shape if shape is not None else ()
meta['val'] = np.full(shape, meta['val'], dtype=float)
# save graph info for possible later plotting
self._shapes_graph = graph
unresolved = set(graph.nodes()) - all_knowns
if unresolved:
unresolved = sorted(unresolved)
self._collect_error(f"{self.msginfo}: Failed to resolve shapes for {unresolved}. "
"To see the dynamic shape dependency graph, "
"do 'openmdao view_dyn_shapes <your_py_file>'.")
@collect_errors
@check_mpi_exceptions
def _setup_connections(self):
"""
Compute dict of all connections owned by this Group.
Also, check shapes of connected variables.
"""
abs_in2out = self._conn_abs_in2out = {}
self._conn_discrete_in2out = {}
global_abs_in2out = self._conn_global_abs_in2out
pathname = self.pathname
allprocs_discrete_in = self._var_allprocs_discrete['input']
allprocs_discrete_out = self._var_allprocs_discrete['output']
for subsys in self._sorted_sys_iter():
subsys._setup_connections()
path_dot = pathname + '.' if pathname else ''
path_len = len(path_dot)
allprocs_abs2meta_in = self._var_allprocs_abs2meta['input']
allprocs_abs2meta_out = self._var_allprocs_abs2meta['output']
abs2meta_in = self._var_abs2meta['input']
abs2meta_out = self._var_abs2meta['output']
nproc = self.comm.size
# Check input/output units here, and set _has_input_scaling
# to True for this Group if units are defined and different, or if
# ref or ref0 are defined for the output.
for abs_in, abs_out in global_abs_in2out.items():
# Check that they are in different subsystems of this system.
out_subsys = abs_out[path_len:].partition('.')[0]
in_subsys = abs_in[path_len:].partition('.')[0]
if out_subsys != in_subsys:
if abs_in in allprocs_discrete_in:
self._conn_discrete_in2out[abs_in] = abs_out
elif abs_out in allprocs_discrete_out:
self._collect_error(
f"{self.msginfo}: Can't connect discrete output '{abs_out}' "
f"to continuous input '{abs_in}'.", ident=(abs_out, abs_in))
continue
else:
abs_in2out[abs_in] = abs_out
if nproc > 1 and self._vector_class is None:
# check for any cross-process data transfer. If found, use
# self._problem_meta['distributed_vector_class'] as our vector class.
if (abs_in not in abs2meta_in or abs_out not in abs2meta_out or
abs2meta_in[abs_in]['distributed'] or
abs2meta_out[abs_out]['distributed']):
self._vector_class = self._distributed_vector_class
# if connected output has scaling then we need input scaling
if not self._has_input_scaling and not (abs_in in allprocs_discrete_in or
abs_out in allprocs_discrete_out):
out_units = allprocs_abs2meta_out[abs_out]['units']
in_units = allprocs_abs2meta_in[abs_in]['units']
# if units are defined and different, or if a connected output has any scaling,
# we need input scaling.
self._has_input_scaling = self._has_output_scaling or self._has_resid_scaling or \
(in_units and out_units and in_units != out_units)
# check compatability for any discrete connections
for abs_in, abs_out in self._conn_discrete_in2out.items():
in_type = self._var_allprocs_discrete['input'][abs_in]['type']
try:
out_type = self._var_allprocs_discrete['output'][abs_out]['type']
except KeyError:
self._collect_error(
f"{self.msginfo}: Can't connect continuous output '{abs_out}' "
f"to discrete input '{abs_in}'.", ident=(abs_out, abs_in))
continue
if not issubclass(in_type, out_type):
self._collect_error(
f"{self.msginfo}: Type '{out_type.__name__}' of output '{abs_out}' is "
f"incompatible with type '{in_type.__name__}' of input '{abs_in}'.",
ident=(abs_out, abs_in))
# check unit/shape compatibility, but only for connections that are
# either owned by (implicit) or declared by (explicit) this Group.
# This way, we don't repeat the error checking in multiple groups.
for abs_in, abs_out in abs_in2out.items():
all_meta_out = allprocs_abs2meta_out[abs_out]
all_meta_in = allprocs_abs2meta_in[abs_in]
# check unit compatibility
out_units = all_meta_out['units']
in_units = all_meta_in['units']
if out_units:
if not in_units:
if not _is_unitless(out_units):
msg = f"Output '{abs_out}' with units of '{out_units}' " + \
f"is connected to input '{abs_in}' which has no units."
issue_warning(msg, prefix=self.msginfo, category=UnitsWarning)
elif not is_compatible(in_units, out_units):
self._collect_error(
f"{self.msginfo}: Output units of '{out_units}' for '{abs_out}' "
f"are incompatible with input units of '{in_units}' for '{abs_in}'.",
ident=(abs_out, abs_in))
continue
elif in_units is not None:
if not _is_unitless(in_units):
msg = f"Input '{abs_in}' with units of '{in_units}' is " + \
f"connected to output '{abs_out}' which has no units."
issue_warning(msg, prefix=self.msginfo, category=UnitsWarning)
# check shape compatibility
if abs_in in abs2meta_in:
meta_in = abs2meta_in[abs_in]
# get output shape from allprocs meta dict, since it may
# be distributed (we want global shape)
out_shape = all_meta_out['global_shape']
# get input shape and src_indices from the local meta dict
# (input is always local)
if meta_in['distributed']:
# if output is non-distributed and input is distributed, make output shape the
# full distributed shape, i.e., treat it in this regard as a distributed output
out_shape = self._get_full_dist_shape(abs_out, all_meta_out['shape'])
in_shape = meta_in['shape']
src_indices = meta_in['src_indices']
if src_indices is None and out_shape != in_shape:
# out_shape != in_shape is allowed if there's no ambiguity in storage order
if (in_shape is None or out_shape is None or
not array_connection_compatible(in_shape, out_shape)):
with np.printoptions(legacy='1.21'):
self._collect_error(
f"{self.msginfo}: The source and target shapes do not match or "
f"are ambiguous for the connection '{abs_out}' to '{abs_in}'. "
f"The source shape is {out_shape} "
f"but the target shape is {in_shape}.", ident=(abs_out, abs_in))
continue
elif src_indices is not None:
try:
shp = (out_shape if all_meta_out['distributed'] else
all_meta_out['global_shape'])
src_indices.set_src_shape(shp, dist_shape=out_shape)
src_indices = src_indices.shaped_instance()
except Exception:
type_exc, exc, tb = sys.exc_info()
s, src, tgt = get_connection_owner(self, abs_in)
abs_out = self._conn_global_abs_in2out[tgt]
self._collect_error(
f"{s.msginfo}: When connecting '{src}' to '{tgt}': {exc}",
exc_type=type_exc, tback=tb, ident=(abs_out, abs_in))
continue
if src_indices.indexed_src_size == 0:
continue
if src_indices.indexed_src_size != shape_to_len(in_shape):
# initial dimensions of indices shape must be same shape as target
for idx_d, inp_d in zip(src_indices.indexed_src_shape, in_shape):
if idx_d != inp_d:
self._collect_error(
f"{self.msginfo}: The source indices {meta_in['src_indices']} "
f"do not specify a valid shape for the connection '{abs_out}' "
f"to '{abs_in}'. The target shape is {in_shape} but indices "
f"are shape {src_indices.indexed_src_shape}.",
ident=(abs_out, abs_in))
break
else:
self._collect_error(
f"{self.msginfo}: src_indices shape {src_indices.indexed_src_shape}"
f" does not match {abs_in} shape {in_shape}.",
ident=(abs_out, abs_in))
continue
# any remaining dimension of indices must match shape of source
if not src_indices._flat_src and (len(src_indices.indexed_src_shape) >
len(out_shape)):
self._collect_error(
f"{self.msginfo}: The source indices {meta_in['src_indices']} do not "
f"specify a valid shape for the connection '{abs_out}' to '{abs_in}'. "
f"The source has {len(out_shape)} dimensions but the indices expect at "
f"least {len(src_indices.indexed_src_shape)}.",
ident=(abs_out, abs_in))
def _transfer(self, vec_name, mode, sub=None):
"""
Perform a vector transfer.
Parameters
----------
vec_name : str
Name of the vector RHS on which to perform a transfer.
mode : str
Either 'fwd' or 'rev'
sub : None or str
If None, perform a full transfer.
If str, perform a partial transfer to named subsystem for linear Gauss--Seidel.
"""
xfer = self._transfers[mode]
if sub in xfer:
xfer = xfer[sub]
else:
if mode == 'fwd' and self._conn_discrete_in2out and vec_name == 'nonlinear':
self._discrete_transfer(sub)
return
vec_inputs = self._vectors['input'][vec_name]
if mode == 'fwd':
if xfer is not None:
if self._has_input_scaling:
vec_inputs.scale_to_norm()
xfer._transfer(vec_inputs, self._vectors['output'][vec_name], mode)
vec_inputs.scale_to_phys()
else:
xfer._transfer(vec_inputs, self._vectors['output'][vec_name], mode)
if self._conn_discrete_in2out and vec_name == 'nonlinear':
self._discrete_transfer(sub)
else: # rev
if xfer is not None:
if self._has_input_scaling:
vec_inputs.scale_to_norm(mode='rev')
xfer._transfer(vec_inputs, self._vectors['output'][vec_name], mode)
if self._problem_meta['parallel_deriv_color'] is None:
key = (sub, '@nocolor')
if key in self._transfers['rev']:
xfer = self._transfers['rev'][key]
xfer._transfer(vec_inputs, self._vectors['output'][vec_name], mode)
if self._has_input_scaling:
vec_inputs.scale_to_phys(mode='rev')
def _discrete_transfer(self, sub):
"""
Transfer discrete variables between components. This only occurs in fwd mode.
Parameters
----------
sub : None or str
If None, perform a full transfer.
If not, perform a partial transfer for linear Gauss--Seidel.
"""
comm = self.comm
key = None if sub is None else self._subsystems_allprocs[sub].system.name
if comm.size == 1:
for src_sys_name, src, tgt_sys_name, tgt in self._discrete_transfers[key]:
tgt_sys = self._subsystems_allprocs[tgt_sys_name].system
src_sys = self._subsystems_allprocs[src_sys_name].system
# note that we are not copying the discrete value here, so if the
# discrete value is some mutable object, for example not an int or str,
# the downstream system will have a reference to the same object
# as the source, allowing the downstream system to modify the value as
# seen by the source system.
tgt_sys._discrete_inputs[tgt] = src_sys._discrete_outputs[src]
else: # MPI
allprocs_recv = self._allprocs_discrete_recv[key]
discrete_out = self._var_discrete['output']
if key in self._discrete_transfers:
xfers, remote_send = self._discrete_transfers[key]
if allprocs_recv:
sendvars = [(n, discrete_out[n]['val']) for n in remote_send]
allprocs_send = comm.gather(sendvars, root=0)
if comm.rank == 0:
allprocs_dict = {}
for i in range(comm.size):
allprocs_dict.update(allprocs_send[i])
recvs = [{} for i in range(comm.size)]
for rname, ranks in allprocs_recv.items():
val = allprocs_dict[rname]
for i in ranks:
recvs[i][rname] = val
data = comm.scatter(recvs, root=0)
else:
data = comm.scatter(None, root=0)
else:
data = None
for src_sys_name, src, tgt_sys_name, tgt in xfers:
tgt_sys, _ = self._subsystems_allprocs[tgt_sys_name]
if tgt_sys._is_local:
if tgt in tgt_sys._discrete_inputs:
abs_src = '.'.join((src_sys_name, src))
if data is not None and abs_src in data:
src_val = data[abs_src]
else:
src_sys, _ = self._subsystems_allprocs[src_sys_name]
src_val = src_sys._discrete_outputs[src]
tgt_sys._discrete_inputs[tgt] = src_val
def _setup_transfers(self):
"""
Compute all transfers that are owned by this system.
"""
for subsys in self._subgroups_myproc:
subsys._setup_transfers()
self._vector_class.TRANSFER._setup_transfers(self)
if self._conn_discrete_in2out:
self._vector_class.TRANSFER._setup_discrete_transfers(self)
[docs] def add_subsystem(self, name, subsys, promotes=None,
promotes_inputs=None, promotes_outputs=None,
min_procs=1, max_procs=None, proc_weight=1.0, proc_group=None):
"""
Add a subsystem.
Parameters
----------
name : str
Name of the subsystem being added.
subsys : <System>
An instantiated, but not-yet-set up system object.
promotes : iter of (str or tuple), optional
A list of variable names specifying which subsystem variables
to 'promote' up to this group. If an entry is a tuple of the
form (old_name, new_name), this will rename the variable in
the parent group.
promotes_inputs : iter of (str or tuple), optional
A list of input variable names specifying which subsystem input
variables to 'promote' up to this group. If an entry is a tuple of
the form (old_name, new_name), this will rename the variable in
the parent group.
promotes_outputs : iter of (str or tuple), optional
A list of output variable names specifying which subsystem output
variables to 'promote' up to this group. If an entry is a tuple of
the form (old_name, new_name), this will rename the variable in
the parent group.
min_procs : int
Minimum number of MPI processes usable by the subsystem. Defaults to 1.
max_procs : int or None
Maximum number of MPI processes usable by the subsystem. A value
of None (the default) indicates there is no maximum limit.
proc_weight : float
Weight given to the subsystem when allocating available MPI processes
to all subsystems. Default is 1.0.
proc_group : str or None
Name of a processor group such that any system with that processor group name
within the same parent group will be allocated on the same mpi process(es).
If this is not None, then any other systems sharing the same proc_group must
have identical values of min_procs, max_procs, and proc_weight or an exception
will be raised.
Returns
-------
<System>
The subsystem that was passed in. This is returned to
enable users to instantiate and add a subsystem at the
same time, and get the reference back.
"""
if self._setup_procs_finished:
raise RuntimeError(f"{self.msginfo}: Cannot call add_subsystem in "
"the configure method.")
if inspect.isclass(subsys):
raise TypeError(f"{self.msginfo}: Subsystem '{name}' should be an instance, but a "
f"{subsys.__name__} class object was found.")
if name in self._subsystems_allprocs or name in self._static_subsystems_allprocs:
raise RuntimeError(f"{self.msginfo}: Subsystem name '{name}' is already used.")
if hasattr(self, name) and not isinstance(getattr(self, name), System):
# replacing a subsystem is ok (e.g. resetup) but no other attribute
raise RuntimeError(f"{self.msginfo}: Can't add subsystem '{name}' because an attribute "
f"with that name already exits.")
if not isinstance(subsys, System):
raise TypeError(f"{self.msginfo}: Subsystem '{name}' should be a System instance, but "
f"an instance of type {type(subsys).__name__} was found.")
if subsys is self:
raise RuntimeError(f"{self.msginfo}: System '{name}' can't be added to itself.")
if proc_group is not None and not isinstance(proc_group, str):
raise TypeError(f"{self.msginfo}: proc_group must be a str or None, but is of type "
f"'{type(proc_group).__name__}'.")
match = namecheck_rgx.match(name)
if match is None or match.group() != name:
raise NameError(f"{self.msginfo}: '{name}' is not a valid sub-system name.")
subsys.name = subsys.pathname = name
if isinstance(promotes, str) or \
isinstance(promotes_inputs, str) or \
isinstance(promotes_outputs, str):
raise RuntimeError(f"{self.msginfo}: promotes must be an iterator of strings and/or "
"tuples.")
prominfo = None
# Note, the declared order in any of these promotes arguments shouldn't matter. However,
# the order does matter when using system.promotes during configure. There, you are
# permitted to promote '*' then promote_to an alias afterwards, but not in the reverse.
# To make this work, we sort the promotes lists for this subsystem to put the wild card
# entries at the beginning.
if promotes:
subsys._var_promotes['any'] = [(p, prominfo) for p in
sorted(promotes, key=lambda x: '*' not in x)]
if promotes_inputs:
subsys._var_promotes['input'] = [(p, prominfo) for p in
sorted(promotes_inputs, key=lambda x: '*' not in x)]
if promotes_outputs:
subsys._var_promotes['output'] = [(p, prominfo) for p in
sorted(promotes_outputs, key=lambda x: '*' not in x)]
if self._static_mode:
subsystems_allprocs = self._static_subsystems_allprocs
else:
subsystems_allprocs = self._subsystems_allprocs
subsystems_allprocs[subsys.name] = _SysInfo(subsys, len(subsystems_allprocs))
if not isinstance(min_procs, int) or min_procs < 1:
raise TypeError(f"{self.msginfo}: min_procs must be an int > 0 but ({min_procs}) was "
"given.")
if max_procs is not None and (not isinstance(max_procs, int) or max_procs < min_procs):
raise TypeError(f"{self.msginfo}: max_procs must be None or an int >= min_procs but "
f"({max_procs}) was given.")
if isinstance(proc_weight, Number) and proc_weight < 0:
raise TypeError(f"{self.msginfo}: proc_weight must be a float > 0. but ({proc_weight}) "
"was given.")
self._proc_info[name] = (min_procs, max_procs, proc_weight, proc_group)
setattr(self, name, subsys)
return subsys
[docs] def connect(self, src_name, tgt_name, src_indices=None, flat_src_indices=None):
"""
Connect source src_name to target tgt_name in this namespace.
Parameters
----------
src_name : str
Name of the source variable to connect.
tgt_name : str or [str, ... ] or (str, ...)
Name of the target variable(s) to connect.
src_indices : int or list of ints or tuple of ints or int ndarray or Iterable or None
The global indices of the source variable to transfer data from.
The shapes of the target and src_indices must match, and form of the
entries within is determined by the value of 'flat_src_indices'.
flat_src_indices : bool
If True, each entry of src_indices is assumed to be an index into the
flattened source. Otherwise it must be a tuple or list of size equal
to the number of dimensions of the source.
"""
# if src_indices argument is given, it should be valid
if isinstance(src_indices, str):
if isinstance(tgt_name, str):
tgt_name = [tgt_name]
tgt_name.append(src_indices)
self._collect_error(f"{self.msginfo}: src_indices must be a slice, int, or index array."
f" Did you mean connect('{src_name}', '{tgt_name}')?")
return
# if multiple targets are given, recursively connect to each
if not isinstance(tgt_name, str) and isinstance(tgt_name, Iterable):
for name in tgt_name:
self.connect(src_name, name, src_indices, flat_src_indices=flat_src_indices)
return
if src_indices is not None:
try:
src_indices = indexer(src_indices, flat_src=flat_src_indices)
except Exception:
type_exc, exc, tb = sys.exc_info()
self._collect_error(f"{self.msginfo}: When connecting from '{src_name}' to "
f"'{tgt_name}': {exc}", exc_type=type_exc, tback=tb)
return
# target should not already be connected
for manual_connections in [self._manual_connections, self._static_manual_connections]:
if tgt_name in manual_connections:
srcname = manual_connections[tgt_name][0]
self._collect_error(f"{self.msginfo}: Input '{tgt_name}' is already connected to "
f"'{srcname}'.")
return
# source and target should not be in the same system
if src_name.rsplit('.', 1)[0] == tgt_name.rsplit('.', 1)[0]:
self._collect_error(f"{self.msginfo}: Output and input are in the same System for "
f"connection from '{src_name}' to '{tgt_name}'.")
return
if self._static_mode:
manual_connections = self._static_manual_connections
else:
manual_connections = self._manual_connections
manual_connections[tgt_name] = (src_name, src_indices, flat_src_indices)
[docs] def set_order(self, new_order):
"""
Specify a new execution order for subsystems in this group.
Parameters
----------
new_order : list of str
List of system names in desired new execution order.
"""
if self._problem_meta is not None and not self._problem_meta['allow_post_setup_reorder'] \
and self._problem_meta['setup_status'] == _SetupStatus.POST_CONFIGURE:
raise RuntimeError(f"{self.msginfo}: Cannot call set_order in the configure method.")
# Make sure the new_order is valid. It must contain all subsystems
# in this model.
newset = set(new_order)
if self._static_mode:
olddict = self._static_subsystems_allprocs
else:
olddict = self._subsystems_allprocs
oldset = set(olddict)
if oldset != newset:
msg = []
missing = oldset - newset
if missing:
msg.append("%s: %s expected in subsystem order and not found." %
(self.msginfo, sorted(missing)))
extra = newset - oldset
if extra:
msg.append("%s: subsystem(s) %s found in subsystem order but don't exist." %
(self.msginfo, sorted(extra)))
raise ValueError('\n'.join(msg))
# Don't allow duplicates either.
if len(newset) < len(new_order):
dupes = [key for key, val in Counter(new_order).items() if val > 1]
raise ValueError("%s: Duplicate name(s) found in subsystem order list: %s" %
(self.msginfo, sorted(dupes)))
subsystems = {} # need a fresh one to keep the right order
if self._static_mode:
self._static_subsystems_allprocs = subsystems
else:
self._subsystems_allprocs = subsystems
for i, name in enumerate(new_order):
sinfo = olddict[name]
subsystems[name] = sinfo
sinfo.index = i
if not self._static_mode:
self._subsystems_myproc = [s for s, _ in self._subsystems_allprocs.values()]
self._order_set = True
if self._problem_meta is not None and not self._problem_meta['allow_post_setup_reorder']:
# order has been changed so we need a new full setup
self._problem_meta['setup_status'] = _SetupStatus.PRE_SETUP
def _get_subsystem(self, name):
"""
Return the system called 'name' in the current namespace.
Parameters
----------
name : str
name of the desired system in the current namespace.
Returns
-------
System or None
System if found else None.
"""
system = self
for subname in name.split('.'):
try:
system = system._subsystems_allprocs[subname].system
except KeyError:
try:
system = system._static_subsystems_allprocs[subname].system
except KeyError:
if name == '':
return self
return None
return system
[docs] def run_linearize(self, sub_do_ln=True, driver=None):
"""
Compute jacobian / factorization.
This calls _linearize, but with the model assumed to be in an unscaled state.
Parameters
----------
sub_do_ln : bool
Flag indicating if the children should call linearize on their linear solvers.
driver : Driver or None
If this system is the top level system and approx derivatives have not been
initialized, the driver for this model must be supplied in order to properly
initialize the approximations.
"""
if driver is not None and self.pathname == '' and self._owns_approx_jac:
self._tot_jac = _TotalJacInfo(driver._problem(), None, None, 'flat_dict', approx=True)
try:
super().run_linearize(sub_do_ln=sub_do_ln)
finally:
self._tot_jac = None
def _apply_nonlinear(self):
"""
Compute residuals. The model is assumed to be in a scaled state.
"""
self._transfer('nonlinear', 'fwd')
# Apply recursion
for subsys in self._relevance.filter(self._subsystems_myproc):
subsys._apply_nonlinear()
self.iter_count_apply += 1
def _solve_nonlinear(self):
"""
Compute outputs. The model is assumed to be in a scaled state.
"""
name = self.pathname if self.pathname else 'root'
with Recording(name + '._solve_nonlinear', self.iter_count, self):
with self._relevance.active(self._nonlinear_solver.use_relevance()):
self._nonlinear_solver._solve_with_cache_check()
# Iteration counter is incremented in the Recording context manager at exit.
def _guess_nonlinear(self):
"""
Provide initial guess for states.
"""
# let any lower level systems do their guessing first
if self._has_guess:
for sname, sinfo in self._subsystems_allprocs.items():
sub = sinfo.system
# TODO: could gather 'has_guess' information during setup and be able to
# skip transfer for subs that don't have guesses...
self._transfer('nonlinear', 'fwd', sname)
if sub._is_local and sub._has_guess:
sub._guess_nonlinear()
# call our own guess_nonlinear method, after the recursion is done to
# all the lower level systems and the data transfers have happened
complex_step = self._inputs._under_complex_step
if complex_step:
self._inputs.set_complex_step_mode(False)
self._residuals.set_complex_step_mode(False)
self._outputs.set_complex_step_mode(False)
try:
if self._discrete_inputs or self._discrete_outputs:
self.guess_nonlinear(self._inputs, self._outputs, self._residuals,
self._discrete_inputs, self._discrete_outputs)
else:
self.guess_nonlinear(self._inputs, self._outputs, self._residuals)
finally:
if complex_step:
self._inputs.set_complex_step_mode(True)
self._residuals.set_complex_step_mode(True)
self._outputs.set_complex_step_mode(True)
[docs] def guess_nonlinear(self, inputs, outputs, residuals,
discrete_inputs=None, discrete_outputs=None):
"""
Provide initial guess for states.
Override this method to set the initial guess for states.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
outputs : Vector
Unscaled, dimensional output variables read via outputs[key].
residuals : Vector
Unscaled, dimensional residuals written to via residuals[key].
discrete_inputs : dict or None
If not None, dict containing discrete input values.
discrete_outputs : dict or None
If not None, dict containing discrete output values.
"""
pass
def _iter_call_apply_linear(self):
"""
Return whether to call _apply_linear on this Group from within linear block GS/Jac.
Linear block solvers call _apply_linear then _solve_linear (fwd) or _solve_linear then
_apply_linear (rev) during an iteration. This will tell those solvers whether they
should call _apply_linear on this group when they're calling _apply_linear on their
subsystems. Note that _apply_linear will still be called from within a subsystem's
_solve_linear.
Returns
-------
bool
True if _apply_linear should be called from within a parent _apply_linear.
"""
return (self._owns_approx_jac and self._jacobian is not None) or \
self._assembled_jac is not None or not self._linear_solver.does_recursive_applies()
def _apply_linear(self, jac, mode, scope_out=None, scope_in=None):
"""
Compute jac-vec product. The model is assumed to be in a scaled state.
Parameters
----------
jac : Jacobian or None
If None, use local jacobian, else use assembled jacobian jac.
mode : str
'fwd' or 'rev'.
scope_out : set or None
Set of absolute output names in the scope of this mat-vec product.
If None, all are in the scope.
scope_in : set or None
Set of absolute input names in the scope of this mat-vec product.
If None, all are in the scope.
"""
if self._owns_approx_jac:
jac = self._jacobian
elif jac is None and self._assembled_jac is not None:
jac = self._assembled_jac
if jac is not None:
with self._matvec_context(scope_out, scope_in, mode) as vecs:
d_inputs, d_outputs, d_residuals = vecs
jac._apply(self, d_inputs, d_outputs, d_residuals, mode)
# _fd_rev_xfer_correction_dist is used to correct for the fact that we don't
# do reverse transfers internal to an FD group. Reverse transfers
# are constructed such that derivative values are correct when transferred into
# system doutput variables, taking into account distributed inputs.
# Since the transfers are not correcting for those issues, we need to do it here.
# If we have a distributed constraint/obj within the FD group and that con/obj is,
# active, we perform essentially an allreduce on the d_inputs vars that connect to
# outside systems so they'll include the contribution from all procs.
if self._fd_rev_xfer_correction_dist and mode == 'rev':
seed_vars = self._problem_meta['seed_vars']
if seed_vars is not None:
seed_vars = [n for n in seed_vars if n in self._fd_rev_xfer_correction_dist]
slices = self._dinputs.get_slice_dict()
inarr = self._dinputs.asarray()
data = {}
for seed_var in seed_vars:
for inp in self._fd_rev_xfer_correction_dist[seed_var]:
if inp not in data:
if inp in slices: # inp is a local input
arr = inarr[slices[inp]]
if np.any(arr):
data[inp] = arr
else:
data[inp] = None # don't send an array of zeros
else:
data[inp] = None # prevent possible MPI hangs
if data:
myrank = self.comm.rank
for rank, d in enumerate(self.comm.allgather(data)):
if rank != myrank:
for n, val in d.items():
if val is not None and n in slices:
inarr[slices[n]] += val
# Apply recursion
else:
if mode == 'fwd':
self._transfer('linear', mode)
for s in self._relevance.filter(self._subsystems_myproc, relevant=False):
# zero out dvecs of irrelevant subsystems
s._dresiduals.set_val(0.0)
for s in self._relevance.filter(self._subsystems_myproc, relevant=True):
s._apply_linear(jac, mode, scope_out, scope_in)
if mode == 'rev':
self._transfer('linear', mode)
for s in self._relevance.filter(self._subsystems_myproc, relevant=False):
# zero out dvecs of irrelevant subsystems
s._doutputs.set_val(0.0)
def _solve_linear(self, mode, scope_out=_UNDEFINED, scope_in=_UNDEFINED):
"""
Apply inverse jac product. The model is assumed to be in a scaled state.
Parameters
----------
mode : str
'fwd' or 'rev'.
scope_out : set, None, or _UNDEFINED
Outputs relevant to possible lower level calls to _apply_linear on Components.
scope_in : set, None, or _UNDEFINED
Inputs relevant to possible lower level calls to _apply_linear on Components.
"""
if self._owns_approx_jac:
# No subsolves if we are approximating our jacobian. Instead, we behave like an
# ExplicitComponent and pass on the values in the derivatives vectors.
d_outputs = self._doutputs
d_residuals = self._dresiduals
if mode == 'fwd':
if self._has_resid_scaling:
with self._unscaled_context(outputs=[d_outputs], residuals=[d_residuals]):
d_outputs.set_vec(d_residuals)
else:
d_outputs.set_vec(d_residuals)
# ExplicitComponent jacobian defined with -1 on diagonal.
d_outputs *= -1.0
else: # rev
if self._has_resid_scaling:
with self._unscaled_context(outputs=[d_outputs], residuals=[d_residuals]):
d_residuals.set_vec(d_outputs)
else:
d_residuals.set_vec(d_outputs)
# ExplicitComponent jacobian defined with -1 on diagonal.
d_residuals *= -1.0
else:
self._linear_solver._set_matvec_scope(scope_out, scope_in)
with self._relevance.active(self._linear_solver.use_relevance()):
self._linear_solver.solve(mode, None)
def _linearize(self, jac, sub_do_ln=True):
"""
Compute jacobian / factorization. The model is assumed to be in a scaled state.
Parameters
----------
jac : Jacobian or None
If None, use local jacobian, else use assembled jacobian jac.
sub_do_ln : bool
Flag indicating if the children should call linearize on their linear solvers.
"""
if (self._tot_jac is not None and self._owns_approx_jac and
not isinstance(self._jacobian, coloring_mod._ColSparsityJac)):
self._jacobian = self._tot_jac
elif self._jacobian is None:
self._jacobian = DictionaryJacobian(self)
self._check_first_linearize()
# Group finite difference
if self._owns_approx_jac:
jac = self._jacobian
if self.pathname == "":
for approximation in self._approx_schemes.values():
approximation.compute_approximations(self, jac=jac)
else:
# When an approximation exists in a submodel (instead of in root), the model is
# in a scaled state.
with self._unscaled_context(outputs=[self._outputs]):
for approximation in self._approx_schemes.values():
approximation.compute_approximations(self, jac=jac)
else:
if self._assembled_jac is not None:
jac = self._assembled_jac
relevance = self._relevance
with relevance.active(self._linear_solver.use_relevance()):
subs = list(relevance.filter(self._subsystems_myproc))
# Only linearize subsystems if we aren't approximating the derivs at this level.
for subsys in subs:
do_ln = sub_do_ln and (subsys._linear_solver is not None and
subsys._linear_solver._linearize_children())
subsys._linearize(jac, sub_do_ln=do_ln)
# Update jacobian
if self._assembled_jac is not None:
self._assembled_jac._update(self)
if sub_do_ln:
for subsys in subs:
if subsys._linear_solver is not None:
subsys._linear_solver._linearize()
def _check_first_linearize(self):
if self._first_call_to_linearize:
self._first_call_to_linearize = False # only do this once
coloring = self._get_coloring() if coloring_mod._use_partial_sparsity else None
if coloring is not None:
self._setup_approx_coloring()
# TODO: for top level FD, call below is unnecessary, but we need this
# for some tests that just call run_linearize directly without calling
# compute_totals.
elif self._approx_schemes:
self._setup_approx_derivs()
[docs] def approx_totals(self, method='fd', step=None, form=None, step_calc=None):
"""
Approximate derivatives for a Group using the specified approximation method.
Parameters
----------
method : str
The type of approximation that should be used. Valid options include:
'fd': Finite Difference, 'cs': Complex Step.
step : float
Step size for approximation. Defaults to None, in which case, the approximation
method provides its default value.
form : str
Form for finite difference, can be 'forward', 'backward', or 'central'. Defaults to
None, in which case, the approximation method provides its default value.
step_calc : str
Step type for computing the size of the finite difference step. It can be 'abs' for
absolute, 'rel_avg' for a size relative to the absolute value of the vector input, or
'rel_element' for a size relative to each value in the vector input. In addition, it
can be 'rel_legacy' for a size relative to the norm of the vector. For backwards
compatibilty, it can be 'rel', which is now equivalent to 'rel_avg'. Defaults to None,
in which case the approximation method provides its default value.
"""
self._has_approx = True
self._approx_schemes = {}
approx_scheme = self._get_approx_scheme(method)
default_opts = approx_scheme.DEFAULT_OPTIONS
kwargs = {}
for name, attr in (('step', step), ('form', form), ('step_calc', step_calc)):
if attr is not None:
if name in default_opts:
kwargs[name] = attr
else:
raise RuntimeError("%s: '%s' is not a valid option for '%s'" % (self.msginfo,
name, method))
self._owns_approx_jac = True
self._owns_approx_jac_meta = kwargs
def _setup_partials(self):
"""
Call setup_partials in components.
"""
self._subjacs_info = info = {}
for subsys in self._sorted_sys_iter():
subsys._setup_partials()
info.update(subsys._subjacs_info)
if self._has_distrib_vars and self._owns_approx_jac:
# We currently cannot approximate across a group with a distributed component if the
# inputs are distributed via src_indices.
for iname, meta in self._var_allprocs_abs2meta['input'].items():
if meta['has_src_indices'] and \
meta['distributed'] and \
iname not in self._conn_abs_in2out:
msg = "{}: Approx_totals is not supported on a group with a distributed "
msg += "component whose input '{}' is distributed using src_indices. "
raise RuntimeError(msg.format(self.msginfo, iname))
def _setup_residuals(self):
"""
Call setup_residuals in components.
"""
for subsys in self._sorted_sys_iter():
subsys._setup_residuals()
def _declared_partials_iter(self):
"""
Iterate over all declared partials.
Yields
------
key : tuple (of, wrt)
Subjacobian key.
"""
for subsys in self._subsystems_myproc:
yield from subsys._declared_partials_iter()
def _get_missing_partials(self, missing):
"""
Provide (of, wrt) tuples for which derivatives have not been declared in the system.
Parameters
----------
missing : dict
Dictionary containing list of missing derivatives keyed by system pathname.
"""
if self._has_approx:
return
for subsys in self._subsystems_myproc:
subsys._get_missing_partials(missing)
def _approx_subjac_keys_iter(self):
yield from self._subjac_keys_iter()
def _subjac_keys_iter(self):
# yields absolute keys (no aliases)
totals = self.pathname == ''
wrt = set()
ivc = set(self.get_indep_vars(local=False))
pro2abs = self._var_allprocs_prom2abs_list
if totals:
# When computing totals, weed out inputs connected to anything inside our system unless
# the source is an indepvarcomp.
if self._owns_approx_wrt:
wrt = {m['source'] for m in self._owns_approx_wrt.values()}
diff = wrt.difference(ivc)
if diff:
bad = [n for n, m in self._owns_approx_wrt.items() if m['source'] in diff]
raise RuntimeError("When computing total derivatives for the model, the "
"following wrt variables are not independent variables or "
"do not have an independant variable as a source: "
f"{sorted(bad)}")
wrt = wrt.intersection(ivc)
else:
wrt = ivc
else:
for abs_inps in pro2abs['input'].values():
if abs_inps[0] not in self._conn_abs_in2out:
# If connection is inside of this Group, perturbation of all implicitly
# connected inputs will be handled properly via internal transfers.
# Otherwise, we need to add all implicitly connected inputs separately.
wrt.update(abs_inps)
# get rid of any old stuff in here
self._owns_approx_of = self._owns_approx_wrt = None
if self._owns_approx_of: # can only be total at this point
of = set(m['source'] for m in self._owns_approx_of.values())
else:
of = set(self._var_allprocs_abs2meta['output'])
# Skip indepvarcomp res wrt other srcs
of -= ivc
if totals:
yield from product(of, wrt.union(of))
else:
for key in product(of, wrt.union(of)):
# Create approximations for the ones we need.
_of, _wrt = key
# Skip res wrt outputs
if _wrt in of and _wrt not in ivc:
# Support for specifying a desvar as an obj/con.
if _wrt not in wrt or _of == _wrt:
continue
yield key
def _jac_of_iter(self):
"""
Iterate over (name, start, end, idxs, dist_sizes) for each 'of' (row) var in the jacobian.
idxs will usually be the var slice into the full variable in the result array,
except in cases where _owns_approx__idx has a value for that variable, in which case it'll
be indices into the variable.
Yields
------
str
Absolute name of 'of' variable source.
int
Starting index.
int
Ending index.
slice or ndarray
A full slice or indices for the 'of' variable.
ndarray or None
Distributed sizes if var is distributed else None
"""
if self._owns_approx_of:
total = self.pathname == ''
abs2meta = self._var_allprocs_abs2meta['output']
abs2idx = self._var_allprocs_abs2idx
sizes = self._var_sizes['output']
szname = 'global_size' if total else 'size'
# we're computing totals/semi-totals (vars may not be local)
start = end = 0
for name, ofmeta in self._owns_approx_of.items():
if total:
src = ofmeta['source']
else:
src = name
if not total and src not in self._var_abs2meta['output']:
continue
meta = abs2meta[src]
if meta['distributed']:
dist_sizes = sizes[:, abs2idx[src]]
else:
dist_sizes = None
indices = ofmeta['indices']
if indices is not None: # of in approx_of_idx:
end += indices.indexed_src_size
yield src, start, end, indices.shaped_array().ravel(), dist_sizes
else:
end += abs2meta[src][szname]
yield src, start, end, _full_slice, dist_sizes
start = end
else:
yield from super()._jac_of_iter()
def _jac_wrt_iter(self, wrt_matches=None):
"""
Iterate over (name, offset, end, vec, idxs, dist_sizes) for each column var in the jacobian.
Parameters
----------
wrt_matches : set or None
Only include row vars that are contained in this set. This will determine what
the actual offsets are, i.e. the offsets will be into a reduced jacobian
containing only the matching columns.
Yields
------
str
Name of 'wrt' variable.
int
Starting index.
int
Ending index.
Vector
Either the _outputs or _inputs vector.
slice or ndarray
A full slice or indices for the 'wrt' variable.
ndarray or None
Distributed sizes if var is distributed else None
"""
total = self.pathname == ''
if self._owns_approx_wrt:
sizes = self._var_sizes
toidx = self._var_allprocs_abs2idx
abs2meta = self._var_allprocs_abs2meta
local_ins = self._var_abs2meta['input']
local_outs = self._var_abs2meta['output']
szname = 'global_size' if total else 'size'
seen = set()
start = end = 0
if self.pathname: # doing semitotals, so include output columns
for of, _start, _end, _, dist_sizes in self._jac_of_iter():
if wrt_matches is None or of in wrt_matches:
seen.add(of)
end += (_end - _start)
vec = self._outputs if of in local_outs else None
yield of, start, end, vec, _full_slice, dist_sizes
start = end
for wrt, wrtmeta in self._owns_approx_wrt.items():
if total:
wrt = wrtmeta['source']
if wrtmeta['remote']:
vec = None
else:
vec = self._outputs
else:
if wrt in local_ins:
vec = self._inputs
elif wrt in local_outs:
vec = self._outputs
else:
vec = None # remote wrt
if (wrt_matches is None or wrt in wrt_matches) and wrt not in seen:
io = 'input' if wrt in abs2meta['input'] else 'output'
meta = abs2meta[io][wrt]
if total and wrtmeta['indices'] is not None:
sub_wrt_idx = wrtmeta['indices'].as_array()
size = sub_wrt_idx.size
sub_wrt_idx = sub_wrt_idx
else:
sub_wrt_idx = _full_slice
size = abs2meta[io][wrt][szname]
if vec is None:
sub_wrt_idx = ValueRepeater(None, size)
end += size
dist_sizes = sizes[io][:, toidx[wrt]] if meta['distributed'] else None
yield wrt, start, end, vec, sub_wrt_idx, dist_sizes
start = end
else:
yield from super()._jac_wrt_iter(wrt_matches)
def _promoted_wrt_iter(self):
if not (self._owns_approx_of or self.pathname):
return
abs2prom = self._var_allprocs_abs2prom
seen = set()
for _, wrt in self._get_approx_subjac_keys():
if wrt not in seen:
seen.add(wrt)
if wrt in abs2prom['output']:
yield abs2prom['output'][wrt]
else:
yield abs2prom['input'][wrt]
def _setup_approx_derivs(self):
"""
Add approximations for all approx derivs.
"""
if self._jacobian is None:
self._jacobian = DictionaryJacobian(system=self)
abs2meta = self._var_allprocs_abs2meta
total = self.pathname == ''
nprocs = self.comm.size
if self._coloring_info.coloring is not None and (self._owns_approx_of is None or
self._owns_approx_wrt is None):
method = self._coloring_info.method
else:
method = list(self._approx_schemes)[0]
wrt_matches = self._get_static_wrt_matches()
approx = self._get_approx_scheme(method)
# reset the approx if necessary
approx._wrt_meta = {}
approx._reset()
sizes_out = self._var_sizes['output']
sizes_in = self._var_sizes['input']
abs2idx = self._var_allprocs_abs2idx
self._cross_keys = set()
approx_keys = self._get_approx_subjac_keys()
for key in approx_keys:
left, right = key
if not total and nprocs > 1 and self._has_fd_group:
sout = sizes_out[:, abs2idx[left]]
sin = sizes_in[:, abs2idx[right]]
if np.count_nonzero(sout[sin == 0]) > 0 and np.count_nonzero(sin[sout == 0]) > 0:
# we have of and wrt that exist on different procs. Now see if they're relevant
# to each other
for _, _, rel in self._relevance.iter_seed_pair_relevance(inputs=True,
outputs=True):
if left in rel and right in rel:
self._cross_keys.add(key)
break
if key in self._subjacs_info:
meta = self._subjacs_info[key]
else:
meta = SUBJAC_META_DEFAULTS.copy()
if left == right:
size = abs2meta['output'][left]['size']
meta['rows'] = meta['cols'] = np.arange(size)
# All group approximations are treated as explicit components, so we
# have a -1 on the diagonal.
meta['val'] = np.full(size, -1.0)
self._subjacs_info[key] = meta
meta['method'] = method
meta.update(self._owns_approx_jac_meta)
if wrt_matches is None or right in wrt_matches:
self._update_approx_coloring_meta(meta)
if meta['val'] is None:
if not total and right in abs2meta['input']:
sz = abs2meta['input'][right]['size']
else:
sz = abs2meta['output'][right]['size']
shape = (abs2meta['output'][left]['size'], sz)
meta['shape'] = shape
if meta['rows'] is not None: # subjac is sparse
meta['val'] = np.zeros(len(meta['rows']))
else:
meta['val'] = np.zeros(shape)
approx.add_approximation(key, self, meta)
if not total:
# we're taking semi-total derivs for this group. Update _owns_approx_of
# and _owns_approx_wrt so we can use the same approx code for totals and
# semi-totals. Also, the order must match order of vars in the output and
# input vectors.
abs_outs = self._var_allprocs_abs2meta['output']
abs_ins = self._var_allprocs_abs2meta['input']
abs2prom_out = self._var_allprocs_abs2prom['output']
abs2prom_in = self._var_allprocs_abs2prom['input']
self._owns_approx_of = {}
for n, m in abs_outs.items():
self._owns_approx_of[n] = dct = m.copy()
dct['name'] = abs2prom_out[n]
dct['source'] = n
dct['indices'] = None
wrtset = set([k[1] for k in approx_keys])
self._owns_approx_wrt = {}
for n, m in abs_ins.items():
if n in wrtset:
self._owns_approx_wrt[n] = dct = m.copy()
dct['name'] = abs2prom_in[n]
dct['source'] = n
dct['indices'] = None
self._owns_approx_jac = True
def _setup_approx_coloring(self):
"""
Ensure that if coloring is declared, approximations will be set up.
"""
if self._coloring_info.coloring is not None:
self.approx_totals(self._coloring_info.method,
self._coloring_info.get('step'),
self._coloring_info.get('form'))
self._setup_approx_derivs()
def _setup_check(self):
"""
Do any error checking on user's setup, before any other recursion happens.
"""
if (self._coloring_info.static or self._coloring_info.dynamic) and self.pathname != '':
msg = f"{self.msginfo}: semi-total coloring is currently not supported."
raise RuntimeError(msg)
def _update_approx_coloring_meta(self, meta):
"""
Update metadata for a subjac based on coloring metadata.
Parameters
----------
meta : dict
Metadata for a subjac.
"""
info = self._coloring_info
meta['coloring'] = True
for name in ('method', 'step', 'form'):
if name in info:
meta[name] = info[name]
[docs] def compute_sys_graph(self, comps_only=False, add_edge_info=True):
"""
Compute a dependency graph for subsystems in this group.
Variable connection information is stored in each edge of
the system graph if comps_only is True and add_edge_info is True.
Parameters
----------
comps_only : bool (False)
If True, return a graph of all components within this group
or any of its descendants. No sub-groups will be included. Otherwise,
a graph containing only direct children (both Components and Groups)
of this group will be returned.
add_edge_info : bool (True)
If True and comps_only is also True, store variable connection information in each
edge of the system graph.
Returns
-------
DiGraph
A directed graph containing names of subsystems and their connections.
"""
graph = nx.DiGraph()
if comps_only:
# add all components as nodes in the graph so they'll be there even if unconnected.
comps = set(v.rpartition('.')[0] for v in chain(self._var_allprocs_abs2prom['output'],
self._var_allprocs_abs2prom['input']))
graph.add_nodes_from(comps)
if add_edge_info:
edge_data = defaultdict(lambda: defaultdict(list))
for in_abs, src_abs in self._conn_global_abs_in2out.items():
src_sys = src_abs.rpartition('.')[0]
tgt_sys = in_abs.rpartition('.')[0]
# store var connection data in each system to system edge.
edge_data[(src_sys, tgt_sys)][src_abs].append(in_abs)
for (src_sys, tgt_sys), data in edge_data.items():
graph.add_edge(src_sys, tgt_sys, conns=data)
else:
for in_abs, src_abs in self._conn_global_abs_in2out.items():
graph.add_edge(src_abs.rpartition('.')[0], in_abs.rpartition('.')[0])
else:
if self._sys_graph_cache is None:
# add all systems as nodes in the graph so they'll be there even if unconnected.
graph.add_nodes_from(self._subsystems_allprocs)
glen = self.pathname.count('.') + 1 if self.pathname else 0
for in_abs, src_abs in self._conn_global_abs_in2out.items():
src_sys = src_abs.split('.', glen + 1)[glen]
tgt_sys = in_abs.split('.', glen + 1)[glen]
if src_sys != tgt_sys:
graph.add_edge(src_sys, tgt_sys)
self._sys_graph_cache = graph
else:
graph = self._sys_graph_cache
return graph
def _get_auto_ivc_out_val(self, tgts, vars_to_gather):
# all tgts are continuous variables
# only called from top level group
info = None
src_idx_found = []
max_size = -1
found_dup = False
abs2meta_in = self._var_abs2meta['input']
abs_in2prom_info = self._problem_meta['abs_in2prom_info']
start_val = val = None
val_shape = None
chosen_tgt = None
# first, find the auto_ivc output shape
loc_tgts = [t for t in tgts if t in abs2meta_in]
full_tgts = [t for t in loc_tgts if t not in abs_in2prom_info]
if full_tgts: # full variable connections without any src_indices
val_shape = abs2meta_in[full_tgts[0]]['shape']
for tgt in full_tgts:
if tgt not in vars_to_gather:
found_dup = True
chosen_tgt = tgt
break
else:
plist_tgts = [tgt for tgt in loc_tgts if tgt in abs_in2prom_info]
if plist_tgts:
plists = [abs_in2prom_info[tgt] for tgt in plist_tgts]
plens = [len(plist) for plist in plists]
nlevels = max(plens)
# find highest specification of src_shape in bfs order to shape the auto_ivc output
for i in range(nlevels):
for tgt, plist, plen in zip(plist_tgts, plists, plens):
if i < plen:
pinfo = plist[i]
if pinfo is not None and pinfo.src_shape is not None:
val_shape = pinfo.src_shape
break
if val_shape is not None:
chosen_tgt = tgt
break
if val_shape is not None:
start_val = val = np.ones(val_shape)
info = None
for tgt in tgts:
if tgt in abs2meta_in: # tgt is local
meta = abs2meta_in[tgt]
size = meta['size']
value = meta['val']
src_indices = None
if tgt in abs_in2prom_info:
# traverse down the promotes list, (abs_in2prom_info[tgt]), to get the
# final src_indices down at the component level so we can set the value of
# that component input into the appropriate place(s) in the auto_ivc output.
# If a tgt has no src_indices anywhere, it will not be found in
# abs_in2prom_info.
newshape = val_shape
for pinfo in abs_in2prom_info[tgt]:
if pinfo is None:
continue
inds, _, shape = pinfo
if inds is not None:
if shape is None:
shape = newshape
if inds._src_shape is None:
try:
inds.set_src_shape(shape)
except IndexError:
exc_class, exc, tb = sys.exc_info()
self._collect_error(f"When promoting '{pinfo.prom}' from "
f"system '{pinfo.promoted_from}' with "
f"src_indices {inds} and src_shape "
f"{shape}: {exc}",
exc_type=exc_class, tback=tb,
ident=(pinfo.prom, pinfo.promoted_from))
if src_indices is None:
src_indices = inds
else:
sinds = convert_src_inds(src_indices, newshape, inds, shape)
# final src_indices are wrt original full sized source and are flat,
# so use val_shape and flat_src=True
src_indices = indexer(sinds, src_shape=val_shape, flat_src=True)
newshape = src_indices.indexed_src_shape
if src_indices is None:
src_indices = meta['src_indices']
if src_indices is not None:
if val is None:
if val_shape is None and not found_dup:
src_idx_found.append(tgt)
val = value
else:
try:
if src_indices._flat_src:
val.ravel()[src_indices.flat()] = value.flat
else:
# Squeeze out singleton dimensions so that we can assign
# values of different shapes when the storage order is unambiguous.
np.squeeze(val[src_indices()])[...] = np.squeeze(value)
except Exception as err:
src = self._conn_global_abs_in2out[tgt]
msg = f"{self.msginfo}: The source indices " + \
f"{src_indices} do not specify a " + \
f"valid shape for the connection '{src}' to " + \
f"'{tgt}'. (target shape=" + \
f"{meta['shape']}, indices_shape=" + \
f"{src_indices.indexed_src_shape}): {err}"
self._collect_error(msg, ident=(src, tgt))
continue
else:
if val is None:
val = value
elif np.ndim(value) == 0:
if val.size > 1:
src = self._conn_global_abs_in2out[tgt]
self._collect_error(f"Shape of input '{tgt}', (), doesn't match shape "
f"{val.shape}.", ident=(src, tgt))
continue
elif np.squeeze(val).shape != np.squeeze(value).shape:
src = self._conn_global_abs_in2out[tgt]
self._collect_error(f"Shape of input '{tgt}', {value.shape}, doesn't match "
f"shape {val.shape}.", ident=(src, tgt))
continue
if val is not value:
if val.shape:
val = np.reshape(value, val.shape)
else:
val = value
if tgt not in vars_to_gather:
found_dup = True
if tgt == chosen_tgt or (chosen_tgt is None and size > max_size):
max_size = size
info = (tgt, val, False)
keep_val = val
val = start_val
if tgt in vars_to_gather: # tgt var is remote somewhere (but not distributed)
owner = vars_to_gather[tgt]
if owner == self.comm.rank: # this rank 'owns' the var
val = keep_val
self.comm.bcast(val, root=owner)
else:
val = self.comm.bcast(None, root=owner)
info = (tgt, val, False)
if src_idx_found: # auto_ivc connected to local vars with src_indices
self._collect_error("Attaching src_indices to inputs requires that the shape of the "
"source variable is known, but the source shape for inputs "
f"{src_idx_found} is unknown. You can specify the src shape for "
"these inputs by setting 'val' or 'src_shape' in a call to "
"set_input_defaults, or by adding an IndepVarComp as the source.",
ident=(self.pathname, tuple(src_idx_found)))
return None
return info
def _setup_auto_ivcs(self):
# only happens at top level
from openmdao.core.indepvarcomp import _AutoIndepVarComp
if self.comm.size > 1 and self._mpi_proc_allocator.parallel:
raise RuntimeError("The top level system must not be a ParallelGroup.")
# create the IndepVarComp that will contain all auto-ivc outputs
self._auto_ivc = auto_ivc = _AutoIndepVarComp()
auto_ivc.name = '_auto_ivc'
auto_ivc.pathname = auto_ivc.name
prom2auto = {}
count = 0
auto2tgt = {}
abs2prom = self._var_allprocs_abs2prom['input']
all_abs2meta = self._var_allprocs_abs2meta['input']
conns = self._conn_global_abs_in2out
auto_conns = {}
for tgt in all_abs2meta:
if tgt in conns or tgt in self._bad_conn_vars:
continue
all_meta = all_abs2meta[tgt]
if all_meta['distributed']:
# OpenMDAO currently can't create an automatic IndepVarComp for inputs on
# distributed components.
if tgt not in self._bad_conn_vars:
prom_name = abs2prom[tgt]
promoted_as = f', promoted as "{prom_name}",' if prom_name != tgt else ''
raise RuntimeError(f'Distributed component input "{tgt}"'
f'{promoted_as} is not connected.')
prom = abs2prom[tgt]
if prom in prom2auto:
# multiple connected inputs w/o a src. Connect them to the same IVC
src = prom2auto[prom][0]
auto_conns[tgt] = src
else:
src = f"_auto_ivc.v{count}"
count += 1
prom2auto[prom] = (src, tgt)
auto_conns[tgt] = src
if src in auto2tgt:
auto2tgt[src].append(tgt)
else:
auto2tgt[src] = [tgt]
conns.update(auto_conns)
auto_ivc.auto2tgt = auto2tgt
vars2gather = self._vars_to_gather
for src, tgts in auto2tgt.items():
prom = self._var_allprocs_abs2prom['input'][tgts[0]]
ret = self._get_auto_ivc_out_val(tgts, vars2gather)
if ret is None: # setup error occurred. Try to continue
continue
tgt, val, remote = ret
prom = abs2prom[tgt]
if prom not in self._group_inputs:
self._group_inputs[prom] = [{'use_tgt': tgt, 'auto': True, 'path': self.pathname,
'prom': prom}]
else:
self._group_inputs[prom][0]['use_tgt'] = tgt
gmeta = self._group_inputs[prom][0]
if 'units' in gmeta:
units = gmeta['units']
else:
units = all_abs2meta[tgt]['units']
if not remote and 'val' in gmeta:
val = gmeta['val']
if self.comm.size > 1:
tgt_local_procs = set()
# do a preliminary check to avoid the allgather if we can
for t in tgts:
if t in vars2gather:
tgt_local_procs.add(vars2gather[t])
else: # t is duplicated in all procs
break
else:
if len(tgt_local_procs) < self.comm.size: # don't have a local var in each proc
tgt_local_procs = set()
for t in self.comm.allgather(tgt):
if t in vars2gather:
tgt_local_procs.add(vars2gather[t])
if len(tgt_local_procs) > 1:
# the 'local' val can only exist on 1 proc (distrib auto_ivcs not
# allowed), so must consolidate onto one proc
rank = sorted(tgt_local_procs)[0]
if rank != self.comm.rank:
val = np.zeros(0)
remote = True
relsrc = src.rsplit('.', 1)[-1]
auto_ivc.add_output(relsrc, val=np.atleast_1d(val), units=units)
if remote:
auto_ivc._add_remote(relsrc)
# have to sort to keep vars in sync because we may be doing bcasts
for abs_in in sorted(self._var_allprocs_discrete['input']):
if abs_in not in conns: # unconnected, so connect the input to an _auto_ivc output
prom = abs2prom[abs_in]
val = _UNDEFINED
if prom in prom2auto:
# multiple connected inputs w/o a src. Connect them to the same IVC
# check if they have different metadata, and if they do, there must be
# a group input defined that sets the default, else it's an error
conns[abs_in] = prom2auto[prom][0]
else:
ivc_name = f"_auto_ivc.v{count}"
loc_out_name = ivc_name.rsplit('.', 1)[-1]
count += 1
prom2auto[prom] = (ivc_name, abs_in)
conns[abs_in] = ivc_name
if abs_in in self._var_abs2prom['input']: # var is local
val = self._var_discrete['input'][abs_in]['val']
else:
val = None
if abs_in in vars2gather:
if vars2gather[abs_in] == self.comm.rank:
self.comm.bcast(val, root=vars2gather[abs_in])
else:
val = self.comm.bcast(None, root=vars2gather[abs_in])
auto_ivc.add_discrete_output(loc_out_name, val=val)
src = conns[abs_in]
if src in auto2tgt:
auto2tgt[src].append(abs_in)
else:
auto2tgt[src] = [abs_in]
if not prom2auto:
return auto_ivc
auto_ivc._setup_procs(auto_ivc.pathname, self.comm, self._problem_meta)
auto_ivc._configure()
auto_ivc._configure_check()
auto_ivc._setup_var_data()
# now update our own data structures based on the new auto_ivc component variables
old = self._subsystems_allprocs
self._subsystems_allprocs = allsubs = {}
allsubs['_auto_ivc'] = _SysInfo(auto_ivc, 0)
for i, (name, s) in enumerate(old.items()):
allsubs[name] = s
s.index = i + 1
self._subsystems_myproc = [auto_ivc] + self._subsystems_myproc
io = 'output' # auto_ivc has only output vars
old = self._var_allprocs_prom2abs_list[io]
p2abs = {}
for name in auto_ivc._var_allprocs_abs2meta[io]:
p2abs[name] = [name]
p2abs.update(old)
self._var_allprocs_prom2abs_list[io] = p2abs
# set up auto_ivc abs2prom such that promoted name of the auto_ivc output is the same
# as the promoted name of the input that it is connected to
abs2prom = self._var_abs2prom[io]
abs2prom_in = self._var_allprocs_abs2prom['input']
for n in auto_ivc._var_abs2prom[io]:
abs2prom[n] = abs2prom_in[auto2tgt[n][0]]
all_abs2prom = self._var_allprocs_abs2prom[io]
for n in auto_ivc._var_allprocs_abs2prom[io]:
all_abs2prom[n] = abs2prom_in[auto2tgt[n][0]]
self._var_discrete[io].update({'_auto_ivc.' + k: v for k, v in
auto_ivc._var_discrete[io].items()})
self._var_allprocs_discrete[io].update(auto_ivc._var_allprocs_discrete[io])
old = self._var_abs2meta[io]
self._var_abs2meta[io] = {}
self._var_abs2meta[io].update(auto_ivc._var_abs2meta[io])
self._var_abs2meta[io].update(old)
old = self._var_allprocs_abs2meta[io]
self._var_allprocs_abs2meta[io] = {}
self._var_allprocs_abs2meta[io].update(auto_ivc._var_allprocs_abs2meta[io])
self._var_allprocs_abs2meta[io].update(old)
# add all auto_ivc vars to our _loc_ivcs
self._ivcs.update(auto_ivc._var_abs2meta[io])
self._approx_subjac_keys = None # this will force re-initialization
self._setup_procs_finished = True
return auto_ivc
@collect_errors
def _resolve_ambiguous_input_meta(self):
"""
Resolve ambiguous input units and values for auto_ivcs with multiple targets.
This should only be called on the top level Group.
"""
all_abs2meta_in = self._var_allprocs_abs2meta['input']
all_abs2meta_out = self._var_allprocs_abs2meta['output']
abs2prom = self._var_allprocs_abs2prom['input']
abs2meta_in = self._var_abs2meta['input']
all_discrete_outs = self._var_allprocs_discrete['output']
all_discrete_ins = self._var_allprocs_discrete['input']
for src, tgts in self._auto_ivc.auto2tgt.items():
if len(tgts) < 2:
continue
if src not in all_discrete_outs:
smeta = all_abs2meta_out[src]
sunits = smeta['units'] if 'units' in smeta else None
sval = self.get_val(src, kind='output', get_remote=True, from_src=False)
errs = set()
prom = abs2prom[tgts[0]]
if prom in self._group_inputs:
gmeta = self._group_inputs[prom][0]
else:
gmeta = {'path': self.pathname, 'prom': prom, 'auto': True}
for tgt in tgts:
tval = self.get_val(tgt, kind='input', get_remote=True, from_src=False)
if tgt in all_discrete_ins:
if 'val' not in gmeta and sval != tval:
errs.add('val')
else:
tmeta = all_abs2meta_in[tgt]
tunits = tmeta['units'] if 'units' in tmeta else None
if 'units' not in gmeta and sunits != tunits:
# Detect if either Source or Targe units are None.
if sunits is None or tunits is None:
errs.add('units')
elif _find_unit(sunits) != _find_unit(tunits):
errs.add('units')
if 'val' not in gmeta:
if tval.shape == sval.shape:
if _has_val_mismatch(tunits, tval, sunits, sval):
errs.add('val')
else:
if all_abs2meta_in[tgt]['has_src_indices'] and tgt in abs2meta_in:
if abs2meta_in[tgt]['flat_src_indices']:
srcpart = sval.ravel()[abs2meta_in[tgt]['src_indices'].flat()]
else:
srcpart = sval[abs2meta_in[tgt]['src_indices']()]
if _has_val_mismatch(tunits, tval, sunits, srcpart):
errs.add('val')
if errs:
self._show_ambiguity_msg(prom, errs, tgts)
elif src not in all_discrete_outs:
gmeta['units'] = sunits
def _show_ambiguity_msg(self, prom, metavars, tgts, metadata=None):
errs = sorted(metavars)
if metadata is None:
meta = errs
else:
meta = sorted(metadata)
inputs = sorted(tgts)
gpath = common_subpath(tgts)
if gpath == self.pathname:
g = self
else:
g = self._get_subsystem(gpath)
gprom = None
# get promoted name relative to g
if MPI and self.comm.size > 1:
if g is not None and not g._is_local:
g = None
if self.comm.allreduce(int(g is not None)) < self.comm.size:
# some procs have remote g
if g is not None:
gprom = g._var_allprocs_abs2prom['input'][inputs[0]]
proms = self.comm.allgather(gprom)
for p in proms:
if p is not None:
gprom = p
break
if gprom is None:
gprom = g._var_allprocs_abs2prom['input'][inputs[0]]
gname = f"Group named '{gpath}'" if gpath else 'model'
args = ', '.join([f'{n}=?' for n in errs])
self._collect_error(f"{self.msginfo}: The following inputs, {inputs}, promoted "
f"to '{prom}', are connected but their metadata entries {meta}"
f" differ. Call <group>.set_input_defaults('{gprom}', {args}), "
f"where <group> is the {gname} to remove the ambiguity.")
def _ordered_comp_name_iter(self):
"""
Yield contained component pathnames in order of execution.
For components within ParallelGroups, true execution order is unknown so components
will be ordered by rank within a ParallelGroup.
"""
for s in self._subsystems_myproc:
if isinstance(s, Group):
yield from s._ordered_comp_name_iter()
else:
yield s.pathname
def _sorted_sys_iter(self):
"""
Yield subsystems in sorted order if Problem option allow_post_setup_reorder is True.
Otherwise, yield subsystems in the order they were added to their parent group.
Yields
------
System
A subsystem.
"""
if self._problem_meta['allow_post_setup_reorder']:
for s in sorted(self._subsystems_myproc, key=lambda s: s.name):
yield s
else:
yield from self._subsystems_myproc
def _sorted_sys_iter_all_procs(self):
"""
Yield subsystem names in sorted order if Problem option allow_post_setup_reorder is True.
Otherwise, yield subsystem names in the order they were added to their parent group.
Yields
------
System
A subsystem.
"""
if self._problem_meta['allow_post_setup_reorder']:
for s in sorted(self._subsystems_allprocs):
yield s
else:
yield from self._subsystems_allprocs
def _all_subsystem_iter(self):
"""
Iterate over all subsystems, local and nonlocal.
Yields
------
System
A subsystem.
"""
for s, _ in self._subsystems_allprocs.values():
yield s
def _get_relevance_modifiers(self, grad_groups, always_opt_comps):
"""
Collect information from the model that will modify the relevance graph of the model.
Parameters
----------
grad_groups : set
Set of groups having nonlinear solvers that use gradients.
always_opt_comps : set
Set of components that are to be included in every iteration of the optimization,
even if they aren't relevant in terms of data flow.
"""
if self.nonlinear_solver is not None and self.nonlinear_solver.supports['gradients']:
grad_groups.add(self.pathname)
elif self.linear_solver is not None and isinstance(self.linear_solver, DirectSolver):
grad_groups.add(self.pathname)
for s in self._subsystems_myproc:
if isinstance(s, Group):
s._get_relevance_modifiers(grad_groups, always_opt_comps)
elif s.options['always_opt']:
always_opt_comps.add(s.pathname)
@property
def model_options(self):
"""
Get the model options from self._problem_meta.
The user may change the contents of model_options to impact values sent
to subsystems of this Group.
Returns
-------
dict
The model options metadata provided by the associated Problem object.
"""
return self._problem_meta['model_options']
def _gather_full_data(self):
"""
Return True if this system should contribute full data to a collective MPI call.
This prevents sending a lot of unnecessary data across the network when
the data is duplicated across multiple processes.
Returns
-------
bool
True if this system should contribute its full data. Otherwise it
should contribute only an 'empty' version of the data. What
'empty' means depends on the structure of the data being gathered.
"""
if self._mpi_proc_allocator.parallel:
if self._subsystems_myproc and self._subsystems_myproc[0].comm.rank == 0:
return self._subsystems_myproc[0]._full_comm is None or \
self._subsystems_myproc[0]._full_comm.rank == 0
return False
def _get_prom_name(self, abs_name):
"""
Get promoted name for specified variable.
"""
abs2prom = self._var_allprocs_abs2prom
if abs_name in abs2prom['input']:
return abs2prom['input'][abs_name]
elif abs_name in abs2prom['output']:
return abs2prom['output'][abs_name]
else:
return abs_name
def _prom_names_list(self, lst):
"""
Convert a list of variable names to promoted names.
"""
return [self._get_prom_name(n) for n in lst]
def _prom_names_dict(self, dct):
"""
Convert a dictionary keyed on variable names to be keyed on promoted names.
"""
return {self._get_prom_name(k): v for k, v in dct.items()}
def _prom_names_jac(self, jac):
"""
Convert a nested dict jacobian keyed on variable names to be keyed on promoted names.
"""
new_jac = {}
for of in jac:
new_jac[self._get_prom_name(of)] = of_dict = {}
for wrt in jac[of]:
of_dict[self._get_prom_name(wrt)] = jac[of][wrt]
return new_jac
[docs] def get_design_vars(self, recurse=True, get_sizes=True, use_prom_ivc=True):
"""
Get the DesignVariable settings from this system.
Retrieve all design variable settings from the system and, if recurse
is True, all of its subsystems.
Parameters
----------
recurse : bool
If True, recurse through the subsystems and return the path of
all design vars relative to the this Group.
get_sizes : bool, optional
If True, compute the size of each design variable.
use_prom_ivc : bool
Use promoted names for inputs, else convert to absolute source names.
Returns
-------
dict
The design variables defined in the current system and, if
recurse=True, its subsystems.
"""
out = super().get_design_vars(recurse=recurse, get_sizes=get_sizes,
use_prom_ivc=use_prom_ivc)
if recurse:
abs2prom_in = self._var_allprocs_abs2prom['input']
abs2prom_out = self._var_allprocs_abs2prom['output']
if (self.comm.size > 1 and self._mpi_proc_allocator.parallel):
# For parallel groups, we need to make sure that the design variable dictionary is
# assembled in the same order under mpi as for serial runs.
out_by_sys = {}
for subsys in self._sorted_sys_iter():
sub_out = {}
name = subsys.name
dvs = subsys.get_design_vars(recurse=recurse, get_sizes=get_sizes,
use_prom_ivc=use_prom_ivc)
if use_prom_ivc:
# have to promote subsystem prom name to this level
sub_pro2abs_in = subsys._var_allprocs_prom2abs_list['input']
sub_pro2abs_out = subsys._var_allprocs_prom2abs_list['output']
for dv, meta in dvs.items():
if dv in sub_pro2abs_in:
abs_dv = sub_pro2abs_in[dv][0]
sub_out[abs2prom_in[abs_dv]] = meta
elif dv in sub_pro2abs_out:
abs_dv = sub_pro2abs_out[dv][0]
sub_out[abs2prom_out[abs_dv]] = meta
else:
sub_out[dv] = meta
else:
sub_out.update(dvs)
out_by_sys[name] = sub_out
out_by_sys_by_rank = self.comm.allgather(out_by_sys)
all_outs_by_sys = {}
for outs in out_by_sys_by_rank:
for name, meta in outs.items():
all_outs_by_sys[name] = meta
for subsys_name in self._sorted_sys_iter_all_procs():
for name, meta in all_outs_by_sys[subsys_name].items():
if name not in out:
out[name] = meta
else:
for subsys in self._sorted_sys_iter():
dvs = subsys.get_design_vars(recurse=recurse, get_sizes=get_sizes,
use_prom_ivc=use_prom_ivc)
if use_prom_ivc:
# have to promote subsystem prom name to this level
sub_pro2abs_in = subsys._var_allprocs_prom2abs_list['input']
sub_pro2abs_out = subsys._var_allprocs_prom2abs_list['output']
for dv, meta in dvs.items():
if dv in sub_pro2abs_in:
abs_dv = sub_pro2abs_in[dv][0]
out[abs2prom_in[abs_dv]] = meta
elif dv in sub_pro2abs_out:
abs_dv = sub_pro2abs_out[dv][0]
out[abs2prom_out[abs_dv]] = meta
else:
out[dv] = meta
else:
out.update(dvs)
model = self._problem_meta['model_ref']()
if self is model:
abs2meta_out = model._var_allprocs_abs2meta['output']
for outmeta in out.values():
src = outmeta['source']
if src in abs2meta_out and "openmdao:allow_desvar" not in abs2meta_out[src]['tags']:
prom_src, prom_tgt = outmeta['orig']
if prom_src is None:
self._collect_error(f"Design variable '{prom_tgt}' is connected to '{src}',"
f" but '{src}' is not an IndepVarComp or ImplicitComp "
"output.")
else:
self._collect_error(f"Design variable '{prom_src}' is not an IndepVarComp "
"or ImplicitComp output.")
return out
[docs] def get_responses(self, recurse=True, get_sizes=True, use_prom_ivc=False):
"""
Get the response variable settings from this system.
Retrieve all response variable settings from the system as a dict,
keyed by either absolute variable name, promoted name, or alias name,
depending on the value of use_prom_ivc and whether the original key was
a promoted output, promoted input, or an alias.
Parameters
----------
recurse : bool, optional
If True, recurse through the subsystems and return the path of
all responses relative to the this system.
get_sizes : bool, optional
If True, compute the size of each response.
use_prom_ivc : bool
Translate ivc names to their promoted input names.
Returns
-------
dict
The responses defined in the current system and, if
recurse=True, its subsystems.
"""
out = super().get_responses(recurse=recurse, get_sizes=get_sizes, use_prom_ivc=use_prom_ivc)
if recurse:
abs2prom_out = self._var_allprocs_abs2prom['output']
if self.comm.size > 1 and self._mpi_proc_allocator.parallel:
# For parallel groups, we need to make sure that the response dictionary is
# assembled in the same order under mpi as for serial runs.
out_by_sys = {}
for subsys in self._sorted_sys_iter():
name = subsys.name
sub_out = {}
resps = subsys.get_responses(recurse=recurse, get_sizes=get_sizes,
use_prom_ivc=use_prom_ivc)
if use_prom_ivc:
# have to promote subsystem prom name to this level
sub_pro2abs_out = subsys._var_allprocs_prom2abs_list['output']
for res, meta in resps.items():
if res in sub_pro2abs_out:
abs_resp = sub_pro2abs_out[res][0]
sub_out[abs2prom_out[abs_resp]] = meta
else:
sub_out[res] = meta
else:
for rkey, rmeta in resps.items():
if rkey in out:
tdict = {'con': 'constraint', 'obj': 'objective'}
rpath = rmeta['parent']
rname = '.'.join((rpath, rmeta['name'])) if rpath else rkey
rtype = tdict[rmeta['type']]
ometa = sub_out[rkey]
opath = ometa['parent']
oname = '.'.join((opath, ometa['name'])) if opath else ometa['name']
otype = tdict[ometa['type']]
raise NameError(f"The same response alias, '{rkey}' was declared"
f" for {rtype} '{rname}' and {otype} '{oname}'.")
sub_out[rkey] = rmeta
out_by_sys[name] = sub_out
out_by_sys_by_rank = self.comm.allgather(out_by_sys)
all_outs_by_sys = {}
for outs in out_by_sys_by_rank:
for name, meta in outs.items():
all_outs_by_sys[name] = meta
for subsys_name in self._sorted_sys_iter_all_procs():
for name, meta in all_outs_by_sys[subsys_name].items():
out[name] = meta
else:
for subsys in self._sorted_sys_iter():
resps = subsys.get_responses(recurse=recurse, get_sizes=get_sizes,
use_prom_ivc=use_prom_ivc)
if use_prom_ivc:
# have to promote subsystem prom name to this level
sub_pro2abs_out = subsys._var_allprocs_prom2abs_list['output']
for res, meta in resps.items():
if res in sub_pro2abs_out:
out[abs2prom_out[sub_pro2abs_out[res][0]]] = meta
else:
out[res] = meta
else:
for rkey, rmeta in resps.items():
if rkey in out:
tdict = {'con': 'constraint', 'obj': 'objective'}
rpath = rmeta['parent']
rname = '.'.join((rpath, rmeta['name'])) if rpath else rkey
rtype = tdict[rmeta['type']]
ometa = out[rkey]
opath = ometa['parent']
oname = '.'.join((opath, ometa['name'])) if opath else ometa['name']
otype = tdict[ometa['type']]
raise NameError(f"The same response alias, '{rkey}' was declared"
f" for {rtype} '{rname}' and {otype} '{oname}'.")
out[rkey] = rmeta
return out
def _get_totals_metadata(self, driver=None, of=None, wrt=None):
if isinstance(of, str):
of = [of]
if isinstance(wrt, str):
wrt = [wrt]
if not driver:
if of is None or wrt is None:
raise RuntimeError("driver must be specified if of and wrt variables are not "
"provided.")
if driver is False: # force to not use any existing desvar or response metadata
return self._active_responses(of, responses=False), \
self._active_desvars(wrt, designvars=False), True
return self._active_responses(of), self._active_desvars(wrt), True
has_custom_derivs = False
list_wrt = list(wrt) if wrt is not None else []
list_of = list(of) if of is not None else []
lincons = [d for d, meta in driver._cons.items() if meta['linear']]
if lincons and list_of == lincons: # this is for a linear jacobian
driver_wrt = list(driver._get_lin_dvs())
else:
driver_wrt = list(driver._get_nl_dvs())
if wrt is None:
wrt = driver_wrt
if not wrt:
raise RuntimeError("No design variables were passed to compute_totals and "
"the driver is not providing any.")
else:
if list_wrt != driver_wrt:
wrt_src_names = [driver._designvars[n]['source'] for n in driver_wrt]
if list_wrt != wrt_src_names:
has_custom_derivs = True
driver_ordered_nl_resp_names = driver._get_ordered_nl_responses()
if of is None:
of = driver_ordered_nl_resp_names
if not of:
raise RuntimeError("No response variables were passed to compute_totals and "
"the driver is not providing any.")
else:
of_src_names = [m['source'] for n, m in driver._responses.items()
if n in driver_ordered_nl_resp_names]
of = list(of)
if of != driver_ordered_nl_resp_names and of != of_src_names:
has_custom_derivs = True
return self._active_responses(of, driver._responses), \
self._active_desvars(wrt, driver._designvars), has_custom_derivs
def _active_desvars(self, user_dv_names, designvars=None):
"""
Return a design variable dictionary.
Whatever names match the names of design variables in this system will use the metadata
from the design variable. For other variables that have not been registered as design
variables, metadata will be constructed based on variable metadata.
Parameters
----------
user_dv_names : iter of str
Iterator of user facing design variable names.
designvars : dict, None, or False
Dictionary of design variables. If None, get_design_vars will be called. If False,
no design vars will be used.
Returns
-------
dict
Dictionary of design variables.
"""
# do this to keep ordering the same as in the user list
active_dvs = {n: None for n in user_dv_names}
if designvars is None:
designvars = self.get_design_vars(recurse=True, get_sizes=True, use_prom_ivc=True)
if designvars: # use any matching metadata from existing design vars
for name, meta in designvars.items():
if name in active_dvs:
active_dvs[name] = meta.copy()
elif meta['name'] in active_dvs:
active_dvs[meta['name']] = meta.copy()
elif meta['source'] in active_dvs:
active_dvs[meta['source']] = meta.copy()
prom2abs_in = self._var_allprocs_prom2abs_list['input']
for name, meta in active_dvs.items():
if meta is None:
meta = {
'parallel_deriv_color': None,
'indices': None,
'name': name,
'cache_linear_solution': False,
}
self._update_dv_meta(meta, get_size=True)
if name in prom2abs_in:
meta['ivc_print_name'] = name
else:
meta['ivc_print_name'] = None
active_dvs[name] = meta
meta['remote'] = meta['source'] not in self._var_abs2meta['output']
return active_dvs
def _active_responses(self, user_response_names, responses=None):
"""
Return a response dictionary containing the given variables.
Whatever names match the names of responses in this system, use the metadata
from the response. For other variables that have not been registered as responses,
construct metadata based on variable metadata.
Parameters
----------
user_response_names : iter of str
Iterator of user facing response names. Aliases are allowed.
responses : dict, None, or False.
Dictionary of responses. If None, get_responses will be called. If False,
no responses will be used.
Returns
-------
dict
Dictionary of responses.
"""
# do this to keep ordering the same as in the user list
active_resps = {n: None for n in user_response_names}
if responses is None:
responses = self.get_responses(recurse=True, get_sizes=True, use_prom_ivc=True)
if responses:
for name, meta in responses.items():
if name in active_resps:
active_resps[name] = meta.copy()
for name, meta in active_resps.items():
if meta is None:
# no response exists for this name, so create metadata with default values and
# update size, etc. based on the variable metadata.
meta = {
'parallel_deriv_color': None,
'indices': None,
'alias': None,
'name': name,
'cache_linear_solution': False,
'linear': False,
}
self._update_response_meta(meta, get_size=True)
active_resps[name] = meta
meta['remote'] = meta['source'] not in self._var_abs2meta['output']
return active_resps
def _get_graph_node_meta(self):
"""
Return metadata to add to this system's graph node.
Returns
-------
dict
Metadata for this system's graph node.
"""
meta = super()._get_graph_node_meta()
# TODO: maybe set 'implicit' based on whether there are any implicit comps anywhere
# inside of the group or its children.
meta['base'] = 'Group'
return meta
[docs]def iter_solver_info(system):
"""
Return solver information for this System.
Parameters
----------
system : System
Return solver information for this System.
Returns
-------
str
System pathname.
str
Class name.
list of sets of str
Strongly connected components in this Group's subsystem graph. If not a Group, this will
be an empty list.
str
Linear solver class name.
str
Nonlinear solver class name.
int
Linear solver max iterations.
int
Nonlinear solver max iterations.
int
Number of subsystems that are not part of any strongly connected component. If this
number is greater than 0 and strongly connected components exist in this group, it
indicates that this group contains subcycles and that it may be more efficient to
separate those subcyles into their own groups and apply iterative solvers to them.
bool
True if this is a Group, False if it is an ImplicitComponent.
bool
True if the linear solver found for this System can solve a cycle or implicit component.
bool
True if the nonlinear solver found for this System can solve a cycle or implicit component.
"""
sccs = []
missing = 0
lnmaxiter = nlmaxiter = 1
nl_can_solve = lin_can_solve = False
nlslvname = lnslvname = None
if isinstance(system, Group):
isgrp = True
for s in get_sccs_topo(system.compute_sys_graph()):
if len(s) > 1:
sccs.append(s)
else:
missing += 1
elif isinstance(system, ImplicitComponent):
isgrp = False
else:
return (system.pathname, system.__class__.__name__, sccs, None, None, 0, 0, 0, False,
False, False)
if system.nonlinear_solver:
if isgrp:
nl_can_solve = system.nonlinear_solver.can_solve_cycle()
else:
nl_can_solve = system.nonlinear_solver.supports['implicit_components']
nlslvname = system.nonlinear_solver.__class__.__name__
if 'maxiter' in system.nonlinear_solver.options:
nlmaxiter = system.nonlinear_solver.options['maxiter']
if system.linear_solver:
if isgrp:
lin_can_solve = system.linear_solver.can_solve_cycle()
else:
lin_can_solve = system.linear_solver.supports['implicit_components']
lnslvname = system.linear_solver.__class__.__name__
if lnslvname and 'maxiter' in system.linear_solver.options:
lnmaxiter = system.linear_solver.options['maxiter']
if not isgrp:
if lnslvname is None and system._has_solve_linear:
lnslvname = 'solve_linear'
lin_can_solve = True
if nlslvname is None and system._has_solve_nl:
nlslvname = 'solve_nonlinear'
nl_can_solve = True
return (system.pathname, system.__class__.__name__, sccs, lnslvname, nlslvname, lnmaxiter,
nlmaxiter, missing, isgrp, nl_can_solve, lin_can_solve)