Source code for openmdao.drivers.differential_evolution_driver

"""
Driver for a differential evolution genetic algorithm.

TODO: add better references than: https://en.wikipedia.org/wiki/Differential_evolution

Most of this driver (except execute_ga) is based on SimpleGA, so the following may still apply:

The following reference is only for the penalty function:
Smith, A. E., Coit, D. W. (1995) Penalty functions. In: Handbook of Evolutionary Computation, 97(1).

The following reference is only for weighted sum multi-objective optimization:
Sobieszczanski-Sobieski, J., Morris, A. J., van Tooren, M. J. L. (2015)
Multidisciplinary Design Optimization Supported by Knowledge Based Engineering.
John Wiley & Sons, Ltd.
"""
import os
import copy

import numpy as np

try:
    from pyDOE3 import lhs
except ModuleNotFoundError:
    lhs = None

from openmdao.core.constants import INF_BOUND
from openmdao.core.driver import Driver, RecordingDebugging
from openmdao.utils.concurrent import concurrent_eval
from openmdao.utils.mpi import MPI
from openmdao.core.analysis_error import AnalysisError


[docs]class DifferentialEvolutionDriver(Driver): """ Driver for a differential evolution genetic algorithm. This algorithm requires that inputs are floating point numbers. Parameters ---------- **kwargs : dict of keyword arguments Keyword arguments that will be mapped into the Driver options. Attributes ---------- _problem_comm : MPI.Comm or None The MPI communicator for the Problem. _concurrent_pop_size : int Number of points to run concurrently when model is a parallel one. _concurrent_color : int Color of current rank when running a parallel model. _desvar_idx : dict Keeps track of the indices for each desvar, since DifferentialEvolution sees an array of design variables. _ga : <DifferentialEvolution> Main genetic algorithm lies here. _nfit : int Number of successful function evaluations. _randomstate : int Seed-number which controls the random draws. """
[docs] def __init__(self, **kwargs): """ Initialize the DifferentialEvolutionDriver driver. """ if lhs is None: raise RuntimeError(f"{self.__class__.__name__} requires the 'pyDOE3' package, " "which can be installed with one of the following commands:\n" " pip install openmdao[doe]\n" " pip install pyDOE3") super().__init__(**kwargs) # What we support self.supports['optimization'] = True self.supports['inequality_constraints'] = True self.supports['equality_constraints'] = True self.supports['multiple_objectives'] = True # What we don't support yet self.supports['integer_design_vars'] = False self.supports['two_sided_constraints'] = False self.supports['linear_constraints'] = False self.supports['simultaneous_derivatives'] = False self.supports['active_set'] = False self.supports['distributed_design_vars'] = False self.supports._read_only = True self._desvar_idx = {} self._ga = None self._nfit = 0 # random state can be set for predictability during testing if 'DifferentialEvolutionDriver_seed' in os.environ: self._randomstate = int(os.environ['DifferentialEvolutionDriver_seed']) else: self._randomstate = None # Support for Parallel models. self._concurrent_pop_size = 0 self._concurrent_color = 0
def _declare_options(self): """ Declare options before kwargs are processed in the init method. """ self.options.declare('max_gen', default=100, desc='Number of generations before termination.') self.options.declare('pop_size', default=0, desc='Number of points in the GA. Set to 0 and it will be computed ' 'as 20 times the total number of inputs.') self.options.declare('run_parallel', types=bool, default=False, desc='Set to True to execute the points in a generation in parallel.') self.options.declare('procs_per_model', default=1, lower=1, desc='Number of processors to give each model under MPI.') self.options.declare('penalty_parameter', default=10., lower=0., desc='Penalty function parameter.') self.options.declare('penalty_exponent', default=1., desc='Penalty function exponent.') self.options.declare('Pc', default=0.9, lower=0., upper=1., desc='Crossover probability.') self.options.declare('F', default=0.9, lower=0., upper=1., allow_none=True, desc='Differential rate.') self.options.declare('multi_obj_weights', default={}, types=(dict), desc='Weights of objectives for multi-objective optimization.' 'Weights are specified as a dictionary with the absolute names' 'of the objectives. The same weights for all objectives are assumed, ' 'if not given.') self.options.declare('multi_obj_exponent', default=1., lower=0., desc='Multi-objective weighting exponent.') def _setup_driver(self, problem): """ Prepare the driver for execution. This is the final thing to run during setup. Parameters ---------- problem : <Problem> Pointer to the containing problem. """ super()._setup_driver(problem) # check design vars and constraints for invalid bounds for name, meta in self._designvars.items(): lower, upper = meta['lower'], meta['upper'] for param in (lower, upper): if param is None or np.all(np.abs(param) >= INF_BOUND): msg = (f"Invalid bounds for design variable '{name}'. When using " f"{self.__class__.__name__}, values for both 'lower' and 'upper' " f"must be specified between +/-INF_BOUND ({INF_BOUND}), " f"but they are: lower={lower}, upper={upper}.") raise ValueError(msg) for name, meta in self._cons.items(): equals, lower, upper = meta['equals'], meta['lower'], meta['upper'] if ((equals is None or np.all(np.abs(equals) >= INF_BOUND)) and (lower is None or np.all(np.abs(lower) >= INF_BOUND)) and (upper is None or np.all(np.abs(upper) >= INF_BOUND))): msg = (f"Invalid bounds for constraint '{name}'. " f"When using {self.__class__.__name__}, the value for 'equals', " f"'lower' or 'upper' must be specified between +/-INF_BOUND " f"({INF_BOUND}), but they are: " f"equals={equals}, lower={lower}, upper={upper}.") raise ValueError(msg) model_mpi = None comm = problem.comm if self._concurrent_pop_size > 0: model_mpi = (self._concurrent_pop_size, self._concurrent_color) elif not self.options['run_parallel']: comm = None self._ga = DifferentialEvolution(self.objective_callback, comm=comm, model_mpi=model_mpi) def _setup_comm(self, comm): """ Perform any driver-specific setup of communicators for the model. Here, we generate the model communicators. Parameters ---------- comm : MPI.Comm or <FakeComm> or None The communicator for the Problem. Returns ------- MPI.Comm or <FakeComm> or None The communicator for the Problem model. """ self._problem_comm = comm procs_per_model = self.options['procs_per_model'] if MPI and self.options['run_parallel']: full_size = comm.size size = full_size // procs_per_model if full_size != size * procs_per_model: raise RuntimeError("The total number of processors is not evenly divisible by the " "specified number of processors per model.\n Provide a " "number of processors that is a multiple of %d, or " "specify a number of processors per model that divides " "into %d." % (procs_per_model, full_size)) color = comm.rank % size model_comm = comm.Split(color) # Everything we need to figure out which case to run. self._concurrent_pop_size = size self._concurrent_color = color return model_comm self._concurrent_pop_size = 0 self._concurrent_color = 0 return comm def _setup_recording(self): """ Set up case recording. """ if MPI: run_parallel = self.options['run_parallel'] procs_per_model = self.options['procs_per_model'] for recorder in self._rec_mgr: if run_parallel: # write cases only on procs up to the number of parallel models # (i.e. on the root procs for the cases) if procs_per_model == 1: recorder.record_on_process = True else: size = self._problem_comm.size // procs_per_model if self._problem_comm.rank < size: recorder.record_on_process = True elif self._problem_comm.rank == 0: # if not running cases in parallel, then just record on proc 0 recorder.record_on_process = True super()._setup_recording() def _get_name(self): """ Get name of current Driver. Returns ------- str Name of current Driver. """ return "DifferentialEvolution"
[docs] def get_driver_objective_calls(self): """ Return number of objective evaluations made during a driver run. Returns ------- int Number of objective evaluations made during a driver run. """ return self._nfit
[docs] def get_driver_derivative_calls(self): """ Return number of derivative evaluations made during a driver run. Returns ------- int Number of derivative evaluations made during a driver run. """ return 0
[docs] def run(self): """ Execute the genetic algorithm. Returns ------- bool Failure flag; True if failed to converge, False is successful. """ model = self._problem().model ga = self._ga pop_size = self.options['pop_size'] max_gen = self.options['max_gen'] F = self.options['F'] Pc = self.options['Pc'] self._check_for_missing_objective() self._check_for_invalid_desvar_values() # Size design variables. desvars = self._designvars desvar_vals = self.get_design_var_values() count = 0 for name, meta in desvars.items(): size = meta['size'] self._desvar_idx[name] = (count, count + size) count += size lower_bound = np.empty((count, )) upper_bound = np.empty((count, )) x0 = np.empty(count) # Figure out bounds vectors and initial design vars for name, meta in desvars.items(): i, j = self._desvar_idx[name] lower_bound[i:j] = meta['lower'] upper_bound[i:j] = meta['upper'] x0[i:j] = desvar_vals[name] # Automatic population size. if pop_size == 0: pop_size = 20 * count desvar_new, obj, self._nfit = ga.execute_ga(x0, lower_bound, upper_bound, pop_size, max_gen, self._randomstate, F, Pc) # Pull optimal parameters back into framework and re-run, so that # framework is left in the right final state for name in desvars: i, j = self._desvar_idx[name] val = desvar_new[i:j] self.set_design_var(name, val) with RecordingDebugging(self._get_name(), self.iter_count, self) as rec: model.run_solve_nonlinear() rec.abs = 0.0 rec.rel = 0.0 self.iter_count += 1 return False
[docs] def objective_callback(self, x, icase): r""" Evaluate problem objective at the requested point. In case of multi-objective optimization, a simple weighted sum method is used: .. math:: f = (\sum_{k=1}^{N_f} w_k \cdot f_k)^a where :math:`N_f` is the number of objectives and :math:`a>0` is an exponential weight. Choosing :math:`a=1` is equivalent to the conventional weighted sum method. The weights given in the options are normalized, so: .. math:: \sum_{k=1}^{N_f} w_k = 1 If one of the objectives :math:`f_k` is not a scalar, its elements will have the same weights, and it will be normed with length of the vector. Takes into account constraints with a penalty function. All constraints are converted to the form of :math:`g_i(x) \leq 0` for inequality constraints and :math:`h_i(x) = 0` for equality constraints. The constraint vector for inequality constraints is the following: .. math:: g = [g_1, g_2 \dots g_N], g_i \in R^{N_{g_i}} h = [h_1, h_2 \dots h_N], h_i \in R^{N_{h_i}} The number of all constraints: .. math:: N_g = \sum_{i=1}^N N_{g_i}, N_h = \sum_{i=1}^N N_{h_i} The fitness function is constructed with the penalty parameter :math:`p` and the exponent :math:`\kappa`: .. math:: \Phi(x) = f(x) + p \cdot \sum_{k=1}^{N^g}(\delta_k \cdot g_k)^{\kappa} + p \cdot \sum_{k=1}^{N^h}|h_k|^{\kappa} where :math:`\delta_k = 0` if :math:`g_k` is satisfied, 1 otherwise .. note:: The values of :math:`\kappa` and :math:`p` can be defined as driver options. Parameters ---------- x : ndarray Value of design variables. icase : int Case number, used for identification when run in parallel. Returns ------- float Objective value. bool Success flag, True if successful. int Case number, used for identification when run in parallel. """ model = self._problem().model success = 1 objs = self.get_objective_values() nr_objectives = len(objs) # Single objective, if there is only one objective, which has only one element if nr_objectives > 1: is_single_objective = False else: for obj in objs.items(): is_single_objective = len(obj) == 1 break obj_exponent = self.options['multi_obj_exponent'] if self.options['multi_obj_weights']: # not empty obj_weights = self.options['multi_obj_weights'] else: # Same weight for all objectives, if not specified obj_weights = {name: 1. for name in objs.keys()} sum_weights = sum(obj_weights.values()) for name in self._designvars: i, j = self._desvar_idx[name] self.set_design_var(name, x[i:j]) # a very large number, but smaller than the result of nan_to_num in Numpy almost_inf = INF_BOUND # Execute the model with RecordingDebugging(self._get_name(), self.iter_count, self) as rec: self.iter_count += 1 try: model.run_solve_nonlinear() # Tell the optimizer that this is a bad point. except AnalysisError: model._clear_iprint() success = 0 obj_values = self.get_objective_values() if is_single_objective: # Single objective optimization for i in obj_values.values(): obj = i # First and only key in the dict else: # Multi-objective optimization with weighted sums weighted_objectives = np.array([]) for name, val in obj_values.items(): # element-wise multiplication with scalar # takes the average, if an objective is a vector try: weighted_obj = val * obj_weights[name] / val.size except KeyError: msg = ('Name "{}" in "multi_obj_weights" option ' 'is not an absolute name of an objective.') raise KeyError(msg.format(name)) weighted_objectives = np.hstack((weighted_objectives, weighted_obj)) obj = sum(weighted_objectives / sum_weights)**obj_exponent # Parameters of the penalty method penalty = self.options['penalty_parameter'] exponent = self.options['penalty_exponent'] if penalty == 0: fun = obj else: constraint_violations = np.array([]) for name, val in self.get_constraint_values().items(): con = self._cons[name] # The not used fields will either None or a very large number if (con['lower'] is not None) and np.any(con['lower'] > -almost_inf): diff = val - con['lower'] violation = np.array([0. if d >= 0 else abs(d) for d in diff]) elif (con['upper'] is not None) and np.any(con['upper'] < almost_inf): diff = val - con['upper'] violation = np.array([0. if d <= 0 else abs(d) for d in diff]) elif (con['equals'] is not None) and np.any(np.abs(con['equals']) < almost_inf): diff = val - con['equals'] violation = np.absolute(diff) constraint_violations = np.hstack((constraint_violations, violation)) fun = obj + penalty * sum(np.power(constraint_violations, exponent)) # Record after getting obj to assure they have # been gathered in MPI. rec.abs = 0.0 rec.rel = 0.0 return fun, success, icase
[docs]class DifferentialEvolution(object): """ Differential Evolution Genetic Algorithm. TODO : add better references than: https://en.wikipedia.org/wiki/Differential_evolution Parameters ---------- objfun : function Objective callback function. comm : MPI communicator or None The MPI communicator that will be used objective evaluation for each generation. model_mpi : None or tuple If the model in objfun is also parallel, then this will contain a tuple with the the total number of population points to evaluate concurrently, and the color of the point to evaluate on this rank. Attributes ---------- comm : MPI communicator or None The MPI communicator that will be used objective evaluation for each generation. lchrom : int Chromosome length. model_mpi : None or tuple If the model in objfun is also parallel, then this will contain a tuple with the the total number of population points to evaluate concurrently, and the color of the point to evaluate on this rank. npop : int Population size. objfun : function Objective function callback. """
[docs] def __init__(self, objfun, comm=None, model_mpi=None): """ Initialize genetic algorithm object. """ if lhs is None: raise RuntimeError(f"{self.__class__.__name__} requires the 'pyDOE3' package, " "which can be installed with one of the following commands:\n" " pip install openmdao[doe]\n" " pip install pyDOE3") self.objfun = objfun self.comm = comm self.lchrom = 0 self.npop = 0 self.model_mpi = model_mpi
[docs] def execute_ga(self, x0, vlb, vub, pop_size, max_gen, random_state, F=0.5, Pc=0.5): """ Perform the genetic algorithm. Parameters ---------- x0 : ndarray Initial design values. vlb : ndarray Lower bounds array. vub : ndarray Upper bounds array. pop_size : int Number of points in the population. max_gen : int Number of generations to run the GA. random_state : int Seed-number which controls the random draws. F : float Differential rate. Pc : float Crossover rate. Returns ------- ndarray Best design point. float Objective value at best design point. int Number of successful function evaluations. """ comm = self.comm xopt = copy.deepcopy(vlb) fopt = np.inf self.lchrom = len(x0) if np.mod(pop_size, 2) == 1: pop_size += 1 self.npop = int(pop_size) if self.comm is not None and self.comm.size > 1: if random_state is None: # if no random_state is given, generate one on rank 0 and broadcast it to all # ranks. Because we add the rank to the starting random state, no ranks will # have the same seed. rng = np.random.default_rng() random_state = rng.integers(2**31) # Must be less than 2^32-1 if self.comm.rank == 0: self.comm.bcast(random_state, root=0) else: random_state = self.comm.bcast(None, root=0) # add rank to ensure different seed in each MPI process seed = random_state + self.comm.rank else: seed = random_state rng = np.random.default_rng(seed) # create LHS initial population (scaled to bounds) + user initial condition population = lhs(self.lchrom, self.npop - 1, criterion='center', random_state=seed) population = population * (vub - vlb) + vlb # scale to bounds population = np.vstack((population, x0)) fitness = np.ones(self.npop) * np.inf # initialize fitness to infinitely bad # Main Loop nfit = 0 for generation in range(max_gen + 1): # Evaluate fitness of points in this generation if comm is not None: # Parallel # Since GA is random, ranks generate different new populations, so just take one # and use it on all. population = comm.bcast(population, root=0) cases = [((item, ii), None) for ii, item in enumerate(population)] # Pad the cases with some dummy cases to make the cases divisible amongst the procs. # TODO: Add a load balancing option to this driver. extra = len(cases) % comm.size if extra > 0: for j in range(comm.size - extra): cases.append(cases[-1]) results = concurrent_eval(self.objfun, cases, comm, allgather=True, model_mpi=self.model_mpi) fitness[:] = np.inf for result in results: returns, traceback = result if returns: val, success, ii = returns if success: fitness[ii] = val nfit += 1 else: # Print the traceback if it fails print('A case failed:') print(traceback) else: # Serial for ii in range(self.npop): fitness[ii], success, _ = self.objfun(population[ii], 0) nfit += 1 # Find best performing point in this generation. min_fit = np.min(fitness) min_index = np.argmin(fitness) min_gen = population[min_index] # Update overall best. if min_fit < fopt: fopt = min_fit xopt = min_gen if generation == max_gen: # finished break # Selection: new generation members replace parents, if better (implied elitism) if generation == 0: parentPop = copy.deepcopy(population) parentFitness = copy.deepcopy(fitness) else: for ii in range(self.npop): if fitness[ii] < parentFitness[ii]: # if child is better, else parent unchanged parentPop[ii] = population[ii] parentFitness[ii] = fitness[ii] # Evolve new generation. population = np.zeros(np.shape(parentPop)) fitness = np.ones(self.npop) * np.inf for ii in range(self.npop): # randomly select 3 different population members other than the current choice a, b, c = ii, ii, ii while a == ii: a = rng.integers(0, self.npop) while b == ii or b == a: b = rng.integers(0, self.npop) while c == ii or c == a or c == b: c = rng.integers(0, self.npop) # randomly select chromosome index for forced crossover r = rng.integers(0, self.lchrom) # crossover and mutation population[ii] = parentPop[ii] # start the same as parent # clip mutant so that it cannot be outside the bounds mutant = np.clip(parentPop[a] + F * (parentPop[b] - parentPop[c]), vlb, vub) # sometimes replace parent's feature with mutant's rr = rng.random(self.lchrom) idx = np.where(rr < Pc) population[ii][idx] = mutant[idx] population[ii][r] = mutant[r] # always replace at least one with mutant's return xopt, fopt, nfit