Source code for openmdao.vectors.petsc_transfer

"""Define the PETSc Transfer class."""
from openmdao.utils.mpi import check_mpi_env

use_mpi = check_mpi_env()

if use_mpi is False:
    PETScTransfer = None
        import petsc4py
        from petsc4py import PETSc
    except ImportError:
        PETSc = None
        if use_mpi is True:
            raise ImportError("Importing petsc4py failed and OPENMDAO_USE_MPI is true.")

    import numpy as np
    from petsc4py import PETSc
    from collections import defaultdict

    from openmdao.vectors.default_transfer import DefaultTransfer, _merge
    from openmdao.core.constants import INT_DTYPE

[docs] class PETScTransfer(DefaultTransfer): """ PETSc Transfer implementation for running in parallel. Parameters ---------- in_vec : <Vector> Pointer to the input vector. out_vec : <Vector> Pointer to the output vector. in_inds : int ndarray Input indices for the transfer. out_inds : int ndarray Output indices for the transfer. comm : MPI.Comm or <FakeComm> Communicator of the system that owns this transfer. Attributes ---------- _scatter : method Method that performs a PETSc scatter. """
[docs] def __init__(self, in_vec, out_vec, in_inds, out_inds, comm): """ Initialize all attributes. """ super().__init__(in_vec, out_vec, in_inds, out_inds, comm) in_indexset = PETSc.IS().createGeneral(self._in_inds, comm=self._comm) out_indexset = PETSc.IS().createGeneral(self._out_inds, comm=self._comm) self._scatter = PETSc.Scatter().create(out_vec._petsc, out_indexset, in_vec._petsc, in_indexset).scatter
@staticmethod def _setup_transfers(group): """ Compute all transfers that are owned by our parent group. Parameters ---------- group : <Group> Parent group. """ rev = group._mode != 'fwd' for subsys in group._subgroups_myproc: subsys._setup_transfers() abs2meta_in = group._var_abs2meta['input'] abs2meta_out = group._var_abs2meta['output'] allprocs_abs2meta_out = group._var_allprocs_abs2meta['output'] myproc = group.comm.rank transfers = group._transfers = {} vectors = group._vectors offsets = group._get_var_offsets() mypathlen = len(group.pathname) + 1 if group.pathname else 0 # Initialize empty lists for the transfer indices xfer_in = [] xfer_out = [] fwd_xfer_in = defaultdict(list) fwd_xfer_out = defaultdict(list) if rev: rev_xfer_in = defaultdict(list) rev_xfer_out = defaultdict(list) allprocs_abs2idx = group._var_allprocs_abs2idx sizes_in = group._var_sizes['input'] sizes_out = group._var_sizes['output'] offsets_in = offsets['input'] offsets_out = offsets['output'] # Loop through all connections owned by this system for abs_in, abs_out in group._conn_abs_in2out.items(): # Only continue if the input exists on this processor if abs_in in abs2meta_in: # Get meta meta_in = abs2meta_in[abs_in] meta_out = allprocs_abs2meta_out[abs_out] idx_in = allprocs_abs2idx[abs_in] idx_out = allprocs_abs2idx[abs_out] # Read in and process src_indices src_indices = meta_in['src_indices'] if src_indices is None: owner = group._owning_rank[abs_out] # if the input is larger than the output on a single proc, we have # to just loop over the procs in the same way we do when src_indices # is defined. if meta_in['size'] > sizes_out[owner, idx_out]: src_indices = np.arange(meta_in['size'], dtype=INT_DTYPE) else: src_indices = src_indices.shaped_array() # 1. Compute the output indices # NOTE: src_indices are relative to a single, possibly distributed variable, # while the output_inds that we compute are relative to the full distributed # array that contains all local variables from each rank stacked in rank order. if src_indices is None: if meta_out['distributed']: # input in this case is non-distributed (else src_indices would be # defined by now). dist output to non-distributed input conns w/o # src_indices are not allowed. raise RuntimeError(f"{group.msginfo}: Can't connect distributed output " f"'{abs_out}' to non-distributed input '{abs_in}' " "without declaring src_indices.", ident=(abs_out, abs_in)) else: rank = myproc if abs_out in abs2meta_out else owner offset = offsets_out[rank, idx_out] output_inds = np.arange(offset, offset + meta_in['size'], dtype=INT_DTYPE) else: output_inds = np.zeros(src_indices.size, INT_DTYPE) start = end = 0 for iproc in range(group.comm.size): end += sizes_out[iproc, idx_out] if start == end: continue # The part of src on iproc on_iproc = np.logical_and(start <= src_indices, src_indices < end) if np.any(on_iproc): # This converts from iproc-then-ivar to ivar-then-iproc ordering # Subtract off part of previous procs # Then add all variables on previous procs # Then all previous variables on this proc # - np.sum(out_sizes[:iproc, idx_out]) # + np.sum(out_sizes[:iproc, :]) # + np.sum(out_sizes[iproc, :idx_out]) # + inds offset = offsets_out[iproc, idx_out] - start output_inds[on_iproc] = src_indices[on_iproc] + offset start = end # 2. Compute the input indices input_inds = np.arange(offsets_in[myproc, idx_in], offsets_in[myproc, idx_in] + sizes_in[myproc, idx_in], dtype=INT_DTYPE) # Now the indices are ready - input_inds, output_inds sub_in = abs_in[mypathlen:].partition('.')[0] fwd_xfer_in[sub_in].append(input_inds) fwd_xfer_out[sub_in].append(output_inds) if rev: sub_out = abs_out[mypathlen:].partition('.')[0] rev_xfer_in[sub_out].append(input_inds) rev_xfer_out[sub_out].append(output_inds) else: # not a local input but still need entries in the transfer dicts to # avoid hangs sub_in = abs_in[mypathlen:].partition('.')[0] fwd_xfer_in[sub_in] # defaultdict will create an empty list there fwd_xfer_out[sub_in] if rev: sub_out = abs_out[mypathlen:].partition('.')[0] rev_xfer_in[sub_out] rev_xfer_out[sub_out] for sname, inds in fwd_xfer_in.items(): fwd_xfer_in[sname] = _merge(inds) fwd_xfer_out[sname] = _merge(fwd_xfer_out[sname]) if rev: for sname, inds in rev_xfer_out.items(): rev_xfer_in[sname] = _merge(rev_xfer_in[sname]) rev_xfer_out[sname] = _merge(inds) if fwd_xfer_in: xfer_in = np.concatenate(list(fwd_xfer_in.values())) xfer_out = np.concatenate(list(fwd_xfer_out.values())) else: xfer_in = xfer_out = np.zeros(0, dtype=INT_DTYPE) out_vec = vectors['output']['nonlinear'] xfer_all = PETScTransfer(vectors['input']['nonlinear'], out_vec, xfer_in, xfer_out, group.comm) transfers['fwd'] = xfwd = {} xfwd[None] = xfer_all if rev: transfers['rev'] = xrev = {} xrev[None] = xfer_all for sname, inds in fwd_xfer_in.items(): transfers['fwd'][sname] = PETScTransfer( vectors['input']['nonlinear'], vectors['output']['nonlinear'], inds, fwd_xfer_out[sname], group.comm) if rev: for sname, inds in rev_xfer_out.items(): transfers['rev'][sname] = PETScTransfer( vectors['input']['nonlinear'], vectors['output']['nonlinear'], rev_xfer_in[sname], inds, group.comm) def _transfer(self, in_vec, out_vec, mode='fwd'): """ Perform transfer. Parameters ---------- in_vec : <Vector> pointer to the input vector. out_vec : <Vector> pointer to the output vector. mode : str 'fwd' or 'rev'. """ flag = False if mode == 'rev': flag = True in_vec, out_vec = out_vec, in_vec in_petsc = in_vec._petsc out_petsc = out_vec._petsc # For Complex Step, need to disassemble real and imag parts, transfer them separately, # then reassemble them. if in_vec._under_complex_step and out_vec._alloc_complex: # Real in_petsc.array = in_vec._data.real out_petsc.array = out_vec._data.real self._scatter(out_petsc, in_petsc, addv=flag, mode=flag) # Imaginary in_petsc_imag = in_vec._imag_petsc out_petsc_imag = out_vec._imag_petsc in_petsc_imag.array = in_vec._data.imag out_petsc_imag.array = out_vec._data.imag self._scatter(out_petsc_imag, in_petsc_imag, addv=flag, mode=flag) in_vec._data[:] = in_petsc.array + in_petsc_imag.array * 1j else: # Anything that has been allocated complex requires an additional step because # the petsc vector does not directly reference the _data. if in_vec._alloc_complex: in_petsc.array = in_vec._get_data() if out_vec._alloc_complex: out_petsc.array = out_vec._get_data() self._scatter(out_petsc, in_petsc, addv=flag, mode=flag) if in_vec._alloc_complex: data = in_vec._get_data() data[:] = in_petsc.array