Source code for openmdao.lib.components.metamodel

""" Metamodel provides basic Meta Modeling capability."""

from copy import deepcopy

# pylint: disable-msg=E0611,F0401
from enthought.traits.trait_base import not_none
from enthought.traits.has_traits import _clone_trait

from openmdao.main.api import Component, Case
from openmdao.lib.datatypes.api import Slot, List, Str, Float, Int, Event, Dict, Bool
from openmdao.main.interfaces import IComponent, ISurrogate, ICaseRecorder, \
     ICaseIterator, IUncertainVariable
from openmdao.main.mp_support import has_interface
from openmdao.util.log import logger
from openmdao.main.datatypes.uncertaindist import UncertainDistVar
from openmdao.util.typegroups import int_types, real_types

_missing = object()
__surrogate_prefix__ = 'sur_'

[docs]class MetaModel(Component): # pylint: disable-msg=E1101 model = Slot(IComponent, allow_none=True, desc='Slot for the Component or Assembly being ' 'encapsulated.') includes = List(Str, iotype='in', desc='A list of names of variables to be included ' 'in the public interface.') excludes = List(Str, iotype='in', desc='A list of names of variables to be excluded ' 'from the public interface.') warm_start_data = Slot(ICaseIterator, iotype="in", desc="CaseIterator containing cases to use as " "initial training data. When this is set, all " "previous training data is cleared and replaced " "with data from this CaseIterator.") default_surrogate = Slot(ISurrogate, allow_none=True, desc="This surrogate will be used for all " "outputs that don't have a specific surrogate assigned " "to them in their sur_<name> slot.") report_errors = Bool(True, iotype="in", desc="If True, metamodel will report errors reported from the component. " "If False, metamodel will swallow the errors but log that they happened and " "exclude the case from the training set.") recorder = Slot(ICaseRecorder, desc='Records training cases') # when fired, the next execution will train the metamodel train_next = Event(desc='Train metamodel on next execution') #when fired, the next execution will reset all training data reset_training_data = Event(desc='Reset training data on next execution') def __init__(self): super(MetaModel, self).__init__() self._surrogate_input_names = None self._surrogate_output_names = None self._surrogate_overrides = set() # keeps track of which sur_<name> slots are full self._training_data = {} self._training_input_history = [] self._const_inputs = {} # dict of constant training inputs indices and their values self._train = False self._new_train_data = False self._failed_training_msgs = [] self._default_surrogate_copies = {} # need to maintain separate copy of default surrogate for each sur_* that doesn't # have a surrogate defined # the following line will work for classes that inherit from MetaModel # as long as they declare their traits in the class body and not in # the __init__ function. If they need to create traits dynamically # during initialization they'll have to provide the value of # _mm_class_traitnames self._mm_class_traitnames = set(self.traits(iotype=not_none).keys()) def _train_next_fired(self): self._train = True self._new_train_data = True def _reset_training_data_fired(self): self._training_input_history = [] self._const_inputs = {} self._failed_training_msgs = [] # remove output history from training_data for name in self._training_data: self._training_data[name] = [] def _warm_start_data_changed(self, oldval, newval): self.reset_training_data = True # build list of inputs for case in newval: if self.recorder: self.recorder.record(case) inputs = [] for inp_name in self.surrogate_input_names(): var_name = '.'.join([self.name, inp_name]) try: inp_val = case[var_name] except KeyError: pass #self.raise_exception('The variable "%s" was not ' #'found as an input in one of the cases provided ' #'for warm_start_data.' % var_name, ValueError) else: if inp_val is not None: inputs.append(inp_val) #print "inputs", inputs self._training_input_history.append(inputs) for output_name in self.surrogate_output_names(): #grab value from case data var_name = '.'.join([self.name, output_name]) try: val = case.get_output(var_name) except KeyError: self.raise_exception('The output "%s" was not found ' 'in one of the cases provided for ' 'warm_start_data' % var_name, ValueError) else: # save to training output history self._training_data[output_name].append(val) self._new_train_data = True
[docs] def execute(self): """If the training flag is set, train the metamodel. Otherwise, predict outputs. """ if self.model is None: self.raise_exception("MetaModel object must have a model!", RuntimeError) if self._train: try: inputs = self.update_model_inputs() #print '%s training with inputs: %s' % (self.get_pathname(), inputs) self.model.run(force=True) except Exception as err: if self.report_errors: raise err else: self._failed_training_msgs.append(str(err)) else: # if no exceptions are generated, save the data self._training_input_history.append(inputs) self.update_outputs_from_model() case_outputs = [] for name, output_history in self._training_data.items(): case_outputs.append(('.'.join([self.name, name]), output_history[-1])) # save the case, making sure to add out name to the local input name since # this Case is scoped to our parent Assembly case_inputs = [('.'.join([self.name, name]), val) for name, val in zip(self.surrogate_input_names(), inputs)] if self.recorder: self.recorder.record(Case(inputs=case_inputs, outputs=case_outputs)) self._train = False else: if self.default_surrogate is None and not self._surrogate_overrides: # NO surrogates defined. just run model and get outputs inputs = self.update_model_inputs() self.model.run() self.update_outputs_from_model() return #print '%s predicting' % self.get_pathname() if self._new_train_data: if len(self._training_input_history) < 2: self.raise_exception("ERROR: need at least 2 training points!", RuntimeError) # figure out if we have any constant training inputs tcases = self._training_input_history in_hist = tcases[0][:] # start off assuming every input is constant idxlist = range(len(in_hist)) self._const_inputs = dict(zip(idxlist, in_hist)) for i in idxlist: val = in_hist[i] for case in range(1, len(tcases)): if val != tcases[case][i]: del self._const_inputs[i] break if len(self._const_inputs) == len(in_hist): self.raise_exception("ERROR: all training inputs are constant.") elif len(self._const_inputs) > 0: # some inputs are constant, so we have to remove them from the training set training_input_history = [] for inputs in self._training_input_history: training_input_history.append([val for i, val in enumerate(inputs) if i not in self._const_inputs]) else: training_input_history = self._training_input_history for name, output_history in self._training_data.items(): surrogate = self._get_surrogate(name) if surrogate is not None: surrogate.train(training_input_history, output_history) self._new_train_data = False inputs = [] for i, name in enumerate(self.surrogate_input_names()): val = getattr(self, name) cval = self._const_inputs.get(i, _missing) if cval is _missing: inputs.append(val) elif val != cval: self.raise_exception("ERROR: training input '%s' was a constant value of (%s) but the value has changed to (%s)." % (name, cval, val), ValueError) for name in self._training_data: surrogate = self._get_surrogate(name) # copy output to boundary if surrogate is None: setattr(self, name, getattr(self.model, name)) # no surrogate. use outputs from model else: setattr(self, name, surrogate.predict(inputs))
def _post_run(self): self._train = False super(MetaModel, self)._post_run()
[docs] def invalidate_deps(self, compname=None, varnames=None, force=False): if compname: # we were called from our model, which expects to be in an Assembly return super(MetaModel, self).invalidate_deps(varnames=varnames)
[docs] def child_invalidated(self, childname, outs=None, force=False): pass
[docs] def exec_counts(self, compnames): # we force the run on our model, so it doesn't matter what we tell it the exec counts are return [0 for n in compnames]
def _model_changed(self, oldmodel, newmodel): """called whenever the model variable is set or when includes/excludes change.""" # TODO: check for pre-connected traits on the new model # TODO: disconnect traits corresponding to old model (or leave them if the new model has the same ones?) # TODO: check for nested MMs? Is this a problem? # TODO: check for name collisions between MetaModel class traits and traits from model if newmodel is not None and not has_interface(newmodel, IComponent): self.raise_exception('model of type %s does not implement the IComponent interface' % type(newmodel).__name__, TypeError) self.reset_training_data = True self._update_surrogate_list() if self.default_surrogate is None: no_sur = [] for name in self.surrogate_output_names(): if getattr(self, __surrogate_prefix__+name, None) is None: no_sur.append(name) if len(no_sur) > 0 and len(no_sur) != len(self._surrogate_output_names): self.raise_exception("No default surrogate model is defined and the following outputs do not have a surrogate model: %s. Either specify default_surrogate, or specify a surrogate model for all outputs." % no_sur, RuntimeError) if newmodel: newmodel.parent = self newmodel.name = 'model' def _add_var_for_surrogate(self, surrogate, varname): """Different surrogates have different types of output values, so create the appropriate type of output Variable based on the return value of get_uncertain_value on the surrogate. """ val = surrogate.get_uncertain_value(getattr(self.model, varname)) if has_interface(val, IUncertainVariable): ttype = UncertainDistVar elif isinstance(val, real_types): ttype = Float elif isinstance(val, int_types): ttype = Int else: self.raise_exception("value type of '%s' is not a supported surrogate return value" % val.__class__.__name__) self.add(varname, ttype(default_value=val, iotype='out', desc=self.model.trait(varname).desc, units=self.model.trait(varname).units)) setattr(self, varname, val) def _surrogate_updated(self, obj, name, old, new): """Called when a surrogate Slot (sur_*) is updated.""" if new is None: if self.default_surrogate: self._default_surrogate_copies[name] = deepcopy(self.default_surrogate) if name in self._surrogate_overrides: self._surrogate_overrides.remove(name) else: self._surrogate_overrides.add(name) varname = name[len(__surrogate_prefix__):] self._add_var_for_surrogate(getattr(self, name), varname) if old is None and name in self._default_surrogate_copies: del self._default_surrogate_copies[name]
[docs] def update_inputs(self, compname, varnames): if compname != 'model': self.raise_exception("cannot update inputs for child named '%s'" % compname) self.model.set_valid(varnames, True)
[docs] def update_model_inputs(self): """Copy the values of the MetaModel's inputs into the inputs of the model. Returns the values of the inputs. """ input_values = [] for name in self.surrogate_input_names(): inp = getattr(self, name) input_values.append(inp) setattr(self.model, name, inp) return input_values
def _get_surrogate(self, name): """Return the designated surrogate for the given output.""" surrogate = getattr(self, __surrogate_prefix__+name, None) if surrogate is None and self.default_surrogate is not None: surrogate = self._default_surrogate_copies.get(__surrogate_prefix__+name) return surrogate
[docs] def update_outputs_from_model(self): """Copy output values from the model into the MetaModel's outputs, and if training, save the output associated with surrogate. """ for name in self.surrogate_output_names(): out = getattr(self.model, name) surrogate = self._get_surrogate(name) if surrogate is None: setattr(self, name, out) else: setattr(self, name, surrogate.get_uncertain_value(out)) if self._train: self._training_data[name].append(out) # save to training output history
def _add_input(self, name): """Adds the specified input variable.""" self.add_trait(name, _clone_trait(self.model.trait(name))) setattr(self, name, getattr(self.model, name)) def _add_output(self, name): """Adds the specified output variable and its associated surrogate Slot.""" sur_name = __surrogate_prefix__+name self.add_trait(sur_name, Slot(ISurrogate, allow_none=True)) self.on_trait_change(self._surrogate_updated, sur_name) if self.default_surrogate is not None: surrogate = deepcopy(self.default_surrogate) self._default_surrogate_copies[sur_name] = surrogate self._add_var_for_surrogate(surrogate, name) else: self.add_trait(name, _clone_trait(self.model.trait(name))) self._training_data[name] = [] def _remove_input(self, name): """Removes the specified input variable.""" if self.parent: self.parent.disconnect('.'.join([self.name, name])) self.remove_trait(name) def _remove_output(self, name): """Removes the specified output variable and its associated surrogate.""" if self.parent: self.parent.disconnect('.'.join([self.name, name])) self.remove_trait(name) self.remove_trait(__surrogate_prefix__+name) if name in self._training_data: del self._training_data[name]
[docs] def surrogate_input_names(self): """Return the list of names of public inputs that correspond to model inputs. """ if self._surrogate_input_names is None: if self.model: self._surrogate_input_names = [n for n in self.model._alltraits(iotype='in').keys() if self._eligible(n) and n not in self._mm_class_traitnames] else: return [] return self._surrogate_input_names
[docs] def surrogate_output_names(self): """Return the list of names of public outputs that correspond to model outputs. """ if self._surrogate_output_names is None: if self.model: self._surrogate_output_names = [n for n in self.model._alltraits(iotype='out').keys() if self._eligible(n) and n not in self._mm_class_traitnames] else: return [] return self._surrogate_output_names
def _update_surrogate_list(self): old_in = set() if self._surrogate_input_names is not None: old_in.update(self._surrogate_input_names) old_out = set() if self._surrogate_output_names is not None: old_out.update(self._surrogate_output_names) self._surrogate_input_names = None self._surrogate_output_names = None new_in = set(self.surrogate_input_names()) new_out = set(self.surrogate_output_names()) added_outs = new_out - old_out added_ins = new_in - old_in removed_outs = old_out - new_out removed_ins = old_in - new_in if added_outs or added_ins or removed_ins: self.reset_training_data = True for name in removed_ins: self._remove_input(name) for name in added_ins: self._add_input(name) for name in removed_outs: self._remove_output(name) for name in added_outs: self._add_output(name) def _includes_changed(self, old, new): if self.excludes and new is not None: self.__dict__['includes'] = old self.raise_exception("includes and excludes are mutually exclusive", RuntimeError) self._update_surrogate_list() def _excludes_changed(self, old, new): if self.includes and new is not None: self.__dict__['excludes'] = old self.raise_exception("includes and excludes are mutually exclusive", RuntimeError) self._update_surrogate_list() def _default_surrogate_changed(self, old_obj, new_obj): if old_obj: old_obj.on_trait_change(self._def_surrogate_trait_modified, remove=True) if new_obj: new_obj.on_trait_change(self._def_surrogate_trait_modified) #due to the way "add" works, container will always remove the old # before it adds the new one. So you actually get this method called # twice on a replace. You only do this update when the new one gets set for name in self.surrogate_output_names(): surname = __surrogate_prefix__+name if surname not in self._surrogate_overrides: surrogate = deepcopy(self.default_surrogate) self._default_surrogate_copies[surname] = surrogate self._add_var_for_surrogate(surrogate, name) def _def_surrogate_trait_modified(self, surrogate, name, old, new): # a trait inside of the default_surrogate was changed, so we need to # replace all of the default copies for name in self._default_surrogate_copies: self._default_surrogate_copies[name] = deepcopy(self.default_surrogate) def _eligible(self, name): """Return True if the named trait is not excluded from the public interface based on the includes and excludes lists. """ if name in self._mm_class_traitnames: return False if self.includes and name not in self.includes: return False elif self.excludes and name in self.excludes: return False return True
OpenMDAO Home