Source code for openmdao.surrogate_models.kriging

"""Surrogate model based on Kriging."""
from six.moves import zip, range

import numpy as np
import scipy.linalg as linalg
from scipy.optimize import minimize

from openmdao.surrogate_models.surrogate_model import SurrogateModel
from openmdao.utils.general_utils import warn_deprecation

MACHINE_EPSILON = np.finfo(np.double).eps


[docs]class KrigingSurrogate(SurrogateModel): """ Surrogate Modeling method based on the simple Kriging interpolation. Predictions are returned as a tuple of mean and RMSE. Based on Gaussian Processes for Machine Learning (GPML) by Rasmussen and Williams. (see also: scikit-learn). Attributes ---------- alpha : ndarray Reduced likelihood parameter: alpha L : ndarray Reduced likelihood parameter: L n_dims : int Number of independents in the surrogate n_samples : int Number of training points. sigma2 : ndarray Reduced likelihood parameter: sigma squared thetas : ndarray Kriging hyperparameters. X : ndarray Training input values, normalized. X_mean : ndarray Mean of training input values, normalized. X_std : ndarray Standard deviation of training input values, normalized. Y : ndarray Training model response values, normalized. Y_mean : ndarray Mean of training model response values, normalized. Y_std : ndarray Standard deviation of training model response values, normalized. """
[docs] def __init__(self, **kwargs): """ Initialize all attributes. Parameters ---------- **kwargs : dict options dictionary. """ super(KrigingSurrogate, self).__init__(**kwargs) self.n_dims = 0 # number of independent self.n_samples = 0 # number of training points self.thetas = np.zeros(0) self.alpha = np.zeros(0) self.L = np.zeros(0) self.sigma2 = np.zeros(0) # Normalized Training Values self.X = np.zeros(0) self.Y = np.zeros(0) self.X_mean = np.zeros(0) self.X_std = np.zeros(0) self.Y_mean = np.zeros(0) self.Y_std = np.zeros(0)
def _declare_options(self): """ Declare options before kwargs are processed in the init method. """ self.options.declare('eval_rmse', types=bool, default=False, desc="Flag indicating whether the Root Mean Squared Error (RMSE) " "should be computed. Set to False by default.") # nugget smoothing parameter from [Sasena, 2002] self.options.declare('nugget', default=10. * MACHINE_EPSILON, desc="Nugget smoothing parameter for smoothing noisy data. Represents " "the variance of the input values. If nugget is an ndarray, it " "must be of the same length as the number of training points. " "Default: 10. * Machine Epsilon")
[docs] def train(self, x, y): """ Train the surrogate model with the given set of inputs and outputs. Parameters ---------- x : array-like Training input locations y : array-like Model responses at given inputs. """ super(KrigingSurrogate, self).train(x, y) x, y = np.atleast_2d(x, y) self.n_samples, self.n_dims = x.shape if self.n_samples <= 1: raise ValueError('KrigingSurrogate require at least 2 training points.') # Normalize the data X_mean = np.mean(x, axis=0) X_std = np.std(x, axis=0) Y_mean = np.mean(y, axis=0) Y_std = np.std(y, axis=0) X_std[X_std == 0.] = 1. Y_std[Y_std == 0.] = 1. X = (x - X_mean) / X_std Y = (y - Y_mean) / Y_std self.X = X self.Y = Y self.X_mean, self.X_std = X_mean, X_std self.Y_mean, self.Y_std = Y_mean, Y_std def _calcll(thetas): """Calculate loglike (callback function).""" loglike = self._calculate_reduced_likelihood_params(np.exp(thetas))[0] return -loglike bounds = [(np.log(1e-5), np.log(1e5)) for _ in range(self.n_dims)] optResult = minimize(_calcll, 1e-1 * np.ones(self.n_dims), method='slsqp', options={'eps': 1e-3}, bounds=bounds) if not optResult.success: raise ValueError( 'Kriging Hyper-parameter optimization failed: {0}'.format(optResult.message)) self.thetas = np.exp(optResult.x) _, params = self._calculate_reduced_likelihood_params() self.alpha = params['alpha'] self.U = params['U'] self.S_inv = params['S_inv'] self.Vh = params['Vh'] self.sigma2 = params['sigma2']
def _calculate_reduced_likelihood_params(self, thetas=None): """ Calculate quantity with same maximum location as the log-likelihood for a given theta. Parameters ---------- thetas : ndarray, optional Given input correlation coefficients. If none given, uses self.thetas from training. Returns ------- ndarray Calculated reduced_likelihood dict Dictionary containing the parameters. """ if thetas is None: thetas = self.thetas X, Y = self.X, self.Y params = {} # Correlation Matrix distances = np.zeros((self.n_samples, self.n_dims, self.n_samples)) for i in range(self.n_samples): distances[i, :, i + 1:] = np.abs(X[i, ...] - X[i + 1:, ...]).T distances[i + 1:, :, i] = distances[i, :, i + 1:].T R = np.exp(-thetas.dot(np.square(distances))) R[np.diag_indices_from(R)] = 1. + self.options['nugget'] [U, S, Vh] = linalg.svd(R) # Penrose-Moore Pseudo-Inverse: # Given A = USV^* and Ax=b, the least-squares solution is # x = V S^-1 U^* b. # Tikhonov regularization is used to make the solution significantly # more robust. h = 1e-8 * S[0] inv_factors = S / (S ** 2. + h ** 2.) alpha = Vh.T.dot(np.einsum('j,kj,kl->jl', inv_factors, U, Y)) logdet = -np.sum(np.log(inv_factors)) sigma2 = np.dot(Y.T, alpha).sum(axis=0) / self.n_samples reduced_likelihood = -(np.log(np.sum(sigma2)) + logdet / self.n_samples) params['alpha'] = alpha params['sigma2'] = sigma2 * np.square(self.Y_std) params['S_inv'] = inv_factors params['U'] = U params['Vh'] = Vh return reduced_likelihood, params
[docs] def predict(self, x): """ Calculate predicted value of the response based on the current trained model. Parameters ---------- x : array-like Point at which the surrogate is evaluated. Returns ------- ndarray Kriging prediction. ndarray, optional (if eval_rmse is True) Root mean square of the prediction error. """ super(KrigingSurrogate, self).predict(x) thetas = self.thetas if isinstance(x, list): x = np.array(x) x = np.atleast_2d(x) n_eval = x.shape[0] # Normalize input x_n = (x - self.X_mean) / self.X_std r = np.zeros((n_eval, self.n_samples), dtype=x.dtype) for r_i, x_i in zip(r, x_n): r_i[:] = np.exp(-thetas.dot(np.square((x_i - self.X).T))) # Scaled Predictor y_t = np.dot(r, self.alpha) # Predictor y = self.Y_mean + self.Y_std * y_t if self.options['eval_rmse']: mse = (1. - np.dot(np.dot(r, self.Vh.T), np.einsum('j,kj,lk->jl', self.S_inv, self.U, r))) * self.sigma2 # Forcing negative RMSE to zero if negative due to machine precision mse[mse < 0.] = 0. return y, np.sqrt(mse) return y
[docs] def linearize(self, x): """ Calculate the jacobian of the Kriging surface at the requested point. Parameters ---------- x : array-like Point at which the surrogate Jacobian is evaluated. Returns ------- ndarray Jacobian of surrogate output wrt inputs. """ thetas = self.thetas # Normalize Input x_n = (x - self.X_mean) / self.X_std r = np.exp(-thetas.dot(np.square((x_n - self.X).T))) # Z = einsum('i,ij->ij', X, Y) is equivalent to, but much faster and # memory efficient than, diag(X).dot(Y) for vector X and 2D array Y. # i.e., Z[i,j] = X[i]*Y[i,j] gradr = r * -2 * np.einsum('i,ij->ij', thetas, (x_n - self.X).T) jac = np.einsum('i,j,ij->ij', self.Y_std, 1. / self.X_std, gradr.dot(self.alpha).T) return jac
[docs]class FloatKrigingSurrogate(KrigingSurrogate): """ Deprecated. """
[docs] def __init__(self, **kwargs): """ Capture Initialize to throw warning. Parameters ---------- **kwargs : dict Deprecated arguments. """ warn_deprecation("'FloatKrigingSurrogate' has been deprecated. Use " "'KrigingSurrogate' instead.") super(FloatKrigingSurrogate, self).__init__(**kwargs)