"""Components for use in `CycleGroup`. For details, see `CycleGroup`."""
import numpy as np
import scipy.sparse as sparse
import unittest
from openmdao.core.explicitcomponent import ExplicitComponent
PSI = 1.
_vec_terms = {}
def _compute_vector_terms(system_size):
# Try/Except pattern is much faster than if key in ... if the key is present (which it will be
# outside of the first invocation).
try:
return _vec_terms[system_size]
except KeyError:
u = np.zeros(system_size)
u[[0, -1]] = np.sqrt(2)/2
v = np.zeros(system_size)
v[1:-1] = 1 / np.sqrt(system_size - 2)
cross_terms = np.outer(v, u) - np.outer(u, v)
same_terms = np.outer(u, u) + np.outer(v, v)
_vec_terms[system_size] = u, v, cross_terms, same_terms
return u, v, cross_terms, same_terms
def _compute_A(system_size, theta):
u, v, cross_terms, same_terms = _compute_vector_terms(system_size)
return (np.eye(system_size)
+ np.sin(theta) * cross_terms
+ (np.cos(theta) - 1) * same_terms)
def _compute_dA(system_size, theta):
u, v, cross_terms, same_terms = _compute_vector_terms(system_size)
return np.cos(theta) * cross_terms - np.sin(theta) * same_terms
[docs]def array_idx(i, var_size):
return slice(i * var_size, (i + 1) * var_size)
[docs]class ExplicitCycleComp(ExplicitComponent):
def _inputs_to_vector(self, inputs):
var_shape = self.options['var_shape']
num_var = self.options['num_var']
size = np.prod(var_shape)
x = np.zeros(num_var * size)
for i in range(num_var):
x_i = inputs[self._cycle_names['x'].format(i)].flat
x[size * i:size * (i + 1)] = x_i
return x
def _vector_to_outputs(self, vec, outputs):
var_shape = self.options['var_shape']
num_var = self.options['num_var']
size = np.prod(var_shape)
for i in range(num_var):
y_i = vec[size * i:size * (i + 1)].reshape(var_shape)
outputs[self._cycle_names['y'].format(i)] = y_i
def __str__(self):
return 'Explicit Cycle Component'
[docs] def initialize(self):
self.options.declare('jacobian_type', default='matvec',
values=['matvec', 'dense', 'sparse-csc'],
desc='method of assembling derivatives')
self.options.declare('partial_type', default='array',
values=['array', 'sparse', 'aij'],
desc='type of partial derivatives')
self.options.declare('num_var', types=int, default=1,
desc='Number of variables per component')
self.options.declare('var_shape', types=tuple, default=(3,),
desc='Shape of each variable')
self.options.declare('index', types=int,
desc='Index of the component. Used for testing implicit connections')
self.options.declare('connection_type', default='explicit',
values=['explicit', 'implicit'],
desc='How to connect variables.')
self.options.declare('partial_method', default='exact',
values=('exact', 'fd', 'cs'),
desc='How derivatives should be solved (exact, fd, or cs)')
self.options.declare('num_comp', types=int, default=2,
desc='Total number of components')
self.angle_param = 'theta'
self._cycle_names = {}
def _init_parameterized(self):
self.num_var = self.options['num_var']
self.var_shape = self.options['var_shape']
self.size = self.num_var * np.prod(self.var_shape)
if self.options['jacobian_type'] == 'matvec':
self.compute_jacvec_product = self.jacvec_product
self.matrix_free = True
if self.options['connection_type'] == 'implicit':
idx = self.options['index']
self._cycle_names['x'] = 'x_{}_{{}}'.format(idx)
self._cycle_names['y'] = 'x_{}_{{}}'.format(idx + 1)
self._cycle_names['theta'] = 'theta_{}'.format(idx)
self._cycle_names['theta_out'] = 'theta_{}'.format(idx + 1)
num_var = self.options['num_var']
self._cycle_promotes_in = [self._cycle_names['x'].format(i) for i in range(num_var)]
self._cycle_promotes_out = [self._cycle_names['y'].format(i) for i in range(num_var)]
self._cycle_promotes_in.append(self._cycle_names['theta'])
self._cycle_promotes_out.append(self._cycle_names['theta_out'])
else:
self._cycle_names['x'] = 'x_{}'
self._cycle_names['y'] = 'y_{}'
self._cycle_names['theta'] = 'theta'
self._cycle_names['theta_out'] = 'theta_out'
self._cycle_promotes_in = self._cycle_promotes_out = []
[docs] def setup(self):
for i in range(self.num_var):
self.add_input(self._cycle_names['x'].format(i), shape=self.var_shape)
self.add_output(self._cycle_names['y'].format(i), shape=self.var_shape)
self.add_input(self._cycle_names['theta'], val=1.)
self.add_output(self._cycle_names['theta_out'], shape=(1,))
[docs] def setup_partials(self):
# Setup partials
pd_type = self.options['partial_type']
if self.options['partial_method'] != 'exact':
if self.options['jacobian_type'] == 'matvec':
raise unittest.SkipTest('not testing FD and matvec')
if pd_type != 'array':
raise unittest.SkipTest('only dense FD supported')
self.declare_partials('*', '*', method=self.options['partial_method'])
elif self.options['jacobian_type'] != 'matvec' and pd_type != 'array':
num_var = self.num_var
var_shape = self.var_shape
var_size = np.prod(var_shape)
A = np.ones((self.size, self.size))
dA_x = np.ones((self.size, 1))
dtheta = np.array([[1.]])
angle_param = self._cycle_names[self.angle_param]
# if our subjacs are not dense, we must assign values here that
# match their type (data values don't matter, only structure).
# Otherwise, we assume they are dense and we'll get an error later
# when we assign a subjac with a type that doesn't match.
for out_idx in range(num_var):
out_var = self._cycle_names['y'].format(out_idx)
for in_idx in range(num_var):
in_var = self._cycle_names['x'].format(in_idx)
Aij = A[array_idx(out_idx, var_size), array_idx(in_idx, var_size)]
self.declare_partials(out_var, in_var,
**self._array2kwargs(Aij, pd_type, 'exact'))
self.declare_partials(out_var, angle_param,
**self._array2kwargs(dA_x[array_idx(out_idx, var_size)],
pd_type, 'exact'))
self.declare_partials(self._cycle_names['theta_out'], self._cycle_names['theta'],
**self._array2kwargs(dtheta, pd_type, 'exact'))
else:
# Declare everything
self.declare_partials(of='*', wrt='*')
[docs] def compute(self, inputs, outputs):
theta = inputs[self._cycle_names['theta']]
A = _compute_A(self.size, theta)
x = self._inputs_to_vector(inputs)
y = A.dot(x)
self._vector_to_outputs(y, outputs)
outputs[self._cycle_names['theta_out']] = theta
[docs] def jacvec_product(self, inputs, d_inputs, d_outputs, mode):
angle_param = self._cycle_names[self.angle_param]
x = self._inputs_to_vector(inputs)
angle = inputs[angle_param]
A = _compute_A(self.size, angle)
dA = _compute_dA(self.size, angle)
var_shape = self.options['var_shape']
var_size = np.prod(var_shape)
num_var = self.options['num_var']
x_name = self._cycle_names['x']
y_name = self._cycle_names['y']
theta_name = self._cycle_names['theta']
theta_out_name = self._cycle_names['theta_out']
if mode == 'fwd':
for j in range(num_var):
x_j = x_name.format(j)
if x_j in d_inputs:
dx = d_inputs[x_j].flat[:]
for i in range(num_var):
y_i = y_name.format(i)
if y_i in d_outputs:
Aij = A[array_idx(i, var_size), array_idx(j, var_size)]
d_outputs[y_i] += Aij.dot(dx).reshape(var_shape)
if theta_name in d_inputs and theta_out_name in d_outputs:
dtheta = d_inputs[theta_name]
d_outputs[theta_out_name] += dtheta
if angle_param in d_inputs:
dangle = d_inputs[angle_param]
dy_dangle = (dA.dot(x)) * dangle
for i in range(num_var):
y_i = y_name.format(i)
if y_i in d_outputs:
d_outputs[y_i] += dy_dangle[array_idx(i, var_size)].reshape(var_shape)
elif mode == 'rev':
for i in range(num_var):
y_i = y_name.format(i)
if y_i in d_outputs:
dy_i = d_outputs[y_i].flat[:]
for j in range(num_var):
x_j = x_name.format(j)
if x_j in d_inputs:
Aij = A[array_idx(i, var_size), array_idx(j, var_size)]
d_inputs[x_j] += Aij.T.dot(dy_i).reshape(var_shape)
if angle_param in d_inputs:
dAij = dA[array_idx(i, var_size), array_idx(j, var_size)]
x_j_vec = inputs[x_j].flat[:]
d_inputs[angle_param] += x_j_vec.T.dot(dAij.T.dot(dy_i))
if theta_out_name in d_outputs and theta_name in d_inputs:
dtheta_out = d_outputs[theta_out_name]
d_inputs[theta_name] += dtheta_out
[docs] def make_jacobian_entry(self, A, pd_type):
if pd_type == 'aij':
return self.make_sub_jacobian(A, pd_type)[0]
return self.make_sub_jacobian(A, pd_type)
[docs] def make_sub_jacobian(self, A, pd_type):
if pd_type == 'array':
return A
if pd_type == 'sparse':
return sparse.csr_matrix(A)
if pd_type == 'aij':
data = []
rows = []
cols = []
A = np.atleast_2d(A)
for i in range(A.shape[0]):
for j in range(A.shape[1]):
if np.abs(A[i, j]) > 1e-15:
data.append(A[i, j])
rows.append(i)
cols.append(j)
return [np.array(data), np.array(rows), np.array(cols)]
raise ValueError('Unknown partial_type: {}'.format(pd_type))
def _array2kwargs(self, arr, pd_type, method):
jac = self.make_sub_jacobian(arr, pd_type)
if pd_type == 'aij':
return {'val': jac[0], 'rows': jac[1], 'cols': jac[2], 'method': method}
else:
return {'val': jac, 'method': method}
[docs] def compute_partials(self, inputs, partials):
if self.options['jacobian_type'] != 'matvec' and self.options['partial_method'] == 'exact':
angle_param = self._cycle_names[self.angle_param]
angle = inputs[angle_param]
num_var = self.num_var
var_shape = self.var_shape
var_size = np.prod(var_shape)
x = self._inputs_to_vector(inputs)
size = self.size
A = _compute_A(size, angle)
dA = _compute_dA(size, angle)
dA_x = np.atleast_2d(dA.dot(x)).T
pd_type = self.options['partial_type']
dtheta = np.array([[1.]])
y_name = self._cycle_names['y']
x_name = self._cycle_names['x']
for out_idx in range(num_var):
out_var = y_name.format(out_idx)
for in_idx in range(num_var):
in_var = x_name.format(in_idx)
Aij = A[array_idx(out_idx, var_size), array_idx(in_idx, var_size)]
J_y_x = self.make_jacobian_entry(Aij, pd_type)
J_y_angle = self.make_jacobian_entry(dA_x[array_idx(out_idx, var_size)],
pd_type)
partials[out_var, in_var] = J_y_x
partials[out_var, angle_param] = J_y_angle
theta_out = self._cycle_names['theta_out']
theta = self._cycle_names['theta']
partials[theta_out, theta] = self.make_jacobian_entry(dtheta, pd_type)
[docs]class ExplicitFirstComp(ExplicitCycleComp):
def __str__(self):
return 'Explicit Cycle Component - First'
[docs] def setup(self):
self.add_input('psi', val=1.)
self.angle_param = 'psi'
self._cycle_names['psi'] = 'psi'
super().setup()
[docs] def compute(self, inputs, outputs):
theta = inputs[self._cycle_names['theta']]
psi = inputs[self._cycle_names['psi']]
A = _compute_A(self.size, psi)
y = A.dot(np.ones(self.size))
self._vector_to_outputs(y, outputs)
outputs[self._cycle_names['theta_out']] = theta
[docs]class ExplicitLastComp(ExplicitFirstComp):
def __str__(self):
return 'Explicit Cycle Component - Last'
[docs] def setup(self):
super().setup()
self.add_output('x_norm2', shape=(1,))
self._n = 1
[docs] def setup_partials(self):
# Setup partials
super().setup_partials()
pd_type = self.options['partial_type']
method = self.options['partial_method']
if self.options['jacobian_type'] != 'matvec' and pd_type != 'array':
x = np.ones(self.var_shape)
for i in range(self.options['num_var']):
in_var = self._cycle_names['x'].format(i)
self.declare_partials('x_norm2', in_var,
**self._array2kwargs(x.flatten(), pd_type, method))
self.declare_partials(self._cycle_names['theta_out'], self._cycle_names['psi'],
**self._array2kwargs(np.array([1.]), pd_type, method))
[docs] def compute(self, inputs, outputs):
theta = inputs[self._cycle_names['theta']]
psi = inputs[self._cycle_names['psi']]
k = self.options['num_comp']
x = self._inputs_to_vector(inputs)
outputs['x_norm2'] = 0.5*np.dot(x,x)
# theta_out has 1/2 the error as theta does to the correct angle.
outputs[self._cycle_names['theta_out']] = theta / 2 + (self._n * 2 * np.pi - psi) / (2 * k - 2)
[docs] def compute_partials(self, inputs, partials):
if self.options['jacobian_type'] != 'matvec' and self.options['partial_method'] == 'exact':
pd_type = self.options['partial_type']
for i in range(self.options['num_var']):
in_var = self._cycle_names['x'].format(i)
partials['x_norm2', in_var] = self.make_jacobian_entry(inputs[in_var].flat[:],
pd_type)
k = self.options['num_comp']
theta_out = self._cycle_names['theta_out']
theta = self._cycle_names['theta']
partials[theta_out, theta] = self.make_jacobian_entry(np.array([.5]), pd_type)
partials[theta_out, self._cycle_names['psi']] = \
self.make_jacobian_entry(np.array([-1/(2*k-2)]), pd_type)
[docs] def jacvec_product(self, inputs, d_inputs, d_outputs, mode):
if self.options['jacobian_type'] == 'matvec':
k = self.options['num_comp']
num_var = self.options['num_var']
theta_out = self._cycle_names['theta_out']
theta = self._cycle_names['theta']
psi = self._cycle_names['psi']
if mode == 'fwd':
if theta_out in d_outputs:
if theta in d_inputs:
d_outputs[theta_out] += 0.5 * d_inputs[theta]
if psi in d_inputs:
d_outputs[theta_out] += -d_inputs[psi] / (2 * k - 2)
for i in range(num_var):
in_var = self._cycle_names['x'].format(i)
if in_var in d_inputs and 'x_norm2' in d_outputs:
d_outputs['x_norm2'] += np.dot(inputs[in_var].flat, d_inputs[in_var].flat)
elif mode == 'rev':
if 'x_norm2' in d_outputs:
dxnorm = d_outputs['x_norm2']
for i in range(num_var):
x_i_name = self._cycle_names['x'].format(i)
if x_i_name in d_inputs:
d_inputs[x_i_name] += inputs[x_i_name] * dxnorm
if theta_out in d_outputs:
dtheta_out = d_outputs[theta_out]
if theta in d_inputs:
d_inputs[theta] += .5*dtheta_out
if psi in d_inputs:
d_inputs[psi] += -dtheta_out/(2*k-2)