"""
Classes that handle array indexing.
"""
import sys
import numpy as np
from numpy.lib.stride_tricks import as_strided
from openmdao.core.constants import INT_DTYPE
from openmdao.utils.general_utils import shape2tuple
from openmdao.utils.array_utils import shape_to_len
from openmdao.utils.om_warnings import issue_warning
[docs]
def get_virtual_array(shape):
"""
Return a 'virtual' array of the given shape.
Only one element is actually allocated in memory.
Parameters
----------
shape : tuple
Shape of the desired array.
Returns
-------
ndarray
The virtual array.
"""
dummy = np.zeros(1, dtype=np.int8)
# Create a "virtual" view of the target shape.
# By setting strides to all zeros, every index points to the
# same memory address as the dummy array.
# This is O(1) memory regardless of original_shape size.
return as_strided(dummy, shape=shape, strides=(0,) * len(shape))
[docs]
def array2slice(arr):
"""
Try to convert an array to slice.
Conversion is only attempted for a 1D array.
Parameters
----------
arr : ndarray
The index array to be represented as a slice.
Returns
-------
slice or None
If slice conversion is possible, return the slice, else return None.
"""
if arr.ndim == 1 and arr.dtype.kind in ('i', 'u'):
if arr.size > 1: # see if 1D array will convert to slice
if arr[0] < 0 or arr[1] < 0:
return None
diffs = np.diff(arr)
step = int(diffs[0])
if step == 0:
return None
if np.all(diffs == step):
if step > 0:
return slice(int(arr[0]), int(arr[-1]) + 1, step)
elif arr[-1] > 0:
return slice(int(arr[0]), int(arr[-1]) - 1, step)
elif arr.size == 1:
if arr[0] >= 0:
return slice(int(arr[0]), int(arr[0]) + 1)
else:
return slice(0, 0)
[docs]
def combine_ranges(ranges):
"""
Combine a list of (start, end) tuples into the smallest possible list of contiguous ranges.
The ranges are assumed to be non-overlapping and in ascending order.
Parameters
----------
ranges : list
List of (start, end) tuples.
Returns
-------
list of tuples
List of combined ranges.
"""
rnglist = []
if not ranges:
return rnglist
it = iter(ranges)
cstart, cend = next(it)
for start, end in it:
if start == cend:
cend = end
else:
rnglist.append((cstart, cend))
cstart, cend = start, end
rnglist.append((cstart, cend))
return rnglist
[docs]
def ranges2indexer(ranges, src_shape=None):
"""
Convert a list of ranges to an indexer.
Parameters
----------
ranges : list
List of (start, end) tuples.
src_shape : tuple or None
The shape of the source array.
Returns
-------
Indexer
Indexer object.
"""
ranges = combine_ranges(ranges)
if len(ranges) == 1:
idx = slice(ranges[0][0], ranges[0][1])
elif len(ranges) == 0:
idx = slice(0, 0)
else:
idx = np.concatenate([range(start, end) for start, end in ranges])
if src_shape is None:
src_shape = (ranges[-1][1] - ranges[0][0],)
return indexer(idx, src_shape=src_shape, flat_src=True)
[docs]
class Indexer(object):
"""
Abstract indexing class.
Parameters
----------
flat_src : bool
True if we're treating the source as flat.
Attributes
----------
_src_shape : tuple or None
Shape of the 'source'. Used to determine actual index or slice values when indices are
negative or slice contains negative start or stop values or ':' or '...'.
_shaped_inst : Indexer or None
Cached shaped_instance if we've computed it before.
_flat_src : bool
If True, index is into a flat source array.
_dist_shape : tuple
Distributed shape of the source.
"""
_copy_attrs = {'_src_shape', '_dist_shape', '_flat_src'}
[docs]
def __init__(self, flat_src=None):
"""
Initialize attributes.
"""
self._src_shape = None
self._dist_shape = None
self._shaped_inst = None
self._flat_src = flat_src
def __call__(self):
"""
Return the indices in their most efficient form.
For example, if the original indices were an index array that is convertable to a slice,
then a slice would be returned.
This could be either an int, a slice, an index array, or a multidimensional 'fancy' index.
"""
raise NotImplementedError("No implementation of '__call__' found.")
def __str__(self):
"""
Return string representation.
Returns
-------
str
String representation.
"""
return f"[{repr(self)}]"
[docs]
def is_full_slice(self):
return False
[docs]
def indexed_val(self, arr):
"""
Return the value of the indices in the array.
Parameters
----------
arr : ndarray
The array to index into.
Returns
-------
ndarray
The result of indexing into the array.
"""
if self._flat_src:
return arr.ravel()[self.flat()]
else:
return arr[self()]
[docs]
def indexed_val_set(self, arr, val):
"""
Set the value of the indices in the array.
Parameters
----------
arr : ndarray
The array to index into.
val : ndarray
The value to set.
"""
if self._flat_src:
arr.ravel()[self.flat()] = val
else:
arr[self()] = val
[docs]
def copy(self, *args):
"""
Copy this Indexer.
Parameters
----------
*args : position args
Args that are specific to initialization of a derived Indexer.
Returns
-------
Indexer
A copy of this Indexer.
"""
inst = self.__class__(*args)
for attr in self._copy_attrs:
setattr(inst, attr, getattr(self, attr))
return inst
def _set_attrs(self, parent):
"""
Copy certain attributes from the parent to self.
Parameters
----------
parent : Indexer
Parent of this indexer.
Returns
-------
Indexer
This indexer.
"""
self._src_shape = parent._src_shape
self._flat_src = parent._flat_src
self._dist_shape = parent._dist_shape
return self
@property
def indexed_src_shape(self):
"""
Return the shape of the result if the indices were applied to a source array.
Returns
-------
tuple
The shape of the result.
"""
s = self.shaped_instance()
if s is None:
raise RuntimeError(f"Can't get indexed_src_shape of {self} because source shape "
"is unknown.")
if self._flat_src:
shape = (shape_to_len(self._src_shape),)
else:
shape = self._src_shape
return get_virtual_array(shape)[s()].shape
@property
def indexed_src_size(self):
"""
Return the size of the result if the index were applied to the source.
Returns
-------
int
Size of flattened indices.
"""
return shape_to_len(self.indexed_src_shape)
[docs]
def flat(self, copy=False):
"""
Return index array or slice into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
"""
raise NotImplementedError("No implementation of 'flat' found.")
[docs]
def shaped_instance(self):
"""
Return a 'shaped' version of this Indexer type.
This should be overridden for all non-shaped derived classes.
Returns
-------
Indexer
The 'shaped' Indexer type. 'shaped' Indexers know the extent of the array that
they are indexing into, or they don't care what the extent is because they don't
contain negative indices, negative start or stop, ':', or '...'.
"""
return self
[docs]
def shaped_array(self, copy=False, flat=True):
"""
Return an index array version of the indices that index into a flattened array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
flat : bool
If True, return a flat array.
Returns
-------
ndarray
Version of these indices that index into a flattened array.
"""
s = self.shaped_instance()
if s is None:
raise ValueError(f"Can't get shaped array of {self} because it has no source shape.")
return s.as_array(copy=copy, flat=flat)
[docs]
def set_src_shape(self, shape, dist_shape=None):
"""
Set the shape of the 'source' array .
Parameters
----------
shape : tuple or int
The shape of the 'source' array.
dist_shape : tuple or None
If not None, the full distributed shape of the source.
Returns
-------
Indexer
Self is returned to allow chaining.
"""
sshape, self._dist_shape, = self._get_shapes(shape, dist_shape)
if shape is not None:
if self._flat_src is None:
self._flat_src = len(sshape) <= 1
if sshape != self._src_shape:
self._src_shape = sshape
try:
self._check_bounds()
except Exception:
self._src_shape = None
self._dist_shape = None
raise
self._shaped_inst = None
return self
[docs]
def to_json(self):
"""
Return a JSON serializable version of self.
"""
raise NotImplementedError("No implementation of 'to_json' found.")
def _get_shapes(self, shape, dist_shape):
if shape is None:
return None, None
shape = shape2tuple(shape)
if self._flat_src:
shape = (shape_to_len(shape),)
if dist_shape is None:
return shape, shape
dist_shape = shape2tuple(dist_shape)
if self._flat_src:
dist_shape = (shape_to_len(dist_shape),)
return shape, dist_shape
[docs]
class ShapedIntIndexer(Indexer):
"""
Int indexing class.
Parameters
----------
idx : int
The index.
flat_src : bool
If True, source is treated as flat.
Attributes
----------
_idx : int
The integer index.
"""
[docs]
def __init__(self, idx, flat_src=None):
"""
Initialize attributes.
"""
super().__init__(flat_src)
self._idx = idx
def __call__(self):
"""
Return this index.
Returns
-------
int
This index.
"""
return self._idx
def __repr__(self):
"""
Return string representation.
"""
return f"{self._idx}"
[docs]
def apply_offset(self, offset, flat=True):
"""
Apply an offset to this index.
Parameters
----------
offset : int
The offset to apply.
flat : bool
If True, return a flat index.
Returns
-------
int
The offset index.
"""
return self._idx + offset
[docs]
def copy(self):
"""
Copy this Indexer.
Returns
-------
Indexer
A copy of this Indexer.
"""
return super().copy(self._idx)
@property
def min_src_dim(self):
"""
Return the number of source dimensions.
Returns
-------
int
The number of dimensions expected in the source array.
"""
return 1
[docs]
def as_array(self, copy=False, flat=True):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
flat : bool
If True, return a flat array.
Returns
-------
ndarray
The index array.
"""
return np.array([self._idx])
[docs]
def flat(self, copy=False):
"""
Return index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
Returns
-------
ndarray
The index into a flat array.
"""
return np.array([self._idx])
def _check_bounds(self):
"""
Check that indices are within the bounds of the source shape.
"""
if self._src_shape is not None and self._dist_shape:
if self._idx >= self._dist_shape[0] or self._idx < -self._dist_shape[0]:
raise IndexError(f"index {self._idx} is out of bounds of the source shape "
f"{self._dist_shape}.")
[docs]
def to_json(self):
"""
Return a JSON serializable version of self.
Returns
-------
int
Int version of self.
"""
return self._idx
[docs]
class IntIndexer(ShapedIntIndexer):
"""
Int indexing class that may or may not be 'shaped'.
Parameters
----------
idx : int
The index.
flat_src : bool or None
If True, treat source as flat.
"""
[docs]
def shaped_instance(self):
"""
Return a 'shaped' version of this Indexer type.
Returns
-------
ShapedIntIndexer or None
Will return a ShapedIntIndexer if possible, else None.
"""
if self._shaped_inst is not None:
return self._shaped_inst
if self._src_shape is None:
return None
if self._idx < 0:
self._shaped_inst = ShapedIntIndexer(self._idx + self._src_shape[0])
else:
self._shaped_inst = ShapedIntIndexer(self._idx)
return self._shaped_inst._set_attrs(self)
[docs]
class ShapedSliceIndexer(Indexer):
"""
Abstract slice class that is 'shaped'.
Parameters
----------
slc : slice
The slice.
flat_src : bool
If True, source is treated as flat.
Attributes
----------
_slice : slice
The wrapped slice object.
"""
[docs]
def __init__(self, slc, flat_src=None):
"""
Initialize attributes.
"""
super().__init__(flat_src)
if slc.step is None:
slc = slice(slc.start, slc.stop, 1)
self._slice = slc
def __call__(self):
"""
Return this slice.
Returns
-------
slice
This slice.
"""
return self._slice
def __repr__(self):
"""
Return string representation.
Returns
-------
str
String representation.
"""
start = self._slice.start if self._slice.start is not None else ''
stop = self._slice.stop if self._slice.stop is not None else ''
step = self._slice.step if self._slice.step is not None else 1
if step == 1:
step = ''
if step:
return f"{start}:{stop}:{step}"
elif start is not None:
return f"{start}:{stop}"
elif stop is not None:
return f":{stop}"
return ":"
[docs]
def is_full_slice(self):
if self._slice.stop is None and self._slice.start is None and \
self._slice.step in (None, 1):
return True
if self._src_shape is None:
return False
inds = self._slice.indices(self._src_shape[0])
return inds[0] == 0 and inds[1] == self._src_shape[0] and inds[2] == 1
[docs]
def apply_offset(self, offset, flat=True):
"""
Apply an offset to this index.
Parameters
----------
offset : int
The offset to apply.
flat : bool
If True, return a flat index.
Returns
-------
slice
The offset slice.
"""
return slice(self._slice.start + offset, self._slice.stop + offset, self._slice.step)
[docs]
def copy(self):
"""
Copy this Indexer.
Returns
-------
Indexer
A copy of this Indexer.
"""
return super().copy(self._slice)
[docs]
def as_array(self, copy=False, flat=True):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
flat : bool
If True, return a flat array.
Returns
-------
ndarray
The index array.
"""
if len(self._src_shape) == 1:
# Case 1: Requested flat or nonflat indices but src_shape is None or flat
# return a flattened arange
slc = self._slice
if slc.stop is None and slc.step < 0: # special case - neg step down to -1
return np.arange(self._src_shape[0], dtype=int)[slc]
else:
# use maxsize here since a shaped slice always has positive int start and stop
return np.arange(*slc.indices(sys.maxsize), dtype=int)
else:
src_size = shape_to_len(self._src_shape)
arr = np.arange(src_size, dtype=INT_DTYPE).reshape(self._src_shape)[self._slice].ravel()
if flat:
# Case 2: Requested flattened indices of multidimensional array
# Return indices into a flattened src.
return arr
else:
# Case 3: Requested non-flat indices of multidimensional array
# This is never called within OpenMDAO
return np.unravel_index(arr, shape=self._src_shape)
[docs]
def flat(self, copy=False):
"""
Return a slice into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
Returns
-------
slice
The slice into a flat array.
"""
# slices are immutable, so ignore copy arg
return self._slice
@property
def min_src_dim(self):
"""
Return the number of source dimensions.
Returns
-------
int
The number of dimensions expected in the source array.
"""
return 1
def _check_bounds(self):
"""
Check that indices are within the bounds of the source shape.
"""
# a slice with start or stop outside of the source range is allowed in numpy arrays
# and just results in an empty array, but in OpenMDAO that behavior would probably be
# unintended, so for now make it an error.
if self._src_shape is not None:
start = self._slice.start
stop = self._slice.stop
sz = shape_to_len(self._dist_shape)
if start != stop and ((start is not None and (start >= sz or start < -sz)
or (stop is not None and (stop > sz or stop < -sz)))):
raise IndexError(f"{self._slice} is out of bounds of the source shape "
f"{self._dist_shape}.")
[docs]
def to_json(self):
"""
Return a JSON serializable version of self.
Returns
-------
list of int or int
List or int version of self.
"""
return self.as_array().tolist()
[docs]
class SliceIndexer(ShapedSliceIndexer):
"""
Abstract slice class that may or may not be 'shaped'.
Parameters
----------
slc : slice
The slice.
flat_src : bool or None
If True, treat source as flat.
"""
[docs]
def shaped_instance(self):
"""
Return a 'shaped' version of this Indexer type.
Returns
-------
ShapedSliceIndexer or None
Will return a ShapedSliceIndexer if possible, else None.
"""
if self._shaped_inst is not None:
return self._shaped_inst
if self._src_shape is None:
return None
slc = self._slice
if slc.stop is None and slc.step < 0: # special backwards indexing case
self._shaped_inst = \
ShapedSliceIndexer(slc)
elif (slc.start is not None and slc.start < 0) or slc.stop is None or slc.stop < 0:
self._shaped_inst = \
ShapedSliceIndexer(slice(*self._slice.indices(self._src_shape[0])))
else:
self._shaped_inst = ShapedSliceIndexer(slc)
return self._shaped_inst._set_attrs(self)
[docs]
def as_array(self, copy=False, flat=True):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
flat : bool
If True, return a flat array.
Returns
-------
ndarray
The index array.
"""
return self.shaped_array(copy=copy, flat=flat)
[docs]
class ShapedArrayIndexer(Indexer):
"""
Abstract index array class that knows its source shape.
Parameters
----------
arr : ndarray
The index array.
flat_src : bool
If True, source is treated as flat.
Attributes
----------
_arr : ndarray
The wrapped index array object.
"""
[docs]
def __init__(self, arr, flat_src=None):
"""
Initialize attributes.
"""
super().__init__(flat_src)
ndarr = np.asarray(arr)
# check type
if ndarr.dtype.kind not in ('i', 'u'):
raise TypeError(f"Can't create an index array using indices of "
f"non-integral type '{ndarr.dtype.type.__name__}'.")
self._arr = ndarr
def __call__(self):
"""
Return this index array.
Returns
-------
int
This index array.
"""
return self._arr
def __repr__(self):
"""
Return string representation.
Returns
-------
str
String representation.
"""
return f"{self._arr.tolist()}".replace('\n', '')
[docs]
def apply_offset(self, offset, flat=True):
"""
Apply an offset to this index.
Parameters
----------
offset : int
The offset to apply.
flat : bool
If True, return a flat index.
Returns
-------
slice
The offset slice.
"""
return self.as_array(flat=flat) + offset
[docs]
def copy(self):
"""
Copy this Indexer.
Returns
-------
Indexer
A copy of this Indexer.
"""
return super().copy(self._arr)
@property
def min_src_dim(self):
"""
Return the number of source dimensions.
Returns
-------
int
The number of dimensions expected in the source array.
"""
return 1
[docs]
def as_array(self, copy=False, flat=True):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
flat : bool
If True, return a flat array.
Returns
-------
ndarray
The index array.
"""
if flat:
arr = self._arr.ravel()
else:
arr = self._arr
if copy:
return arr.copy()
return arr
[docs]
def flat(self, copy=False):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
Returns
-------
ndarray
The index into a flat array.
"""
if copy:
return self._arr.ravel().copy()
return self._arr.ravel()
def _check_bounds(self):
"""
Check that indices are within the bounds of the source shape.
"""
if self._src_shape is not None and self._arr.size > 0:
src_size = shape_to_len(self._dist_shape)
amax = np.max(self._arr)
ob = None
if amax >= src_size or -amax < -src_size:
ob = amax
if ob is None:
amin = np.min(self._arr)
if amin < 0 and -amin > src_size:
ob = amin
if ob is not None:
raise IndexError(f"index {ob} is out of bounds for source dimension of size "
f"{src_size}.")
[docs]
def to_json(self):
"""
Return a JSON serializable version of self.
Returns
-------
list of int or int
List or int version of self.
"""
return self().tolist()
[docs]
class ArrayIndexer(ShapedArrayIndexer):
"""
Abstract index array class that may or may not be 'shaped'.
Parameters
----------
arr : ndarray
The index array.
flat_src : bool or None
If True, treat source as flat.
"""
[docs]
def shaped_instance(self):
"""
Return a 'shaped' version of this Indexer type.
Returns
-------
ShapedArrayIndexer or None
Will return a ShapedArrayIndexer if possible, else None.
"""
if self._shaped_inst is not None:
return self._shaped_inst
if self._src_shape is None:
return None
negs = self._arr < 0
if np.any(negs):
sharr = self._arr.copy()
sharr[negs] += self._src_shape[0]
else:
sharr = self._arr
self._shaped_inst = ShapedArrayIndexer(sharr)
return self._shaped_inst._set_attrs(self)
@property
def indexed_src_shape(self):
"""
Return the shape of the result of indexing into the source.
Returns
-------
tuple
The shape of the index.
"""
if self._src_shape is None:
raise RuntimeError("Can't get index_src_shape because src_shape is None.")
if self._flat_src or len(self._src_shape) == 1:
return self._arr.shape
return super().indexed_src_shape
[docs]
class ShapedMultiIndexer(Indexer):
"""
Abstract multi indexer class that is 'shaped'.
Parameters
----------
tup : tuple
Tuple of indices/slices.
flat_src : bool
If True, treat source array as flat.
Attributes
----------
_tup : tuple
The wrapped tuple of indices/slices.
_idx_list : list
List of Indexers.
_remove_extra_brackets : bool
If True, remove extra brackets from the string representation.
"""
[docs]
def __init__(self, tup, flat_src=False):
"""
Initialize attributes.
"""
if flat_src and len(tup) > 1:
raise RuntimeError(f"Can't index into a flat array with an indexer expecting {len(tup)}"
" dimensions.")
super().__init__(flat_src)
self._remove_extra_brackets = False
self._tup = tup
self._set_idx_list()
def _set_idx_list(self):
self._idx_list = []
for i in self._tup:
if isinstance(i, (np.ndarray, list)): # need special handling here for ndim > 1 arrays
self._idx_list.append(ArrayIndexer(i, flat_src=self._flat_src))
if self._idx_list[-1]._arr.ndim > 1:
self._remove_extra_brackets = True
else:
self._idx_list.append(indexer(i, flat_src=self._flat_src))
def __call__(self):
"""
Return this multidimensional index.
Returns
-------
int
This multidimensional index.
"""
return tuple(i() for i in self._idx_list)
def __str__(self):
"""
Return string representation.
Returns
-------
str
String representation.
"""
if self._remove_extra_brackets:
return repr(self)
return f"[{repr(self)}]"
def __repr__(self):
"""
Return string representation.
Returns
-------
str
String representation.
"""
return f"{', '.join(repr(i) for i in self._idx_list)}"
[docs]
def apply_offset(self, offset, flat=True):
"""
Apply an offset to this index.
Parameters
----------
offset : int
The offset to apply.
flat : bool
If True, return a flat index.
Returns
-------
ndarray
The offset array.
"""
if flat:
return self.flat() + offset
return self.as_array(flat=False) + offset
[docs]
def copy(self):
"""
Copy this Indexer.
Returns
-------
Indexer
A copy of this Indexer.
"""
return super().copy(self._tup)
@property
def min_src_dim(self):
"""
Return the number of source dimensions.
Returns
-------
int
The number of dimensions expected in the source array.
"""
return len(self._idx_list)
[docs]
def as_array(self, copy=False, flat=True):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
flat : bool
If True, return a flat array.
Returns
-------
ndarray
The index array into a flat array.
"""
if self._src_shape is None:
raise ValueError("Can't determine extent of array because source shape is not known.")
size = shape_to_len(self._src_shape)
idxs = np.arange(size, dtype=INT_DTYPE).reshape(self._src_shape)
if flat:
return idxs[self()].ravel()
else:
return idxs[self()]
[docs]
def flat(self, copy=False):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
Returns
-------
ndarray
An index array into a flat array.
"""
return self.shaped_array(copy=copy, flat=True)
[docs]
def set_src_shape(self, shape, dist_shape=None):
"""
Set the shape of the 'source' array .
Parameters
----------
shape : tuple or int
The shape of the 'source' array.
dist_shape : tuple or None
If not None, the full distributed shape of the source.
Returns
-------
Indexer
Self is returned to allow chaining.
"""
self._check_src_shape(shape2tuple(shape))
super().set_src_shape(shape, dist_shape)
if shape is None:
return self
if self._flat_src:
for i in self._idx_list:
i.set_src_shape(self._src_shape, self._dist_shape)
else:
for i, s, ds in zip(self._idx_list, self._src_shape, self._dist_shape):
i.set_src_shape(s, ds)
return self
def _check_src_shape(self, shape):
if shape is not None and not self._flat_src and len(shape) < len(self._idx_list):
raise ValueError(f"Can't set source shape to {shape} because indexer {self} expects "
f"{len(self._idx_list)} dimensions.")
def _check_bounds(self):
"""
Check that indices are within the bounds of the source shape.
"""
if self._src_shape is not None:
for i in self._idx_list:
i._check_bounds()
[docs]
def to_json(self):
"""
Return a JSON serializable version of self.
Returns
-------
list of int or int
List or int version of self.
"""
return self.as_array().tolist()
[docs]
class MultiIndexer(ShapedMultiIndexer):
"""
Abstract multi indexer class that may or may not be 'shaped'.
Parameters
----------
tup : tuple
Tuple of indices/slices.
flat_src : bool
If True, treat source array as flat.
"""
[docs]
def shaped_instance(self):
"""
Return a 'shaped' version of this Indexer type.
Returns
-------
ShapedMultiIndexer or None
Will return a ShapedMultiIndexer if possible, else None.
"""
if self._shaped_inst is not None:
return self._shaped_inst
if self._src_shape is None:
return None
try:
self._shaped_inst = ShapedMultiIndexer(tuple(idxer.shaped_instance()()
for idxer in self._idx_list),
flat_src=self._flat_src)
except Exception:
self._shaped_inst = None
else:
self._shaped_inst.set_src_shape(self._src_shape)
self._shaped_inst._set_attrs(self)
return self._shaped_inst
[docs]
class EllipsisIndexer(Indexer):
"""
Abstract multi indexer class that is 'shaped'.
Parameters
----------
tup : tuple
Tuple of indices/slices.
flat_src : bool
If True, treat source array as flat.
Attributes
----------
_tup : tuple
The wrapped tuple of indices/slices (it contains an ellipsis).
"""
[docs]
def __init__(self, tup, flat_src=None):
"""
Initialize attributes.
"""
super().__init__(flat_src)
tlist = []
# convert any internal lists/tuples to arrays
for i, v in enumerate(tup):
if isinstance(v, (list, tuple)):
v = np.atleast_1d(v)
tlist.append(v)
self._tup = tuple(tlist)
def __call__(self):
"""
Return the 'default' form of the indices.
Returns
-------
tuple
Tuple of indices and/or slices.
"""
return self._tup
def __repr__(self):
"""
Return string representation.
Returns
-------
str
String representation.
"""
s = []
for i in self._tup:
if i is ...:
s.append('...')
else:
s.append(repr(i))
return f"{', '.join(s)}"
[docs]
def apply_offset(self, offset, flat=True):
"""
Apply an offset to this index.
Parameters
----------
offset : int
The offset to apply.
flat : bool
If True, return a flat index.
Returns
-------
ndarray
The offset array.
"""
return self.as_array(flat=flat) + offset
[docs]
def copy(self):
"""
Copy this Indexer.
Returns
-------
EllipsisIndexer
A copy of this Indexer.
"""
return super().copy(self._tup)
@property
def min_src_dim(self):
"""
Return the number of source dimensions.
Returns
-------
int
The number of dimensions expected in the source array.
"""
mn = len(self._tup) - 1
return mn if mn > 1 else 1
[docs]
def shaped_instance(self):
"""
Return a 'shaped' version of this Indexer type.
Returns
-------
A shaped Indexer or None
Will return some kind of shaped Indexer if possible, else None.
"""
if self._shaped_inst is not None:
return self._shaped_inst
if self._src_shape is None:
return None
lst = [None] * len(self._src_shape)
# number of full slice dimensions
nfull = len(self._src_shape) - len(self._tup) + 1
i = 0
for ind in self._tup:
if ind is ...:
for j in range(nfull):
lst[i] = slice(None)
i += 1
else:
lst[i] = ind
i += 1
if len(lst) == 1:
idxer = indexer(lst[0])
else:
idxer = indexer(tuple(lst))
idxer.set_src_shape(self._src_shape)
self._shaped_inst = idxer.shaped_instance()
return self._shaped_inst._set_attrs(self)
[docs]
def as_array(self, copy=False, flat=True):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
flat : bool
If True, return a flat array.
Returns
-------
ndarray
The index array.
"""
return self.shaped_array(copy=copy, flat=flat)
[docs]
def flat(self, copy=False):
"""
Return an index array into a flat array.
Parameters
----------
copy : bool
If True, make sure the array returned is a copy.
Returns
-------
ndarray
An index array into a flat array.
"""
return self.as_array(copy=copy)
def _check_bounds(self):
"""
Check that indices are within the bounds of the source shape.
"""
s = self.shaped_instance()
if s is not None:
s._check_bounds()
[docs]
def to_json(self):
"""
Return a JSON serializable version of self.
Returns
-------
list of int or int
A list or int version of self.
"""
return self.as_array().tolist()
[docs]
class IndexMaker(object):
"""
A Factory for Indexer objects.
"""
def __call__(self, idx, src_shape=None, flat_src=False, try_slice=False):
"""
Return an Indexer instance based on the passed indices/slices.
Parameters
----------
idx : int, ndarray, slice, or tuple
Some sort of index/indices/slice.
src_shape : tuple or None
Source shape if known.
flat_src : bool
If True, indices are into a flat source.
try_slice : bool
If True, try to convert 1D index array to a slice.
Returns
-------
Indexer
The Indexer instance we created based on the args.
"""
if idx is ...:
idxer = EllipsisIndexer((idx,), flat_src=flat_src)
elif isinstance(idx, int):
idxer = IntIndexer(idx, flat_src=flat_src)
elif isinstance(idx, slice):
idxer = SliceIndexer(idx, flat_src=flat_src)
elif isinstance(idx, tuple):
multi = len(idx) > 1
for i in idx:
if i is ...:
multi = len(idx) > 2 # ... doesn't count toward limit of dimensions
idxer = EllipsisIndexer(idx, flat_src=flat_src)
break
else:
idxer = MultiIndexer(idx, flat_src=flat_src)
if flat_src and multi:
raise RuntimeError("Can't use a multdimensional index into a flat source.")
elif isinstance(idx, Indexer):
return idx
else:
arr = np.atleast_1d(idx)
if arr.ndim == 1:
slc = None
if try_slice:
slc = array2slice(arr)
if slc is not None:
idxer = SliceIndexer(slc, flat_src=flat_src)
else:
idxer = ArrayIndexer(arr, flat_src=flat_src)
else:
issue_warning("Using a non-tuple sequence for multidimensional indexing is "
"deprecated; use `arr[tuple(seq)]` instead of `arr[seq]`. In the "
"future this will be interpreted as an array index, "
"`arr[np.array(seq)]`, which will result either in an error or a "
"different result.")
idxer = MultiIndexer(tuple(idx), flat_src=flat_src)
if src_shape is not None:
if flat_src:
src_shape = (shape_to_len(src_shape),)
idxer.set_src_shape(src_shape)
return idxer
[docs]
def __getitem__(self, idx):
"""
Return an Indexer based on idx.
Parameters
----------
idx : int, ndarray, slice or tuple
The passed indices/slices.
Returns
-------
Indexer
The Indexer instance we created based on the args.
"""
return self(idx)
indexer = IndexMaker()
def _convert_ellipsis_idx(shape, idx):
lst = [None] * len(shape)
# number of full slice dimensions
nfull = len(shape) - len(idx) + 1
i = 0
for ind in idx:
if ind is ...:
for j in range(nfull):
lst[i] = slice(None)
i += 1
else:
lst[i] = ind
i += 1
return tuple(lst)
[docs]
def idx_list_to_index_array(idx_list):
"""
Convert a sequential list of indexers to an equivalent array indexer.
Parameters
----------
idx_list : list of indexers
List of indexers.
Returns
-------
ndarray
The equivalent index array.
"""
if len(idx_list) == 0:
return None
elif len(idx_list) == 1:
return idx_list[0].as_array()
else:
idx = idx_list[0]
arr = np.arange(shape_to_len(idx._src_shape)).reshape(idx._src_shape)
for i in range(len(idx_list)):
arr = idx_list[i].indexed_val(arr)
return arr
[docs]
def apply_idx_list(arr, idx_list):
"""
Apply a sequential list of indexers to an array.
"""
if len(idx_list) == 0:
return arr
elif len(idx_list) == 1:
return idx_list[0].indexed_val(arr)
else:
for i in range(len(idx_list)):
arr = idx_list[i].indexed_val(arr)
return arr
[docs]
def idx_list_to_shape(idx_list, src_shape):
"""
Convert a sequential list of indexers to an equivalent shape.
"""
if len(idx_list) == 0:
return src_shape
shp = src_shape
for i in range(len(idx_list)):
idx_list[i].set_src_shape(shp)
shp = idx_list[i].indexed_src_shape
return shp
# Since this is already user facing we'll leave it as is, and just use the output of
# __getitem__ to initialize our Indexer object that will be used internally.
[docs]
class Slicer(object):
"""
Helper class that can be used when a slice is needed for indexing.
"""
[docs]
def __getitem__(self, val):
"""
Pass through indices or slice.
Parameters
----------
val : int or slice object or tuples of slice objects
Indices or slice to return.
Returns
-------
indices : int or slice object or tuples of slice objects
Indices or slice to return.
"""
return val
# instance of the Slicer class to be used by users
slicer = Slicer()
_full_slice = slice(None)
_flat_full_indexer = indexer(_full_slice, flat_src=True)
_full_indexer = indexer(_full_slice, flat_src=False)