"""Definition of the Matrix Vector Product Component."""
import numpy as np
import scipy.linalg as spla
from openmdao.core.explicitcomponent import ExplicitComponent
[docs]class MatrixVectorProductComp(ExplicitComponent):
"""
Computes a vectorized matrix-vector product.
math::
b = np.dot(A, x)
where A is of shape (vec_size, n, m)
x is of shape (vec_size, m)
b is of shape (vec_size, m)
if vec_size > 1 and
where A is of shape (n, m)
x is of shape (m,)
b is of shape (m,)
otherwise.
The size of vectors x and b is determined by the number of rows in m at each point.
Parameters
----------
**kwargs : dict of keyword arguments
Keyword arguments that will be mapped into the Component options.
Attributes
----------
_products : list
Cache the data provided during `add_product`
so everything can be saved until setup is called.
"""
[docs] def __init__(self, **kwargs):
"""
Initialize the Matrix Vector Product component.
"""
super().__init__(**kwargs)
self._products = []
opt = self.options
self.add_product(b_name=opt['b_name'], A_name=opt['A_name'], x_name=opt['x_name'],
b_units=opt['b_units'], A_units=opt['A_units'], x_units=opt['x_units'],
vec_size=opt['vec_size'], A_shape=opt['A_shape'])
self._no_check_partials = True
[docs] def initialize(self):
"""
Declare options.
"""
self.options.declare('vec_size', types=int, default=1,
desc='The number of points at which the matrix vector product '
'is to be computed')
self.options.declare('A_name', types=str, default='A',
desc='The variable name for the matrix.')
self.options.declare('A_shape', types=tuple, default=(3, 3),
desc='The shape of the input matrix at a single point.')
self.options.declare('A_units', types=str, default=None, allow_none=True,
desc='The units of the input matrix.')
self.options.declare('x_name', types=str, default='x',
desc='The name of the input vector.')
self.options.declare('x_units', types=str, default=None, allow_none=True,
desc='The units of the input vector.')
self.options.declare('b_name', types=str, default='b',
desc='The variable name of the output vector.')
self.options.declare('b_units', types=str, default=None, allow_none=True,
desc='The units of the output vector.')
[docs] def add_product(self, b_name, A_name='A', x_name='x', A_units=None, x_units=None, b_units=None,
vec_size=1, A_shape=(3, 3)):
"""
Add a new output product to the matrix vector product component.
Parameters
----------
b_name : str
The name of the vector product output.
A_name : str
The name of the matrix input.
x_name : str
The name of the vector input.
A_units : str or None
The units of the input matrix.
x_units : str or None
The units of the input vector.
b_units : str or None
The units of the output matrix.
vec_size : int
The number of points at which the matrix vector product
should be computed simultaneously.
A_shape : tuple of (int, int)
The shape of the matrix at each point.
The first element also specifies the size of the output at each point.
The second element specifies the size of the input vector at each point.
For example, if vec_size=10 and shape is (5, 3), then
the input matrix will have a shape of (10, 5, 3),
the input vector will have a shape of (10, 3), and
the output vector will have shape of (10, 5).
"""
self._products.append({
'A_name': A_name,
'x_name': x_name,
'b_name': b_name,
'A_units': A_units,
'x_units': x_units,
'b_units': b_units,
'A_shape': A_shape,
'vec_size': vec_size
})
# add inputs and outputs for all products
if self._static_mode:
var_rel2meta = self._static_var_rel2meta
var_rel_names = self._static_var_rel_names
else:
var_rel2meta = self._var_rel2meta
var_rel_names = self._var_rel_names
n_rows, n_cols = A_shape
A_shape = (vec_size, n_rows, n_cols)
b_shape = (vec_size, n_rows) if vec_size > 1 else (n_rows, )
x_shape = (vec_size, n_cols)
if b_name not in var_rel2meta:
self.add_output(name=b_name, shape=b_shape, units=b_units)
elif b_name in var_rel_names['input']:
raise NameError(f"{self.msginfo}: '{b_name}' specified as an output, "
"but it has already been defined as an input.")
else:
raise NameError(f"{self.msginfo}: Multiple definition of output '{b_name}'.")
if A_name not in var_rel2meta:
self.add_input(name=A_name, shape=A_shape, units=A_units)
elif A_name in var_rel_names['output']:
raise NameError(f"{self.msginfo}: '{A_name}' specified as an input, "
"but it has already been defined as an output.")
else:
meta = var_rel2meta[A_name]
if vec_size != meta['shape'][0]:
raise ValueError(f"{self.msginfo}: Conflicting vec_size={x_shape[0]} "
f"specified for matrix '{A_name}', which has already "
f"been defined with vec_size={meta['shape'][0]}.")
elif (n_rows, n_cols) != meta['shape'][1:]:
raise ValueError(f"{self.msginfo}: Conflicting shape {A_shape[1:]} specified "
f"for matrix '{A_name}', which has already been defined "
f"with shape {meta['shape'][1:]}.")
elif A_units != meta['units']:
raise ValueError(f"{self.msginfo}: Conflicting units '{A_units}' specified "
f"for matrix '{A_name}', which has already been defined "
f"with units '{meta['units']}'.")
if x_name not in var_rel2meta:
self.add_input(name=x_name, shape=x_shape, units=x_units)
elif x_name in var_rel_names['output']:
raise NameError(f"{self.msginfo}: '{x_name}' specified as an input, "
"but it has already been defined as an output.")
else:
meta = var_rel2meta[x_name]
if vec_size != meta['shape'][0]:
raise ValueError(f"{self.msginfo}: Conflicting vec_size={x_shape[0]} "
f"specified for vector '{x_name}', which has already "
f"been defined with vec_size={meta['shape'][0]}.")
elif n_cols != meta['shape'][1]:
raise ValueError(f"{self.msginfo}: Matrix shape {A_shape[1:]} is incompatible "
f"with vector '{x_name}', which has already been defined "
f"with {meta['shape'][1]} column(s).")
elif x_units != meta['units']:
raise ValueError(f"{self.msginfo}: Conflicting units '{x_units}' specified "
f"for vector '{x_name}', which has already been defined "
f"with units '{meta['units']}'.")
# Make a dummy version of A so we can figure out the nonzero indices
A = np.ones(A_shape)
x = np.ones(x_shape)
bd_A = spla.block_diag(*A)
x_repeat = np.repeat(x, A.shape[1], axis=0)
bd_x_repeat = spla.block_diag(*x_repeat)
db_dx_rows, db_dx_cols = np.nonzero(bd_A)
db_dA_rows, db_dA_cols = np.nonzero(bd_x_repeat)
self.declare_partials(of=b_name, wrt=A_name,
rows=db_dA_rows, cols=db_dA_cols)
self.declare_partials(of=b_name, wrt=x_name,
rows=db_dx_rows, cols=db_dx_cols)
[docs] def compute(self, inputs, outputs):
"""
Compute the matrix vector product of inputs `A` and `x` using np.einsum.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
outputs : Vector
Unscaled, dimensional output variables read via outputs[key].
"""
for product in self._products:
A = inputs[product['A_name']]
x = inputs[product['x_name']]
# ... here allows b to be shaped either (n, i) or (i,)
outputs[product['b_name']][...] = np.einsum('nij,nj->ni', A, x)
[docs] def compute_partials(self, inputs, partials):
"""
Compute the sparse partials for the matrix vector product w.r.t. the inputs.
Parameters
----------
inputs : Vector
Unscaled, dimensional input variables read via inputs[key].
partials : Jacobian
Sub-jac components written to partials[output_name, input_name].
"""
for product in self._products:
A_name = product['A_name']
x_name = product['x_name']
b_name = product['b_name']
A = inputs[A_name]
x = inputs[x_name]
# Use the following for sparse partials
partials[b_name, A_name] = np.repeat(x, A.shape[1], axis=0).ravel()
partials[b_name, x_name] = A.ravel()