import copy
import numpy
VERTEX = 'Vertex'
CELL_CENTER = 'CellCenter'
_GRID_LOCATIONS = (VERTEX, CELL_CENTER)
[docs]class FlowSolution(object):
"""
Contains solution variables for a :class:`Zone`.
All variables have the same shape and grid location.
"""
def __init__(self):
self._grid_location = VERTEX
self._ghosts = (0, 0, 0, 0, 0, 0)
self._arrays = []
self._vectors = []
def _get_grid_location(self):
return self._grid_location
def _set_grid_location(self, loc):
if loc not in _GRID_LOCATIONS:
raise ValueError('%r is not a valid grid location' % loc)
self._grid_location = loc
grid_location = property(_get_grid_location, _set_grid_location,
doc='Position at which data is located,'
' must be one of %s' % (_GRID_LOCATIONS,))
def _get_ghosts(self):
return self._ghosts
def _set_ghosts(self, ghosts):
if len(ghosts) < 2*len(self.shape):
raise ValueError('ghosts must be a %d-element array'
% (2*len(self.shape)))
for i in ghosts:
if i < 0:
raise ValueError('All ghost values must be >= 0')
self._ghosts = ghosts
for vector in self._vectors:
vector.ghosts = ghosts
ghosts = property(_get_ghosts, _set_ghosts,
doc='Number of ghost cells for each index direction.')
@property
[docs] def arrays(self):
""" List of scalar data arrays. """
return self._arrays
@property
[docs] def vectors(self):
""" List of vector data. """
return self._vectors
@property
[docs] def shape(self):
""" Data index limits, not including 'ghost/rind' planes. """
ijk = self.real_shape
if len(ijk) < 1:
return ()
ghosts = self._ghosts
imax = ijk[0] - (ghosts[0] + ghosts[1])
if len(ijk) < 2:
return (imax,)
jmax = ijk[1] - (ghosts[2] + ghosts[3])
if len(ijk) < 3:
return (imax, jmax)
kmax = ijk[2] - (ghosts[4] + ghosts[5])
return (imax, jmax, kmax)
@property
[docs] def real_shape(self):
""" Data index limits, including any 'ghost/rind' planes. """
if self._vectors:
return self._vectors[0].real_shape
elif self._arrays:
return self._arrays[0].shape
else:
return ()
[docs] def add_array(self, name, array):
"""
Add a :class:`numpy.ndarray` of scalar data and bind to `name`.
Returns the added array.
name: string
Name for the added array.
array: :class:`numpy.ndarray`
Scalar data.
"""
if hasattr(self, name):
raise ValueError('name %r is already bound' % name)
if self._arrays:
ijk = self._arrays[0].shape
elif self._vectors:
ijk = self._vectors[0].shape
else:
ijk = ()
if ijk and array.shape != ijk:
raise ValueError('array shape %s != existing shape %s'
% (array.shape, ijk))
setattr(self, name, array)
self._arrays.append(array)
return array
[docs] def add_vector(self, name, vector):
"""
Add a :class:`Vector` and bind to `name`.
Returns the added vector.
name: string
Name for the added array.
vector: :class:`Vector`
Vector data.
"""
if hasattr(self, name):
raise ValueError('name %r is already bound' % name)
shape = self.real_shape
if shape and vector.real_shape != shape:
raise ValueError('vector real shape %s != existing real shape %s'
% (vector.real_shape, shape))
setattr(self, name, vector)
self._vectors.append(vector)
vector.ghosts = self.ghosts
return vector
[docs] def copy(self):
""" Returns a deep copy of self. """
return copy.deepcopy(self)
def _copy_scalars(self, other):
""" Copy scalars from `other` to self. """
for name, val in other.__dict__.items():
if not hasattr(self, name):
setattr(self, name, val)
[docs] def is_equivalent(self, other, logger, tolerance=0.):
"""
Test if self and `other` are equivalent.
other: :class:`FlowSolution`
The flowfield to check against.
logger: :class:`Logger` or None
Used to log debug messages that will indicate what, if anything, is
not equivalent.
tolerance: float
The maximum relative difference in array values to be considered
equivalent.
"""
if not isinstance(other, FlowSolution):
logger.debug('other is not a FlowSolution object.')
return False
if other.grid_location != self.grid_location:
logger.debug('grid locations are not equal: %s vs. %s.',
other.grid_location, self.grid_location)
return False
if other.ghosts != self.ghosts:
logger.debug('flow ghost cell counts are not equal: %s vs. %s.',
other.ghosts, self.ghosts)
return False
for arr in self._arrays:
name = self.name_of_obj(arr)
try:
other_arr = getattr(other, name)
except AttributeError:
logger.debug('other is missing array %r', name)
return False
if tolerance > 0.:
if not numpy.allclose(other_arr, arr, tolerance, tolerance):
logger.debug("%s values are not 'close'.", name)
return False
else:
if (other_arr != arr).any():
logger.debug('%s values are not equal.', name)
return False
for vector in self._vectors:
name = self.name_of_obj(vector)
try:
other_vector = getattr(other, name)
except AttributeError:
logger.debug('other is missing vector %r', name)
return False
if not vector.is_equivalent(other_vector, name, logger, tolerance):
return False
# TODO: check scalars
return True
def _extract_3d(self, imin, imax, jmin, jmax, kmin, kmax, new_ghosts):
""" 3D (index space) extraction. """
imn, imx, jmn, jmx, kmn, kmx = imin, imax, jmin, jmax, kmin, kmax
ghosts = self._ghosts
# Support end-relative indexing and adjust for existing ghostplanes.
flow_imax, flow_jmax, flow_kmax = self.shape
if imin < 0:
imin += flow_imax
imin += ghosts[0]
if imax < 0:
imax += flow_imax
imax += ghosts[0]
if jmin < 0:
jmin += flow_jmax
jmin += ghosts[2]
if jmax < 0:
jmax += flow_jmax
jmax += ghosts[2]
if kmin < 0:
kmin += flow_kmax
kmin += ghosts[4]
if kmax < 0:
kmax += flow_kmax
kmax += ghosts[4]
# Adjust for new ghost/rind planes.
imin -= new_ghosts[0]
imax += new_ghosts[1]
jmin -= new_ghosts[2]
jmax += new_ghosts[3]
kmin -= new_ghosts[4]
kmax += new_ghosts[5]
# Check limits.
if imin < 0 or imax > flow_imax+ghosts[1] or \
jmin < 0 or jmax > flow_jmax+ghosts[3] or \
kmin < 0 or kmax > flow_kmax+ghosts[5]:
region = (imin, imax, jmin, jmax, kmin, kmax)
original = (0, flow_imax+ghosts[1], 0, flow_jmax+ghosts[3],
0, flow_kmax+ghosts[5])
raise ValueError('Extraction region %s exceeds original %s'
% (region, original))
# Extract.
flow = FlowSolution()
for arr in self._arrays:
flow.add_array(self.name_of_obj(arr),
arr[imin:imax+1, jmin:jmax+1, kmin:kmax+1])
for vector in self._vectors:
flow.add_vector(self.name_of_obj(vector),
vector.extract(imn, imx, jmn, jmx, kmn, kmx,
ghosts=new_ghosts))
flow.grid_location = self.grid_location
flow.ghosts = new_ghosts
flow._copy_scalars(self)
return flow
def _extract_2d(self, imin, imax, jmin, jmax, new_ghosts):
""" 2D (index space) extraction. """
imn, imx, jmn, jmx, = imin, imax, jmin, jmax
ghosts = self._ghosts
# Support end-relative indexing and adjust for existing ghost planes.
flow_imax, flow_jmax = self.shape
if imin < 0:
imin += flow_imax
imin += ghosts[0]
if imax < 0:
imax += flow_imax
imax += ghosts[0]
if jmin < 0:
jmin += flow_jmax
jmin += ghosts[2]
if jmax < 0:
jmax += flow_jmax
jmax += ghosts[2]
# Adjust for new ghost/rind planes.
imin -= new_ghosts[0]
imax += new_ghosts[1]
jmin -= new_ghosts[2]
jmax += new_ghosts[3]
# Check limits.
if imin < 0 or imax > flow_imax+ghosts[1] or \
jmin < 0 or jmax > flow_jmax+ghosts[3]:
region = (imin, imax, jmin, jmax)
original = (0, flow_imax+ghosts[1], 0, flow_jmax+ghosts[3])
raise ValueError('Extraction region %s exceeds original %s'
% (region, original))
# Extract.
flow = FlowSolution()
for arr in self._arrays:
flow.add_array(self.name_of_obj(arr),
arr[imin:imax+1, jmin:jmax+1])
for vector in self._vectors:
flow.add_vector(self.name_of_obj(vector),
vector.extract(imn, imx, jmn, jmx,
ghosts=new_ghosts))
flow.grid_location = self.grid_location
flow.ghosts = new_ghosts
flow._copy_scalars(self)
return flow
def _extract_1d(self, imin, imax, new_ghosts):
""" 1D (index space) extraction. """
imn, imx = imin, imax
ghosts = self._ghosts
# Support end-relative indexing and adjust for existing ghost planes.
flow_imax, = self.shape
if imin < 0:
imin += flow_imax
imin += ghosts[0]
if imax < 0:
imax += flow_imax
imax += ghosts[0]
# Adjust for new ghost/rind planes.
imin -= new_ghosts[0]
imax += new_ghosts[1]
# Check limits.
if imin < 0 or imax > flow_imax+ghosts[1]:
region = (imin, imax)
original = (0, flow_imax+ghosts[1])
raise ValueError('Extraction region %s exceeds original %s'
% (region, original))
# Extract.
flow = FlowSolution()
for arr in self._arrays:
flow.add_array(self.name_of_obj(arr), arr[imin:imax+1])
for vector in self._vectors:
flow.add_vector(self.name_of_obj(vector),
vector.extract(imn, imx, ghosts=new_ghosts))
flow.grid_location = self.grid_location
flow.ghosts = new_ghosts
flow._copy_scalars(self)
return flow
[docs] def extend(self, axis, delta, npoints):
"""
Construct a new :class:`FlowSolution` by replication.
The existing ghosts/rind planes specification is retained.
axis: 'i', 'j', or 'k'
Index axis to extend.
delta: float.
Direction. A negative value adds points before the current
zero-index of `axis`.
npoints: int > 0
Number of points to add in `axis` dimension.
"""
if not delta:
raise ValueError('delta must be non-zero')
if npoints < 1:
raise ValueError('npoints must be >= 1')
i = len(self.shape)
if i == 3:
if axis not in ('i', 'j', 'k'):
raise ValueError('axis must be i, j, or k')
return self._extend_3d(axis, delta, npoints)
elif i == 2:
if axis not in ('i', 'j'):
raise ValueError('axis must be i or j')
return self._extend_2d(axis, delta, npoints)
elif i == 1:
if axis != 'i':
raise ValueError('axis must be i')
return self._extend_1d(delta, npoints)
else:
raise RuntimeError('FlowSolution is empty!')
def _extend_3d(self, axis, delta, npoints):
""" 3D (index space) extension. """
imax, jmax, kmax = self.real_shape
if axis == 'i':
new_shape = (imax + npoints, jmax, kmax)
indx = imax if delta > 0 else npoints
elif axis == 'j':
new_shape = (imax, jmax + npoints, kmax)
indx = jmax if delta > 0 else npoints
else:
new_shape = (imax, jmax, kmax + npoints)
indx = kmax if delta > 0 else npoints
flow = FlowSolution()
for arr in self._arrays:
new_arr = numpy.zeros(new_shape)
if axis == 'i':
if delta > 0:
new_arr[0:indx, :, :] = arr
for i in range(npoints):
new_arr[indx+i, :, :] = arr[-1, :, :]
else:
new_arr[indx:, :, :] = arr
for i in range(npoints):
new_arr[i, :, :] = arr[0, :, :]
elif axis == 'j':
if delta > 0:
new_arr[:, 0:indx, :] = arr
for j in range(npoints):
new_arr[:, indx+j, :] = arr[:, -1, :]
else:
new_arr[:, indx:, :] = arr
for j in range(npoints):
new_arr[:, j, :] = arr[:, 0, :]
else:
if delta > 0:
new_arr[:, :, 0:indx] = arr
for k in range(npoints):
new_arr[:, :, indx+k] = arr[:, :, -1]
else:
new_arr[:, :, indx:] = arr
for k in range(npoints):
new_arr[:, :, k] = arr[:, :, 0]
flow.add_array(self.name_of_obj(arr), new_arr)
for vector in self._vectors:
flow.add_vector(self.name_of_obj(vector),
vector.extend(axis, delta, npoints))
flow.grid_location = self.grid_location
flow.ghosts = copy.copy(self._ghosts)
flow._copy_scalars(self)
return flow
def _extend_2d(self, axis, delta, npoints):
""" 2D (index space) extension. """
imax, jmax = self.real_shape
if axis == 'i':
new_shape = (imax + npoints, jmax)
indx = imax if delta > 0 else npoints
else:
new_shape = (imax, jmax + npoints)
indx = jmax if delta > 0 else npoints
flow = FlowSolution()
for arr in self._arrays:
new_arr = numpy.zeros(new_shape)
if axis == 'i':
if delta > 0:
new_arr[0:indx, :] = arr
for i in range(npoints):
new_arr[indx+i, :] = arr[-1, :]
else:
new_arr[indx:, :] = arr
for i in range(npoints):
new_arr[i, :] = arr[0, :]
else:
if delta > 0:
new_arr[:, 0:indx] = arr
for j in range(npoints):
new_arr[:, indx+j] = arr[:, -1]
else:
new_arr[:, indx:] = arr
for j in range(npoints):
new_arr[:, j] = arr[:, 0]
flow.add_array(self.name_of_obj(arr), new_arr)
for vector in self._vectors:
flow.add_vector(self.name_of_obj(vector),
vector.extend(axis, delta, npoints))
flow.grid_location = self.grid_location
flow.ghosts = copy.copy(self._ghosts)
flow._copy_scalars(self)
return flow
def _extend_1d(self, delta, npoints):
""" 1D (index space) extension. """
imax, = self.real_shape
new_shape = (imax + npoints,)
indx = imax if delta > 0 else npoints
flow = FlowSolution()
for arr in self._arrays:
new_arr = numpy.zeros(new_shape)
if delta > 0:
new_arr[0:indx] = arr
for i in range(npoints):
new_arr[indx+i] = arr[-1]
else:
new_arr[indx:] = arr
for i in range(npoints):
new_arr[i] = arr[0]
flow.add_array(self.name_of_obj(arr), new_arr)
for vector in self._vectors:
flow.add_vector(self.name_of_obj(vector),
vector.extend('i', delta, npoints))
flow.grid_location = self.grid_location
flow.ghosts = copy.copy(self._ghosts)
flow._copy_scalars(self)
return flow
[docs] def name_of_obj(self, obj):
""" Return name of object or None if not found. """
for name, value in self.__dict__.items():
if value is obj:
return name
return None
[docs] def flip_z(self):
""" Convert to other-handed coordinate system. """
for vector in self._vectors:
vector.flip_z()
[docs] def make_cartesian(self, grid, axis='z'):
"""
Convert to Cartesian coordinate system.
grid: :class:`GridCoordinates`
Must be in cylindrical form.
axis: string
Specifies which is the cylinder axis ('z' or 'x').
"""
for vector in self._vectors:
vector.make_cartesian(grid, axis)
[docs] def make_cylindrical(self, grid, axis='z'):
"""
Convert to cylindrical coordinate system.
grid: :class:`GridCoordinates`
Must be in cylindrical form.
axis: string
Specifies which is the cylinder axis ('z' or 'x').
"""
for vector in self._vectors:
vector.make_cylindrical(grid, axis)
[docs] def rotate_about_x(self, deg):
"""
Rotate about the X axis.
deg: float (degrees)
Amount of rotation.
"""
for vector in self._vectors:
vector.rotate_about_x(deg)
[docs] def rotate_about_y(self, deg):
"""
Rotate about the Y axis.
deg: float (degrees)
Amount of rotation.
"""
for vector in self._vectors:
vector.rotate_about_y(deg)
[docs] def rotate_about_z(self, deg):
"""
Rotate about the Z.
deg: float (degrees)
Amount of rotation.
"""
for vector in self._vectors:
vector.rotate_about_z(deg)
[docs] def demote(self):
""" Demote from N-dimensional to N-1 dimensional index space. """
shape = self.real_shape
ghosts = self._ghosts
if len(shape) > 2:
imax, jmax, kmax = shape
imx = imax - (ghosts[0] + ghosts[1])
jmx = jmax - (ghosts[2] + ghosts[3])
kmx = kmax - (ghosts[4] + ghosts[5])
if imx == 1:
for i, arr in enumerate(self._arrays):
name = self.name_of_obj(arr)
new_arr = numpy.zeros((jmax, kmax))
new_arr[:, :] = arr[ghosts[0], :, :]
setattr(self, name, new_arr)
self._arrays[i] = new_arr
self._ghosts = (ghosts[2], ghosts[3], ghosts[4], ghosts[5], 0, 0)
elif jmx == 1:
for i, arr in enumerate(self._arrays):
name = self.name_of_obj(arr)
new_arr = numpy.zeros((imax, kmax))
new_arr[:, :] = arr[:, ghosts[1], :]
setattr(self, name, new_arr)
self._arrays[i] = new_arr
self._ghosts = (ghosts[0], ghosts[1], ghosts[4], ghosts[5], 0, 0)
elif kmx == 1:
for i, arr in enumerate(self._arrays):
name = self.name_of_obj(arr)
new_arr = numpy.zeros((imax, jmax))
new_arr[:, :] = arr[:, :, ghosts[2]]
setattr(self, name, new_arr)
self._arrays[i] = new_arr
self._ghosts = (ghosts[0], ghosts[1], ghosts[2], ghosts[3], 0, 0)
else:
raise RuntimeError('No i, j, or k plane to collapse')
elif len(shape) > 1:
imax, jmax = shape
imx = imax - (ghosts[0] + ghosts[1])
jmx = jmax - (ghosts[2] + ghosts[3])
if imx == 1:
for i, arr in enumerate(self._arrays):
name = self.name_of_obj(arr)
new_arr = numpy.zeros((jmax,))
new_arr[:] = arr[ghosts[0], :]
setattr(self, name, new_arr)
self._arrays[i] = new_arr
self._ghosts = (ghosts[2], ghosts[3], 0, 0, 0, 0)
elif jmx == 1:
for i, arr in enumerate(self._arrays):
name = self.name_of_obj(arr)
new_arr = numpy.zeros((imax,))
new_arr[:] = arr[:, ghosts[1]]
setattr(self, name, new_arr)
self._arrays[i] = new_arr
self._ghosts = (ghosts[0], ghosts[1], 0, 0, 0, 0)
else:
raise RuntimeError('No i or j plane to collapse')
elif len(shape) > 0:
raise RuntimeError('FlowSolution is 1D')
else:
raise RuntimeError('FlowSolution is empty!')
for vector in self._vectors:
vector.demote()