""" Differentiates a driver's workflow using the Finite Difference with Analytical
Derivatives (FDAD) method.
A variety of difference types are available for both first and second order."""
import logging
from ordereddict import OrderedDict
from itertools import product
from openmdao.main.numpy_fallback import array
from openmdao.lib.datatypes.api import Enum, Float
from openmdao.main.api import Container
from openmdao.main.interfaces import implements, IDifferentiator
from openmdao.main.container import find_name
[docs]def diff_1st_central(fp, fm, eps):
"""Evaluates a first order central difference."""
return (fp - fm)/(2.0*eps)
[docs]def diff_1st_fwrdbwrd(fp, fm, eps):
"""Evaluates a first order forward or backward difference."""
return (fp - fm)/eps
[docs]def diff_2nd_xx(fp, f0, fm, eps):
"""Evaluates an on-diagonal 2nd derivative term."""
return (fp - 2.0*f0 + fm)/eps**2
[docs]def diff_2nd_xy(fpp, fpm, fmp, fmm, eps1, eps2):
"""Evaluates an off-diagonal 2nd derivative term."""
return (fpp - fpm - fmp + fmm)/(4.0*eps1*eps2)
[docs]class FiniteDifference(Container):
""" Differentiates a driver's workflow using the Finite Difference with
Analytical Derivatives (FDAD) method. A variety of difference types are
available for both first and second order."""
implements(IDifferentiator)
# pylint: disable-msg=E1101
form = Enum("central", ["central", "forward", "backward"], iotype='in', \
desc="Finite difference form (central, forward, backward).")
default_stepsize = Float(1.0e-6, iotype='in', desc='Default finite ' + \
'difference step size.')
def __init__(self):
super(FiniteDifference, self).__init__()
# This gets set in the callback
_parent = None
self.param_names = []
self.objective_names = []
self.eqconst_names = []
self.ineqconst_names = []
self.gradient_case = OrderedDict()
self.gradient = {}
self.hessian_ondiag_case = OrderedDict()
self.hessian_offdiag_case = OrderedDict()
self.hessian = {}
[docs] def setup(self):
"""Sets some dimensions."""
self.param_names = self._parent.get_parameters().keys()
self.objective_names = self._parent.get_objectives().keys()
try:
self.ineqconst_names = self._parent.get_ineq_constraints().keys()
except AttributeError:
self.ineqconst_names = []
try:
self.eqconst_names = self._parent.get_eq_constraints().keys()
except AttributeError:
self.eqconst_names = []
[docs] def get_derivative(self, output_name, wrt):
"""Returns the derivative of output_name with respect to wrt.
output_name: string
Name of the output in the local OpenMDAO hierarchy.
wrt: string
Name of the input in the local OpenMDAO hierarchy. The
derivative is with respect to this variable.
"""
return self.gradient[wrt][output_name]
[docs] def get_2nd_derivative(self, output_name, wrt):
"""Returns the 2nd derivative of output_name with respect to both vars
in the tuple wrt.
output_name: string
Name of the output in the local OpenMDAO hierarchy.
wrt: tuple containing two strings
Names of the inputs in the local OpenMDAO hierarchy. The
derivative is with respect to these 2 variables.
"""
return self.hessian[wrt[0]][wrt[1]][output_name]
[docs] def get_gradient(self, output_name=None):
"""Returns the gradient of the given output with respect to all
parameters.
output_name: string
Name of the output in the local OpenMDAO hierarchy.
"""
return array([self.gradient[wrt][output_name] for wrt in self.param_names])
[docs] def get_Hessian(self, output_name=None):
"""Returns the Hessian matrix of the given output with respect to
all parameters.
output_name: string
Name of the output in the local OpenMDAO hierarchy.
"""
#return array([self.hessian[in1][in2][output_name] for (in1,in2) in product(self.param_names, self.param_names)])
return array([self.hessian[in1][in2][output_name] for (in1,in2) in product(self.param_names, self.param_names)])
[docs] def calc_gradient(self):
"""Calculates the gradient vectors for all outputs in this Driver's
workflow."""
# Each component runs its calc_derivatives method.
# We used to do this in the driver instead, but we've moved it in
# here to make the interface more uniform.
self._parent.calc_derivatives(first=True)
self.setup()
# Create our 2D dictionary the first time we execute.
if not self.gradient:
for name in self.param_names:
self.gradient[name] = {}
# Pull initial state and stepsizes from driver's parameters
base_param = OrderedDict()
stepsize = {}
for key, item in self._parent.get_parameters().iteritems():
base_param[key] = item.evaluate()
if item.fd_step:
stepsize[key] = item.fd_step
else:
stepsize[key] = self.default_stepsize
# For Forward or Backward diff, we want to save the baseline
# objective and constraints. These are also needed for the
# on-diagonal Hessian terms, so we will save them in the class
# later.
base_data = self._run_point(base_param)
# Set up problem based on Finite Difference type
if self.form == 'central':
deltas = [1, -1]
func = diff_1st_central
elif self.form == 'forward':
deltas = [1, 0]
func = diff_1st_fwrdbwrd
else:
deltas = [0, -1]
func = diff_1st_fwrdbwrd
self.gradient_case = OrderedDict()
# Assemble input data
for param in self.param_names:
pcase = []
for j_step, delta in enumerate(deltas):
case = base_param.copy()
case[param] += delta*stepsize[param]
pcase.append({ 'param': case })
self.gradient_case[param] = pcase
# Run all "cases".
# TODO - Integrate OpenMDAO's concurrent processing capability once it
# is formalized. This operation is inherently paralellizable.
for key, case in self.gradient_case.iteritems():
for ipcase, pcase in enumerate(case):
if deltas[ipcase]:
pcase['data'] = self._run_point(pcase['param'])
else:
pcase['data'] = base_data
# Calculate gradients
for key, case in self.gradient_case.iteritems():
eps = stepsize[key]
for name in list(self.objective_names + \
self.eqconst_names + \
self.ineqconst_names):
self.gradient[key][name] = \
func(case[0]['data'][name],
case[1]['data'][name], eps)
# Save these for Hessian calculation
self.base_param = base_param
self.base_data = base_data
[docs] def calc_hessian(self, reuse_first=False):
"""Returns the Hessian matrix for all outputs in the Driver's
workflow.
reuse_first: bool
Switch to reuse some data from the gradient calculation so that
we don't have to re-run some points we already ran (namely the
baseline, +eps, and -eps cases.) Obviously you do this when the
driver needs gradient and hessian information at the same point,
and calls calc_gradient before calc_hessian.
"""
# Each component runs its calc_derivatives method.
# We used to do this in the driver instead, but we've moved it in
# here to make the interface more uniform.
self._parent.calc_derivatives(second=True)
self.setup()
# Create our 3D dictionary the first time we execute.
if not self.hessian:
for name1 in self.param_names:
self.hessian[name1] = {}
for name2 in self.param_names:
self.hessian[name1][name2] = {}
self.hessian_ondiag_case = OrderedDict()
self.hessian_offdiag_case = OrderedDict()
# Pull stepsizes from driver's parameters
base_param = OrderedDict()
stepsize = {}
for key, item in self._parent.get_parameters().iteritems():
if item.fd_step:
stepsize[key] = item.fd_step
else:
stepsize[key] = self.default_stepsize
# Diagonal terms in Hessian always need base point
# Usually, we will have saved this when we calculated
# the gradient.
if reuse_first:
base_param = self.base_param
base_data = self.base_data
else:
# Pull initial state from driver's parameters
for key, item in self._parent.get_parameters().iteritems():
base_param[key] = item.evaluate()
base_data = self._run_point(base_param)
# Assemble input data
# Cases : ondiag [fp, fm]
deltas = [1, -1]
for param in self.param_names:
pcase = []
for j_step, delta in enumerate(deltas):
case = base_param.copy()
case[param] += delta*stepsize[param]
pcase.append({ 'param': case })
self.hessian_ondiag_case[param] = pcase
# Assemble input data
# Cases : offdiag [fpp, fpm, fmp, fmm]
deltas = [[1, 1],
[1, -1],
[-1, 1],
[-1, -1]]
for i, param1 in enumerate(self.param_names):
offdiag = {}
for param2 in self.param_names[i+1:]:
pcase = []
for delta in deltas:
case = base_param.copy()
case[param1] += delta[0]*stepsize[param1]
case[param2] += delta[1]*stepsize[param2]
pcase.append({ 'param': case })
offdiag[param2] = pcase
self.hessian_offdiag_case[param1] = offdiag
# Run all "cases".
# TODO - Integrate OpenMDAO's concurrent processing capability once it
# is formalized. This operation is inherently paralellizable.
# We don't need to re-run on-diag cases if the gradients were
# calculated with Central Difference.
if reuse_first and self.form=='central':
for key, case in self.hessian_ondiag_case.iteritems():
gradient_case = self.gradient_case[key]
for ipcase, pcase in enumerate(case):
gradient_ipcase = gradient_case[ipcase]
pcase['data'] = gradient_ipcase['data']
else:
for case in self.hessian_ondiag_case.values():
for pcase in case:
data = self._run_point(pcase['param'])
pcase['data'] = data
# Off-diag cases must always be run.
for cases in self.hessian_offdiag_case.values():
for case in cases.values():
for pcase in case:
pcase['data'] = self._run_point(pcase['param'])
# Calculate Hessians - On Diagonal
for key, case in self.hessian_ondiag_case.iteritems():
eps = stepsize[key]
for name in list(self.objective_names + \
self.eqconst_names + \
self.ineqconst_names):
self.hessian[key][key][name] = \
diff_2nd_xx(case[0]['data'][name],
base_data[name],
case[1]['data'][name], eps)
# Calculate Hessians - Off Diagonal
for key1, cases in self.hessian_offdiag_case.iteritems():
eps1 = stepsize[key1]
for key2, case in cases.iteritems():
eps2 = stepsize[key2]
for name in list(self.objective_names + \
self.eqconst_names + \
self.ineqconst_names):
self.hessian[key1][key2][name] = \
diff_2nd_xy(case[0]['data'][name],
case[1]['data'][name],
case[2]['data'][name],
case[3]['data'][name],
eps1, eps2)
# Symmetry
# (Should ponder whether we should even store it.)
self.hessian[key2][key1][name] = \
self.hessian[key1][key2][name]
def _run_point(self, data_param):
"""Runs the model at a single point and captures the results. Note that
some differences require the baseline point."""
dvals = [float(val) for val in data_param.values()]
self._parent.set_parameters(dvals)
# Run the model
super(type(self._parent), self._parent).run_iteration()
data = {}
# Get Objectives
for key, item in self._parent.get_objectives().iteritems():
data[key] = item.evaluate(self._parent.parent)
# Get Inequality Constraints
if self.ineqconst_names:
for key, item in self._parent.get_ineq_constraints().iteritems():
val = item.evaluate(self._parent.parent)
if '>' in val[2]:
data[key] = val[1]-val[0]
else:
data[key] = val[0]-val[1]
# Get Equality Constraints
if self.eqconst_names:
for key, item in self._parent.get_eq_constraints().iteritems():
val = item.evaluate(self._parent.parent)
if '>' in val[2]:
data[key] = val[1]-val[0]
else:
data[key] = val[0]-val[1]
return data
[docs] def reset_state(self):
"""Finite Difference does not leave the model in a clean state. If you
require one, then run this method."""
dvals = [float(val) for val in self.base_param.values()]
self._parent.set_parameters(dvals)
super(type(self._parent), self._parent).run_iteration()
[docs] def raise_exception(self, msg, exception_class=Exception):
"""Raise an exception."""
name = find_name(self._parent, self)
self._parent.raise_exception("%s: %s" % (name,msg), exception_class)