Source code for openmdao.lib.differentiators.analytic

""" Differentiates a driver's workflow using an analytic method.
"""

# pylint: disable-msg=E0611,F0401
try:
    from numpy import zeros, dot, linalg
except ImportError as err:
    import logging
    logging.warn("In %s: %r" % (__file__, err))
    
from openmdao.lib.datatypes.api import Enum, Bool
from openmdao.lib.differentiators.chain_rule import ChainRule
from openmdao.main.api import Driver, Assembly
from openmdao.main.driver import Run_Once
from openmdao.main.interfaces import implements, IDifferentiator, ISolver
from openmdao.main.mp_support import has_interface
from openmdao.units import convert_units
from openmdao.util.decorators import stub_if_missing_deps


def _d_conn(expr_txt, target, ascope):
    """ Evaluates the derivative of a source-target variable connection.
    This includes the derivative of the expression as well as the
    derivative of the unit conversion factor."""
    
    expr = ascope._exprmapper.get_expr(expr_txt)
    source = expr.refs().pop()
        
    # Need derivative of the expression
    expr_deriv = expr.evaluate_gradient(scope=ascope,
                                        wrt=source)
    
    # We also need the derivative of the unit
    # conversion factor if there is one
    metadata = expr.get_metadata('units')
    source_unit = [x[1] for x in metadata if x[0] == source]
    if source_unit and source_unit[0]:
        dest_expr = ascope._exprmapper.get_expr(target)
        metadata = dest_expr.get_metadata('units')
        target_unit = [x[1] for x in metadata if x[0] == target]

        expr_deriv[source] = expr_deriv[source] * \
            convert_units(1.0, source_unit[0], target_unit[0])
        
    return source, expr_deriv


@stub_if_missing_deps('numpy')
[docs]class Analytic(ChainRule): """ Differentiates a driver's workflow using one of the analytic methods.""" implements(IDifferentiator) # pylint: disable-msg=E1101 mode = Enum('direct', ['direct', 'adjoint'], iotype = 'in', desc='Choose forward or adjoint mode') approach = Enum('functional', ['functional', 'residual', 'hybrid'], iotype = 'in', desc = 'approach for assembling the ' + \ 'problem.\n' + \ 'functional - convert all comps to functional form' + \ 'residual - convert all comps to residual form' + \ 'hybrid - no conversion, each comps uses what it has') sparse = Bool(False, iotype = 'in', desc='Set to True for sparse ' + \ 'storage of matrices.') def __init__(self): super(Analytic, self).__init__() # Left hand and right hand sides of our linear system. self.LHS = zeros((0, 0), 'd') self.RHS = zeros((0, 0), 'd') self.EQS = zeros((0, 0), 'd') self.EQS_zero = zeros((0, 0), 'd') self.n_var = 0 # Overrides definition in parent # Store as matrix instead of dict self.gradient = zeros((0, 0), 'd') # Bookkeeping index/name self.var_list = []
[docs] def get_derivative(self, output_name, wrt): """Returns the derivative of output_name with respect to wrt. output_name: string Name of the output in the local OpenMDAO hierarchy. wrt: string Name of the input in the local OpenMDAO hierarchy. The derivative is with respect to this variable. """ i_param = self.param_names.index(wrt) i_func = self.function_names.index(output_name) return self.gradient[i_func][i_param]
[docs] def get_gradient(self, output_name=None): """Returns the gradient of the given output with respect to all parameters. output_name: string Name of the output in the local OpenMDAO hierarchy. """ i_func = self.function_names.index(output_name) return self.gradient[i_func][:]
[docs] def setup(self): """ Determine problem dimension and allocate arrays. (unless sparse) """ # Parent class method assembles all of our driver connections super(Analytic, self).setup() # Direct mode: count outputs # Adjoint mode: count outputs as input edges #index = 0 if self.mode == 'adjoint' else 1 index = 1 self.var_list = [] # Count recursively to get n_var and var_list self._edge_counter(self._parent, self._parent, index) n_var = len(self.var_list) n_param = len(self.param_names) n_eq = len(self.function_names) self.LHS = zeros((n_var, n_var), 'd') #if self.mode == 'adjoint': # self.EQS = zeros((n_var, n_param), 'd') # self.RHS = zeros((n_var, n_eq), 'd') #else: self.EQS = zeros((n_eq, n_var), 'd') self.RHS = zeros((n_var, n_param), 'd') self.EQS_zero = zeros((n_eq, n_param), 'd') self.gradient = zeros((n_eq, n_param), 'd') # For direct mode, the EQS only needs to be calculated once # For adjoint mode, the RHS may need recalculation because of # connection derivatives. # Objectives first i_eq = 0 wrt = self.var_list + self.param_names + \ [item for sublist in self.grouped_param_names for item in sublist] for expr in self._parent.get_objectives().values(): obj_grad = expr.evaluate_gradient(scope=self._parent.parent, wrt=wrt) for input_name, val in obj_grad.iteritems(): if input_name in self.param_names: i_param = self.param_names.index(input_name) self.EQS_zero[i_eq][i_param] = val elif input_name in self.grouped_param_names: grouped = self.grouped_param_names[input_name] i_param = self.param_names.index(grouped) self.EQS_zero[i_eq][i_param] = val elif input_name in self.var_list: i_var = self.var_list.index(input_name) self.EQS[i_eq][i_var] = val i_eq += 1 # Constraints next for constraint in self._parent.get_constraints().values(): lhs, rhs, comparator, _ = \ constraint.evaluate_gradient(scope=self._parent.parent, wrt=wrt) sign = -1.0 if '>' in comparator else 1.0 for input_name, val in lhs.iteritems(): val = sign*val if input_name in self.param_names: i_param = self.param_names.index(input_name) self.EQS_zero[i_eq][i_param] += val elif input_name in self.grouped_param_names: grouped = self.grouped_param_names[input_name] i_param = self.param_names.index(grouped) self.EQS_zero[i_eq][i_param] += val elif input_name in self.var_list: i_var = self.var_list.index(input_name) self.EQS[i_eq][i_var] += val for input_name, val in rhs.iteritems(): val = -sign*val if input_name in self.param_names: i_param = self.param_names.index(input_name) self.EQS_zero[i_eq][i_param] += val elif input_name in self.grouped_param_names: grouped = self.grouped_param_names[input_name] i_param = self.param_names.index(grouped) self.EQS_zero[i_eq][i_param] += val elif input_name in self.var_list: i_var = self.var_list.index(input_name) self.EQS[i_eq][i_var] += val i_eq += 1
def _edge_counter(self, scope, dscope, index, head=''): """Helper function to figure out which edges in the edge dicts are our unknowns. Called recursively for assy or driver scopes.""" # Traverse the workflow self._find_edges(scope, dscope) scope_name = dscope.get_pathname() if head: head = '%s.' % head index_bar = 0 if index == 1 else 1 # Number of unknowns = number of edges in our edge_dict for name, edges in self.edge_dicts[scope_name].iteritems(): # For assemblies, we need to traverse their workflows too node = scope.parent.get(name) if isinstance(node, Assembly): # Assembly inputs are also counted as unknowns. This makes # recursion easier so that you don't have to query out of # scope. for input_name in edges[index_bar]: input_full = "%s%s.%s" % (head, name, input_name) self.var_list.append(input_full) node_scope_name = node.get_pathname() self._edge_counter(node.driver, node.driver, index, node_scope_name) elif isinstance(node, Driver): if not has_interface(node, ISolver): msg = "Only nested solvers are supported" raise NotImplementedError(msg) node_scope_name = node.get_pathname() self._edge_counter(scope, node, index, head) # Save the names for all our unknowns for item in edges[index]: full_name = "%s%s.%s" % (head, name, item) # Parameter edges in adjoint mode are not added to the # unknowns if full_name not in self.param_names and \ full_name not in self.grouped_param_names: self.var_list.append(full_name)
[docs] def calc_gradient(self): """Calculates the gradient vectors for all outputs in this Driver's workflow.""" # This stuff only needs to run once if self._parent not in self.edge_dicts: self.setup() # Assemble matrices for problem every run ascope = self._parent.parent dscope = self._parent self._assemble_direct(ascope, dscope) self._solve()
def _assemble_direct(self, ascope, dscope, eq_num=0, head='', solver_conns=None): """Assembles the system matrices for the direct problem. This is meant to be called recursively, with the assembly scope, driver scope, and current equation number as inputs.""" scope_name = dscope.get_pathname() if head: head = '%s.' % head if solver_conns is None: solver_conns = [] i_eq = eq_num for node_name in self.dworkflow[scope_name]: # If it's a list, then it's a set of components to finite # difference together. if not isinstance(node_name, list): node = dscope.parent.get(node_name) fdblock = False else: fdblock = True # Finite difference block if fdblock: fd = self.fdhelpers[scope_name][str(node_name)] input_dict = {} for item in fd.list_wrt(): input_dict[item] = dscope.parent.get(item) output_dict = {} for item in fd.list_outs(): output_dict[item] = dscope.parent.get(item) local_derivs = fd.run(input_dict, output_dict) # So far, we only handle nested solvers. elif isinstance(node, Driver): if not has_interface(node, ISolver): msg = "Only nested solvers are supported" raise NotImplementedError(msg) # Assemble a dictionary of the couplings that this solver # iterates. params = node.get_parameters().keys() solver_dict = {} for constr in node.get_eq_constraints().values(): item1 = constr.lhs.get_referenced_varpaths() item2 = constr.rhs.get_referenced_varpaths() comps = list(item1.union(item2)) if comps[0] in params: indep = comps[0] dep = comps[1] elif comps[1] in params: indep = comps[1] dep = comps[0] else: msg = "No independent in solver equation." raise NotImplementedError(msg) solver_dict[indep] = dep # Recurse i_eq = self._assemble_direct(ascope, node, i_eq, head, solver_dict) continue # Recurse into assemblies. elif isinstance(node, Assembly): if not isinstance(node.driver, Run_Once): raise NotImplementedError('Nested drivers') edge_dict = self.edge_dicts[scope_name][node_name] # Assembly inputs are unknowns, so they get equations for input_name in edge_dict[0]: self.LHS[i_eq][i_eq] = 1.0 input_full = "%s.%s" % (node_name, input_name) # Assy input conected to parameter goes in RHS if input_full in self.param_names: i_param = self.param_names.index(input_full) self.RHS[i_eq][i_param] = 1.0 elif input_full in self.grouped_param_names: grouped = self.grouped_param_names[input_full] i_param = self.param_names.index(grouped) self.RHS[i_eq][i_param] = 1.0 # Assy Input connected to other outputs goes in LHS else: sources = ascope._depgraph.connections_to(input_full) expr_txt = sources[0][0] target = sources[0][1] # Variables on an assembly boundary if expr_txt[0:4] == '@bin': expr_txt = expr_txt.replace('@bin.', '') # Need derivative of the expression source, expr_deriv = _d_conn(expr_txt, target, ascope) # Chain together deriv from var connection and comp i_var = self.var_list.index("%s%s" % (head, source)) self.LHS[i_eq][i_var] = -expr_deriv[source] i_eq += 1 # Recurse assy_scope_name = node.get_pathname() i_eq = self._assemble_direct(node, node.driver, i_eq, assy_scope_name) # Assembly outputs are unknowns, so they get equations sub_scope = ascope.get(node_name) for output_name in edge_dict[1]: self.LHS[i_eq][i_eq] = 1.0 sources = sub_scope._depgraph.connections_to(output_name) for connect in sources: if '@bout' in connect[1]: expr_txt = connect[0] target = connect[1] break # Variables on an assembly boundary if target[0:5] == '@bout': target = target.replace('@bout.', '') # Need derivative of the expression source, expr_deriv = _d_conn(expr_txt, target, sub_scope) # Chain together deriv from var connection and comp i_var = self.var_list.index("%s%s.%s" % (head, node_name, source)) self.LHS[i_eq][i_var] = -expr_deriv[source] i_eq += 1 continue # This component can determine its derivatives. else: node.calc_derivatives(first=True) local_derivs = node.derivatives.first_derivatives node_name = [node_name] # The following executes for components with derivatives and # blocks that are finite-differenced for item in node_name: edge_dict = self.edge_dicts[scope_name][item] # Calculate derivatives of incoming connections # I moved this out of the double loop so that it isn't # repeated needlessly x(num_outputs) conn_data = {} for input_name in edge_dict[0]: input_full = "%s.%s" % (item, input_name) if input_full not in self.param_names and \ input_full not in self.grouped_param_names and \ input_full not in solver_conns: sources = ascope._depgraph.connections_to(input_full) expr_txt = sources[0][0] target = sources[0][1] # Variables on an assembly boundary if expr_txt[0:4] == '@bin': expr_txt = expr_txt.replace('@bin.', '') # Need derivative of the expression source, expr_deriv = _d_conn(expr_txt, target, ascope) # Chain together deriv from var connection and comp i_var = self.var_list.index("%s%s" % (head, source)) conn_data[input_full] = (i_var, expr_deriv[source]) # Each output gives us an equation for output_name in edge_dict[1]: self.LHS[i_eq][i_eq] = 1.0 if fdblock: local_out = "%s.%s" % (item, output_name) else: local_out = output_name # Each input provides a term for LHS or RHS for input_name in edge_dict[0]: input_full = "%s.%s" % (item, input_name) if fdblock: local_in = input_full else: local_in = input_name # Direct connection to parameter goes in RHS if input_full in self.param_names: i_param = self.param_names.index(input_full) self.RHS[i_eq][i_param] = \ local_derivs[local_out][local_in] elif input_full in self.grouped_param_names: grouped = self.grouped_param_names[input_full] i_param = self.param_names.index(grouped) self.RHS[i_eq][i_param] = \ local_derivs[local_out][local_in] # Input is a dependent in a solver loop elif input_full in solver_conns: source = solver_conns[input_full] i_dep = self.var_list.index(source) self.LHS[i_eq][i_dep] = \ -local_derivs[local_out][local_in] # Input connected to other outputs goes in LHS else: # Already calculated connection deriatives i_var = conn_data[input_full][0] expr_deriv = conn_data[input_full][1] self.LHS[i_eq][i_var] = \ -local_derivs[local_out][local_in] * \ expr_deriv i_eq += 1 return i_eq def _solve(self): """Solve the linear system. Direct mode: solves for dy/d(param) Adjoint mode: solves for d(obj,constr)/dx """ if self.mode == 'adjoint': total_derivs = linalg.solve(self.LHS.T, self.EQS.T) self.gradient = self.EQS_zero + dot(total_derivs.T, self.RHS) else: total_derivs = linalg.solve(self.LHS, self.RHS) self.gradient = self.EQS_zero + dot(self.EQS, total_derivs)
OpenMDAO Home