from collections.abc import Iterable, Callable
import inspect
import warnings
import numpy as np
from scipy import interpolate
import openmdao
import openmdao.api as om
from openmdao.utils.mpi import MPI
from openmdao.utils.om_warnings import issue_warning
from openmdao.core.system import System
from openmdao.recorders.case import Case
import dymos as dm
from .options import ControlOptionsDictionary, ParameterOptionsDictionary, \
StateOptionsDictionary, TimeOptionsDictionary, ConstraintOptionsDictionary, \
PolynomialControlOptionsDictionary, GridRefinementOptionsDictionary, SimulateOptionsDictionary, \
TimeseriesOutputOptionsDictionary, PhaseTimeseriesOptionsDictionary
from ..transcriptions.transcription_base import TranscriptionBase
from ..transcriptions.grid_data import GaussLobattoGrid, RadauGrid, UniformGrid
from ..transcriptions import ExplicitShooting, GaussLobatto, Radau
from ..utils.indexing import get_constraint_flat_idxs
from ..utils.introspection import configure_time_introspection, _configure_constraint_introspection, \
configure_controls_introspection, configure_parameters_introspection, \
configure_timeseries_output_introspection, classify_var, configure_timeseries_expr_introspection
from ..utils.misc import _unspecified
from ..utils.lgl import lgl
om_dev_version = openmdao.__version__.endswith('dev')
om_version = tuple(int(s) for s in openmdao.__version__.split('-')[0].split('.'))
class Phase(om.Group):
"""
The Phase object in Dymos.
The Phase object is an OpenMDAO Group which contains the options for the variables in the
optimal control problem (states, times, controls, parameters), the transcription, and
the ODE class.
The role of the Phase is to unite the problem formulation with the transcription and the ODE
in order to transcribe a single portion of a trajectory into a nonlinear programming problem
to be solved by the optimizer.
On setup, the Phase runs through its setup stack which will add the appropriate OpenMDAO
systems as prescribed by its associated Transcription.
Parameters
----------
from_phase : <Phase> or None
A phase instance from which the initialized phase should copy its data.
**kwargs : dict
Dictionary of optional phase arguments.
"""
def __init__(self, from_phase=None, **kwargs):
_kwargs = kwargs.copy()
# These are the options which will be set at setup time.
# Prior to setup, the options are placed into the user_*_options dictionaries.
self.time_options = TimeOptionsDictionary()
self.state_options = {}
self.control_options = {}
self.polynomial_control_options = {}
self.parameter_options = {}
self.refine_options = GridRefinementOptionsDictionary()
self.simulate_options = SimulateOptionsDictionary()
self.timeseries_ec_vars = {}
self.timeseries_options = PhaseTimeseriesOptionsDictionary()
# Dictionaries of variable options that are set by the user via the API
# These will be applied over any defaults specified by decorators on the ODE
if from_phase is None:
self._initial_boundary_constraints = []
self._final_boundary_constraints = []
self._path_constraints = []
self._timeseries = {'timeseries': {'transcription': None,
'subset': 'all',
'outputs': {}}}
self._objectives = {}
else:
self.time_options.update(from_phase.time_options)
self.state_options = from_phase.state_options.copy()
self.control_options = from_phase.control_options.copy()
self.polynomial_control_options = from_phase.polynomial_control_options.copy()
self.parameter_options = from_phase.parameter_options.copy()
self.refine_options.update(from_phase.refine_options)
self.simulate_options.update(from_phase.simulate_options)
self._initial_boundary_constraints = from_phase._initial_boundary_constraints.copy()
self._final_boundary_constraints = from_phase._final_boundary_constraints.copy()
self._path_constraints = from_phase._path_constraints.copy()
self._timeseries = from_phase._timeseries.copy()
self._objectives = from_phase._objectives.copy()
_kwargs['ode_class'] = from_phase.options['ode_class']
_kwargs['ode_init_kwargs'] = from_phase.options['ode_init_kwargs']
super(Phase, self).__init__(**_kwargs)
def initialize(self):
"""
Declare instantiation options for the phase.
"""
self.options.declare('ode_class', default=None,
desc='System defining the ODE',
recordable=False)
self.options.declare('ode_init_kwargs', types=dict, default={},
desc='Keyword arguments provided when initializing the ODE System')
self.options.declare('transcription', types=TranscriptionBase,
desc='Transcription technique of the optimal control problem.')
[docs] def add_state(self, name, units=_unspecified, shape=_unspecified,
rate_source=_unspecified, targets=_unspecified,
val=_unspecified, fix_initial=_unspecified, fix_final=_unspecified,
lower=_unspecified, upper=_unspecified, scaler=_unspecified, adder=_unspecified,
ref0=_unspecified, ref=_unspecified, defect_scaler=_unspecified,
defect_ref=_unspecified, continuity_scaler=_unspecified, continuity_ref=_unspecified,
solve_segments=_unspecified, connected_initial=_unspecified,
source=_unspecified, input_initial=_unspecified, initial_targets=_unspecified,
opt=_unspecified, initial_bounds=_unspecified, final_bounds=_unspecified):
"""
Add a state variable to be integrated by the phase.
Parameters
----------
name : str
Name of the state variable in the RHS.
units : str or None
Units in which the state variable is defined. Internally components may use different
units for the state variable, but the IndepVarComp which provides its value will provide
it in these units, and collocation defects will use these units. If units is not
specified here then the unit will be determined from the rate_source.
shape : tuple of int
The shape of the state variable. For instance, a 3D cartesian position vector would have
a shape of (3,). This only needs to be specified if the rate_source target points to
a control or state whose shape isn't known in time.
rate_source : str
The path to the ODE output which provides the rate of this state variable.
targets : str or Sequence of str
The path to the targets of the state variable in the ODE system. If given
this will override the value given by the @declare_state decorator on the ODE.
In the future, if left _unspecified (the default), the phase variable will try to connect to an ODE input
of the same name. Set targets to None to prevent this.
val : ndarray
The default value of the state at the state discretization nodes of the phase.
fix_initial : bool(False)
If True, omit the first value of the state from the design variables (prevent the
optimizer from changing it).
fix_final : bool(False)
If True, omit the final value of the state from the design variables (prevent the
optimizer from changing it).
lower : float or ndarray or None (None)
The lower bound of the state at the nodes of the phase.
upper : float or ndarray or None (None)
The upper bound of the state at the nodes of the phase.
scaler : float or ndarray or None (None)
The scaler of the state value at the nodes of the phase.
adder : float or ndarray or None (None)
The adder of the state value at the nodes of the phase.
ref0 : float or ndarray or None (None)
The zero-reference value of the state at the nodes of the phase.
ref : float or ndarray or None (None)
The unit-reference value of the state at the nodes of the phase.
defect_scaler : float or ndarray
The scaler of the state defect at the collocation nodes of the phase.
defect_ref : float or ndarray
The unit-reference value of the state defect at the collocation nodes of the phase. If
provided, this value overrides defect_scaler.
continuity_scaler : float
Constraint scaler used to enforce the continuity mismatch defect between segments when transcription
is not compressed.
continuity_ref : float
Reference unit value of the continuity mismatch defect between segments when transcription is not
compressed. Used in place of scaler.
solve_segments : bool(False)
If True, a solver will be used to converge the collocation defects within a segment.
Note that the state continuity defects between segements will still be
handled by the optimizer.
connected_initial : bool
If True, then the initial value for this state comes from an externally connected
source. Deprecated - use input_initial.
source : str
The path to the ODE output which provides the solution for this state variable when using an
Analytic transcription.
input_initial : bool
If True, then the initial value for this state comes is an input.
initial_targets : str
The path to the ODE inputs to which the initial value of this state should be connected.
opt : bool
If True, state values are fixed at the input values and the optimizer resolves defect constraints by
varying the other design variables in the phase.
initial_bounds : tuple
The bounds (lower, upper) of the state variable at the initial point in the phase.
final_bounds : tuple
The bounds (lower, upper) of the state variable at the final point in the phase.
"""
if name not in self.state_options:
self.state_options[name] = StateOptionsDictionary()
self.state_options[name]['name'] = name
self.set_state_options(name=name, units=units, shape=shape, rate_source=rate_source,
targets=targets, val=val, fix_initial=fix_initial,
fix_final=fix_final, lower=lower, upper=upper, scaler=scaler,
adder=adder, ref0=ref0, ref=ref, defect_scaler=defect_scaler,
defect_ref=defect_ref, continuity_scaler=continuity_scaler,
continuity_ref=continuity_ref, solve_segments=solve_segments,
connected_initial=connected_initial, source=source, input_initial=input_initial,
initial_targets=initial_targets, opt=opt, initial_bounds=initial_bounds,
final_bounds=final_bounds)
[docs] def set_state_options(self, name, units=_unspecified, shape=_unspecified,
rate_source=_unspecified, targets=_unspecified,
val=_unspecified, fix_initial=_unspecified, fix_final=_unspecified,
lower=_unspecified, upper=_unspecified, scaler=_unspecified, adder=_unspecified,
ref0=_unspecified, ref=_unspecified, defect_scaler=_unspecified,
defect_ref=_unspecified, continuity_scaler=_unspecified, continuity_ref=_unspecified,
solve_segments=_unspecified, connected_initial=_unspecified,
source=_unspecified, input_initial=_unspecified, initial_targets=_unspecified,
opt=_unspecified, initial_bounds=_unspecified, final_bounds=_unspecified):
"""
Set options that apply the EOM state variable of the given name.
Parameters
----------
name : str
Name of the state variable in the RHS.
units : str or None
Units in which the state variable is defined. Internally components may use different
units for the state variable, but the IndepVarComp which provides its value will provide
it in these units, and collocation defects will use these units. If units is not
specified here then the unit will be determined from the rate_source.
shape : tuple of int
The shape of the state variable. For instance, a 3D cartesian position vector would have
a shape of (3,). This only needs to be specified if the rate_source target points to
a control or state whose shape isn't known in time.
rate_source : str
The path to the ODE output which provides the rate of this state variable.
targets : str or Sequence of str
The path to the targets of the state variable in the ODE system. If given
this will override the value given by the @declare_state decorator on the ODE.
In the future, if left _unspecified (the default), the phase variable will try to connect to an ODE input
of the same name. Set targets to None to prevent this.
val : ndarray
The default value of the state at the state discretization nodes of the phase.
fix_initial : bool(False)
If True, omit the first value of the state from the design variables (prevent the
optimizer from changing it).
fix_final : bool(False)
If True, omit the final value of the state from the design variables (prevent the
optimizer from changing it).
lower : float or ndarray or None (None)
The lower bound of the state at the nodes of the phase.
upper : float or ndarray or None (None)
The upper bound of the state at the nodes of the phase.
scaler : float or ndarray or None (None)
The scaler of the state value at the nodes of the phase.
adder : float or ndarray or None (None)
The adder of the state value at the nodes of the phase.
ref0 : float or ndarray or None (None)
The zero-reference value of the state at the nodes of the phase.
ref : float or ndarray or None (None)
The unit-reference value of the state at the nodes of the phase.
defect_scaler : float or ndarray
The scaler of the state defect at the collocation nodes of the phase.
defect_ref : float or ndarray
The unit-reference value of the state defect at the collocation nodes of the phase. If
provided, this value overrides defect_scaler.
continuity_scaler : float
Constraint scaler used to enforce the continuity mismatch defect between segments when transcription
is not compressed.
continuity_ref : float
Reference unit value of the continuity mismatch defect between segments when transcription is not
compressed. Used in place of scaler.
solve_segments : bool(False)
If True, a solver will be used to converge the collocation defects within a segment.
Note that the state continuity defects between segements will still be
handled by the optimizer.
connected_initial : bool
If True, then the initial value for this state comes from an externally connected
source. Deprecated - use input_initial.
source : str
The path to the ODE output which provides the solution for this state variable when using an
Analytic transcription.
input_initial : bool
If True, then the initial value for this state comes is an input.
initial_targets : str or Sequence of str
The path to the ODE inputs to which the initial value of this state should be connected.
opt : bool
If True, state values are fixed at the input values and the optimizer resolves defect constraints by
varying the other design variables in the phase.
initial_bounds : tuple
The bounds (lower, upper) of the state variable at the initial point in the phase.
final_bounds : tuple
The bounds (lower, upper) of the state variable at the final point in the phase.
"""
if name not in self.state_options:
# This state option will be picked up automatically from tags.
self.state_options[name] = StateOptionsDictionary()
self.state_options[name]['name'] = name
if units is not _unspecified:
self.state_options[name]['units'] = units
if shape is not _unspecified:
self.state_options[name]['shape'] = shape
if rate_source is not _unspecified:
self.state_options[name]['rate_source'] = rate_source
if targets is not _unspecified:
if isinstance(targets, str):
self.state_options[name]['targets'] = (targets,)
else:
self.state_options[name]['targets'] = targets
if val is not _unspecified:
self.state_options[name]['val'] = val
if fix_initial is not _unspecified:
self.state_options[name]['fix_initial'] = fix_initial
if fix_final is not _unspecified:
self.state_options[name]['fix_final'] = fix_final
if lower is not _unspecified:
self.state_options[name]['lower'] = lower
if upper is not _unspecified:
self.state_options[name]['upper'] = upper
if scaler is not _unspecified:
self.state_options[name]['scaler'] = scaler
if adder is not _unspecified:
self.state_options[name]['adder'] = adder
if ref0 is not _unspecified:
self.state_options[name]['ref0'] = ref0
if ref is not _unspecified:
self.state_options[name]['ref'] = ref
if defect_scaler is not _unspecified:
self.state_options[name]['defect_scaler'] = defect_scaler
if defect_ref is not _unspecified:
self.state_options[name]['defect_ref'] = defect_ref
if continuity_scaler is not _unspecified:
self.state_options[name]['continuity_scaler'] = continuity_scaler
if continuity_ref is not _unspecified:
self.state_options[name]['continuity_ref'] = continuity_ref
if solve_segments is not _unspecified:
self.state_options[name]['solve_segments'] = solve_segments
if connected_initial is not _unspecified:
self.state_options[name]['connected_initial'] = connected_initial
om.issue_warning(f'{self.pathname}: State option `connected_initial` is deprecated. Use input_initial',
om.OMDeprecationWarning)
self.state_options[name]['input_initial'] = connected_initial
if source is not _unspecified:
self.state_options[name]['source'] = source
if input_initial is not _unspecified:
self.state_options[name]['input_initial'] = input_initial
if opt is not _unspecified:
self.state_options[name]['opt'] = opt
if initial_bounds is not _unspecified:
self.state_options[name]['initial_bounds'] = initial_bounds
if final_bounds is not _unspecified:
self.state_options[name]['final_bounds'] = final_bounds
def check_parameter(self, name):
"""
Checks that the parameter of the given name is valid.
First name is checked against all existing states, controls, input parameters, and
parameters. If it has already been assigned to one of those, ValueError is raised.
Finally, if *dynamic* is True, the control is not a dynamic parameter in the ODE,
ValueError is raised.
Parameters
----------
name : str
The name of the controllable parameter.
Raises
------
ValueError
Raised if the parameter of the given name is previously assigned or
incompatible with the type of control to which it is assigned.
"""
if name in ['time', 'time_phase', 't_initial', 't_duration']:
raise ValueError(f'The name {name} is reserved for the independent variable of integration'
' in Dymos and may not be used as a state, control, or parameter name')
elif name in self.state_options:
raise ValueError(f'{name} has already been added as a state.')
elif name in self.control_options:
raise ValueError(f'{name} has already been added as a control.')
elif name in self.parameter_options:
raise ValueError(f'{name} has already been added as a parameter.')
elif name in self.polynomial_control_options:
raise ValueError(f'{name} has already been added as a polynomial control.')
[docs] def add_control(self, name, units=_unspecified, desc=_unspecified, opt=_unspecified,
fix_initial=_unspecified, fix_final=_unspecified, targets=_unspecified,
rate_targets=_unspecified, rate2_targets=_unspecified, val=_unspecified,
shape=_unspecified, lower=_unspecified, upper=_unspecified, scaler=_unspecified,
adder=_unspecified, ref0=_unspecified, ref=_unspecified, continuity=_unspecified,
continuity_scaler=_unspecified, continuity_ref=_unspecified, rate_continuity=_unspecified,
rate_continuity_scaler=_unspecified, rate_continuity_ref=_unspecified,
rate2_continuity=_unspecified, rate2_continuity_scaler=_unspecified,
rate2_continuity_ref=_unspecified):
"""
Adds a dynamic control variable to be tied to a parameter in the ODE.
Parameters
----------
name : str
The name assigned to the control variable. If the ODE has been decorated with
parameters, this should be the name of a control in the system.
units : str or None
The units with which the control parameter in this phase will be defined. It must be
compatible with the units of the targets to which the control is connected.
desc : str
A description of the control variable.
opt : bool
If True, the control value will be a design variable for the optimization problem.
If False, allow the control to be connected externally.
fix_initial : bool
If True, the initial value of this control is fixed and not a design variable.
This option is invalid if opt=False.
fix_final : bool
If True, the final value of this control is fixed and not a design variable.
This option is invalid if opt=False.
targets : Sequence of str or None
Targets in the ODE to which this control is connected.
In the future, if left _unspecified (the default), the phase control will try to connect to an ODE input
of the same name. Set targets to None to prevent this.
rate_targets : Sequence of str or None
The targets in the ODE to which the control rate is connected.
rate2_targets : Sequence of str or None
The parameter in the ODE to which the control 2nd derivative is connected.
val : float
The default value of the control variable at the control input nodes.
shape : Sequence of int
The shape of the control variable at each point in time. Only needed for controls that don't
have a target in the ode.
lower : Sequence of Number or None
The lower bound of the control variable at the nodes.
This option is invalid if opt=False.
upper : Sequence or Number or None
The upper bound of the control variable at the nodes.
This option is invalid if opt=False.
scaler : float or None
The scaler of the control variable at the nodes.
This option is invalid if opt=False.
adder : float or None
The adder of the control variable at the nodes.
This option is invalid if opt=False.
ref0 : float or None
The zero-reference value of the control variable at the nodes.
This option is invalid if opt=False.
ref : float or None
The unit-reference value of the control variable at the nodes.
This option is invalid if opt=False.
continuity : bool
Enforce continuity of control values at segment boundaries.
This option is invalid if opt=False.
continuity_scaler : bool
Scaler of the continuity constraint. This option is invalid if opt=False. This
option is only relevant in the Radau pseudospectral transcription where the continuity
constraint is nonlinear. For Gauss-Lobatto the continuity constraint is linear.
continuity_ref : bool
Reference unit value to be used in place of continuity scaler.
rate_continuity : bool
Enforce continuity of control first derivatives (in dimensionless time) at
segment boundaries.
This option is invalid if opt=False.
rate_continuity_scaler : float
Scaler of the rate continuity constraint at segment boundaries.
This option is invalid if opt=False.
rate_continuity_ref : float or None
Reference unit value of the rate continuity constraint at segment boundaries, for use in
place of rate_continuity_scaler.
rate2_continuity : bool or None
Enforce continuity of control second derivatives at segment boundaries.
This option is invalid if opt=False.
rate2_continuity_scaler : float or None
Scaler of the dimensionless rate continuity constraint at segment boundaries.
This option is invalid if opt=False.
rate2_continuity_ref : float or None
Reference unit value of the rate2 continuity constraint at segment boundaries, for use in
place of rate_continuity_scaler.
Notes
-----
rate and rate2 continuity are not enforced for input controls.
"""
if name not in self.control_options:
self.check_parameter(name)
self.control_options[name] = ControlOptionsDictionary()
self.control_options[name]['name'] = name
self.set_control_options(name, units=units, desc=desc, opt=opt, fix_initial=fix_initial,
fix_final=fix_final, targets=targets, rate_targets=rate_targets,
rate2_targets=rate2_targets, val=val, shape=shape, lower=lower,
upper=upper, scaler=scaler, adder=adder, ref0=ref0, ref=ref,
continuity=continuity, continuity_scaler=continuity_scaler,
continuity_ref=continuity_ref, rate_continuity=rate_continuity,
rate_continuity_scaler=rate_continuity_scaler,
rate_continuity_ref=rate_continuity_ref,
rate2_continuity=rate2_continuity,
rate2_continuity_scaler=rate2_continuity_scaler,
rate2_continuity_ref=rate2_continuity_ref)
[docs] def set_control_options(self, name, units=_unspecified, desc=_unspecified, opt=_unspecified,
fix_initial=_unspecified, fix_final=_unspecified, targets=_unspecified,
rate_targets=_unspecified, rate2_targets=_unspecified, val=_unspecified,
shape=_unspecified, lower=_unspecified, upper=_unspecified, scaler=_unspecified,
adder=_unspecified, ref0=_unspecified, ref=_unspecified, continuity=_unspecified,
continuity_scaler=_unspecified, continuity_ref=_unspecified,
rate_continuity=_unspecified, rate_continuity_scaler=_unspecified,
rate_continuity_ref=_unspecified, rate2_continuity=_unspecified,
rate2_continuity_scaler=_unspecified, rate2_continuity_ref=_unspecified):
"""
Set options on an existing dynamic control variable in the phase.
Parameters
----------
name : str
The name assigned to the control variable. If the ODE has been decorated with
parameters, this should be the name of a control in the system.
units : str or None
The units with which the control parameter in this phase will be defined. It must be
compatible with the units of the targets to which the control is connected.
desc : str
A description of the control variable.
opt : bool
If True, the control value will be a design variable for the optimization problem.
If False, allow the control to be connected externally.
fix_initial : bool
If True, the initial value of this control is fixed and not a design variable.
This option is invalid if opt=False.
fix_final : bool
If True, the final value of this control is fixed and not a design variable.
This option is invalid if opt=False.
targets : Sequence of str or None
Targets in the ODE to which this control is connected.
In the future, if left _unspecified (the default), the phase control will try to connect to an ODE input
of the same name. Set targets to None to prevent this.
rate_targets : Sequence of str or None
The targets in the ODE to which the control rate is connected.
rate2_targets : Sequence of str or None
The parameter in the ODE to which the control 2nd derivative is connected.
val : float
The default value of the control variable at the control input nodes.
shape : Sequence of int
The shape of the control variable at each point in time. Only needed for controls that don't
have a target in the ode.
lower : Sequence of Number or None
The lower bound of the control variable at the nodes.
This option is invalid if opt=False.
upper : Sequence or Number or None
The upper bound of the control variable at the nodes.
This option is invalid if opt=False.
scaler : float or None
The scaler of the control variable at the nodes.
This option is invalid if opt=False.
adder : float or None
The adder of the control variable at the nodes.
This option is invalid if opt=False.
ref0 : float or None
The zero-reference value of the control variable at the nodes.
This option is invalid if opt=False.
ref : float or None
The unit-reference value of the control variable at the nodes.
This option is invalid if opt=False.
continuity : bool
Enforce continuity of control values at segment boundaries.
This option is invalid if opt=False.
continuity_scaler : bool
Scaler of the continuity constraint. This option is invalid if opt=False. This
option is only relevant in the Radau pseudospectral transcription where the continuity
constraint is nonlinear. For Gauss-Lobatto the continuity constraint is linear.
continuity_ref : bool
Reference unit value to be used in place of continuity scaler.
rate_continuity : bool
Enforce continuity of control first derivatives (in dimensionless time) at
segment boundaries.
This option is invalid if opt=False.
rate_continuity_scaler : float
Scaler of the rate continuity constraint at segment boundaries.
This option is invalid if opt=False.
rate_continuity_ref : float or None
Reference unit value of the rate continuity constraint at segment boundaries, for use in
place of rate_continuity_scaler.
rate2_continuity : bool or None
Enforce continuity of control second derivatives at segment boundaries.
This option is invalid if opt=False.
rate2_continuity_scaler : float or None
Scaler of the dimensionless rate continuity constraint at segment boundaries.
This option is invalid if opt=False.
rate2_continuity_ref : float or None
Reference unit value of the rate2 continuity constraint at segment boundaries, for use in
place of rate_continuity_scaler.
Notes
-----
rate and rate2 continuity are not enforced for input controls.
"""
if units is not _unspecified:
self.control_options[name]['units'] = units
if opt is not _unspecified:
self.control_options[name]['opt'] = opt
if desc is not _unspecified:
self.control_options[name]['desc'] = desc
if targets is not _unspecified:
if isinstance(targets, str):
self.control_options[name]['targets'] = (targets,)
else:
self.control_options[name]['targets'] = targets
if rate_targets is not _unspecified:
if isinstance(rate_targets, str):
self.control_options[name]['rate_targets'] = (rate_targets,)
else:
self.control_options[name]['rate_targets'] = rate_targets
if rate2_targets is not _unspecified:
if isinstance(rate2_targets, str):
self.control_options[name]['rate2_targets'] = (rate2_targets,)
else:
self.control_options[name]['rate2_targets'] = rate2_targets
if val is not _unspecified:
self.control_options[name]['val'] = val
if shape is not _unspecified:
self.control_options[name]['shape'] = shape
if fix_initial is not _unspecified:
self.control_options[name]['fix_initial'] = fix_initial
if fix_final is not _unspecified:
self.control_options[name]['fix_final'] = fix_final
if lower is not _unspecified:
self.control_options[name]['lower'] = lower
if upper is not _unspecified:
self.control_options[name]['upper'] = upper
if scaler is not _unspecified:
self.control_options[name]['scaler'] = scaler
if adder is not _unspecified:
self.control_options[name]['adder'] = adder
if ref0 is not _unspecified:
self.control_options[name]['ref0'] = ref0
if ref is not _unspecified:
self.control_options[name]['ref'] = ref
if continuity is not _unspecified:
self.control_options[name]['continuity'] = continuity
if continuity_scaler is not _unspecified:
self.control_options[name]['continuity_scaler'] = continuity_scaler
if continuity_ref is not _unspecified:
self.control_options[name]['continuity_ref'] = continuity_ref
if rate_continuity is not _unspecified:
self.control_options[name]['rate_continuity'] = rate_continuity
if rate_continuity_scaler is not _unspecified:
self.control_options[name]['rate_continuity_scaler'] = rate_continuity_scaler
if rate_continuity_ref is not _unspecified:
self.control_options[name]['rate_continuity_ref'] = rate_continuity_ref
if rate2_continuity is not _unspecified:
self.control_options[name]['rate2_continuity'] = rate2_continuity
if rate2_continuity_scaler is not _unspecified:
self.control_options[name]['rate2_continuity_scaler'] = rate2_continuity_scaler
if rate2_continuity_ref is not _unspecified:
self.control_options[name]['rate2_continuity_ref'] = rate2_continuity_ref
[docs] def add_polynomial_control(self, name, order, desc=_unspecified, val=_unspecified, units=_unspecified,
opt=_unspecified, fix_initial=_unspecified, fix_final=_unspecified,
lower=_unspecified, upper=_unspecified,
scaler=_unspecified, adder=_unspecified, ref0=_unspecified,
ref=_unspecified, targets=_unspecified, rate_targets=_unspecified,
rate2_targets=_unspecified, shape=_unspecified):
"""
Adds a polynomial control variable to be tied to a parameter in the ODE.
Polynomial controls are defined by values at the Legendre-Gauss-Lobatto nodes of a
single polynomial, defined on [-1, 1] in phase tau space.
For a polynomial control of a given order, the number of nodes used to define the
polynomial is (order + 1).
Parameters
----------
name : str
Name of the controllable parameter in the ODE.
order : int
The order of the interpolating polynomial used to represent the control value in
phase tau space.
desc : str
A description of the polynomial control.
val : float or ndarray
Default value of the control at all nodes. If val scalar and the control
is dynamic it will be broadcast.
units : str or None or 0
Units in which the control variable is defined. If 0, use the units declared
for the parameter in the ODE.
opt : bool
If True (default) the value(s) of this control will be design variables in
the optimization problem, in the path 'phase_name.indep_controls.controls:control_name'.
If False, the values of this control will exist as input controls:{name}.
fix_initial : bool
If True, the given initial value of the polynomial control is not a design variable and
will not be changed during the optimization.
fix_final : bool
If True, the given final value of the polynomial control is not a design variable and
will not be changed during the optimization.
lower : float or ndarray
The lower bound of the control at the nodes of the phase.
upper : float or ndarray
The upper bound of the control at the nodes of the phase.
scaler : float or ndarray
The scaler of the control value at the nodes of the phase.
adder : float or ndarray
The adder of the control value at the nodes of the phase.
ref0 : float or ndarray
The zero-reference value of the control at the nodes of the phase.
ref : float or ndarray
The unit-reference value of the control at the nodes of the phase.
targets : Sequence of str or None
Targets in the ODE to which this polynomial control is connected.
rate_targets : None or str
The name of the parameter in the ODE to which the first time-derivative
of the control value is connected.
rate2_targets : None or str
The name of the parameter in the ODE to which the second time-derivative
of the control value is connected.
shape : Sequence of int
The shape of the control variable at each point in time.
"""
self.check_parameter(name)
if name not in self.polynomial_control_options:
self.polynomial_control_options[name] = PolynomialControlOptionsDictionary()
self.polynomial_control_options[name]['name'] = name
self.polynomial_control_options[name]['order'] = order
self.set_polynomial_control_options(name, order, desc, val, units, opt,
fix_initial, fix_final, lower, upper,
scaler, adder, ref0, ref,
targets, rate_targets, rate2_targets, shape)
[docs] def set_polynomial_control_options(self, name, order=_unspecified, desc=_unspecified, val=_unspecified,
units=_unspecified, opt=_unspecified, fix_initial=_unspecified,
fix_final=_unspecified, lower=_unspecified, upper=_unspecified,
scaler=_unspecified, adder=_unspecified, ref0=_unspecified,
ref=_unspecified, targets=_unspecified, rate_targets=_unspecified,
rate2_targets=_unspecified, shape=_unspecified):
"""
Set options on an existing polynomial control variable in the phase.
Parameters
----------
name : str
Name of the controllable parameter in the ODE.
order : int
The order of the interpolating polynomial used to represent the control value in
phase tau space.
desc : str
A description of the polynomial control.
val : float or ndarray
Default value of the control at all nodes. If val scalar and the control
is dynamic it will be broadcast.
units : str or None or 0
Units in which the control variable is defined. If 0, use the units declared
for the parameter in the ODE.
opt : bool
If True (default) the value(s) of this control will be design variables in
the optimization problem, in the path 'phase_name.indep_controls.controls:control_name'.
If False, the values of this control will exist as input controls:{name}.
fix_initial : bool
If True, the given initial value of the polynomial control is not a design variable and
will not be changed during the optimization.
fix_final : bool
If True, the given final value of the polynomial control is not a design variable and
will not be changed during the optimization.
lower : float or ndarray
The lower bound of the control at the nodes of the phase.
upper : float or ndarray
The upper bound of the control at the nodes of the phase.
scaler : float or ndarray
The scaler of the control value at the nodes of the phase.
adder : float or ndarray
The adder of the control value at the nodes of the phase.
ref0 : float or ndarray
The zero-reference value of the control at the nodes of the phase.
ref : float or ndarray
The unit-reference value of the control at the nodes of the phase.
targets : Sequence of str or None
Targets in the ODE to which this polynomial control is connected.
rate_targets : None or str
The name of the parameter in the ODE to which the first time-derivative
of the control value is connected.
rate2_targets : None or str
The name of the parameter in the ODE to which the second time-derivative
of the control value is connected.
shape : Sequence of int
The shape of the control variable at each point in time.
"""
if order is not _unspecified:
self.polynomial_control_options[name]['order'] = order
if units is not _unspecified:
self.polynomial_control_options[name]['units'] = units
if opt is not _unspecified:
self.polynomial_control_options[name]['opt'] = opt
if desc is not _unspecified:
self.polynomial_control_options[name]['desc'] = desc
if targets is not _unspecified:
if isinstance(targets, str):
self.polynomial_control_options[name]['targets'] = (targets,)
else:
self.polynomial_control_options[name]['targets'] = targets
if rate_targets is not _unspecified:
if isinstance(rate_targets, str):
self.polynomial_control_options[name]['rate_targets'] = (rate_targets,)
else:
self.polynomial_control_options[name]['rate_targets'] = rate_targets
if rate2_targets is not _unspecified:
if isinstance(rate2_targets, str):
self.polynomial_control_options[name]['rate2_targets'] = (rate2_targets,)
else:
self.polynomial_control_options[name]['rate2_targets'] = rate2_targets
if val is not _unspecified:
self.polynomial_control_options[name]['val'] = val
if shape is not _unspecified:
self.polynomial_control_options[name]['shape'] = shape
if fix_initial is not _unspecified:
self.polynomial_control_options[name]['fix_initial'] = fix_initial
if fix_final is not _unspecified:
self.polynomial_control_options[name]['fix_final'] = fix_final
if lower is not _unspecified:
self.polynomial_control_options[name]['lower'] = lower
if upper is not _unspecified:
self.polynomial_control_options[name]['upper'] = upper
if scaler is not _unspecified:
self.polynomial_control_options[name]['scaler'] = scaler
if adder is not _unspecified:
self.polynomial_control_options[name]['adder'] = adder
if ref0 is not _unspecified:
self.polynomial_control_options[name]['ref0'] = ref0
if ref is not _unspecified:
self.polynomial_control_options[name]['ref'] = ref
[docs] def add_parameter(self, name, val=_unspecified, units=_unspecified, opt=False,
desc=_unspecified, lower=_unspecified, upper=_unspecified, scaler=_unspecified,
adder=_unspecified, ref0=_unspecified, ref=_unspecified, targets=_unspecified,
shape=_unspecified, dynamic=_unspecified, static_target=_unspecified,
include_timeseries=_unspecified, static_targets=_unspecified):
"""
Add a parameter (static control variable) to the phase.
Parameters
----------
name : str
Name of the parameter.
val : float or ndarray
Default value of the parameter at all nodes.
units : str or None or 0
Units in which the parameter is defined. If 0, use the units declared
for the parameter in the ODE.
opt : bool
If True, the value(s) of this parameter will be design variables in
the optimization problem, in the path 'phase_name.indep_controls.controls:control_name'.
If False (default), the this parameter will still be owned by an IndepVarComp in the phase,
but it will not be a design variable in the optimization.
desc : str
A description of the parameter.
lower : float or ndarray
The lower bound of the parameter value.
upper : float or ndarray
The upper bound of the parameter value.
scaler : float or ndarray
The scaler of the parameter value for the optimizer.
adder : float or ndarray
The adder of the parameter value for the optimizer.
ref0 : float or ndarray
The zero-reference value of the parameter for the optimizer.
ref : float or ndarray
The unit-reference value of the parameter for the optimizer.
targets : Sequence of str or None
Targets in the ODE to which this parameter is connected.
In the future, if left _unspecified (the default), the phase parameter will try to connect to an ODE input
of the same name. Set targets to None to prevent this.
shape : Sequence of int
The shape of the parameter.
dynamic : bool
True if the targets in the ODE may be dynamic (if the inputs are sized to the number
of nodes) else False.
static_target : bool or _unspecified
True if the targets in the ODE are not shaped with num_nodes as the first dimension
(meaning they cannot have a unique value at each node). Otherwise False.
include_timeseries : bool
True if the static parameters should be included in output timeseries, else False.
static_targets : bool or Sequence or _unspecified
True if ALL targets in the ODE are not shaped with num_nodes as the first dimension
(meaning they cannot have a unique value at each node). If False, ALL targets are
expected to be shaped with the first dimension as the number of nodes. If given
as a Sequence, it provides those targets not shaped with num_nodes. If left _unspecified,
static targets will be determined automatically.
"""
self.check_parameter(name)
if name not in self.parameter_options:
self.parameter_options[name] = ParameterOptionsDictionary()
self.parameter_options[name]['name'] = name
self.set_parameter_options(name, val=val, units=units, opt=opt, desc=desc,
lower=lower, upper=upper, scaler=scaler, adder=adder,
ref0=ref0, ref=ref, targets=targets, shape=shape, dynamic=dynamic,
static_target=static_target, static_targets=static_targets,
include_timeseries=include_timeseries)
[docs] def set_parameter_options(self, name, val=_unspecified, units=_unspecified, opt=False,
desc=_unspecified, lower=_unspecified, upper=_unspecified,
scaler=_unspecified, adder=_unspecified, ref0=_unspecified,
ref=_unspecified, targets=_unspecified, shape=_unspecified,
dynamic=_unspecified, static_target=_unspecified,
include_timeseries=_unspecified, static_targets=_unspecified):
"""
Set options for an existing parameter (static control variable) in the phase.
Parameters
----------
name : str
Name of the parameter.
val : float or ndarray
Default value of the parameter at all nodes.
units : str or None or 0
Units in which the parameter is defined. If 0, use the units declared
for the parameter in the ODE.
opt : bool
If True the value(s) of this parameter will be design variables in
the optimization problem, in the path 'phase_name.indep_controls.controls:control_name'.
If False (default), the this parameter will still be owned by an IndepVarComp in the phase,
but it will not be a design variable in the optimization.
desc : str
A description of the parameter.
lower : float or ndarray
The lower bound of the parameter value.
upper : float or ndarray
The upper bound of the parameter value.
scaler : float or ndarray
The scaler of the parameter value for the optimizer.
adder : float or ndarray
The adder of the parameter value for the optimizer.
ref0 : float or ndarray
The zero-reference value of the parameter for the optimizer.
ref : float or ndarray
The unit-reference value of the parameter for the optimizer.
targets : Sequence of str or None
Targets in the ODE to which this parameter is connected.
In the future, if left _unspecified (the default), the phase parameter will try to connect to an ODE input
of the same name. Set targets to None to prevent this.
shape : Sequence of int
The shape of the parameter.
dynamic : bool
True if the targets in the ODE may be dynamic (if the inputs are sized to the number
of nodes) else False. This option is deprecated.
static_target : bool or _unspecified
True if the targets in the ODE are not shaped with num_nodes as the first dimension
(meaning they cannot have a unique value at each node). Otherwise False.
include_timeseries : bool
True if the static parameters should be included in output timeseries, else False.
static_targets : bool or Sequence or _unspecified
True if ALL targets in the ODE are not shaped with num_nodes as the first dimension
(meaning they cannot have a unique value at each node). If False, ALL targets are
expected to be shaped with the first dimension as the number of nodes. If given
as a Sequence, it provides those targets not shaped with num_nodes. If left _unspecified,
static targets will be determined automatically.
"""
if units is not _unspecified:
self.parameter_options[name]['units'] = units
self.parameter_options[name]['opt'] = opt
if desc is not _unspecified:
self.parameter_options[name]['desc'] = desc
if targets is not _unspecified:
if isinstance(targets, str):
self.parameter_options[name]['targets'] = (targets,)
else:
self.parameter_options[name]['targets'] = targets
if val is not _unspecified:
self.parameter_options[name]['val'] = val
if shape is not _unspecified:
if np.isscalar(shape):
self.parameter_options[name]['shape'] = (shape,)
elif isinstance(shape, list):
self.parameter_options[name]['shape'] = tuple(shape)
else:
self.parameter_options[name]['shape'] = shape
if dynamic is not _unspecified:
self.parameter_options[name]['static_targets'] = not dynamic
if static_target is not _unspecified:
self.parameter_options[name]['static_targets'] = static_target
if static_targets is not _unspecified:
self.parameter_options[name]['static_targets'] = static_targets
if dynamic is not _unspecified and static_target is not _unspecified:
raise ValueError("Both the deprecated 'dynamic' option and option 'static_target' were\n"
f"specified for parameter '{name}'. Going forward, please use only\n"
"option static_targets. Options 'dynamic' and 'static_target'\n"
"will be removed in Dymos 2.0.0.")
if dynamic is not _unspecified and static_targets is not _unspecified:
raise ValueError("Both the deprecated 'dynamic' option and option 'static_targets' were "
f"specified for parameter '{name}'. Going forward, please use only "
"option static_targets. Option 'dynamic' will be removed in "
"Dymos 2.0.0.")
if lower is not _unspecified:
self.parameter_options[name]['lower'] = lower
if upper is not _unspecified:
self.parameter_options[name]['upper'] = upper
if scaler is not _unspecified:
self.parameter_options[name]['scaler'] = scaler
if adder is not _unspecified:
self.parameter_options[name]['adder'] = adder
if ref0 is not _unspecified:
self.parameter_options[name]['ref0'] = ref0
if ref is not _unspecified:
self.parameter_options[name]['ref'] = ref
if include_timeseries is not _unspecified:
self.parameter_options[name]['include_timeseries'] = include_timeseries
[docs] def add_boundary_constraint(self, name, loc, constraint_name=None, units=None,
shape=None, indices=None, lower=None, upper=None, equals=None,
scaler=None, adder=None, ref=None, ref0=None, linear=False, flat_indices=False):
r"""
Add a boundary constraint to a variable in the phase.
Parameters
----------
name : str
Name of the variable to constrain. May also provide an expression to be evaluated and constrained.
If a single variable and the name is not a state, control, or 'time',
then this is assumed to be the path of the variable to be constrained in the ODE.
If an expression, it must be provided in the form of an equation with a left- and right-hand side.
loc : str
The location of the boundary constraint ('initial' or 'final').
constraint_name : str or None
The name of the boundary constraint. By default, this is 'var_constraint' if name is a single variable,
or the left-hand side of the equation if name is an expression.
units : str or None
The units in which the boundary constraint is to be applied. If None, use the
units associated with the constrained output. If provided, must be compatible with
the variables units.
shape : tuple, list, ndarray, or None
The shape of the variable being boundary-constrained. This can be inferred
automatically for time, states, controls, and parameters, but is required
if the constrained variable is an output of the ODE system.
indices : tuple, list, ndarray, slice, or None
The indices of the output variable to be boundary constrained at either the initial or final time in the
phase. When the variable is multi-dimensional, this should be a list of lists, one for each dimension,
containing the indices to be constrained. Note the behavior of indices changes depending on the value
of the flat_indices option.
lower : float or ndarray, optional
Lower boundary for the variable.
upper : float or ndarray, optional
Upper boundary for the variable.
equals : float or ndarray, optional
Equality constraint value for the variable.
scaler : float or ndarray, optional
Value to multiply the model value to get the scaled value. Scaler
is second in precedence.
adder : float or ndarray, optional
Value to add to the model value to get the scaled value. Adder
is first in precedence.
ref : float or ndarray, optional
Value of response variable that scales to 1.0 in the driver.
ref0 : float or ndarray, optional
Value of response variable that scales to 0.0 in the driver.
linear : bool
Set to True if constraint is linear. Setting this to True when the constraint is not a linear function
of the design variables will result in a failure of the optimization.
flat_indices : bool
If True, treat indices as flattened C-ordered indices of elements to constrain. Otherwise,
indices should be a tuple or list giving the elements to constrain at each point in time.
"""
if loc not in ['initial', 'final']:
raise ValueError(f'Invalid boundary constraint location "{loc}". Must be '
'"initial" or "final".')
expr_operators = ['(', '+', '-', '/', '*', '&', '%', '@']
if '=' in name:
is_expr = True
elif '=' not in name and any(opr in name for opr in expr_operators):
raise ValueError(f'The expression provided `{name}` has invalid format. '
'Expression may be a single variable or an equation '
'of the form `constraint_name = func(vars)`')
else:
is_expr = False
if is_expr:
constraint_name = name.split('=')[0].strip()
elif constraint_name is None:
constraint_name = name.rpartition('.')[-1]
bc_list = self._initial_boundary_constraints if loc == 'initial' else self._final_boundary_constraints
existing_bc = [bc for bc in bc_list if bc['name'] == name and bc['indices'] is None and indices is None]
if existing_bc:
raise ValueError(f'Cannot add new {loc} boundary constraint for variable `{name}` and indices {indices}. '
f'One already exists.')
existing_bc_name = [bc for bc in bc_list if bc['name'] == constraint_name and
bc['indices'] is None and indices is None]
if existing_bc_name:
raise ValueError(f'Cannot add new {loc} boundary constraint named `{constraint_name}`'
f' and indices {indices}. The name `{constraint_name}` is already in use'
f' as a {loc} boundary constraint')
bc = ConstraintOptionsDictionary()
bc_list.append(bc)
bc['name'] = name
bc['constraint_name'] = constraint_name
bc['lower'] = lower
bc['upper'] = upper
bc['equals'] = equals
bc['scaler'] = scaler
bc['adder'] = adder
bc['ref0'] = ref0
bc['ref'] = ref
bc['indices'] = indices
bc['shape'] = shape
bc['linear'] = linear
bc['units'] = units
bc['flat_indices'] = flat_indices
bc['is_expr'] = is_expr
# Automatically add the requested variable to the timeseries outputs if it's an ODE output.
var_type = self.classify_var(name)
if var_type == 'ode':
if constraint_name not in self._timeseries['timeseries']['outputs']:
self.add_timeseries_output(name, output_name=constraint_name, units=units, shape=shape)
[docs] def add_path_constraint(self, name, constraint_name=None, units=None, shape=None, indices=None,
lower=None, upper=None, equals=None, scaler=None, adder=None, ref=None,
ref0=None, linear=False, flat_indices=False):
r"""
Add a path constraint to a variable in the phase.
Parameters
----------
name : str
Name of the variable to constrain. May also provide an expression to be evaluated and constrained.
If a single variable and the name is not a state, control, or 'time',
then this is assumed to be the path of the variable to be constrained in the ODE.
If an expression, it must be provided in the form of an equation with a left- and right-hand side.
constraint_name : str or None
The name of the path constraint. By default, this is 'var_constraint' if name is a single variable,
or the left-hand side of the equation if name is an expression.
units : str or None
The units in which the boundary constraint is to be applied. If None, use the
units associated with the constrained output. If provided, must be compatible with
the variables units.
shape : tuple, list, ndarray, or None
The shape of the variable being boundary-constrained. This can be inferred
automatically for time, states, controls, and parameters, but is required
if the constrained variable is an output of the ODE system.
indices : tuple, list, ndarray, or None
The indices of the output variable to be constrained at each point in time in the phase.
When the variable is multi-dimensional, this should be a list of lists, one for each dimension,
containing the indices to be constrained. Note the behavior of indices changes depending on the value
of the flat_indices option.
lower : float or ndarray, optional
Lower boundary for the variable.
upper : float or ndarray, optional
Upper boundary for the variable.
equals : float or ndarray, optional
Equality constraint value for the variable.
scaler : float or ndarray, optional
Value to multiply the model value to get the scaled value. Scaler
is second in precedence.
adder : float or ndarray, optional
Value to add to the model value to get the scaled value. Adder
is first in precedence.
ref : float or ndarray, optional
Value of response variable that scales to 1.0 in the driver.
ref0 : float or ndarray, optional
Value of response variable that scales to 0.0 in the driver.
linear : bool
Set to True if constraint is linear. If set to True and the constrained output is not a linear function
of the design variables, the optimization will fail.
flat_indices : bool
If True, treat indices as flattened C-ordered indices of elements to constrain at each given point in time.
Otherwise, indices should be a tuple or list giving the elements to constrain at each point in time.
"""
expr_operators = ['(', '+', '-', '/', '*', '&', '%', '@']
if '=' in name:
is_expr = True
elif '=' not in name and any(opr in name for opr in expr_operators):
raise ValueError(f'The expression provided `{name}` has invalid format. '
'Expression may be a single variable or an equation '
'of the form `constraint_name = func(vars)`')
else:
is_expr = False
if is_expr:
constraint_name = name.split('=')[0].strip()
elif constraint_name is None:
constraint_name = name.rpartition('.')[-1]
existing_pc = [pc for pc in self._path_constraints
if pc['name'] == name and pc['indices'] == indices and pc['flat_indices'] == flat_indices]
if existing_pc:
raise ValueError(f'Cannot add new path constraint for variable `{name}` and indices {indices}. '
f'One already exists.')
existing_bc_name = [pc for pc in self._path_constraints
if pc['name'] == constraint_name and pc['indices'] == indices and
pc['flat_indices'] == flat_indices]
if existing_bc_name:
raise ValueError(f'Cannot add new path constraint named `{constraint_name}` and indices {indices}.'
f' The name `{constraint_name}` is already in use as a path constraint')
pc = ConstraintOptionsDictionary()
self._path_constraints.append(pc)
pc['name'] = name
pc['constraint_name'] = constraint_name
pc['lower'] = lower
pc['upper'] = upper
pc['equals'] = equals
pc['scaler'] = scaler
pc['adder'] = adder
pc['ref0'] = ref0
pc['ref'] = ref
pc['indices'] = indices
pc['shape'] = shape
pc['linear'] = linear
pc['units'] = units
pc['flat_indices'] = flat_indices
pc['is_expr'] = is_expr
# Automatically add the requested variable to the timeseries outputs if it's an ODE output.
var_type = self.classify_var(name)
if var_type == 'ode':
if constraint_name not in self._timeseries['timeseries']['outputs']:
self.add_timeseries_output(name, output_name=constraint_name, units=units, shape=shape)
[docs] def add_timeseries_output(self, name, output_name=None, units=_unspecified, shape=_unspecified,
timeseries='timeseries', **kwargs):
r"""
Add a variable to the timeseries outputs of the phase.
If name is given as an expression, this expression will be passed to an OpenMDAO ExecComp and the result
computed and stored in the timeseries output under the variable name to the left of the equal sign.
Parameters
----------
name : str, or list of str
The name(s) of the variable to be used as a timeseries output, or a mathematical expression to be used
as a timeseries output. If a name, it must be one of the integration variable, the phase-relative value
of the integration variable (e.g. 'time_phase', one of the states, controls, control rates, or parameters,
in the phase, the path to an output variable in the ODE, or a glob pattern matching some outputs in the ODE.
output_name : str or None or list or dict
The name of the variable as listed in the phase timeseries outputs. By
default this is the last element in `name` when split by dots. The user may
override the constraint name if splitting the path causes name collisions.
units : str or None or _unspecified
The units to express the timeseries output. If None, use the
units associated with the target. If provided, must be compatible with
the target units.
If a list of names is provided, units can be a matching list or dictionary.
shape : tuple or _unspecified
The shape of the timeseries output variable. This must be provided (if not scalar)
since Dymos doesn't necessarily know the shape of ODE outputs until setup time.
timeseries : str or None
The name of the timeseries to which the output is being added.
**kwargs
Additional arguments passed to the exec comp.
"""
if type(name) is list:
for i, name_i in enumerate(name):
expr = True if '=' in name_i else False
if type(units) is dict: # accept dict for units when using array of name
unit = units.get(name_i, None)
elif type(units) is list: # allow matching list for units
unit = units[i]
else:
unit = units
oname = self._add_timeseries_output(name_i, output_name=output_name,
units=unit,
shape=shape,
timeseries=timeseries,
rate=False,
expr=expr)
# Handle specific units for wildcard names.
if oname is not None and '*' in name_i:
self._timeseries[timeseries]['outputs'][oname]['wildcard_units'] = units
else:
expr = True if '=' in name else False
self._add_timeseries_output(name, output_name=output_name,
units=units,
shape=shape,
timeseries=timeseries,
rate=False,
expr=expr,
expr_kwargs=kwargs)
def add_timeseries_rate_output(self, name, output_name=None, units=_unspecified, shape=_unspecified,
timeseries='timeseries'):
r"""
Add the rate of a variable to the timeseries outputs of the phase.
Parameters
----------
name : str, or list of str
The name(s) of the variable to be used as a timeseries output. Must be one of
the integration variable, 't_phase', one of the states, controls, control rates, or parameters,
in the phase, the path to an output variable in the ODE, or a glob pattern
matching some outputs in the ODE.
output_name : str or None or list or dict
The name of the variable as listed in the phase timeseries outputs. By
default this is the last element in `name` when split by dots. The user may
override the constraint name if splitting the path causes name collisions.
units : str or None or _unspecified
The units to express the timeseries output. If None, use the
units associated with the target. If provided, must be compatible with
the target units.
If a list of names is provided, units can be a matching list or dictionary.
shape : tuple or _unspecified
The shape of the timeseries output variable. This must be provided (if not scalar)
since Dymos doesn't necessarily know the shape of ODE outputs until setup time.
timeseries : str or None
The name of the timeseries to which the output is being added.
"""
if type(name) is list:
for i, name_i in enumerate(name):
expr = True if '=' in name_i else False
if type(units) is dict: # accept dict for units when using array of name
unit = units.get(name_i, None)
elif type(units) is list: # allow matching list for units
unit = units[i]
else:
unit = units
oname = self._add_timeseries_output(name_i, output_name=output_name,
units=unit,
shape=shape,
timeseries=timeseries,
rate=True,
expr=expr)
# Handle specific units for wildcard names.
if oname is not None and '*' in name_i:
self._timeseries[timeseries]['outputs'][oname]['wildcard_units'] = units
else:
self._add_timeseries_output(name, output_name=output_name,
units=units,
shape=shape,
timeseries=timeseries,
rate=True)
def _add_timeseries_output(self, name, output_name=None, units=_unspecified, shape=_unspecified,
timeseries='timeseries', rate=False, expr=False, expr_kwargs=None):
r"""
Add a single variable or rate to the timeseries outputs of the phase.
This is called by add_timeseries_output or add_timeseries_rate_output for each variable or rate
that is added.
Parameters
----------
name : str
The name of the variable to be used as a timeseries output. Must be one of
the integration variable, 't_phase', one of the states, controls, control rates, or parameters,
in the phase, or the path to an output variable in the ODE.
output_name : str or None
The name of the variable as listed in the phase timeseries outputs. By
default this is the last element in `name` when split by dots. The user may
override the constraint name if splitting the path causes name collisions. If rate
is True, the rate name will be this name + _rate.
units : str or None
The units to express the timeseries output. If None, use the
units associated with the target. If provided, must be compatible with
the target units.
shape : tuple
The shape of the timeseries output variable. This must be provided (if not scalar)
since Dymos doesn't necessarily know the shape of ODE outputs until setup time.
timeseries : str or None
The name of the timeseries to which the output is being added.
rate : bool
If True, add the rate of change of the named variable to the timeseries outputs of the
phase. The rate variable will be named f'{name}_rate'. Defaults to False.
expr :
Returns
-------
str or None
Name of output that was added to the timeseries or None if nothing was added.
"""
if timeseries not in self._timeseries:
raise ValueError(f'Timeseries {timeseries} does not exist in phase {self.pathname}')
if output_name is None:
if expr:
output_name = name.split('=')[0].strip()
elif '*' in name:
output_name = name
elif output_name is None:
output_name = name.rpartition('.')[-1]
if rate:
output_name = output_name + '_rate'
if output_name not in self._timeseries[timeseries]['outputs']:
ts_output = TimeseriesOutputOptionsDictionary()
ts_output['name'] = name
ts_output['output_name'] = output_name
ts_output['wildcard_units'] = {}
ts_output['units'] = units
ts_output['shape'] = shape
ts_output['is_rate'] = rate
ts_output['is_expr'] = expr
ts_output['expr_kwargs'] = expr_kwargs
self._timeseries[timeseries]['outputs'][output_name] = ts_output
return output_name
[docs] def add_timeseries(self, name, transcription, subset='all'):
r"""
Adds a new timeseries output upon which outputs can be provided.
Parameters
----------
name : str
A name for the timeseries output path.
transcription : str
A transcription object which provides a grid upon which the outputs of the timeseries
are provided.
subset : str
The name of the node subset in the given transcription at which outputs
are to be provided.
"""
self._timeseries[name] = {'transcription': transcription,
'subset': subset,
'outputs': {}}
def add_objective(self, name, loc='final', index=None, shape=(1,), units=None, ref=None, ref0=None,
adder=None, scaler=None, parallel_deriv_color=None):
"""
Add an objective in the phase.
If name is not a state, control, control rate, or 'time', then this is assumed to be the
path of the variable to be constrained in the RHS.
Parameters
----------
name : str
Name of the objective variable. This should be one of the integration variable, a state or control
variable, the path to an output from the top level of the RHS, or an expression to be evaluated.
If an expression, it must be provided in the form of an equation with a left- and right-hand side.
loc : str
Where in the phase the objective is to be evaluated. Valid
options are 'initial' and 'final'. The default is 'final'.
index : int, optional
If variable is an array at each point in time, this indicates which index is to be
used as the objective, assuming C-ordered flattening.
shape : int, optional
The shape of the objective variable, at a point in time.
units : str, optional
The units of the objective function. If None, use the units associated with the target.
If provided, must be compatible with the target units.
ref : float or ndarray, optional
Value of response variable that scales to 1.0 in the driver.
ref0 : float or ndarray, optional
Value of response variable that scales to 0.0 in the driver.
adder : float or ndarray, optional
Value to add to the model value to get the scaled value. Adder
is first in precedence.
scaler : float or ndarray, optional
Value to multiply the model value to get the scaled value. Scaler
is second in precedence.
parallel_deriv_color : str
If specified, this design var will be grouped for parallel derivative
calculations with other variables sharing the same parallel_deriv_color.
"""
expr_operators = ['(', '+', '-', '/', '*', '&', '%', '@']
if '=' in name:
is_expr = True
elif '=' not in name and any(opr in name for opr in expr_operators):
raise ValueError(f'The expression provided `{name}` has invalid format. '
'Expression may be a single variable or an equation '
'of the form `constraint_name = func(vars)`')
else:
is_expr = False
obj_name = name.split('=')[0].strip() if is_expr else name
obj_dict = {'name': name,
'loc': loc,
'index': index,
'shape': shape,
'units': units,
'ref': ref,
'ref0': ref0,
'adder': adder,
'scaler': scaler,
'parallel_deriv_color': parallel_deriv_color,
'is_expr': is_expr}
self._objectives[obj_name] = obj_dict
if is_expr and obj_name not in self._timeseries['timeseries']['outputs']:
self.add_timeseries_output(name, output_name=obj_name, units=units, shape=shape)
[docs] def set_time_options(self, units=_unspecified, fix_initial=_unspecified,
fix_duration=_unspecified, input_initial=_unspecified,
input_duration=_unspecified, initial_val=_unspecified,
initial_bounds=_unspecified, initial_scaler=_unspecified,
initial_adder=_unspecified, initial_ref0=_unspecified,
initial_ref=_unspecified, duration_val=_unspecified,
duration_bounds=_unspecified, duration_scaler=_unspecified,
duration_adder=_unspecified, duration_ref0=_unspecified,
duration_ref=_unspecified, targets=_unspecified,
time_phase_targets=_unspecified, t_initial_targets=_unspecified,
t_duration_targets=_unspecified, name=_unspecified):
"""
Sets options for time in the phase.
Only those options which are specified in the arguments will be updated.
Parameters
----------
units : str
The default units for time variables in the phase. Default is 's'.
fix_initial : bool
If True, the initial time of the phase is not treated as a design variable for the
optimization problem.
fix_duration : bool
If True, the duration of the phase is not treated as a design variable for the
optimization problem.
input_initial : bool
If True, the user is expected to link phase.t_initial to an external output source.
Providing input_initial=True makes all initial time optimization settings irrelevant.
input_duration : bool
If True, the user is expected to link phase.t_duration to an external output source.
Providing input_duration=True makes all time duration optimization settings irrelevant.
initial_val : float
Default value of the time at the start of the phase.
initial_bounds : iterable of (float, float)
The bounds (lower, upper) for time at the start of the phase.
initial_scaler : float
Scalar for the initial value of time.
initial_adder : float
Adder for the initial value of time.
initial_ref0 : float
Zero-reference for the initial value of time.
initial_ref : float
Unit-reference for the initial value of time.
duration_val : float
Default value for the time duration of the phase.
duration_bounds : iterable of (float, float)
The bounds (lower, upper) for the time duration of the phase.
duration_scaler : float
Scaler for phase time duration.
duration_adder : float
Adder for phase time duration.
duration_ref0 : float
Zero-reference for phase time duration.
duration_ref : float
Unit-reference for phase time duration.
targets : iterable of str
Targets in the ODE for the value of current time.
time_phase_targets : iterable of str
Targets in the ODE for the value of current phase elapsed time.
t_initial_targets : iterable of str
Targets in the ODE for the value of phase initial time.
t_duration_targets : iterable of str
Targets in the ODE for the value of phase time duration.
name : str
Name of the integration variable for this phase. Default is 'time'.
"""
if units is not _unspecified:
self.time_options['units'] = units
if fix_initial is not _unspecified:
self.time_options['fix_initial'] = fix_initial
if fix_duration is not _unspecified:
self.time_options['fix_duration'] = fix_duration
if input_initial is not _unspecified:
self.time_options['input_initial'] = input_initial
if input_duration is not _unspecified:
self.time_options['input_duration'] = input_duration
if initial_val is not _unspecified:
self.time_options['initial_val'] = initial_val
if initial_bounds is not _unspecified:
self.time_options['initial_bounds'] = initial_bounds
if initial_scaler is not _unspecified:
self.time_options['initial_scaler'] = initial_scaler
if initial_adder is not _unspecified:
self.time_options['initial_adder'] = initial_adder
if initial_ref0 is not _unspecified:
self.time_options['initial_ref0'] = initial_ref0
if initial_ref is not _unspecified:
self.time_options['initial_ref'] = initial_ref
if duration_val is not _unspecified:
self.time_options['duration_val'] = duration_val
if duration_bounds is not _unspecified:
self.time_options['duration_bounds'] = duration_bounds
if duration_scaler is not _unspecified:
self.time_options['duration_scaler'] = duration_scaler
if duration_adder is not _unspecified:
self.time_options['duration_adder'] = duration_adder
if duration_ref0 is not _unspecified:
self.time_options['duration_ref0'] = duration_ref0
if duration_ref is not _unspecified:
self.time_options['duration_ref'] = duration_ref
if targets is not _unspecified:
if isinstance(targets, str):
self.time_options['targets'] = (targets,)
else:
self.time_options['targets'] = targets
if time_phase_targets is not _unspecified:
if isinstance(time_phase_targets, str):
self.time_options['time_phase_targets'] = (time_phase_targets,)
else:
self.time_options['time_phase_targets'] = time_phase_targets
if t_initial_targets is not _unspecified:
if isinstance(t_initial_targets, str):
self.time_options['t_initial_targets'] = (t_initial_targets,)
else:
self.time_options['t_initial_targets'] = t_initial_targets
if t_duration_targets is not _unspecified:
if isinstance(t_duration_targets, str):
self.time_options['t_duration_targets'] = (t_duration_targets,)
else:
self.time_options['t_duration_targets'] = t_duration_targets
if name is not _unspecified:
self.time_options['name'] = name
def set_duration_balance(self, name, val=0.0, index=None, units=None, mult_val=None, normalize=False):
"""
Adds a condition for the duration of the phase. This is satisfied using a nonlinear solver.
Parameters
----------
name : str
Name of the variable. This should be a state or control variable, the path to an output
from the top level of the RHS, or an expression to be evaluated. If an expression,
it must be provided in the form of an equation with a left- and right-hand side.
val : float
The value that the residual must equal at the end point of the phase.
index : int, optional
If variable is an array at each point in time, this indicates which index is to be
used as the objective, assuming C-ordered flattening.
units : str, optional
The units of the objective function. If None, use the units associated with the target.
If provided, must be compatible with the target units.
mult_val : float, optional
Default value for the LHS multiplier.
normalize : bool, optional
Specifies whether the resulting residual should be normalized by a quadratic
function of the RHS.
"""
if self.time_options['fix_duration']:
raise ValueError('Cannot implicitly solve for phase duration when fix_duration is True')
elif self.time_options['input_duration']:
raise ValueError('Cannot implicitly solve for phase duration when input_duration is True')
if isinstance(self.options['transcription'], ExplicitShooting):
raise NotImplementedError('Transcription ExplicitShooting does not implement method setup_duration_balance')
options = {'name': name,
'val': val,
'index': index,
'units': units,
'mult_val': mult_val,
'normalize': normalize}
expr_operators = ['(', '+', '-', '/', '*', '&', '%', '@']
if '=' in name:
is_expr = True
elif '=' not in name and any(opr in name for opr in expr_operators):
raise ValueError(f'The expression provided `{name}` has invalid format. '
'Expression may be a single variable or an equation '
'of the form `constraint_name = func(vars)`')
else:
is_expr = False
balance_name = name.split('=')[0].strip() if is_expr else name
options['is_expr'] = is_expr
options['balance_name'] = balance_name
self.time_options['t_duration_balance_options'] = options
var_type = self.classify_var(name)
if var_type == 'ode':
if balance_name not in self._timeseries['timeseries']['outputs']:
self.add_timeseries_output(name, output_name=balance_name, units=units)
def classify_var(self, var):
"""
Classifies a variable of the given name or path.
This method searches for it as a time variable, state variable,
control variable, or parameter. If it is not found to be one
of those variables, it is assumed to be the path to a variable
relative to the top of the ODE system for the phase.
Parameters
----------
var : str
The name of the variable to be classified.
Returns
-------
str
The classification of the given variable, which is one of
't', 't_phase', 'state', 'control', 'control_rate',
'control_rate2', 'polynomial_control',
'polynomial_control_rate', 'polynomial_control_rate2', 'parameter',
or 'ode'.
"""
return classify_var(var, time_options=self.time_options, state_options=self.state_options,
parameter_options=self.parameter_options,
control_options=self.control_options,
polynomial_control_options=self.polynomial_control_options,
timeseries_options=self._timeseries)
def _check_ode(self):
"""
Check that the provided ODE class meets minimum requirements.
* The ode_class must be provided as a class or a callable.
* When given as a callable, ode_class must return an instance derived from openmdao.core.System.
* When given as a class, ode_class must derive from openmdao.core.System
Raises
------
ValueError
ValueError is raised if the ODE does not meet one of the the requirements above.
"""
ode_class = self.options['ode_class'] or self.options['rhs_class']
if not inspect.isclass(ode_class):
if not isinstance(ode_class, Callable):
raise ValueError('ode_class must be given as a callable object that returns an '
'object derived from openmdao.core.System, or as a class derived '
'from openmdao.core.System.')
test_instance = ode_class(num_nodes=1, **self.options['ode_init_kwargs'])
if not isinstance(test_instance, System):
raise ValueError(f'When provided as a callable, ode_class must return an instance '
f'of openmdao.core.System. Got {type(test_instance)}')
elif not issubclass(ode_class, System):
raise ValueError('If given as a class, ode_class must be derived from openmdao.core.System.')
def setup(self):
"""
Build the model hierarchy for a Dymos phase.
"""
# Finalize the variables if it hasn't happened already.
# If this phase exists within a Trajectory, the trajectory will finalize them during setup.
transcription = self.options['transcription']
transcription.setup_time(self)
if self.control_options:
transcription.setup_controls(self)
if self.polynomial_control_options:
transcription.setup_polynomial_controls(self)
if self.parameter_options:
transcription.setup_parameters(self)
transcription.setup_states(self)
self._check_ode()
transcription.setup_ode(self)
transcription.setup_timeseries_outputs(self)
transcription.setup_duration_balance(self)
transcription.setup_defects(self)
transcription.setup_solvers(self)
def configure(self):
"""
Finalize connections after sizes are known.
"""
# Finalize the variables if it hasn't happened already.
# If this phase exists within a Trajectory, the trajectory will finalize them during setup.
transcription = self.options['transcription']
ode = transcription._get_ode(self)
configure_time_introspection(self.time_options, ode)
# The control interpolation comp to which we'll connect controls
if self.control_options:
configure_controls_introspection(self.control_options, ode,
time_units=self.time_options['units'])
if self.polynomial_control_options:
configure_controls_introspection(self.polynomial_control_options, ode,
time_units=self.time_options['units'])
if self.parameter_options:
try:
configure_parameters_introspection(self.parameter_options, ode)
except ValueError as e:
raise ValueError(f'Invalid parameter in phase `{self.pathname}`.\n{str(e)}') from e
transcription.configure_states_discovery(self)
transcription.configure_states_introspection(self)
transcription.configure_time(self)
transcription.configure_controls(self)
transcription.configure_polynomial_controls(self)
transcription.configure_parameters(self)
transcription.configure_states(self)
transcription.configure_ode(self)
transcription.configure_defects(self)
_configure_constraint_introspection(self)
configure_timeseries_expr_introspection(self)
transcription.configure_boundary_constraints(self)
transcription.configure_path_constraints(self)
transcription.configure_objective(self)
try:
configure_timeseries_output_introspection(self)
except RuntimeError as val_err:
raise RuntimeError(f'Error during configure_timeseries_output_introspection in phase {self.pathname}.')\
from val_err
transcription.configure_timeseries_outputs(self)
transcription.configure_duration_balance(self)
transcription.configure_solvers(self)
def check_time_options(self):
"""
Check that time options are valid and issue warnings if invalid options are provided.
Warns
-----
RuntimeWarning
RuntimeWarning is issued in the case of one or more invalid time options.
"""
phase_name = self.pathname
if self.time_options['fix_initial'] or self.time_options['input_initial']:
invalid_options = []
init_bounds = self.time_options['initial_bounds']
if init_bounds is not None and init_bounds != (None, None):
invalid_options.append('initial_bounds')
for opt in 'initial_scaler', 'initial_adder', 'initial_ref', 'initial_ref0':
if self.time_options[opt] is not None:
invalid_options.append(opt)
if invalid_options:
str_invalid_opts = ', '.join(invalid_options)
warnings.warn(f'Phase time options have no effect because fix_initial=True '
f'or input_initial=True for phase \'{phase_name}\': {str_invalid_opts}')
if self.time_options['input_initial'] and self.time_options['fix_initial']:
warnings.warn(f'Phase \'{self.name}\' initial time is an externally-connected input, '
'therefore fix_initial has no effect.', RuntimeWarning)
if self.time_options['fix_duration'] or self.time_options['input_duration']:
invalid_options = []
duration_bounds = self.time_options['duration_bounds']
if duration_bounds is not None and duration_bounds != (None, None):
invalid_options.append('duration_bounds')
for opt in 'duration_scaler', 'duration_adder', 'duration_ref', 'duration_ref0':
if self.time_options[opt] is not None:
invalid_options.append(opt)
if invalid_options:
str_invalid_opts = ', '.join(invalid_options)
warnings.warn(f'Phase time options have no effect because fix_duration=True '
f'or input_duration=True for phase \'{phase_name}\': {str_invalid_opts}')
if self.time_options['input_duration'] and self.time_options['fix_duration']:
warnings.warn(f'Phase \'{self.name}\' time duration is an externally-connected input, '
'therefore fix_duration has no effect.', RuntimeWarning)
def _check_control_options(self):
"""
Check that control options are valid and issue warnings if invalid options are provided.
Warns
-----
RuntimeWarning
RuntimeWarning is issued in the case of one or more invalid time options.
"""
for name, options in self.control_options.items():
if not options['opt']:
invalid_options = []
for opt in 'lower', 'upper', 'scaler', 'adder', 'ref', 'ref0':
if options[opt] is not None:
invalid_options.append(opt)
if invalid_options:
warnings.warn(f"Invalid options for non-optimal control '{name}' in phase "
f"'{self.name}': {', '.join(invalid_options)}",
RuntimeWarning)
# Do not enforce rate continuity/rate continuity for non-optimal controls
self.control_options[name]['continuity'] = False
self.control_options[name]['rate_continuity'] = False
self.control_options[name]['rate2_continuity'] = False
def _check_polynomial_control_options(self):
"""
Check that polynomial control options are valid and issue warnings if invalid options are provided.
Warns
-----
RuntimeWarning
RuntimeWarning is issued in the case of one or more invalid time options.
"""
for name, options in self.control_options.items():
if not options['opt']:
invalid_options = []
for opt in 'lower', 'upper', 'scaler', 'adder', 'ref', 'ref0':
if options[opt] is not None:
invalid_options.append(opt)
if invalid_options:
warnings.warn(f"Invalid options for non-optimal polynoimal control '{name}' in "
f"phase '{self.name}': {', '.join(invalid_options)}",
RuntimeWarning)
def _check_parameter_options(self):
"""
Check that parameter options are valid and issue warnings if invalid
options are provided.
Warns
-----
RuntimeWarning
RuntimeWarning is issued in the case of one or more invalid time options.
"""
for name, options in self.parameter_options.items():
if not options['opt']:
invalid_options = []
for opt in 'lower', 'upper', 'scaler', 'adder', 'ref', 'ref0':
if options[opt] is not None:
invalid_options.append(opt)
if invalid_options:
warnings.warn(f"Invalid options for non-optimal parameter '{name}' in "
f"phase '{self.name}': {', '.join(invalid_options)}",
RuntimeWarning)
def interpolate(self, xs=None, ys=None, nodes='all', kind='linear', axis=0):
"""
Return an array of values on interpolated to the given node subset of the phase.
Parameters
----------
xs : ndarray or Sequence or None
Array of integration variable values.
ys : ndarray or Sequence or None
Array of control/state/parameter values.
nodes : str or None
The name of the node subset.
kind : str
Specifies the kind of interpolation, as per the scipy.interpolate package.
One of ('linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic'
where 'zero', 'slinear', 'quadratic' and 'cubic' refer to a spline
interpolation of zeroth, first, second or third order) or as an
integer specifying the order of the spline interpolator to use.
Default is 'linear'.
axis : int
Specifies the axis along which interpolation should be performed. Default is
the first axis (0).
Returns
-------
np.array
The values of y interpolated at nodes of the specified type.
"""
om.issue_warning('phase.interpolate has been deprecated and will be removed from Dymos '
'2.0.0. Use phase.interp instead, which uses a different order for the '
'arguments but is more terse and can interpolate polynomial control '
'values.', category=om.OMDeprecationWarning)
if not isinstance(ys, Iterable):
raise ValueError('ys must be provided as an Iterable of length at least 2.')
if nodes not in ('col', 'all', 'state_disc', 'state_input', 'control_disc',
'control_input', 'segment_ends'):
raise ValueError("nodes must be one of 'col', 'all', 'state_disc', "
"'state_input', 'control_disc', 'control_input', or 'segment_ends'")
if xs is None:
if len(ys) != 2:
raise ValueError('xs may only be unspecified when len(ys)=2')
if kind != 'linear':
raise ValueError('kind must be linear when xs is unspecified.')
xs = [-1, 1]
elif len(xs) != np.prod(np.asarray(xs).shape):
raise ValueError('xs must be viewable as a 1D array')
gd = self.options['transcription'].grid_data
if gd is None:
raise RuntimeError('interpolate cannot be called until the associated '
'problem has been setup')
node_locations = gd.node_ptau[gd.subset_node_indices[nodes]]
# Affine transform xs into tau space [-1, 1]
_xs = np.asarray(xs).ravel()
m = 2.0 / (_xs[-1] - _xs[0])
b = 1.0 - (m * _xs[-1])
taus = m * _xs + b
interpfunc = interpolate.interp1d(taus, ys, axis=axis, kind=kind,
bounds_error=False, fill_value='extrapolate')
res = np.atleast_2d(interpfunc(node_locations))
if res.shape[0] == 1:
res = res.T
return res
[docs] def interp(self, name=None, ys=None, xs=None, nodes=None, kind='linear', axis=0):
"""
Interpolate values onto the given subset of nodes in the phase.
If specified, name will be used to determine the kind of variable being interpolated.
Parameters
----------
name : str or None
If nodes is None, then use the name argument to determine which kind of variable is
being interpolated. If it is a state, assume nodes is 'state_input'. If it is related
to a control, assume nodes is 'control_input'. If it is a polynomial control, assume
the nodes are the input nodes for that polynomial control. Any other type of variable
will result in an error.
ys : ndarray or Sequence or None
Array of control/state/parameter values.
xs : ndarray or Sequence or None
Array of integration variable values.
nodes : str or None
The name of the node subset or None (default).
kind : str
Specifies the kind of interpolation, as per the scipy.interpolate package.
One of ('linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic'
where 'zero', 'slinear', 'quadratic' and 'cubic' refer to a spline
interpolation of zeroth, first, second or third order) or as an
integer specifying the order of the spline interpolator to use.
Default is 'linear'.
axis : int
Specifies the axis along which interpolation should be performed. Default is
the first axis (0).
Returns
-------
np.array
The values of y interpolated at nodes of the specified type.
"""
if not isinstance(ys, Iterable):
raise ValueError('ys must be provided as an Iterable of length at least 2.')
if nodes not in ('col', 'all', 'state_disc', 'state_input', 'control_disc',
'control_input', 'segment_ends', None):
raise ValueError("nodes must be one of 'col', 'all', 'state_disc', "
"'state_input', 'control_disc', 'control_input', 'segment_ends', or "
"None.")
if xs is None:
if len(ys) != 2:
raise ValueError('xs may only be unspecified when len(ys)=2')
if kind != 'linear':
raise ValueError('kind must be linear when xs is unspecified.')
xs = [-1, 1]
elif len(xs) != np.prod(np.asarray(xs).shape):
raise ValueError('xs must be viewable as a 1D array')
gd = self.options['transcription'].grid_data
if nodes is None:
if name is None:
raise ValueError('nodes for interpolation were not specified but the name of the '
'variable to be interpolated was not provided.\nPlease specify '
'the name of the interpolated variable or a node subset.')
elif name in self.state_options:
# For states in explicit shooting phases, interp should just return the initial
# value.
if isinstance(self.options['transcription'], dm.ExplicitShooting):
node_locations = np.array([-1.0])
else:
node_locations = gd.node_ptau[gd.subset_node_indices['state_input']]
elif name in self.control_options:
node_locations = gd.node_ptau[gd.subset_node_indices['control_input']]
elif name in self.polynomial_control_options:
node_locations, _ = lgl(self.polynomial_control_options[name]['order'] + 1)
else:
raise ValueError('Could not find a state, control, or polynomial control named '
f'{name} to be interpolated.\nPlease explicitly specified the '
f'node subset onto which this value should be interpolated.')
else:
node_locations = gd.node_ptau[gd.subset_node_indices[nodes]]
# Affine transform xs into tau space [-1, 1]
_xs = np.asarray(xs).ravel()
m = 2.0 / (_xs[-1] - _xs[0])
b = 1.0 - (m * _xs[-1])
taus = m * _xs + b
interpfunc = interpolate.interp1d(taus, ys, axis=axis, kind=kind,
bounds_error=False, fill_value='extrapolate')
res = np.atleast_2d(interpfunc(node_locations))
if res.shape[0] == 1:
res = res.T
return res
def get_simulation_phase(self, times_per_seg=None, method=_unspecified, atol=_unspecified,
rtol=_unspecified, first_step=_unspecified, max_step=_unspecified,
reports=False):
"""
Return a SimulationPhase instance that is essentially a copy of this Phase.
This instance is initialized based on data from this Phase instance and
the given simulation times.
If left unspecified, options `method`, `atol`, `rtol`, `first_step`, and `max_step` will
be taken from Phase.simulate_options.
Parameters
----------
times_per_seg : int or None
Number of equally distributed output times per segment in the phase simulation. If
None, output to all nodes provided by this phases GridData.
method : str
The scipy.integrate.solve_ivp integration method.
atol : float
Absolute convergence tolerance for the scipy.integrate.solve_ivp method.
rtol : float
Relative convergence tolerance for the scipy.integrate.solve_ivp method.
first_step : float
Initial step size for the integration.
max_step : float or _unspecified
Maximum step size for the integration.
reports : bool or None or str or Sequence
The reports setting for the subproblem run under each simulation segment.
Returns
-------
SimulationPhase
An instance of SimulationPhase initialized based on data from this Phase and the given
times. This instance has not yet been setup.
"""
from .simulation_phase import SimulationPhase
sim_phase = SimulationPhase(from_phase=self, times_per_seg=times_per_seg, method=method,
atol=atol, rtol=rtol, first_step=first_step, max_step=max_step,
reports=reports)
return sim_phase
def initialize_values_from_phase(self, prob, from_phase, phase_path='', skip_params=None):
"""
Initializes values in the Phase using the phase from which it was created.
Parameters
----------
prob : Problem
The problem instance to set values taken from the from_phase instance.
from_phase : Phase
The Phase instance from which the values in this phase are being initialized.
phase_path : str
The pathname of the system in prob that contains the phases.
skip_params : None or set
Parameter names that will be skipped because they have already been initialized at the
trajectory level (Deprecated).
"""
phs = from_phase
if skip_params is not None:
om.issue_warning(f'{self.pathname}: Option `skip_params` to Phase.initialize_values_from_phase` is '
f'deprecated and will be removed dymos 2.0.0', category=om.OMDeprecationWarning)
op_dict = dict([(name, options) for (name, options) in phs.list_outputs(units=True,
list_autoivcs=True,
out_stream=None)])
ip_dict = dict([(name, options) for (name, options) in phs.list_inputs(units=True,
out_stream=None)])
phs_path = phs.pathname + '.' if phs.pathname else ''
if self.pathname.partition('.')[0] == self.name:
self_path = self.name + '.'
else:
self_path = self.pathname.partition('.')[0] + '.' + self.name + '.'
if MPI:
op_dict = MPI.COMM_WORLD.bcast(op_dict, root=0)
# Set the integration times
time_name = phs.time_options['name']
op = op_dict[f'timeseries.timeseries_comp.{time_name}']
prob.set_val(f'{self_path}t_initial', op['val'][0, ...])
prob.set_val(f'{self_path}t_duration', op['val'][-1, ...] - op['val'][0, ...])
# Assign initial state values
for name in phs.state_options:
op = op_dict[f'timeseries.timeseries_comp.states:{name}']
prob[f'{self_path}initial_states:{name}'][...] = op['val'][0, ...]
# Assign control values
for name, options in phs.control_options.items():
ip = ip_dict[f'control_group.control_interp_comp.controls:{name}']
prob[f'{self_path}controls:{name}'][...] = ip['val']
# Assign polynomial control values
for name, options in phs.polynomial_control_options.items():
ip = ip_dict[f'polynomial_control_group.interp_comp.'
f'polynomial_controls:{name}']
prob[f'{self_path}polynomial_controls:{name}'][...] = ip['val']
# Assign parameter values
for name in phs.parameter_options:
units = phs.parameter_options[name]['units']
# We use this private function to grab the correctly sized variable from the
# auto_ivc source.
val = phs.get_val(f'parameters:{name}', units=units)
if phase_path:
prob_path = f'{phase_path}.{self.name}.parameters:{name}'
else:
prob_path = f'{self.name}.parameters:{name}'
prob.set_val(prob_path, val)
[docs] def simulate(self, times_per_seg=10, method=_unspecified, atol=_unspecified, rtol=_unspecified,
first_step=_unspecified, max_step=_unspecified, record_file=None):
"""
Simulate the Phase using scipy.integrate.solve_ivp.
Parameters
----------
times_per_seg : int or None
Number of equally spaced times per segment at which output is requested. If None,
output will be provided at all Nodes.
method : str
The scipy.integrate.solve_ivp integration method.
atol : float
Absolute convergence tolerance for scipy.integrate.solve_ivp.
rtol : float
Relative convergence tolerance for scipy.integrate.solve_ivp.
first_step : float
Initial step size for the integration.
max_step : float
Maximum step size for the integration.
record_file : str or None
If a string, the file to which the result of the simulation will be saved.
If None, no record of the simulation will be saved.
Returns
-------
problem
An OpenMDAO Problem in which the simulation is implemented. This Problem interface
can be interrogated to obtain timeseries outputs in the same manner as other Phases
to obtain results at the requested times.
"""
sim_prob = om.Problem(model=om.Group())
sim_phase = self.get_simulation_phase(times_per_seg, method=method, atol=atol, rtol=rtol,
first_step=first_step, max_step=max_step)
sim_prob.model.add_subsystem(self.name, sim_phase)
if record_file is not None:
rec = om.SqliteRecorder(record_file)
sim_prob.add_recorder(rec)
sim_prob.setup(check=True)
sim_prob.final_setup()
sim_phase.set_vals_from_phase(from_phase=self)
print(f'\nSimulating phase {self.pathname}')
sim_prob.run_model()
print(f'Done simulating phase {self.pathname}')
sim_prob.record('final')
sim_prob.cleanup()
return sim_prob
[docs] def set_refine_options(self, refine=_unspecified, tol=_unspecified, min_order=_unspecified,
max_order=_unspecified, smoothness_factor=_unspecified):
"""
Set the specified option(s) for grid refinement in the phase.
Parameters
----------
refine : bool
If True, this Phase will undergo refinement during the grid refinement procedure.
tol : float
The error tolerance used by all grid-refinement algorithms.
min_order : int
The minimum allowable transcription order for segments in the phase.
Used in hp and ph refinement methods.
max_order : int
The maximum allowable transcription order for segments in the phase.
Used in hp and ph refinement methods.
smoothness_factor : float
The maximum allowable ratio of state second derivatives. If exceeded the segment must be split.
Used in hp refinement method.
"""
if refine is not _unspecified:
self.refine_options['refine'] = refine
if tol is not _unspecified:
self.refine_options['tolerance'] = tol
if min_order is not _unspecified:
self.refine_options['min_order'] = min_order
if max_order is not _unspecified:
self.refine_options['max_order'] = max_order
if smoothness_factor is not _unspecified:
self.refine_options['smoothness_factor'] = smoothness_factor
def set_simulate_options(self, method=_unspecified, atol=_unspecified, rtol=_unspecified,
first_step=_unspecified, max_step=_unspecified):
"""
Set the specified option(s) for grid refinement in the phase.
Parameters
----------
method : str
The scipy.integrate.solve_ivp integration method.
atol : float
Absolute convergence tolerance for the scipy.integrate.solve_ivp method.
rtol : float
Relative convergence tolerance for the scipy.integrate.solve_ivp method.
first_step : float
Initial step size for the integration.
max_step : float
Maximum step size for the integration.
"""
if method is not _unspecified:
self.simulate_options['method'] = method
if atol is not _unspecified:
self.simulate_options['atol'] = atol
if rtol is not _unspecified:
self.simulate_options['rtol'] = rtol
if first_step is not _unspecified:
self.simulate_options['first_step'] = first_step
if max_step is not _unspecified:
self.simulate_options['max_step'] = max_step
def is_time_fixed(self, loc):
"""
Test whether the initial or final time in the phase is guaranteed to be fixed.
There are situations in which this can return False even if the final time is fixed
in the problem. If the initial time or duration are inputs, the phase knows nothing
about their behavior upstream.
Parameters
----------
loc : str
The location of time to be tested: either 'initial' or 'final'.
Returns
-------
bool
True if both the initial time and duration are not inputs and are fixed.
"""
fix_initial = self.time_options['fix_initial']
initial_bounds = self.time_options['initial_bounds']
if loc == 'initial':
res = fix_initial or (initial_bounds != (None, None) and np.diff(initial_bounds)[0] == 0.0)
elif loc == 'final':
fix_duration = self.time_options['fix_duration']
duration_bounds = self.time_options['duration_bounds']
initial_fixed = fix_initial or (initial_bounds != (None, None) and np.diff(initial_bounds)[0] == 0)
duration_fixed = fix_duration or (duration_bounds != (None, None) and np.diff(duration_bounds)[0] == 0)
res = initial_fixed and duration_fixed
else:
raise ValueError(f'Unknown value for argument "loc": must be either "initial" or '
f'"final" but got {loc}')
return res
def is_state_fixed(self, name, loc):
"""
Test if the state of the given name is guaranteed to be fixed at the initial or final time.
Parameters
----------
name : str
The name of the state to be tested.
loc : str
The location of time to be tested: either 'initial' or 'final'.
Returns
-------
bool
True if the state of the given name is guaranteed to be fixed at the given location.
"""
if loc == 'initial':
res = self.state_options[name]['fix_initial']
elif loc == 'final':
res = self.state_options[name]['fix_final']
else:
raise ValueError(f'Unknown value for argument "loc": must be either "initial" or '
f'"final" but got {loc}')
return res
def is_control_fixed(self, name, loc):
"""
Test if the control of the given name is guaranteed to be fixed at the initial or final time.
Parameters
----------
name : str
The name of the control to be tested.
loc : str
The location of time to be tested: either 'initial' or 'final'.
Returns
-------
bool
True if the state of the given name is guaranteed to be fixed at the given location.
"""
control_opts = self.control_options[name]
if loc == 'initial' and control_opts['fix_initial']:
res = True
elif loc == 'final' and control_opts['fix_final']:
res = True
else:
res = not control_opts['opt']
return res
def is_control_rate_fixed(self, name, loc):
"""
Test if the control rate of the given name is guaranteed to be fixed at the initial or final time.
Parameters
----------
name : str
The name of the control to be tested.
loc : str
The location of time to be tested: either 'initial' or 'final'.
Returns
-------
bool
True if the state of the given name is guaranteed to be fixed at the given location.
"""
if name.endswith('_rate') and self.control_options is not None and \
name[:-5] in self.control_options:
control_name = name[:-5]
elif name.endswith('_rate2') and self.control_options is not None and \
name[:-6] in self.control_options:
control_name = name[:-6]
return self.is_control_fixed(control_name, loc)
def is_polynomial_control_fixed(self, name, loc):
"""
Test if the polynomial control of the given name is guaranteed to be fixed at the initial or final time.
Parameters
----------
name : str
The name of the polynomial control to be tested.
loc : str
The location of time to be tested: either 'initial' or 'final'.
Returns
-------
bool
True if the state of the given name is guaranteed to be fixed at the given location.
"""
if loc == 'initial':
res = self.polynomial_control_options[name]['fix_initial']
elif loc == 'final':
res = self.polynomial_control_options[name]['fix_final']
else:
raise ValueError(f'Unknown value for argument "loc": must be either "initial" or '
f'"final" but got {loc}')
return res
def is_polynomial_control_rate_fixed(self, name, loc):
"""
Test if the polynomial control rate of the given name is guaranteed to be fixed at the initial or final time.
Parameters
----------
name : str
The name of the control to be tested.
loc : str
The location of time to be tested: either 'initial' or 'final'.
Returns
-------
bool
True if the state of the given name is guaranteed to be fixed at the given location.
"""
if name.endswith('_rate') and self.polynomial_control_options is not None and \
name[:-5] in self.polynomial_control_options:
control_name = name[:-5]
elif name.endswith('_rate2') and self.polynomial_control_options is not None and \
name[:-6] in self.options['polynomial_control_options']:
control_name = name[:-6]
return self.is_polynomial_control_fixed(control_name, loc)
def _indices_in_constraints(self, name, loc):
"""
Returns a set of the C-order flattened indices involving constraint of the given name at the given loc.
Parameters
----------
name : str
The pathname of the constrained quantity.
loc : str
The type of constraint to search: 'initial', 'final', or 'path'.
Returns
-------
all_flat_idxs : set
A C-order flattened set of indices that apply to the constraint.
"""
cons = {'initial': self._initial_boundary_constraints,
'final': self._final_boundary_constraints,
'path': self._path_constraints}
all_flat_idxs = set()
for con in cons[loc]:
if con['name'] != name:
continue
flat_idxs = get_constraint_flat_idxs(con)
duplicate_idxs = all_flat_idxs.intersection(flat_idxs)
if duplicate_idxs:
s = {'initial': 'initial boundary', 'final': 'final boundary', 'path': 'path'}
raise ValueError(f'Duplicate constraint in phase {self.pathname}. '
f'The following indices of `{name}` are used in '
f'multiple {s[loc]} constraints:\n{duplicate_idxs}')
all_flat_idxs.update(flat_idxs)
return all_flat_idxs
def _is_fixed(self, var_name, var_type, loc):
"""
Determine whether a variable is fixed or not.
Parameters
----------
var_name : str
Identifier of the variable as known to the phase.
var_type : str
The type of variable.
loc : str
Either 'initial' or 'final' for non-parameters.
Returns
-------
bool
True if the variable is fixed, otherwise False.
"""
if var_type == 't':
return self.is_time_fixed(loc)
elif var_type == 'state':
return self.is_state_fixed(var_name, loc)
elif var_type in {'input_control', 'indep_control'}:
return self.is_control_fixed(var_name, loc)
elif var_type in {'input_polynomial_control', 'indep_polynomial_control'}:
return self.is_polynomial_control_fixed(var_name, loc)
elif var_type in {'control_rate', 'control_rate2'}:
return self.is_control_rate_fixed(var_name, loc)
elif var_type == 'parameter':
return not self.parameter_options[var_name]['opt']
return False # No way to know so we allow these to go through
def load_case(self, case):
"""
Pull all input and output variables from a case into the Phase.
Parameters
----------
case : Case or dict
A Case from a CaseReader, or a dictionary with key 'inputs' mapped to the
output of problem.model.list_inputs and key 'outputs' mapped to the output
of prob.model.list_outputs. Both list_inputs and list_outputs should be called
with `units=True`, `prom_names=True` and `return_format='dict'`.
"""
# allow old style arguments using a Case or OpenMDAO problem instead of dictionary
assert (isinstance(case, Case) or isinstance(case, dict))
if isinstance(case, Case):
previous_solution = {
'inputs': case.list_inputs(out_stream=None, return_format='dict',
units=True, prom_name=True),
'outputs': case.list_outputs(out_stream=None, return_format='dict',
units=True, prom_name=True)
}
else:
previous_solution = case
prev_vars_abs2prom = {}
prev_vars_abs2prom.update({k: v['prom_name'] for k, v in previous_solution['inputs'].items()})
prev_vars_abs2prom.update({k: v['prom_name'] for k, v in previous_solution['outputs'].items()})
prev_vars_prom2abs = {v: k for k, v in prev_vars_abs2prom.items()}
prev_vars = {}
prev_vars.update({v['prom_name']: {'val': v['val'], 'units': v['units'], 'abs_name': k}
for k, v in previous_solution['inputs'].items()})
prev_vars.update({v['prom_name']: {'val': v['val'], 'units': v['units'], 'abs_name': k}
for k, v in previous_solution['outputs'].items()})
phase_io = {'inputs': self.list_inputs(units=True, prom_name=True, out_stream=None),
'outputs': self.list_outputs(units=True, prom_name=True, out_stream=None)}
phase_vars = {}
phase_vars.update({f"{self.pathname}.{v['prom_name']}": {'val': v['val'], 'units': v['units'], 'abs_name': k}
for k, v in phase_io['inputs']})
phase_vars.update({f"{self.pathname}.{v['prom_name']}": {'val': v['val'], 'units': v['units'], 'abs_name': k}
for k, v in phase_io['outputs']})
phase_name = self.name
# Get the initial time and duration from the previous result and set them into the new phase.
integration_name = self.time_options['name']
try:
prev_time_path = prev_vars_abs2prom[f'{self.pathname}.timeseries.timeseries_comp.{integration_name}']
except KeyError:
om.issue_warning(f'load_case for phase {self.name} failed - phase not found in case data.')
return
prev_timeseries_prom_path, _, _ = prev_time_path.rpartition(f'.{integration_name}')
prev_phase_prom_path, _, _ = prev_timeseries_prom_path.rpartition('.timeseries')
prev_time_val = prev_vars[prev_time_path]['val']
prev_time_val, unique_idxs = np.unique(prev_time_val, return_index=True)
prev_time_units = prev_vars[prev_time_path]['units']
t_initial = prev_time_val[0]
t_duration = prev_time_val[-1] - prev_time_val[0]
self.set_val('t_initial', t_initial, units=prev_time_units)
self.set_val('t_duration', t_duration, units=prev_time_units)
# Interpolate the timeseries state outputs from the previous solution onto the new grid.
if not isinstance(self, dm.AnalyticPhase):
for state_name, options in self.state_options.items():
if f'{prev_timeseries_prom_path}.states:{state_name}' in prev_vars_prom2abs:
prev_state_path = f'{prev_timeseries_prom_path}.states:{state_name}'
elif f'{prev_timeseries_prom_path}.{state_name}' in prev_vars_prom2abs:
prev_state_path = f'{prev_timeseries_prom_path}.{state_name}'
else:
issue_warning(f'Unable to find state {state_name} in timeseries data from case being loaded.',
om.OpenMDAOWarning)
continue
prev_state_val = prev_vars[prev_state_path]['val']
prev_state_units = prev_vars[prev_state_path]['units']
interp_vals = self.interp(name=state_name,
xs=prev_time_val,
ys=prev_state_val[unique_idxs],
kind='slinear')
if options['lower'] is not None or options['upper'] is not None:
interp_vals = interp_vals.clip(options['lower'], options['upper'])
self.set_val(f'states:{state_name}',
interp_vals,
units=prev_state_units)
try:
self.set_val(f'initial_states:{state_name}', prev_state_val[0, ...], units=prev_state_units)
except KeyError:
pass
if options['fix_final']:
warning_message = f"{phase_name}.states:{state_name} specifies 'fix_final=True'. " \
f"If the given restart file has a" \
f" different final value this will overwrite the user-specified value"
issue_warning(warning_message)
# Interpolate the timeseries control outputs from the previous solution onto the new grid.
for control_name, options in self.control_options.items():
if f'{prev_timeseries_prom_path}.controls:{control_name}' in prev_vars_prom2abs:
prev_control_path = f'{prev_timeseries_prom_path}.controls:{control_name}'
elif f'{prev_timeseries_prom_path}.{control_name}' in prev_vars_prom2abs:
prev_control_path = f'{prev_timeseries_prom_path}.{control_name}'
else:
issue_warning(f'Unable to find control {control_name} in timeseries data from case being loaded.',
om.OpenMDAOWarning)
continue
prev_control_val = prev_vars[prev_control_path]['val']
prev_control_units = prev_vars[prev_control_path]['units']
interp_vals = self.interp(name=control_name,
xs=prev_time_val,
ys=prev_control_val[unique_idxs],
kind='slinear')
if options['lower'] is not None or options['upper'] is not None:
interp_vals = interp_vals.clip(options['lower'], options['upper'])
self.set_val(f'controls:{control_name}', interp_vals, units=prev_control_units)
if options['fix_final']:
warning_message = f"{phase_name}.controls:{control_name} specifies 'fix_final=True'. " \
f"If the given restart file has a" \
f" different final value this will overwrite the user-specified value"
issue_warning(warning_message)
# Set the output polynomial control outputs from the previous solution as the value
for pc_name, options in self.polynomial_control_options.items():
if f'{prev_timeseries_prom_path}.polynomial_controls:{pc_name}' in prev_vars_prom2abs:
prev_pc_path = f'{prev_timeseries_prom_path}.polynomial_controls:{pc_name}'
elif f'{prev_timeseries_prom_path}.{pc_name}' in prev_vars_prom2abs:
prev_pc_path = f'{prev_timeseries_prom_path}.{pc_name}'
else:
issue_warning(f'Unable to find polynomial control {pc_name} in timeseries data from case being '
f'loaded.', om.OpenMDAOWarning)
continue
prev_pc_val = prev_vars[prev_pc_path]['val']
prev_pc_units = prev_vars[prev_pc_path]['units']
interp_vals = self.interp(name=pc_name,
xs=prev_time_val,
ys=prev_pc_val[unique_idxs],
kind='slinear')
if options['lower'] is not None or options['upper'] is not None:
interp_vals = interp_vals.clip(options['lower'], options['upper'])
self.set_val(f'polynomial_controls:{pc_name}',
interp_vals,
units=prev_pc_units)
if options['fix_final']:
warning_message = f"{phase_name}.polynomial_controls:{pc_name} specifies 'fix_final=True'. " \
f"If the given restart file has a" \
f" different final value this will overwrite the user-specified value"
issue_warning(warning_message)
# Set the timeseries parameter outputs from the previous solution as the parameter value
for param_name in self.parameter_options:
if f'{prev_phase_prom_path}.parameter_vals:{param_name}' in prev_vars:
prev_param_val = prev_vars[f'{prev_phase_prom_path}.parameter_vals:{param_name}']['val']
prev_param_units = prev_vars[f'{prev_phase_prom_path}.parameter_vals:{param_name}']['units']
self.set_val(f'parameters:{param_name}', prev_param_val[0, ...], units=prev_param_units)
else:
issue_warning(f'Unable to find "{prev_phase_prom_path}.parameter_vals:{param_name}" '
f'in data from case being loaded.')