"""
Utils for dealing with arrays.
"""
import sys
from itertools import product
import hashlib
import numpy as np
from scipy.sparse import coo_matrix, csr_matrix
from openmdao.core.constants import INT_DTYPE
from openmdao.utils.omnumba import numba
if sys.version_info >= (3, 8):
from math import prod
def shape_to_len(shape):
"""
Compute length given a shape tuple.
Parameters
----------
shape : tuple of int or None
Numpy shape tuple.
Returns
-------
int
Length of array.
"""
if shape is None:
return None
return prod(shape)
else:
[docs]
def shape_to_len(shape):
"""
Compute length given a shape tuple.
For realistic-dimension arrays, looping over the shape tuple is much faster than np.prod.
Parameters
----------
shape : tuple of int
Numpy shape tuple.
Returns
-------
int
Length of multidimensional array.
"""
if shape is None:
return None
length = 1
for dim in shape:
length *= dim
return length
[docs]
def evenly_distrib_idxs(num_divisions, arr_size):
"""
Return evenly distributed entries for the given array size.
Given a number of divisions and the size of an array, chop the array up
into pieces according to number of divisions, keeping the distribution
of entries as even as possible.
Parameters
----------
num_divisions : int
Number of parts to divide the array into.
arr_size : int
Number of entries in the array.
Returns
-------
tuple
A tuple of (sizes, offsets), where sizes and offsets contain values for all
divisions.
"""
base, leftover = divmod(arr_size, num_divisions)
sizes = np.full(num_divisions, base, dtype=INT_DTYPE)
# evenly distribute the remainder across size-leftover procs,
# instead of giving the whole remainder to one proc
sizes[:leftover] += 1
offsets = np.zeros(num_divisions, dtype=INT_DTYPE)
offsets[1:] = np.cumsum(sizes)[:-1]
return sizes, offsets
[docs]
def scatter_dist_to_local(dist_val, comm, sizes):
"""
Scatter a full distributed value to local values in each MPI process.
Parameters
----------
dist_val : ndarray
The full distributed value.
comm : MPI communicator
The MPI communicator.
sizes : ndarray
The array of sizes for each process.
Returns
-------
ndarray
The local value on this process.
"""
from openmdao.utils.mpi import MPI
offsets = np.zeros(sizes.shape, dtype=INT_DTYPE)
offsets[1:] = np.cumsum(sizes)[:-1]
local = np.zeros(sizes[comm.rank])
comm.Scatterv([dist_val, sizes, offsets, MPI.DOUBLE], local, root=0)
return local
[docs]
def get_evenly_distributed_size(comm, full_size):
"""
Return the size of the current rank's part of an array of the given size.
Given a communicator and the size of an array, chop the array up
into pieces according to the size of the communicator, keeping the
distribution of entries as even as possible.
Parameters
----------
comm : MPI communicator
The communicator we're distributing the array across.
full_size : int
Number of entries in the array.
Returns
-------
int
The size of this rank's part of the full distributed array.
"""
base, leftover = divmod(full_size, comm.size)
sizes = np.full(comm.size, base, dtype=INT_DTYPE)
# evenly distribute the remainder across full_size-leftover procs,
# instead of giving the whole remainder to one proc
sizes[:leftover] += 1
return sizes[comm.rank]
[docs]
def take_nth(rank, size, seq):
"""
Iterate returning every nth value.
Return an iterator over the sequence that returns every
nth element of seq based on the given rank within a group of
the given size. For example, if size = 2, a rank of 0 returns
even indexed elements and a rank of 1 returns odd indexed elements.
Parameters
----------
rank : int
MPI rank of this process.
size : int
Size of the array we're taking nth entries from.
seq : iter
Iterator containing the values being returned.
Yields
------
generator
"""
assert rank < size
it = iter(seq)
while True:
for proc in range(size):
if rank == proc:
try:
yield next(it)
except StopIteration:
return
else:
try:
next(it)
except StopIteration:
return
[docs]
def array_viz(arr, prob=None, of=None, wrt=None, stream=sys.stdout):
"""
Display the structure of a boolean array in a compact form.
If prob, of, and wrt are supplied, print the name of the response alongside
each row and print the names of the design vars, aligned with each column, at
the bottom.
Parameters
----------
arr : ndarray
Array being visualized.
prob : Problem or None
Problem object.
of : list of str or None
Names of response variables used in derivative calculation.
wrt : list of str or None
Names of design variables used in derivative calculation.
stream : file-like
Stream where output will be written.
"""
if len(arr.shape) != 2:
raise RuntimeError("array_viz only works for 2d arrays.")
if prob is not None:
if of is None:
of = prob.driver._get_ordered_nl_responses()
if wrt is None:
wrt = list(prob.driver._designvars)
if prob is None or of is None or wrt is None:
for r in range(arr.shape[0]):
for c in range(arr.shape[1]):
if arr[r, c]:
stream.write('x')
else:
stream.write('.')
stream.write(' %d\n' % r)
else:
row = 0
for res in of:
for r in range(row, row + prob.driver._responses[res]['size']):
col = 0
for dv in wrt:
for c in range(col, col + prob.driver._designvars[dv]['size']):
if arr[r, c]:
stream.write('x')
else:
stream.write('.')
col = c + 1
stream.write(' %d %s\n' % (r, res))
row = r + 1
start = 0
for name in wrt:
tab = ' ' * start
stream.write('%s|%s\n' % (tab, name))
start += prob.driver._designvars[name]['size']
[docs]
def array_connection_compatible(shape1, shape2):
"""
Return True if the two arrays shapes are compatible.
Array shapes are compatible if the underlying data has the same size and is
stored in the same contiguous order for the two shapes.
Parameters
----------
shape1 : tuple of int
Shape of the first array.
shape2 : tuple of int
Shape of the second array.
Returns
-------
bool
True if the two shapes are compatible for connection, else False.
"""
ashape1 = np.asarray(shape1, dtype=INT_DTYPE)
ashape2 = np.asarray(shape2, dtype=INT_DTYPE)
size1 = shape_to_len(ashape1)
size2 = shape_to_len(ashape2)
# Shapes are not connection-compatible if size is different
if size1 != size2:
return False
nz1 = np.where(ashape1 > 1)[0]
nz2 = np.where(ashape2 > 1)[0]
if len(nz1) > 0:
fundamental_shape1 = ashape1[np.min(nz1): np.max(nz1) + 1]
else:
fundamental_shape1 = np.ones((1,))
if len(nz2) > 0:
fundamental_shape2 = ashape2[np.min(nz2): np.max(nz2) + 1]
else:
fundamental_shape2 = np.ones((1,))
if len(fundamental_shape1) != len(fundamental_shape2):
return False
return np.all(fundamental_shape1 == fundamental_shape2)
[docs]
def tile_sparse_jac(data, rows, cols, nrow, ncol, num_nodes):
"""
Assemble arrays necessary to define a COO sparse jacobian for a vectorized component.
These arrays can also be passed to csc_matrix or csr_matrix to create CSC and CSR sparse
matrices.
Parameters
----------
data : ndarray
Array of values.
rows : index array
Array of row indices.
cols : index array
Array of column indices.
nrow : int
Number of rows in sub jacobian.
ncol : int
Number of columns in sub jacobian.
num_nodes : int
Number of vectorized copies to tile.
Returns
-------
ndarray, ndarray, ndarray
Arrays to define a COO sparse jacobian of size num_nodes*nrow by num_nodes*ncol.
"""
nnz = len(rows)
if np.ndim(data) == 0:
data = data * np.ones(nnz)
if np.ndim(nrow) > 0:
nrow = shape_to_len(nrow)
if np.ndim(ncol) > 0:
ncol = shape_to_len(ncol)
repeat_arr = np.repeat(np.arange(num_nodes), nnz)
data = np.tile(data, num_nodes)
rows = np.tile(rows, num_nodes) + repeat_arr * nrow
cols = np.tile(cols, num_nodes) + repeat_arr * ncol
return data, rows, cols
def _global2local_offsets(global_offsets):
"""
Given existing global offsets, return a copy with offsets localized to each process.
Parameters
----------
global_offsets : dict
Arrays of global offsets keyed by vec_name and deriv direction.
Returns
-------
dict
Arrays of local offsets keyed by vec_name and deriv direction.
"""
offsets = {}
for type_ in global_offsets:
goff = global_offsets[type_]
offsets[type_] = goff.copy()
if goff[0].size > 0:
# adjust offsets to be local in each process
offsets[type_] -= goff[:, 0].reshape((goff.shape[0], 1))
return offsets
[docs]
def convert_neg(arr, size):
"""
Convert negative indices based on full array size.
Parameters
----------
arr : ndarray
The index array.
size : int
The full size of the array.
Returns
-------
ndarray
The array with negative indices converted to positive.
"""
arr[arr < 0] += size
return arr
def _flatten_src_indices(src_indices, shape_in, shape_out, size_out):
"""
Convert src_indices into a flat, non-negative form.
Parameters
----------
src_indices : ndarray
Array of src_indices. Can be flat or multi-dimensional.
shape_in : tuple
Shape of the input variable.
shape_out : tuple
Shape of the output variable.
size_out : int
Size of the output variable.
Returns
-------
ndarray
The flattened src_indices.
"""
if len(shape_out) == 1 or shape_in == src_indices.shape:
return convert_neg(src_indices.ravel(), size_out)
entries = [list(range(x)) for x in shape_in]
cols = np.vstack([src_indices[i] for i in product(*entries)])
dimidxs = [convert_neg(cols[:, i], shape_out[i]) for i in range(cols.shape[1])]
return np.ravel_multi_index(dimidxs, shape_out)
[docs]
def sizes2offsets(size_array):
"""
For a given array of sizes, return an array of offsets.
Offsets will be computed using a flattened version of size_array and then
reshaped to match the shape of size_array.
Parameters
----------
size_array : ndarray
Array of sizes.
Returns
-------
ndarray
Array of offsets.
"""
offsets = np.zeros(size_array.size, dtype=size_array.dtype)
offsets[1:] = np.cumsum(size_array.flat)[:-1]
return offsets.reshape(size_array.shape)
[docs]
def abs_complex(x):
"""
Compute the absolute value of a complex-stepped vector.
Rather than taking a Euclidian norm, simply negate the values that are less than zero.
Parameters
----------
x : ndarray
Input array.
Returns
-------
ndarray
Complex-step absolute value of the array.
"""
idx_neg = np.where(x < 0)
x[idx_neg] = -x[idx_neg]
return x
[docs]
def dv_abs_complex(x, x_deriv):
"""
Compute the complex-step derivative of the absolute value function and its derivative.
Parameters
----------
x : ndarray
Input array, used for determining which elements to negate.
x_deriv : ndarray
Incominng partial derivative array, may have one additional dimension.
Returns
-------
ndarray
Absolute value applied to x.
ndarray
Absolute value applied to x_deriv.
"""
idx_neg = np.where(x < 0)
# Special case when x is (1, ) and x_deriv is (1, n).
if len(x_deriv.shape) == 1:
if idx_neg[0].size != 0:
return -x, -x_deriv
x[idx_neg] = -x[idx_neg]
x_deriv[idx_neg] = -x_deriv[idx_neg]
return x, x_deriv
[docs]
def rand_sparsity(shape, density_ratio, dtype=bool):
"""
Return a random boolean COO matrix of the given shape with given percent density.
Row and column indices are generated using random integers so some duplication
is possible, resulting in a matrix with somewhat lower density than specified.
Parameters
----------
shape : tuple
Desired shape of the matrix.
density_ratio : float
Approximate ratio of nonzero to zero entries in the desired matrix.
dtype : type
Specifies type of the values in the returned matrix.
Returns
-------
coo_matrix
A COO matrix with approximately the nonzero density desired.
"""
assert len(shape) == 2, f"shape must be a size 2 tuple but {shape} was given"
nrows, ncols = shape
nnz = int(nrows * ncols * density_ratio)
data = np.ones(nnz, dtype=dtype)
rows = np.random.randint(0, nrows, nnz)
cols = np.random.randint(0, ncols, nnz)
coo = coo_matrix((data, (rows, cols)), shape=shape)
# get rid of dup rows/cols
coo.sum_duplicates()
return coo
[docs]
def sparse_subinds(orig, inds):
"""
Compute new rows or cols resulting from applying inds on top of an existing sparsity pattern.
This only comes into play when we have an approx total jacobian where some dv/resp have
indices.
Parameters
----------
orig : ndarray
Either row or col indices (part of a subjac sparsity pattern).
inds : ndarray or list
Sub-indices introduced when adding a desvar or response.
Returns
-------
ndarray
New compressed rows or cols.
ndarray
Mask array that can be used to update subjac value and corresponding index array to orig.
"""
mask = np.zeros(orig.size, dtype=bool)
for i in inds:
mask |= orig == i
newsp = orig[mask]
# replace the index with the 'compressed' index after we've masked out entries
for r, i in enumerate(np.sort(inds)):
newsp[newsp == i] = r
return newsp, mask
[docs]
def identity_column_iter(column):
"""
Yield the given column with a 1 in each position.
This is useful if you don't want to allocate memory for the full sized identity matrix.
Note that this reuses the column array and assumes that the column array has not
been modified outside of this function.
Parameters
----------
column : ndarray
The array that will contain a column of the 'virtual' identity matrix.
Yields
------
ndarray
A column of the identity matrix.
"""
column[:] = 0
for i in range(column.size):
column[i - 1] = 0
column[i] = 1
yield column
[docs]
def array_hash(arr, alg=hashlib.sha1):
"""
Return a hash of the given numpy array.
arr must be C-contiguous.
Parameters
----------
arr : ndarray
The array to be hashed.
alg : hashing algorithm
Algorithm defaults to hashlib.sha1.
Returns
-------
str
The computed hash.
"""
return alg(arr.view(np.uint8)).hexdigest()
_randgen = np.random.default_rng()
[docs]
def get_random_arr(shape, comm=None, generator=None):
"""
Request a random array, ensuring that its value will be consistent across MPI processes.
Parameters
----------
shape : int
Shape of the random array.
comm : MPI communicator or None
All members of this communicator will receive the random array.
generator : random number generator or None
If not None, use this as the random number generator if on rank 0.
Returns
-------
ndarray
The random array.
"""
gen = generator if generator is not None else _randgen
if comm is None or comm.size == 1:
return gen.random(shape)
if comm.rank == 0:
arr = gen.random(shape)
else:
arr = np.empty(shape)
comm.Bcast(arr, root=0)
return arr
[docs]
class ValueRepeater(object):
"""
An iterable over a single value that repeats a given number of times.
Parameters
----------
val : object
The value to be repeated.
size : int
The number of times to repeat the value.
Attributes
----------
val : object
The value to be repeated.
size : int
The number of times to repeat the value.
Yields
------
object
The value.
"""
[docs]
def __init__(self, val, size):
"""
Initialize all attributes.
"""
self.val = val
self.size = size
[docs]
def __iter__(self):
"""
Return an iterator over the value.
Yields
------
object
The value.
"""
for i in range(self.size):
yield self.val
def __len__(self):
"""
Return the size of the value.
Returns
-------
int
The size of the value.
"""
return self.size
[docs]
def __contains__(self, item):
"""
Return True if the given item is equal to the value.
Parameters
----------
item : object
The item to be checked for containment.
"""
return item == self.val
[docs]
def __getitem__(self, idx):
"""
Return the value.
Parameters
----------
idx : int
The index of the value to be returned.
"""
i = idx
if idx < 0:
idx += self.size
if idx >= self.size:
raise IndexError(f"index {i} is out of bounds for size {self.size}")
return self.val
[docs]
def convert_nans_in_nested_list(val_as_list):
"""
Given a list, possibly nested, replace any numpy.nan values with the string "nan".
This is done since JSON does not handle nan. This code is used to pass variable values
to the N2 diagram.
The modifications to the list values are done in-place to avoid excessive copying of lists.
Parameters
----------
val_as_list : list
List, possibly nested, whose nan elements need to be converted.
"""
for i, val in enumerate(val_as_list):
if isinstance(val, list):
convert_nans_in_nested_list(val)
else:
if np.isnan(val):
val_as_list[i] = "nan"
elif np.isinf(val):
val_as_list[i] = "infinity"
else:
val_as_list[i] = val
[docs]
def convert_ndarray_to_support_nans_in_json(val):
"""
Given numpy array of arbitrary dimensions, return the equivalent nested list with nan replaced.
numpy.nan values are replaced with the string "nan".
Parameters
----------
val : ndarray
Numpy array to be converted.
Returns
-------
list
The equivalent list (possibly nested) with any nan values replaced with the string "nan".
"""
val = np.asarray(val)
# do a quick check for any nans or infs and if not we can avoid the slow check
nans = np.where(np.isnan(val))
infs = np.where(np.isinf(val))
if nans[0].size == 0 and infs[0].size == 0:
return val.tolist()
val_as_list = val.tolist()
convert_nans_in_nested_list(val_as_list)
return val_as_list
if numba is None:
allclose = np.allclose
def allzero(a):
"""
Return True if all elements of a are zero.
Parameters
----------
a : ndarray
Array to be checked for zeros.
Returns
-------
bool
True if all elements of a are zero.
"""
return not np.any(a)
else:
@numba.jit(nopython=True, nogil=True)
def allclose(a, b, rtol=3e-16, atol=3e-16):
"""
Return True if all elements of a and b are close within the given absolute tolerance.
Returns when the first non-close element is found. a and b must have the same size.
Parameters
----------
a : ndarray
First array to be compared.
b : ndarray
Second array to be compared.
rtol : float
Relative tolerance for comparison.
atol : float
Absolute tolerance for comparison.
Returns
-------
bool
True if all elements of a and b are close within the given absolute and
relative tolerance.
"""
for i in range(len(a)):
aval = a[i]
bval = b[i]
absdiff = aval - bval
if absdiff < 0.:
absdiff = -absdiff
if aval < 0.:
aval = -aval
if bval < 0.:
bval = -bval
if aval < bval:
vmax = rtol * bval
else:
vmax = rtol * aval
if atol > vmax:
vmax = atol
if absdiff > vmax:
return False
return True
[docs]
@numba.jit(nopython=True, nogil=True)
def allzero(a):
"""
Return True if all elements of a are zero.
Unlike np.any, this returns as soon as a non-zero element is found and so can be
faster for arrays having nonzero values. It's comparable in speed (slighly faster) to
'not np.any' for arrays that are all zeros.
Parameters
----------
a : ndarray
Array to be checked for zeros.
Returns
-------
bool
True if all elements of a are zero.
"""
for i in range(len(a)):
if a[i] != 0.:
return False
return True
[docs]
def submat_sparsity_iter(row_var_size_iter, col_var_size_iter, nzrows, nzcols, shape):
"""
Yield the sparsity of each submatrix, based on variable names and sizes.
Parameters
----------
row_var_size_iter : iterator of (name, size)
Iterator of row variable names and sizes.
col_var_size_iter : iterator of (name, size)
Iterator of column variable names and sizes.
nzrows : ndarray
Row indices of nonzero entries in the full matrix.
nzcols : ndarray
Column indices of nonzero entries in the full matrix.
shape : tuple
Shape of the full matrix.
Yields
------
tuple
(row_varname, col_varname, nonzero rows, nonzero cols, shape)
"""
row_start = row_end = 0
data = np.ones(nzrows.size, dtype=np.int8)
csr = csr_matrix((data, (nzrows, nzcols)), shape=shape)
col_iter = list(col_var_size_iter) # need to iterate over multiple times
for of, of_size in row_var_size_iter:
row_end += of_size
rowslice = csr[row_start:row_end, :]
row_start = row_end
csc = rowslice.tocsc()
col_start = col_end = 0
for wrt, wrt_size in col_iter:
col_end += wrt_size
submat = csc[:, col_start:col_end].tocoo()
col_start = col_end
if submat.row.size > 0: # only yield if nonzero
yield (of, wrt, submat.row, submat.col, submat.shape)