"""Define the COOmatrix class."""
import numpy as np
from numpy import ndarray
from scipy.sparse import coo_matrix
from openmdao.core.constants import INT_DTYPE
from openmdao.matrices.matrix import Matrix, _compute_index_map
[docs]class COOMatrix(Matrix):
"""
Sparse matrix in Coordinate list format.
Parameters
----------
comm : MPI.Comm or <FakeComm>
Communicator of the top-level system that owns the <Jacobian>.
is_internal : bool
If True, this is the int_mtx of an AssembledJacobian.
Attributes
----------
_coo : coo_matrix
COO matrix. Used as a basis for conversion to CSC, CSR, Dense in inherited classes.
"""
[docs] def __init__(self, comm, is_internal):
"""
Initialize all attributes.
"""
super().__init__(comm, is_internal)
self._coo = None
def _build_coo(self, system):
"""
Allocate the data, rows, and cols for the COO matrix.
Parameters
----------
system : <System>
Parent system of this matrix.
Returns
-------
(ndarray, ndarray, ndarray)
data, rows, cols that can be used to construct a COO matrix.
"""
submats = self._submats
metadata = self._metadata
key_ranges = self._key_ranges = {}
start = end = 0
for key, (info, loc, src_indices, shape, factor) in submats.items():
val = info['val']
rows = info['rows']
dense = (rows is None and (val is None or isinstance(val, ndarray)))
if dense:
if src_indices is None:
end += val.size
else:
end += shape[0] * src_indices.indexed_src_size
elif rows is None: # sparse matrix
end += val.data.size
else: # list sparse format
end += len(rows)
key_ranges[key] = (start, end, dense, rows)
start = end
if system is not None and system.under_complex_step:
data = np.zeros(end, dtype=complex)
else:
data = np.zeros(end)
rows = np.empty(end, dtype=INT_DTYPE)
cols = np.empty(end, dtype=INT_DTYPE)
for key, (start, end, dense, jrows) in key_ranges.items():
info, loc, src_indices, shape, factor = submats[key]
irow, icol = loc
val = info['val']
idxs = None
col_offset = row_offset = 0
if dense:
jac_type = ndarray
if src_indices is None:
colrange = range(col_offset, col_offset + shape[1])
else:
colrange = src_indices.shaped_array()
ncols = len(colrange)
subrows = rows[start:end]
subcols = cols[start:end]
for i in range(shape[0]):
subrows[i * ncols: (i + 1) * ncols] = i + row_offset
subcols[i * ncols: (i + 1) * ncols] = colrange
subrows += irow
subcols += icol
else: # sparse
if jrows is None:
jac_type = type(val)
jac = val.tocoo()
jrows = jac.row
jcols = jac.col
else:
jac_type = list
jcols = info['cols']
if src_indices is None:
rows[start:end] = jrows + (irow + row_offset)
cols[start:end] = jcols + (icol + col_offset)
else:
irows, icols, idxs = _compute_index_map(jrows, jcols,
irow, icol,
src_indices)
rows[start:end] = irows
cols[start:end] = icols
metadata[key] = (start, end, idxs, jac_type, factor)
return data, rows, cols
def _build(self, num_rows, num_cols, system=None):
"""
Allocate the matrix.
Parameters
----------
num_rows : int
number of rows in the matrix.
num_cols : int
number of cols in the matrix.
system : <System>
owning system.
"""
data, rows, cols = self._build_coo(system)
metadata = self._metadata
for key, (start, end, idxs, jac_type, factor) in metadata.items():
if idxs is None:
metadata[key] = (slice(start, end), jac_type, factor)
else:
# store reverse indices to avoid copying subjac data during
# update_submat.
metadata[key] = (np.argsort(idxs) + start, jac_type, factor)
self._matrix = self._coo = coo_matrix((data, (rows, cols)), shape=(num_rows, num_cols))
def _update_submat(self, key, jac):
"""
Update the values of a sub-jacobian.
Parameters
----------
key : (str, str)
the global output and input variable names.
jac : ndarray or scipy.sparse or tuple
the sub-jacobian, the same format with which it was declared.
"""
idxs, jac_type, factor = self._metadata[key]
if not isinstance(jac, jac_type) and (jac_type is list and not isinstance(jac, ndarray)):
raise TypeError("Jacobian entry for %s is of different type (%s) than "
"the type (%s) used at init time." % (key,
type(jac).__name__,
jac_type.__name__))
if isinstance(jac, ndarray):
self._matrix.data[idxs] = jac.flat
else: # sparse
self._matrix.data[idxs] = jac.data
if factor is not None:
self._matrix.data[idxs] *= factor
def _prod(self, in_vec, mode, mask=None):
"""
Perform a matrix vector product.
Parameters
----------
in_vec : ndarray[:]
incoming vector to multiply.
mode : str
'fwd' or 'rev'.
mask : ndarray of type bool, or None
Array used to zero out part of the matrix data.
Returns
-------
ndarray[:]
vector resulting from the product.
"""
# when we have a derivative based solver at a level below the
# group that owns the AssembledJacobian, we need to use only
# the part of the matrix that is relevant to the lower level
# system.
mat = self._matrix
# NOTE: mask applies only to ext_mtx.
if mode == 'fwd':
if mask is None:
return mat.dot(in_vec)
else:
save = mat.data[mask]
mat.data[mask] = 0.0
val = mat.dot(in_vec)
mat.data[mask] = save
return val
else: # rev
if mask is None:
return mat.T.dot(in_vec)
else:
save = mat.data[mask]
mat.data[mask] = 0.0
val = mat.T.dot(in_vec)
mat.data[mask] = save
return val
def _create_mask_cache(self, d_inputs):
"""
Create masking array for this matrix.
Note : this only applies when this Matrix is an 'ext_mtx' inside of a
Jacobian object.
Parameters
----------
d_inputs : Vector
The inputs linear vector.
Returns
-------
ndarray or None
The mask array or None.
"""
if d_inputs._in_matvec_context():
input_names = d_inputs._names
mask = None
for key, val in self._key_ranges.items():
if key[1] in input_names:
if mask is None:
mask = np.ones(self._matrix.data.size, dtype=bool)
start, stop, _, _ = val
mask[start:stop] = False
if mask is not None and np.any(mask):
# convert the mask indices (if necessary) base on sparse matrix type
# (CSC, CSR, etc.)
return self._convert_mask(mask)
[docs] def set_complex_step_mode(self, active):
"""
Turn on or off complex stepping mode.
When turned on, the value in each subjac is cast as complex, and when turned
off, they are returned to real values.
Parameters
----------
active : bool
Complex mode flag; set to True prior to commencing complex step.
"""
if active:
if 'complex' not in self._coo.dtype.__str__():
self._coo.data = self._coo.data.astype(complex)
self._coo.dtype = complex
else:
self._coo.data = self._coo.data.real
self._coo.dtype = float
def _convert_mask(self, mask):
"""
Convert the mask to the format of this sparse matrix (CSC, etc.) from COO.
Parameters
----------
mask : ndarray
The mask of indices to zero out.
Returns
-------
ndarray
The converted mask array.
"""
return mask