Source code for openmdao.components.meta_model_unstructured_comp

"""MetaModel provides basic meta modeling capability."""
from copy import deepcopy
from itertools import chain

import numpy as np

from openmdao.core.explicitcomponent import ExplicitComponent
from openmdao.surrogate_models.surrogate_model import SurrogateModel
from openmdao.utils.class_util import overrides_method
from openmdao.utils.name_maps import rel_key2abs_key
from openmdao.utils.om_warnings import issue_warning, DerivativesWarning
from openmdao.utils.array_utils import shape_to_len


[docs]class MetaModelUnStructuredComp(ExplicitComponent): r""" Class that creates a reduced order model for outputs from inputs. Each output may have its own surrogate model. Training inputs and outputs are automatically created with 'train\_' prepended to the corresponding input/output name. For a Float variable, the training data is an array of length m, where m is the number of training points. Parameters ---------- **kwargs : dict of keyword arguments Keyword arguments that will be mapped into the Component options. Attributes ---------- train : bool If True, training will occur on the next execution. _input_size : int Keeps track of the cumulative size of all inputs. _surrogate_input_names : [str, ..] List of inputs that are not the training vars. _surrogate_output_names : [str, ..] List of outputs that are not the training vars. _static_input_size : int Keeps track of the cumulative size of all inputs added outside of setup. _static_surrogate_input_names : [str, ..] List of inputs that are not the training vars and are added outside of setup. _static_surrogate_output_names : [str, ..] List of outputs that are not the training vars and are added outside of setup. _training_input : dict Training data for inputs. _training_output : dict Training data for outputs. """
[docs] def __init__(self, **kwargs): """ Initialize all attributes. """ super().__init__(**kwargs) # keep list of inputs and outputs that are not the training vars self._surrogate_input_names = [] self._surrogate_output_names = [] # training will occur on first execution self.train = True self._training_input = np.empty(0) self._training_output = {} self._input_size = 0 self._static_surrogate_input_names = [] self._static_surrogate_output_names = [] self._static_input_size = 0 self._no_check_partials = True
def _setup_procs(self, pathname, comm, prob_meta): """ Execute first phase of the setup process. Distribute processors, assign pathnames, and call setup on the component. Parameters ---------- pathname : str Global name of the system, including the path. comm : MPI.Comm or <FakeComm> MPI communicator object. prob_meta : dict Problem level options. """ self._surrogate_input_names = [] self._surrogate_output_names = [] self._surrogate_input_names.extend(self._static_surrogate_input_names) self._surrogate_output_names.extend(self._static_surrogate_output_names) self._input_size = self._static_input_size super()._setup_procs(pathname, comm, prob_meta)
[docs] def initialize(self): """ Declare options. """ self.options.declare('default_surrogate', types=(SurrogateModel, type(None)), default=None, desc="Surrogate that will be used for all outputs that don't have a " "specific surrogate assigned to them.") self.options.declare('vec_size', types=int, default=1, lower=1, desc='Number of points that will be simultaneously predicted by ' 'the surrogate.')
[docs] def add_input(self, name, val=1.0, training_data=None, **kwargs): """ Add an input to this component and a corresponding training input. Parameters ---------- name : str Name of the input. val : float or ndarray Initial value for the input. training_data : float or ndarray Training data for this variable. Optional, can be set by the problem later. **kwargs : dict Additional agruments for add_input. Returns ------- dict Metadata for added variable. """ metadata = super().add_input(name, val, **kwargs) vec_size = self.options['vec_size'] if vec_size > 1: if metadata['shape'][0] != vec_size: raise RuntimeError(f"{self.msginfo}: First dimension of input '{name}' " f"must be {vec_size}") input_size = metadata['val'][0].size else: input_size = metadata['val'].size if self._static_mode: surrogate_input_names = self._static_surrogate_input_names self._static_input_size += input_size else: surrogate_input_names = self._surrogate_input_names self._input_size += input_size surrogate_input_names.append((name, input_size)) train_name = f'train_{name}' self.options.declare(train_name, default=None, desc='Training data for %s' % name) if training_data is not None: self.options[train_name] = training_data return metadata
[docs] def add_output(self, name, val=1.0, training_data=None, surrogate=None, **kwargs): """ Add an output to this component and a corresponding training output. Parameters ---------- name : str Name of the variable output. val : float or ndarray Initial value for the output. While the value is overwritten during execution, it is useful for inferring size. training_data : float or ndarray Training data for this variable. Optional, can be set by the problem later. surrogate : <SurrogateModel>, optional Surrogate model to use for this output; if None, use default surrogate. **kwargs : dict Additional arguments for add_output. Returns ------- dict Metadata for added variable. """ metadata = super().add_output(name, val, **kwargs) vec_size = self.options['vec_size'] if vec_size > 1: if metadata['shape'][0] != vec_size: raise RuntimeError(f"{self.msginfo}: First dimension of output '{name}' " f"must be {vec_size}") output_shape = tuple(metadata['shape'][1:]) else: output_shape = metadata['shape'] if self._static_mode: surrogate_output_names = self._static_surrogate_output_names else: surrogate_output_names = self._surrogate_output_names surrogate_output_names.append((name, output_shape)) self._training_output[name] = np.zeros(0) # Note: the default_surrogate flag is stored in metadata so that we can reconfigure # with a new default by rerunning setup. if surrogate: metadata['surrogate'] = surrogate metadata['default_surrogate'] = False metadata['surrogate_name'] = type(surrogate).__name__ else: metadata['default_surrogate'] = True train_name = f'train_{name}' self.options.declare(train_name, default=None, desc='Training data for %s' % name) if training_data is not None: self.options[train_name] = training_data return metadata
def _setup_var_data(self): """ Count total variables. Also instantiates surrogates for the output variables that use the default surrogate. """ default_surrogate = self.options['default_surrogate'] for name, shape in self._surrogate_output_names: metadata = self._metadata(name) if default_surrogate is not None and metadata.get('default_surrogate'): # Create an instance of the default surrogate for outputs that did not have a # surrogate specified. surrogate = deepcopy(default_surrogate) metadata['surrogate'] = surrogate # training will occur on first execution after setup self.train = True super()._setup_var_data() def _setup_partials(self): """ Process all partials and approximations that the user declared. Metamodel needs to declare its partials after inputs and outputs are known. """ super()._setup_partials() vec_size = self.options['vec_size'] if vec_size > 1: vec_arange = np.arange(vec_size) # Sparse specification of partials for vectorized models. for wrt, n_wrt in self._surrogate_input_names: for of, shape_of in self._surrogate_output_names: n_of = shape_to_len(shape_of) rows = np.repeat(np.arange(n_of), n_wrt) cols = np.tile(np.arange(n_wrt), n_of) repeat = np.repeat(vec_arange, len(rows)) rows = np.tile(rows, vec_size) + repeat * n_of cols = np.tile(cols, vec_size) + repeat * n_wrt pattern_meta = { 'rows': rows, 'cols': cols, 'dependent': True, } self._resolve_partials_patterns(of=of, wrt=wrt, pattern_meta=pattern_meta) else: pattern_meta = { 'val': None, 'dependent': True, } # Dense specification of partials for non-vectorized models. self._resolve_partials_patterns(of=tuple([n[0] for n in self._surrogate_output_names]), wrt=tuple([n[0] for n in self._surrogate_input_names]), pattern_meta=pattern_meta) # Support for user declaring fd partials in a child class and assigning new defaults. # We want a warning for all partials that were not explicitly declared. declared_partials = set([ key for key, dct in self._subjacs_info.items() if 'method' in dct and dct['method']]) # Gather undeclared fd partials on surrogates that don't support analytic derivatives. # While we do this, declare the missing ones. non_declared_partials = [] for of, _ in self._surrogate_output_names: surrogate = self._metadata(of).get('surrogate') if surrogate and not overrides_method('linearize', surrogate, SurrogateModel): wrt_list = [name[0] for name in self._surrogate_input_names] self._approx_partials(of=of, wrt=wrt_list, method='fd') for wrt in wrt_list: abs_key = rel_key2abs_key(self, (of, wrt)) if abs_key not in declared_partials: non_declared_partials.append(abs_key) if non_declared_partials: self._get_approx_scheme('fd') msg = "Because the MetaModelUnStructuredComp '{}' uses a surrogate " \ "which does not define a linearize method,\nOpenMDAO will use " \ "finite differences to compute derivatives. Some of the derivatives " \ "will be computed\nusing default finite difference " \ "options because they were not explicitly declared.\n".format(self.name) msg += "The derivatives computed using the defaults are:\n" for abs_key in non_declared_partials: msg += " {}, {}\n".format(*abs_key) issue_warning(msg, category=DerivativesWarning)
[docs] def check_config(self, logger): """ Perform optional error checks. Parameters ---------- logger : object The object that manages logging output. """ # All outputs must have surrogates assigned either explicitly or through the default # surrogate. if self.options['default_surrogate'] is None: no_sur = [] for name, shape in self._surrogate_output_names: surrogate = self._metadata(name).get('surrogate') if surrogate is None: no_sur.append(name) if len(no_sur) > 0: msg = ("No default surrogate model is defined and the following" " outputs do not have a surrogate model:\n%s\n" "Either specify a default_surrogate, or specify a " "surrogate model for all outputs." % no_sur) logger.error(msg)
[docs] def compute(self, inputs, outputs): """ Predict outputs. If the training flag is set, train the metamodel first. Parameters ---------- inputs : Vector Unscaled, dimensional input variables read via inputs[key]. outputs : Vector Unscaled, dimensional output variables read via outputs[key]. """ vec_size = self.options['vec_size'] # train first if self.train: self._train() # predict for current inputs if vec_size > 1: flat_inputs = self._vec_to_array_vectorized(inputs) else: flat_inputs = self._vec_to_array(inputs) for name, shape in self._surrogate_output_names: surrogate = self._metadata(name).get('surrogate') if vec_size == 1: # Non vectorized. predicted = surrogate.predict(flat_inputs) if isinstance(predicted, tuple): # rmse option self._metadata(name)['rmse'] = predicted[1] predicted = predicted[0] outputs[name] = np.reshape(predicted, shape) elif overrides_method('vectorized_predict', surrogate, SurrogateModel): # Vectorized; surrogate provides vectorized computation. # TODO: This code is untested because no surrogates provide this option. predicted = surrogate.vectorized_predict(flat_inputs.flat) if isinstance(predicted, tuple): # rmse option self._metadata(name)['rmse'] = predicted[1] predicted = predicted[0] outputs[name] = np.reshape(predicted, shape) else: # Vectorized; must call surrogate multiple times. if isinstance(shape, tuple): output_shape = (vec_size, ) + shape else: output_shape = (vec_size, ) predicted = np.zeros(output_shape, dtype=flat_inputs.dtype) rmse = self._metadata(name)['rmse'] = [] for i in range(vec_size): pred_i = surrogate.predict(flat_inputs[i]) if isinstance(pred_i, tuple): # rmse option rmse.append(pred_i[1]) pred_i = pred_i[0] predicted[i] = np.reshape(pred_i, shape) outputs[name] = np.reshape(predicted, output_shape)
def _vec_to_array(self, vec): """ Convert from a dictionary of inputs to a flat ndarray. Parameters ---------- vec : <Vector> pointer to the input vector. Returns ------- ndarray flattened array of input data """ arr = np.zeros(self._input_size, dtype=vec.asarray().dtype) idx = 0 for name, sz in self._surrogate_input_names: val = vec[name] arr[idx:idx + sz] = val.flat idx += sz return arr def _vec_to_array_vectorized(self, vec): """ Convert from a dictionary of inputs to a 2d ndarray with vec_size rows. Parameters ---------- vec : <Vector> pointer to the input vector. Returns ------- ndarray 2d array, self._vectorize rows of flattened input data. """ vec_size = self.options['vec_size'] arr = np.zeros((vec_size, self._input_size), dtype=vec.asarray().dtype) vecvals = [(vec[n], sz) for n, sz in self._surrogate_input_names] for row in range(vec_size): idx = 0 for val, sz in vecvals: arr[row][idx:idx + sz] = val[row].flat idx += sz return arr
[docs] def declare_partials(self, of, wrt, dependent=True, rows=None, cols=None, val=None, method='exact', step=None, form=None, step_calc=None): """ Declare information about this component's subjacobians. Parameters ---------- of : str or list of str The name of the residual(s) that derivatives are being computed for. May also contain a glob pattern. wrt : str or list of str The name of the variables that derivatives are taken with respect to. This can contain the name of any input or output variable. May also contain a glob pattern. dependent : bool(True) If False, specifies no dependence between the output(s) and the input(s). This is only necessary in the case of a sparse global jacobian, because if 'dependent=False' is not specified and declare_partials is not called for a given pair, then a dense matrix of zeros will be allocated in the sparse global jacobian for that pair. In the case of a dense global jacobian it doesn't matter because the space for a dense subjac will always be allocated for every pair. rows : ndarray of int or None Row indices for each nonzero entry. For sparse subjacobians only. cols : ndarray of int or None Column indices for each nonzero entry. For sparse subjacobians only. val : float or ndarray of float or scipy.sparse Value of subjacobian. If rows and cols are not None, this will contain the values found at each (row, col) location in the subjac. method : str The type of approximation that should be used. Valid options include: 'fd': Finite Difference, 'cs': Complex Step, 'exact': use the component defined analytic derivatives. Default is 'exact'. step : float Step size for approximation. Defaults to None, in which case the approximation method provides its default value. form : str Form for finite difference, can be 'forward', 'backward', or 'central'. Defaults to None, in which case the approximation method provides its default value. step_calc : str Step type for computing the size of the finite difference step. It can be 'abs' for absolute, 'rel_avg' for a size relative to the absolute value of the vector input, or 'rel_element' for a size relative to each value in the vector input. In addition, it can be 'rel_legacy' for a size relative to the norm of the vector. For backwards compatibilty, it can be 'rel', which is now equivalent to 'rel_avg'. Defaults to None, in which case the approximation method provides its default value. """ if method == 'cs': raise ValueError('Complex step has not been tested for MetaModelUnStructuredComp') super().declare_partials(of, wrt, dependent, rows, cols, val, method, step, form, step_calc)
[docs] def compute_partials(self, inputs, partials): """ Compute sub-jacobian parts. The model is assumed to be in an unscaled state. Parameters ---------- inputs : Vector Unscaled, dimensional input variables read via inputs[key]. partials : Jacobian Sub-jac components written to partials[output_name, input_name]. """ vec_size = self.options['vec_size'] if vec_size > 1: flat_inputs = self._vec_to_array_vectorized(inputs) else: flat_inputs = self._vec_to_array(inputs) for out_name, out_shape in self._surrogate_output_names: surrogate = self._metadata(out_name).get('surrogate') if vec_size > 1: out_size = shape_to_len(out_shape) for j in range(vec_size): flat_input = flat_inputs[j] if overrides_method('linearize', surrogate, SurrogateModel): derivs = surrogate.linearize(flat_input) idx = 0 for in_name, sz in self._surrogate_input_names: j1 = j * out_size * sz j2 = j1 + out_size * sz partials[out_name, in_name][j1:j2] = derivs[:, idx:idx + sz].flat idx += sz else: if overrides_method('linearize', surrogate, SurrogateModel): sjac = surrogate.linearize(flat_inputs) idx = 0 for in_name, sz in self._surrogate_input_names: partials[(out_name, in_name)] = sjac[:, idx:idx + sz] idx += sz
def _train(self): """ Train the metamodel, if necessary, using the provided training data. """ missing_training_data = [] num_sample = None for name, _ in chain(self._surrogate_input_names, self._surrogate_output_names): train_name = f'train_{name}' val = self.options[train_name] if val is None: missing_training_data.append(train_name) continue if num_sample is None: num_sample = len(val) elif len(val) != num_sample: raise RuntimeError(f"{self.msginfo}: Each variable must have the same number " f"of training points. Expected {num_sample} but found " f"{len(val)} points for '{name}'.") if len(missing_training_data) > 0: raise RuntimeError(f"{self.msginfo}: The following training data sets must be " f"provided as options: {missing_training_data}") inputs = np.zeros((num_sample, self._input_size)) self._training_input = inputs # Assemble input data. idx = 0 for name, sz in self._surrogate_input_names: val = self.options[f'train_{name}'] if isinstance(val[0], float): inputs[:, idx] = val idx += 1 else: for row_idx, v in enumerate(val): v = np.asarray(v) inputs[row_idx, idx:idx + sz] = v.flat idx += sz # Assemble output data and train each output. for name, shape in self._surrogate_output_names: output_size = shape_to_len(shape) outputs = np.zeros((num_sample, output_size)) self._training_output[name] = outputs val = self.options[f'train_{name}'] if isinstance(val[0], float): outputs[:, 0] = val else: for row_idx, v in enumerate(val): v = np.asarray(v) outputs[row_idx, :] = v.flat surrogate = self._metadata(name).get('surrogate') if surrogate is None: raise RuntimeError(f"{self.msginfo}: No surrogate specified for output '{name}'") else: surrogate.train(self._training_input, self._training_output[name]) self.train = False def _metadata(self, name): return self._var_rel2meta[name]