"""
AnalysisDriver definition.
"""
from collections import deque
from collections.abc import Iterable
import itertools
import traceback
from openmdao.core.driver import Driver, RecordingDebugging
from openmdao.core.analysis_error import AnalysisError
from openmdao.drivers.analysis_generator import AnalysisGenerator, SequenceGenerator
from openmdao.utils.mpi import MPI
from openmdao.utils.om_warnings import issue_warning, DriverWarning
[docs]class AnalysisDriver(Driver):
"""
A driver for repeatedly running the model with a list of sampled data.
Samples may be provided as a Sequence of dictionaries, where each entry
in the sequence is a dictionary keyed by the variable names to be set
for that specific execution.
For instance, the following sequence of samples provides 3 executions,
testing (x=0, y=4), (x=1, y=5), and (x=2, y=6). Units may be optionally
provided.
Alternatively, samples can be provided as an instance of AnalysisGenerator,
which will provide each sample in a lazily-evaluated way.
Parameters
----------
samples : list, tuple, or AnalysisGenerator
If given, provides a list or tuple of samples (variable names and values to be tested), or
an AnalysisGenerator which provides samples.
**kwargs : dict of keyword arguments
Keyword arguments that will be mapped into the Driver options.
Attributes
----------
_name : str
The name used to identify this driver in recorded samples.
_problem_comm : MPI.Comm or None
The MPI communicator for the Problem.
_color : int or None
In MPI, the cached color is used to determine which samples to run on this proc.
_num_colors : int
The number of total MPI colors for the run.
_prev_sample_vars : set
The set of variables seen in the previous iteration of the driver on this rank.
_generator : AnalysisGenerator
The internal AnalysisGenerator providing samples.
"""
[docs] def __init__(self, samples=None, **kwargs):
"""
Construct an AnalysisDriver.
"""
if isinstance(samples, (list, tuple)):
self._generator = SequenceGenerator(samples)
elif isinstance(samples, AnalysisGenerator):
self._generator = samples
elif samples is not None:
raise ValueError('If given, samples must be a list, tuple, '
f'or derived from AnalysisDriver but got {type(samples)}')
super().__init__(**kwargs)
# What we support
self.supports['integer_design_vars'] = True
# What we don't support
self.supports['distributed_design_vars'] = False
self.supports['optimization'] = False
self.supports._read_only = True
self._name = 'AnalysisDriver'
self._problem_comm = None
self._color = None
self._num_colors = 1
self._prev_sample_vars = set()
self._total_jac_format = 'dict'
def _declare_options(self):
"""
Declare options before kwargs are processed in the init method.
"""
self.options.declare('run_parallel', types=bool, default=False,
desc='Set to True to execute samples in parallel.')
self.options.declare('batch_size', types=int, default=1000,
desc='Number of samples to distribute among the processors '
'at a time when run_parallel is True. This should be limited when '
'the memory required to store the batch size of samples grows too '
'large.')
self.options.declare('procs_per_model', types=int, default=1, lower=1,
desc='Number of processors to give each model under MPI.')
[docs] def add_response(self, name, indices=None, units=None,
linear=False, parallel_deriv_color=None,
cache_linear_solution=False, flat_indices=None, alias=None):
r"""
Add a response variable to the model associated with this AnalysisDriver.
For AnalysisDriver, a response is an "output of interest" that we want to monitor
as a result of changes made in the various samples.
The AnalysisDriver.add_response interface does not support any optimization-centric
arguments associated with constraints or objectives, such as scaling.
Internally, the driver does add this as an 'objective' to the model for the purposes
of tracking derivatives.
Parameters
----------
name : str
Promoted name of the response variable in the system.
indices : sequence of int, optional
If variable is an array, these indicate which entries are of
interest for this particular response.
units : str, optional
Units to convert to before applying scaling.
linear : bool
Set to True if constraint is linear. Default is False.
parallel_deriv_color : str
If specified, this design var will be grouped for parallel derivative
calculations with other variables sharing the same parallel_deriv_color.
cache_linear_solution : bool
If True, store the linear solution vectors for this variable so they can
be used to start the next linear solution with an initial guess equal to the
solution from the previous linear solve.
flat_indices : bool
If True, interpret specified indices as being indices into a flat source array.
alias : str or None
Alias for this response. Necessary when adding multiple responses on different
indices of the same variable.
"""
model = self._problem().model
model.add_response(name=name, type_='obj', indices=indices,
linear=linear, units=units,
parallel_deriv_color=parallel_deriv_color,
cache_linear_solution=cache_linear_solution,
flat_indices=flat_indices, alias=alias)
[docs] def add_responses(self, responses):
"""
Add multiple responses to be recorded by the AnalysisDriver.
Parameters
----------
responses : Sequence or dict or str
A sequence of response names to be recorded. If more
metadata needs to be specified, reponses can be provided
as a dictionary whose keys are the variables to be recorded,
and whose associated values are dictionaries of metadata to
be passed on as keyword arguments to add_response.
"""
if isinstance(responses, str):
self.add_response(responses)
elif isinstance(responses, dict):
for var, meta in responses.items():
self.add_response(var, **meta)
elif isinstance(responses, Iterable):
for res in responses:
if isinstance(res, str):
self.add_response(res)
def _setup_comm(self, comm):
"""
Perform any driver-specific setup of communicators for the model.
Parameters
----------
comm : MPI.Comm or <FakeComm> or None
The communicator for the Problem.
Returns
-------
MPI.Comm or <FakeComm> or None
The communicator for the Problem model.
"""
self._prev_sample_vars.clear()
self._problem_comm = comm
if not MPI:
return comm
else:
procs_per_model = self.options['procs_per_model']
full_size = comm.size
self._num_colors = size = full_size // procs_per_model
if full_size != size * procs_per_model:
raise RuntimeError("The total number of processors is not evenly divisible by the "
"specified number of processors per model.\n Provide a "
f"number of processors that is a multiple of {procs_per_model}, "
"or specify a number of processors per model that divides "
f"into {full_size}.")
color = self._color = comm.rank % size
new_comm = comm.Split(color)
return new_comm
def _get_name(self):
"""
Get the name of this DOE driver and its case generator.
Returns
-------
str
The name of this DOE driver and its case generator.
"""
return self._name
[docs] def run(self):
"""
Generate samples and run the model for each set of generated input values.
Rank 0 will both manage the distribution of samples to the other procs and
serve as a worker running the samples.
All other procs just run samples.
Returns
-------
bool
Failure flag; True if failed to converge, False is successful.
"""
comm = self._problem_comm
model = self._problem().model
self.result.reset()
self.iter_count = 0
# Variables allowed samples are the inputs or implicit outputs in the model.
# Outputs from sources other than IndepVarComps would just have their value
# overridden when evaluating the model.
# Implicit outputs can override the value given in the case, but it might be a
# useful mechanism for providing an initial guess for a nonlinear solver.
model_inputs = {meta['prom_name'] for _, meta in
model.list_inputs(is_indep_var=True, out_stream=None)}
model_implicit_outputs = {meta['prom_name'] for _, meta
in model.list_outputs(explicit=False, out_stream=None)}
self._allowable_vars = model_inputs | model_implicit_outputs
n_procs = 1 if comm is None else comm.size
if self.options['run_parallel'] and MPI and n_procs > 1:
batch_size = self.options['batch_size']
color_cycler = itertools.cycle(range(self._num_colors))
samples_complete = False
sample_num = 0
job_queues = None
colors = comm.gather(self._color, root=0)
if comm.rank == 0:
color_to_rank_map = {num: [i for i, x in enumerate(colors)
if x == num] for num in set(colors)}
while not samples_complete:
if comm.rank == 0:
job_queues = [deque() for _ in range(n_procs)]
# Rank 0 pushes batch_size jobs to the ranks in job_queues
batch_i = 0
while True:
try:
sample = next(self._generator)
except StopIteration:
samples_complete = True
break
color_idx = next(color_cycler)
for rank_idx in color_to_rank_map[color_idx]:
job_queues[rank_idx].appendleft((sample_num, sample))
if batch_i >= batch_size:
break
batch_i += 1
sample_num += 1
# Broadcast the samples_complete signal from root to all ranks
samples_complete = comm.bcast(samples_complete, root=0)
# Scatter the job list to each rank
q = comm.scatter(job_queues, root=0)
# Now each proc does the jobs in its queue
while q:
sample_num, sample = q.pop()
self._run_sample(sample, sample_num)
# Wait for all processors to run their jobs.
# Then repeat until samples are exhausted.
comm.barrier()
else:
# Not under MPI
for sample_num, sample in enumerate(self._generator):
self._run_sample(sample, sample_num)
return False
def _run_sample(self, sample, sample_num):
"""
Run case, save exception info and mark the metadata if the case fails.
Parameters
----------
sample : dict
A dictionary keyed by variable name with each value being
a dictionary with a 'val' key, and optionally keys for
'units' and 'indices'.
sample_num : int
The iteration of the AnalysisDriver to which this case corresponds.
"""
comm = self._problem_comm
rank = 0 if self._problem_comm is None else comm.rank
self.iter_count = sample_num
metadata = {}
sample_vars = set()
for var, meta in sample.items():
sample_vars.add(var)
val = meta['val']
units = meta.get('units', None)
idxs = meta.get('indices', None)
# If we've given the model more procs than necessary,
# then it will not have inputs/implicit outputs on some ranks.
# Check that self._allowable_vars is not empty before we warn.
if self._allowable_vars and var not in self._allowable_vars:
issue_warning(msg=f'Variable `{var}` is neither an independent variable\n'
f'nor an implicit output in the model on rank {rank}.\n'
'Setting its value in the case data will have no\n'
'impact on the outputs of the model after execution.',
category=DriverWarning)
self._problem().model.set_val(var, val, units, idxs)
if self._prev_sample_vars and sample_vars != self._prev_sample_vars:
new_vars = self._prev_sample_vars - sample_vars
missing_vars = sample_vars - self._prev_sample_vars
info = f'Missing variables: {missing_vars}\n' if missing_vars else ''
info += f'New variables: {new_vars}\n' if new_vars else ''
issue_warning(msg=f'The variables in sample {sample_num} differ from\n'
f'the previous sample\'s variables.\n{info}',
category=DriverWarning)
self._prev_sample_vars = sample_vars
with RecordingDebugging(self._get_name(), self.iter_count, self):
try:
self._run_solve_nonlinear()
metadata['success'] = 1
metadata['msg'] = ''
except AnalysisError:
metadata['success'] = 0
metadata['msg'] = traceback.format_exc()
except Exception:
metadata['success'] = 0
metadata['msg'] = traceback.format_exc()
print(metadata['msg'])
# save reference to metadata for use in record_iteration
self._metadata = metadata
if self.recording_options['record_derivatives']:
self._compute_totals(of=list(self._responses.keys()),
wrt=list(self._get_sampled_vars()),
return_format=self._total_jac_format,
driver_scaling=False)
def _get_sampled_vars(self):
"""
Return all of the variables (promoted name) to be sampled by this driver.
"""
if hasattr(self._generator, '_get_sampled_vars'):
return set(self._generator._get_sampled_vars())
elif isinstance(self._generator, (list, tuple)):
try:
return set(self._generator[0].keys())
except IndexError:
pass
raise AttributeError('The samples for AnalysisDriver must be a list, tuple, '
'or an AnalysisGenerator that provides a _get_sampled_vars() method')
def _setup_recording(self):
"""
Set up case recording.
"""
# We don't necessarily know a-priori what variables are in our case generators.
# Tee the samples and add the variables defined within to be recorded.
comm = self._problem_comm
model = self._problem().model
rec_includes = self.recording_options['includes']
implicit_outputs = {meta['prom_name'] for _, meta in
model.list_outputs(explicit=False, implicit=True, out_stream=None)}
prom2abs_in = model._var_allprocs_prom2abs_list['input']
# Responses are recorded by default, add the inputs to be recorded.
for prom_name in self._get_sampled_vars():
if prom_name in implicit_outputs and prom_name not in rec_includes:
self.recording_options['includes'].append(prom_name)
elif prom_name in prom2abs_in.keys():
self.recording_options['includes'].append(prom_name)
if MPI:
run_parallel = self.options['run_parallel']
procs_per_model = self.options['procs_per_model']
for recorder in self._rec_mgr:
if run_parallel:
# write samples only on procs up to the number of parallel models
# (i.e. on the root procs for the samples)
if procs_per_model == 1:
recorder.record_on_process = True
else:
size = self._problem_comm.size // procs_per_model
if self._problem_comm.rank < size:
recorder.record_on_process = True
elif comm is None or comm.rank == 0:
# if not running samples in parallel, then just record on proc 0
recorder.record_on_process = True
super()._setup_recording()
def _get_recorder_metadata(self, case_name):
"""
Return metadata from the latest iteration for use in the recorder.
Parameters
----------
case_name : str
Name of current case.
Returns
-------
dict
Metadata dictionary for the recorder.
"""
self._metadata['name'] = case_name
return self._metadata