"""Code for generating N2 diagram."""
import inspect
import os
import pathlib
import sys
from operator import itemgetter
import networkx as nx
import numpy as np
from openmdao.drivers.analysis_driver import AnalysisDriver
import openmdao.utils.hooks as hooks
from openmdao.core.explicitcomponent import ExplicitComponent
from openmdao.core.indepvarcomp import IndepVarComp
from openmdao.core.parallel_group import ParallelGroup
from openmdao.core.group import Group
from openmdao.core.problem import Problem, _SetupStatus
from openmdao.core.implicitcomponent import ImplicitComponent
from openmdao.core.constants import _UNDEFINED
from openmdao.components.exec_comp import ExecComp
from openmdao.components.meta_model_structured_comp import MetaModelStructuredComp
from openmdao.components.meta_model_unstructured_comp import MetaModelUnStructuredComp
from openmdao.drivers.doe_driver import DOEDriver
from openmdao.recorders.case_reader import CaseReader
from openmdao.solvers.nonlinear.newton import NewtonSolver
from openmdao.utils.array_utils import convert_ndarray_to_support_nans_in_json
from openmdao.utils.class_util import overrides_method
from openmdao.utils.general_utils import default_noraise, is_undefined
from openmdao.utils.mpi import MPI
from openmdao.utils.notebook_utils import notebook, display, HTML, IFrame, colab
from openmdao.utils.om_warnings import issue_warning
from openmdao.utils.reports_system import register_report_hook
from openmdao.utils.file_utils import _load_and_exec, _to_filename
from openmdao.visualization.htmlpp import HtmlPreprocessor
from openmdao import __version__ as openmdao_version
_MAX_ARRAY_SIZE_FOR_REPR_VAL = 1000 # If var has more elements than this do not pass to N2
_MAX_OPTION_SIZE = int(1e4) # If option value is bigger than this do not pass to N2
_default_n2_filename = 'n2.html'
def _get_array_info(system, vec, name, prom, var_dict, from_src=True):
ndarray_to_convert = vec._abs_get_val(name, flat=False) if vec else \
system.get_val(prom, from_src=from_src)
var_dict['val'] = convert_ndarray_to_support_nans_in_json(ndarray_to_convert)
# Find the minimum indices and value
min_indices = np.unravel_index(np.nanargmin(ndarray_to_convert, axis=None),
ndarray_to_convert.shape)
var_dict['val_min_indices'] = min_indices
var_dict['val_min'] = ndarray_to_convert[min_indices]
# Find the maximum indices and value
max_indices = np.unravel_index(np.nanargmax(ndarray_to_convert, axis=None),
ndarray_to_convert.shape)
var_dict['val_max_indices'] = max_indices
var_dict['val_max'] = ndarray_to_convert[max_indices]
def _get_var_dict(system, typ, name, is_parallel, is_implicit, values):
if name in system._var_abs2meta[typ]:
meta = system._var_abs2meta[typ][name]
prom = system._resolver.abs2prom(name, typ)
val = np.asarray(meta['val'])
is_dist = MPI is not None and meta['distributed']
var_dict = {
'name': prom,
'type': typ,
'dtype': type(val).__name__,
'is_discrete': False,
'distributed': is_dist,
'shape': str(meta['shape']),
'desc': meta['desc']
}
if typ == 'output':
var_dict['implicit'] = is_implicit
vec = system._outputs
else: # input
if MPI:
# for inputs if we're under MPI, we only retrieve the value currently stored
# in the input vector and not from the connected source because that source
# could be remote.
vec = system._inputs
else:
vec = None
# if 'vec' is not None at this point, we can retrieve the value using vec._abs_get_val,
# which is a faster call than system.get_val.
if meta['units'] is None:
var_dict['units'] = 'None'
else:
var_dict['units'] = meta['units']
try:
if values and val.size < _MAX_ARRAY_SIZE_FOR_REPR_VAL:
if not MPI:
# Get the current value
_get_array_info(system, vec, name, prom, var_dict, from_src=True)
elif is_parallel or is_dist:
# we can't access non-local values, so just get the initial value
var_dict['val'] = val
var_dict['initial_value'] = True
else:
# get the current value but don't try to get it from the source,
# which could be remote under MPI
_get_array_info(system, vec, name, prom, var_dict, from_src=False)
except Exception as err:
issue_warning(str(err))
else: # discrete
meta = system._var_discrete[typ][name]
val = meta['val']
var_dict = {
'name': name,
'type': typ,
'dtype': type(val).__name__,
'is_discrete': True,
}
if values:
if MPI is None or isinstance(val, (int, str, list, dict, complex, np.ndarray)):
var_dict['val'] = default_noraise(system.get_val(name))
if 'surrogate_name' in meta:
var_dict['surrogate_name'] = meta['surrogate_name']
return var_dict
def _serialize_single_option(option):
"""
Return a json-safe equivalent of the option.
The default_noraise function performs the datatype serialization, while this function takes
care of attributes specific to options dicts.
Parameters
----------
option : object
Option to be serialized.
Returns
-------
object
JSON-safe serialized object.
"""
if not option['recordable']:
return 'Not Recordable'
val = option['val']
if is_undefined(val):
return str(val)
if sys.getsizeof(val) > _MAX_OPTION_SIZE:
return 'Too Large to Display'
return default_noraise(val)
def _get_tree_dict(system, values=True, is_parallel=False):
"""
Get a dictionary representation of the system hierarchy.
Parameters
----------
system : System
The System at the root of the hierarchy
values : bool
If True, include variable values. If False, all values will be None.
is_parallel : bool
If True, values can be remote and are not available.
"""
tree_dict = {
'name': system.name if system.name else 'root',
'type': 'subsystem' if system.name else 'root',
'class': ':'.join((type(system).__module__, type(system).__qualname__)),
'expressions': None,
'nonlinear_solver': "",
'nonlinear_solver_options': None,
'linear_solver': "",
'linear_solver_options': None,
}
is_implicit = False
if isinstance(system, Group):
if MPI and isinstance(system, ParallelGroup):
is_parallel = True
tree_dict['component_type'] = None
tree_dict['subsystem_type'] = 'group'
tree_dict['is_parallel'] = is_parallel
children = [_get_tree_dict(s, values, is_parallel)
for s in system._subsystems_myproc]
if system.comm.size > 1:
if system._subsystems_myproc:
sub_comm = system._subsystems_myproc[0].comm
if sub_comm.rank != 0:
children = []
children_lists = system.comm.allgather(children)
children = []
for children_list in children_lists:
children.extend(children_list)
if system.linear_solver:
tree_dict['linear_solver'] = system.linear_solver.SOLVER
tree_dict['linear_solver_options'] = {
k: _serialize_single_option(opt)
for k, opt in system.linear_solver.options._dict.items()
}
if system.nonlinear_solver:
tree_dict['nonlinear_solver'] = system.nonlinear_solver.SOLVER
tree_dict['nonlinear_solver_options'] = {
k: _serialize_single_option(opt)
for k, opt in system.nonlinear_solver.options._dict.items()
}
if system.nonlinear_solver.SOLVER == NewtonSolver.SOLVER:
tree_dict['solve_subsystems'] = system._nonlinear_solver.options['solve_subsystems']
else:
tree_dict['subsystem_type'] = 'component'
tree_dict['is_parallel'] = is_parallel
if isinstance(system, ImplicitComponent):
is_implicit = True
tree_dict['component_type'] = 'implicit'
if overrides_method('solve_linear', system, ImplicitComponent):
tree_dict['linear_solver'] = "solve_linear"
elif system.linear_solver:
tree_dict['linear_solver'] = system.linear_solver.SOLVER
tree_dict['linear_solver_options'] = {
k: _serialize_single_option(opt)
for k, opt in system.linear_solver.options._dict.items()
}
if overrides_method('solve_nonlinear', system, ImplicitComponent):
tree_dict['nonlinear_solver'] = "solve_nonlinear"
elif system.nonlinear_solver:
tree_dict['nonlinear_solver'] = system.nonlinear_solver.SOLVER
tree_dict['nonlinear_solver_options'] = {
k: _serialize_single_option(opt)
for k, opt in system.nonlinear_solver.options._dict.items()
}
elif isinstance(system, ExecComp):
tree_dict['component_type'] = 'exec'
tree_dict['expressions'] = system._exprs
elif isinstance(system, (MetaModelStructuredComp, MetaModelUnStructuredComp)):
tree_dict['component_type'] = 'metamodel'
elif isinstance(system, IndepVarComp):
tree_dict['component_type'] = 'indep'
elif isinstance(system, ExplicitComponent):
tree_dict['component_type'] = 'explicit'
else:
tree_dict['component_type'] = None
children = []
for typ in ['input', 'output']:
for abs_name in system._var_abs2meta[typ]:
children.append(_get_var_dict(system, typ, abs_name,
is_parallel, is_implicit, values))
for prom_name in system._var_discrete[typ]:
children.append(_get_var_dict(system, typ, prom_name,
is_parallel, is_implicit, values))
tree_dict['children'] = children
options = {}
slv = {'linear_solver', 'nonlinear_solver'}
for k, opt in system.options._dict.items():
if k in slv:
# need to handle solver option separately because it can be a class, instance or None
try:
val = opt['val']
except KeyError:
val = opt['value']
try:
options[k] = val.SOLVER
except AttributeError:
options[k] = val
else:
options[k] = _serialize_single_option(opt)
tree_dict['options'] = options
return tree_dict
def _get_declare_partials(system):
"""
Get a list of the declared partials.
Parameters
----------
system : <System>
A System in the model.
Returns
-------
list
A list containing all the declared partials (strings in the form "of > wrt" )
beginning from the given system on down.
"""
return [f"{of} > {wrt}" for of, wrt in system._declared_partials_iter() if of != wrt]
def _get_viewer_data(data_source, values=_UNDEFINED, case_id=None):
"""
Get the data needed by the N2 viewer as a dictionary.
Parameters
----------
data_source : <Problem> or <Group> or str or pathlib.Path
A Problem or Group or case recorder filename containing the model or model data.
If the case recorder file from a parallel run has separate metadata, the
filenames can be specified with a comma, e.g.: case.sql_0,case.sql_meta
values : bool or _UNDEFINED
If True, include variable values. If False, all values will be None.
If unspecified, this behaves as if set to True unless the data source is a Problem or
model for which setup is not complete, in which case it behaves as if set to False.
case_id : int or str or None
Case name or index of case in SQL file.
Returns
-------
dict
A dictionary containing information about the model for use by the viewer.
"""
if isinstance(data_source, Problem):
# make sure at least setup_part2 has been run
data_source.set_setup_status(_SetupStatus.POST_SETUP2)
root_group = data_source.model
driver = data_source.driver
driver_name = driver.__class__.__name__
if isinstance(driver, DOEDriver):
driver_type = 'doe'
elif isinstance(driver, AnalysisDriver):
driver_type = 'analysis'
else:
driver_type = 'optimization'
driver_options = {key: _serialize_single_option(driver.options._dict[key])
for key in driver.options}
if driver_type == 'optimization' and hasattr(driver, 'opt_settings'):
driver_opt_settings = driver.opt_settings
else:
driver_opt_settings = None
driver_supports = driver.supports._dict
# set default behavior for values flag
if is_undefined(values):
values = (data_source._metadata is not None and
data_source._metadata['setup_status'] >= _SetupStatus.POST_FINAL_SETUP)
elif isinstance(data_source, Group):
if not data_source.pathname: # root group
root_group = data_source
driver_name = None
driver_type = None
driver_options = None
driver_opt_settings = None
driver_supports = None
else:
# this function only makes sense when it is at the root
msg = f"Viewer data is not available for sub-Group '{data_source.pathname}'."
raise TypeError(msg)
if data_source._problem_meta is not None:
if data_source._problem_meta['setup_status'] >= _SetupStatus.POST_SETUP:
if data_source._problem_meta['setup_status'] < _SetupStatus.POST_SETUP2:
# run setup_part2 on the model
data_source._problem_meta['model_ref']()._setup_part2()
# set default behavio r for values flag
if is_undefined(values):
values = (data_source._problem_meta is not None and
data_source._problem_meta['setup_status'] >= _SetupStatus.POST_FINAL_SETUP)
elif isinstance(data_source, str) or isinstance(data_source, pathlib.Path):
if isinstance(data_source, str) and ',' in data_source:
filenames = data_source.split(',')
cr = CaseReader(filenames[0], metadata_filename=filenames[1])
else:
cr = CaseReader(data_source)
data_dict = cr.problem_metadata
# set default behavior for values flag
if is_undefined(values):
values = True
def set_values(children, stack, case):
"""
Set variable values in model tree from the specified Case.
If case is None, set all values to None.
"""
for child in children:
# if 'val' in child
if child['type'] == 'subsystem':
stack.append(child['name'])
set_values(child['children'], stack, case)
stack.pop()
elif child['type'] == 'input':
if case is None:
child.pop('val')
for key in ['val_min', 'val_max', 'val_min_indices', 'val_max_indices']:
del child[key]
elif case.inputs is None:
child['val'] = 'N/A'
else:
path = child['name'] if not stack else '.'.join(stack + [child['name']])
child['val'] = case.inputs[path]
elif child['type'] == 'output':
if case is None:
child.pop('val')
for key in ['val_min', 'val_max', 'val_min_indices', 'val_max_indices']:
del child[key]
elif case.outputs is None:
child['val'] = 'N/A'
else:
path = child['name'] if not stack else '.'.join(stack + [child['name']])
try:
child['val'] = case.outputs[path]
except KeyError:
child['val'] = 'N/A'
if values is False:
set_values(data_dict['tree']['children'], [], None)
elif case_id is not None:
case = cr.get_case(case_id)
print(f"Using source: {case.source}\nCase: {case.name}")
set_values(data_dict['tree']['children'], [], case)
# Delete the variables key since it's not used in N2
if 'variables' in data_dict:
del data_dict['variables']
# Older recordings might not have this.
if 'md5_hash' not in data_dict:
data_dict['md5_hash'] = None
return data_dict
else:
raise TypeError(f"Viewer data is not available for '{data_source}'."
"The source must be a Problem, model or the filename of a recording.")
data_dict = {}
data_dict['tree'] = _get_tree_dict(root_group, values=values)
data_dict['md5_hash'] = root_group._generate_md5_hash()
connections_list = []
G = root_group.compute_sys_graph(comps_only=True)
scc = nx.strongly_connected_components(G)
strongdict = {}
sys_idx_names = []
for i, strong_comp in enumerate(scc):
for c in strong_comp:
strongdict[c] = i # associate each comp with a strongly connected component
if len(strong_comp) > 1:
# these IDs are only used when back edges are present
for name in strong_comp:
sys_idx_names.append(name)
sys_idx = {} # map of pathnames to index of pathname in list (systems in cycles only)
comp_orders = {name: i for i, name in enumerate(root_group._ordered_comp_name_iter())}
for name in sorted(sys_idx_names):
sys_idx[name] = len(sys_idx)
# 1 is added to the indices of all edges in the matrix so that we can use 0 entries to
# indicate that there is no connection.
matrix = np.zeros((len(comp_orders), len(comp_orders)), dtype=np.int32)
edge_ids = []
for i, edge in enumerate(G.edges()):
src, tgt = edge
if strongdict[src] == strongdict[tgt]:
matrix[comp_orders[src], comp_orders[tgt]] = i + 1 # bump edge index by 1
edge_ids.append((sys_idx[src], sys_idx[tgt]))
else:
edge_ids.append(None)
for edge_i, (src, tgt) in enumerate(G.edges()):
if strongdict[src] == strongdict[tgt]:
start = comp_orders[src]
end = comp_orders[tgt]
# get a view here so we can remove this edge from submat temporarily to eliminate
# an 'if' check inside the nested list comprehension for edges_list
rem = matrix[start:start + 1, end:end + 1]
rem[0, 0] = 0
if end < start:
start, end = end, start
submat = matrix[start:end + 1, start:end + 1]
nz = submat[submat > 0]
rem[0, 0] = edge_i + 1 # put removed edge back
if nz.size > 1:
nz -= 1 # convert back to correct edge index
edges_list = [edge_ids[i] for i in nz]
edges_list = sorted(edges_list, key=itemgetter(0, 1))
for vsrc, vtgtlist in G.get_edge_data(src, tgt)['conns'].items():
for vtgt in vtgtlist:
connections_list.append({'src': vsrc, 'tgt': vtgt,
'cycle_arrows': edges_list})
continue
for vsrc, vtgtlist in G.get_edge_data(src, tgt)['conns'].items():
for vtgt in vtgtlist:
connections_list.append({'src': vsrc, 'tgt': vtgt})
connections_list = sorted(connections_list, key=itemgetter('src', 'tgt'))
data_dict['sys_pathnames_list'] = list(sys_idx)
data_dict['connections_list'] = connections_list
data_dict['abs2prom'] = {
'input': {k: v for k, v in root_group._resolver.abs2prom_iter('input', local=True)},
'output': {k: v for k, v in root_group._resolver.abs2prom_iter('output', local=True)},
}
data_dict['driver'] = {
'name': driver_name,
'type': driver_type,
'options': driver_options,
'opt_settings': driver_opt_settings,
'supports': driver_supports,
}
data_dict['design_vars'] = root_group.get_design_vars(use_prom_ivc=False)
data_dict['responses'] = root_group.get_responses(use_prom_ivc=False)
data_dict['declare_partials_list'] = _get_declare_partials(root_group)
return data_dict
[docs]def n2(data_source, outfile=_default_n2_filename, path=None, values=_UNDEFINED, case_id=None,
show_browser=True, embeddable=False, title=None, display_in_notebook=True):
"""
Generate an HTML file containing a tree viewer.
Optionally opens a web browser to view the file.
Parameters
----------
data_source : <Problem> or str
The Problem or case recorder database containing the model or model data.
outfile : str, optional
The name of the final output file.
path : str, optional
If specified, the n2 viewer will begin in a state that is zoomed in on the selected path.
This path should be the absolute path of a system in the model.
values : bool or _UNDEFINED
If True, include variable values. If False, all values will be None.
If unspecified, this behaves as if set to True unless the data source is a Problem or
model for which setup is not complete, in which case it behaves as if set to False.
case_id : int, str, or None
Case name or index of case in SQL file if data_source is a database.
show_browser : bool, optional
If True, pop up the system default web browser to view the generated html file.
Defaults to True.
embeddable : bool, optional
If True, gives a single HTML file that doesn't have the <html>, <DOCTYPE>, <body>
and <head> tags. If False, gives a single, standalone HTML file for viewing.
title : str, optional
The title for the diagram. Used in the HTML title.
display_in_notebook : bool, optional
If True, display the N2 diagram in the notebook, if this is called from a notebook.
Defaults to True.
"""
# grab the model viewer data
try:
model_data = _get_viewer_data(data_source, values=values, case_id=case_id)
err_msg = ''
except TypeError as err:
model_data = {}
err_msg = str(err)
issue_warning(err_msg)
# If MPI is active only display one copy of the viewer.
# If the data_source is a Problem, only run on the root proc of its comm.
# Otherwise, only run on the global root proc.
if MPI:
try:
comm = data_source.comm
except AttributeError:
comm = MPI.COMM_WORLD
if comm.rank != 0:
return
options = {}
model_data['options'] = options
import openmdao
openmdao_dir = os.path.dirname(inspect.getfile(openmdao))
vis_dir = os.path.join(openmdao_dir, "visualization/n2_viewer")
if title:
title = f"OpenMDAO Model Hierarchy and N2 diagram: {title}"
else:
title = "OpenMDAO Model Hierarchy and N2 diagram"
html_vars = {
'title': title,
'embeddable': "embedded-diagram" if embeddable else "non-embedded-diagram",
'openmdao_version': openmdao_version,
'model_data': model_data,
'initial_path': path
}
if err_msg:
with open(outfile, 'w') as f:
f.write(err_msg)
else:
HtmlPreprocessor(os.path.join(vis_dir, "index.html"),
outfile, allow_overwrite=True, var_dict=html_vars,
json_dumps_default=default_noraise, verbose=False).run()
if notebook:
if display_in_notebook:
# display in Jupyter Notebook
outfile = os.path.relpath(outfile)
if not colab:
display(IFrame(src=outfile, width="100%", height=700))
else:
display(HTML(outfile))
elif show_browser:
# open it up in the browser
from openmdao.utils.webview import webview
webview(outfile)
# N2 report definition
def _run_n2_report(prob, report_filename=_default_n2_filename):
n2_filepath = prob.get_reports_dir() / report_filename
try:
n2(prob, show_browser=False, outfile=n2_filepath, display_in_notebook=False)
except RuntimeError as err:
# We ignore this error
if str(err) != "Can't compute total derivatives unless " \
"both 'of' or 'wrt' variables have been specified.":
raise err
def _run_n2_report_w_errors(prob, report_filename=_default_n2_filename):
if prob._any_rank_has_saved_errors():
n2_filepath = prob.get_reports_dir() / report_filename
# only run the n2 here if we've had setup errors. Normally we'd wait until
# after final_setup in order to have correct values for all of the I/O variables.
try:
n2(prob, show_browser=False, outfile=n2_filepath, display_in_notebook=False)
except Exception as err:
# We ignore this error
if str(err) != "Can't compute total derivatives unless " \
"both 'of' or 'wrt' variables have been specified.":
prob.model._collect_error(str(err))
# errors will result in exit at the end of the _check_collected_errors method
def _n2_report_register():
register_report_hook('n2', 'final_setup', 'Problem', post=_run_n2_report,
description='N2 diagram', report_filename=_default_n2_filename)
register_report_hook('n2', '_check_collected_errors', 'Problem', pre=_run_n2_report_w_errors,
description='N2 diagram')
def _n2_setup_parser(parser):
"""
Set up the openmdao subparser for the 'openmdao n2' command.
Parameters
----------
parser : argparse subparser
The parser we're adding options to.
"""
parser.add_argument('file', nargs=1,
help='Python script or recording containing the model. '
'If metadata from a parallel run was recorded in a separate file, '
'specify both database filenames delimited with a comma.')
parser.add_argument('-o', default=_default_n2_filename, action='store', dest='outfile',
help='html output file.')
parser.add_argument('--no_values', action='store_true', dest='no_values',
help="don't display variable values.")
parser.add_argument('--no_browser', action='store_true', dest='no_browser',
help="don't display in a browser.")
parser.add_argument('--embed', action='store_true', dest='embeddable',
help="create embeddable version.")
parser.add_argument('--title', default=None, action='store', dest='title',
help='diagram title.')
parser.add_argument('--path', default=None, action='store', dest='path',
help='initial system path to zoom into.')
parser.add_argument('--problem', default=None, action='store', dest='problem_name',
help='name of sub-problem, if target is a sub-problem')
def _n2_cmd(options, user_args):
"""
Process command line args and call n2 on the specified file.
Parameters
----------
options : argparse Namespace
Command line options.
user_args : list of str
Command line options after '--' (if any). Passed to user script.
"""
filename = _to_filename(options.file[0])
probname = options.problem_name
if filename.endswith('.py'):
# disable the reports system, we only want the N2 report and then we exit
os.environ['OPENMDAO_REPORTS'] = '0'
def _view_model_w_errors(prob):
# if problem name is not specified, use top-level problem (no delimiter in pathname)
prob_id = prob._get_inst_id()
if probname is None or probname == prob_id:
errs = prob._metadata['saved_errors']
if errs:
# only run the n2 here if we've had setup errors. Normally we'd wait until
# after final_setup in order to have correct values for all of the variables.
n2(prob, outfile=options.outfile, show_browser=not options.no_browser,
values=not options.no_values, title=options.title, path=options.path,
embeddable=options.embeddable)
# errors will result in exit at the end of the _check_collected_errors method
# no errors, generate n2 after final_setup
def _view_model_no_errors(prob):
prob_id = prob._get_inst_id()
if (probname is None and '/' not in prob_id) or (probname == prob_id):
n2(prob, outfile=options.outfile, show_browser=not options.no_browser,
values=not options.no_values, title=options.title, path=options.path,
embeddable=options.embeddable)
hooks._register_hook('_check_collected_errors', 'Problem', pre=_view_model_w_errors,
inst_id=probname)
hooks._register_hook('final_setup', class_name='Problem', post=_view_model_no_errors,
inst_id=probname, exit=True)
_load_and_exec(options.file[0], user_args)
else:
# assume the file is a recording, run standalone
n2(filename, outfile=options.outfile, title=options.title, path=options.path,
values=False if options.no_values else _UNDEFINED,
show_browser=not options.no_browser,
embeddable=options.embeddable)