Source code for openmdao.vectors.petsc_transfer

"""Define the PETSc Transfer class."""
import numpy as np
from openmdao.utils.mpi import check_mpi_env
from openmdao.core.constants import INT_DTYPE

use_mpi = check_mpi_env()
_empty_idx_array = np.array([], dtype=INT_DTYPE)


if use_mpi is False:
    PETScTransfer = None
else:
    try:
        import petsc4py  # noqa: F401
        from petsc4py import PETSc
    except ImportError:
        PETSc = None
        if use_mpi is True:
            raise ImportError("Importing petsc4py failed and OPENMDAO_USE_MPI is true.")

    from petsc4py import PETSc
    from collections import defaultdict

    from openmdao.vectors.default_transfer import DefaultTransfer, _setup_index_views

[docs] class PETScTransfer(DefaultTransfer): """ PETSc Transfer implementation for running in parallel. Parameters ---------- in_vec : <Vector> Pointer to the input vector. out_vec : <Vector> Pointer to the output vector. in_inds : int ndarray Input indices for the transfer. out_inds : int ndarray Output indices for the transfer. comm : MPI.Comm or <FakeComm> Communicator of the system that owns this transfer. Attributes ---------- _scatter : method Method that performs a PETSc scatter. """
[docs] def __init__(self, in_vec, out_vec, in_inds, out_inds, comm): """ Initialize all attributes. """ super().__init__(in_vec, out_vec, in_inds, out_inds) in_indexset = PETSc.IS().createGeneral(self._in_inds, comm=comm) out_indexset = PETSc.IS().createGeneral(self._out_inds, comm=comm) self._scatter = PETSc.Scatter().create(out_vec._petsc, out_indexset, in_vec._petsc, in_indexset).scatter
@staticmethod def _setup_transfers(group): """ Compute all transfers that are owned by our parent group. Parameters ---------- group : <Group> Parent group. """ rev = group._orig_mode != 'fwd' group._transfers = { 'fwd': PETScTransfer._setup_transfers_fwd(group) } if rev: group._transfers['rev'] = PETScTransfer._setup_transfers_rev(group) @staticmethod def _setup_transfers_fwd(group): transfers = {} if not group._conn_abs_in2out: return transfers abs2meta_in = group._var_abs2meta['input'] myrank = group.comm.rank offsets_in = group._get_var_offsets()['input'][myrank, :] mypathlen = len(group.pathname) + 1 if group.pathname else 0 xfer_in = defaultdict(list) xfer_out = defaultdict(list) allprocs_abs2idx = group._var_allprocs_abs2idx sizes_in = group._var_sizes['input'][myrank, :] total_len = 0 # Loop through all connections owned by this system for abs_in, abs_out in group._conn_abs_in2out.items(): sub_in = abs_in[mypathlen:].partition('.')[0] # Only continue if the input exists on this processor if abs_in in abs2meta_in: output_inds, _ = _get_output_inds(group, abs_out, abs_in) idx_in = allprocs_abs2idx[abs_in] input_inds = range(offsets_in[idx_in], offsets_in[idx_in] + sizes_in[idx_in]) total_len += len(input_inds) xfer_in[sub_in].append(input_inds) xfer_out[sub_in].append(output_inds) else: # not a local input but still need entries in the transfer dicts to # avoid hangs xfer_in[sub_in] # defaultdict will create an empty list there xfer_out[sub_in] if xfer_in: full_xfer_in, full_xfer_out = _setup_index_views(total_len, xfer_in, xfer_out) # full transfer (transfer to all subsystems at once) transfers[None] = PETScTransfer(group._vectors['input']['nonlinear'], group._vectors['output']['nonlinear'], full_xfer_in, full_xfer_out, group._comm) # transfers to individual subsystems for sname, inds in xfer_in.items(): transfers[sname] = PETScTransfer(group._vectors['input']['nonlinear'], group._vectors['output']['nonlinear'], inds, xfer_out[sname], group._comm) return transfers @staticmethod def _setup_transfers_rev(group): abs2meta_in = group._var_abs2meta['input'] abs2meta_out = group._var_abs2meta['output'] allprocs_abs2prom = group._var_allprocs_abs2prom # for an FD group, we use the relevance graph to determine which inputs on the # boundary of the group are upstream of responses within the group so # that we can perform any necessary corrections to the derivative inputs. if group._owns_approx_jac: if group.comm.size > 1 and group.pathname != '' and group._has_distrib_vars: all_abs2meta_out = group._var_allprocs_abs2meta['output'] all_abs2meta_in = group._var_allprocs_abs2meta['input'] # connections internal to this group and all direct/indirect subsystems conns = group._conn_global_abs_in2out inp_boundary_set = set(all_abs2meta_in).difference(conns) if inp_boundary_set: for dv, resp, rel in group._relevance.iter_seed_pair_relevance(inputs=True): if resp in all_abs2meta_out and dv not in allprocs_abs2prom: # response is continuous and inside this group and # dv is outside this group if all_abs2meta_out[resp]['distributed']: # a distributed response for inp in inp_boundary_set.intersection(rel): if inp in abs2meta_in: if resp not in group._fd_rev_xfer_correction_dist: group._fd_rev_xfer_correction_dist[resp] = set() group._fd_rev_xfer_correction_dist[resp].add(inp) # FD groups don't need reverse transfers return {} myrank = group.comm.rank allprocs_abs2idx = group._var_allprocs_abs2idx transfers = group._transfers vectors = group._vectors offsets = group._get_var_offsets() mypathlen = len(group.pathname) + 1 if group.pathname else 0 has_par_coloring = group._problem_meta['has_par_deriv_color'] has_nocolor_xfers = 0 xfer_in = defaultdict(list) xfer_out = defaultdict(list) # xfers that are only active when parallel coloring is not xfer_in_nocolor = defaultdict(list) xfer_out_nocolor = defaultdict(list) sizes_in = group._var_sizes['input'] offsets_in = offsets['input'] offsets_out = offsets['output'] total_size = total_size_nocolor = 0 # Loop through all connections owned by this system for abs_in, abs_out in group._conn_abs_in2out.items(): sub_out = abs_out[mypathlen:].partition('.')[0] # Only continue if the input exists on this processor if abs_in in abs2meta_in: meta_in = abs2meta_in[abs_in] idx_in = allprocs_abs2idx[abs_in] idx_out = allprocs_abs2idx[abs_out] output_inds, src_indices = _get_output_inds(group, abs_out, abs_in) # 2. Compute the input indices input_inds = range(offsets_in[myrank, idx_in], offsets_in[myrank, idx_in] + sizes_in[myrank, idx_in]) # Now the indices are ready - input_inds, output_inds inp_is_dup, inp_missing, distrib_in = group.get_var_dup_info(abs_in, 'input') out_is_dup, _, distrib_out = group.get_var_dup_info(abs_out, 'output') iowninput = myrank == group._owning_rank[abs_in] if inp_is_dup and (abs_out not in abs2meta_out or (distrib_out and not iowninput)): xfer_in[sub_out] xfer_out[sub_out] elif out_is_dup and inp_missing > 0 and (iowninput or distrib_in): # if this rank owns the input or the input is distributed, # and the output is duplicated, then we send the owning/distrib input # to each duplicated output that doesn't have a corresponding connected # input on the same rank. oidxlist = [] iidxlist = [] oidxlist_nc = [] iidxlist_nc = [] size = size_nc = 0 for rnk, osize, isize in zip(range(group.comm.size), group.get_var_sizes(abs_out, 'output'), group.get_var_sizes(abs_in, 'input')): if rnk == myrank: # transfer to output on same rank oidxlist.append(output_inds) iidxlist.append(input_inds) size += len(input_inds) elif osize > 0 and isize == 0: # dup output exists on this rank but there is no corresponding # input, so we send the owning/distrib input to the dup output offset = offsets_out[rnk, idx_out] if src_indices is None: oarr = range(offset, offset + meta_in['size']) elif src_indices.size > 0: oarr = np.asarray(src_indices + offset, dtype=INT_DTYPE) else: continue if has_par_coloring: # these transfers will only happen if parallel coloring is # not active for the current seed response oidxlist_nc.append(oarr) iidxlist_nc.append(input_inds) size_nc += len(input_inds) has_nocolor_xfers = 1 else: oidxlist.append(oarr) iidxlist.append(input_inds) size += len(input_inds) if len(iidxlist) > 1: input_inds = _merge(iidxlist, size) output_inds = _merge(oidxlist, size) else: input_inds = iidxlist[0] output_inds = oidxlist[0] total_size += len(input_inds) xfer_in[sub_out].append(input_inds) xfer_out[sub_out].append(output_inds) if has_par_coloring and iidxlist_nc: # keep transfers separate that shouldn't happen when parallel # deriv coloring is active if len(iidxlist_nc) > 1: input_inds = _merge(iidxlist_nc, size_nc) output_inds = _merge(oidxlist_nc, size_nc) else: input_inds = iidxlist_nc[0] output_inds = oidxlist_nc[0] total_size_nocolor += len(input_inds) xfer_in_nocolor[sub_out].append(input_inds) xfer_out_nocolor[sub_out].append(output_inds) else: if (inp_is_dup and out_is_dup and src_indices is not None and src_indices.size > 0): offset = offsets_out[myrank, idx_out] output_inds = np.asarray(src_indices + offset, dtype=INT_DTYPE) total_size += len(input_inds) xfer_in[sub_out].append(input_inds) xfer_out[sub_out].append(output_inds) else: # remote input but still need entries in the transfer dicts to avoid hangs xfer_in[sub_out] xfer_out[sub_out] if has_par_coloring: xfer_in_nocolor[sub_out] xfer_out_nocolor[sub_out] full_xfer_in, full_xfer_out = _setup_index_views(total_size, xfer_in, xfer_out) transfers = { None: PETScTransfer(vectors['input']['nonlinear'], vectors['output']['nonlinear'], full_xfer_in, full_xfer_out, group._comm) } for sname, inds in xfer_out.items(): transfers[sname] = PETScTransfer(vectors['input']['nonlinear'], vectors['output']['nonlinear'], xfer_in[sname], inds, group._comm) if has_par_coloring: has_nocolor_xfers = group._comm.allreduce(has_nocolor_xfers) if has_nocolor_xfers: full_xfer_in, full_xfer_out = _setup_index_views(total_size_nocolor, xfer_in_nocolor, xfer_out_nocolor) transfers[(None, '@nocolor')] = PETScTransfer(vectors['input']['nonlinear'], vectors['output']['nonlinear'], full_xfer_in, full_xfer_out, group._comm) for sname, inds in xfer_out_nocolor.items(): transfers[(sname, '@nocolor')] = \ PETScTransfer(vectors['input']['nonlinear'], vectors['output']['nonlinear'], xfer_in_nocolor[sname], inds, group._comm) return transfers def _transfer(self, in_vec, out_vec, mode='fwd'): """ Perform transfer. Parameters ---------- in_vec : <Vector> pointer to the input vector. out_vec : <Vector> pointer to the output vector. mode : str 'fwd' or 'rev'. """ flag = False if mode == 'rev': flag = True in_vec, out_vec = out_vec, in_vec in_petsc = in_vec._petsc out_petsc = out_vec._petsc # For Complex Step, need to disassemble real and imag parts, transfer them separately, # then reassemble them. if in_vec._under_complex_step and out_vec._alloc_complex: # Real in_petsc.array = in_vec._data.real out_petsc.array = out_vec._data.real self._scatter(out_petsc, in_petsc, addv=flag, mode=flag) # Imaginary in_petsc_imag = in_vec._imag_petsc out_petsc_imag = out_vec._imag_petsc in_petsc_imag.array = in_vec._data.imag out_petsc_imag.array = out_vec._data.imag self._scatter(out_petsc_imag, in_petsc_imag, addv=flag, mode=flag) in_vec._data[:] = in_petsc.array + in_petsc_imag.array * 1j else: # Anything that has been allocated complex requires an additional step because # the petsc vector does not directly reference the _data. if in_vec._alloc_complex: in_petsc.array = in_vec._get_data() if out_vec._alloc_complex: out_petsc.array = out_vec._get_data() self._scatter(out_petsc, in_petsc, addv=flag, mode=flag) if in_vec._alloc_complex: data = in_vec._get_data() data[:] = in_petsc.array
def _merge(inds_list, tot_size): """ Convert a list of indices and/or ranges into an array. Parameters ---------- inds_list : list of ranges or ndarrays List of indices. tot_size : int Total size of the indices in the list. Returns ------- ndarray Array of indices. """ if inds_list: arr = np.empty(tot_size, dtype=INT_DTYPE) start = end = 0 for inds in inds_list: end += len(inds) arr[start:end] = inds start = end return arr return _empty_idx_array def _get_output_inds(group, abs_out, abs_in): owner = group._owning_rank[abs_out] meta_in = group._var_abs2meta['input'][abs_in] out_dist = group._var_allprocs_abs2meta['output'][abs_out]['distributed'] in_dist = meta_in['distributed'] src_indices = meta_in['src_indices'] rank = group.comm.rank if abs_out in group._var_abs2meta['output'] else owner out_idx = group._var_allprocs_abs2idx[abs_out] offsets = group._get_var_offsets()['output'][:, out_idx] sizes = group._var_sizes['output'][:, out_idx] if src_indices is None: orig_src_inds = src_indices else: src_indices = src_indices.shaped_array() orig_src_inds = src_indices if not out_dist and not in_dist: # convert from local to distributed src_indices off = np.sum(sizes[:rank]) if off > 0.: # adjust for local offsets # don't do += to avoid modifying stored value src_indices = src_indices + off # NOTE: src_indices are relative to a single, possibly distributed variable, # while the output_inds that we compute are relative to the full distributed # array that contains all local variables from each rank stacked in rank order. if src_indices is None: if out_dist: # input in this case is non-distributed (else src_indices would be # defined by now). dist output to non-distributed input conns w/o # src_indices are not allowed. raise RuntimeError(f"{group.msginfo}: Can't connect distributed output " f"'{abs_out}' to non-distributed input '{abs_in}' " "without declaring src_indices.", ident=(abs_out, abs_in)) else: offset = offsets[rank] output_inds = range(offset, offset + sizes[rank]) else: output_inds = np.empty(src_indices.size, INT_DTYPE) start = end = 0 for iproc in range(group.comm.size): end += sizes[iproc] if start == end: continue # The part of src on iproc on_iproc = np.logical_and(start <= src_indices, src_indices < end) if np.any(on_iproc): # This converts from global to variable specific ordering output_inds[on_iproc] = src_indices[on_iproc] + (offsets[iproc] - start) start = end return output_inds, orig_src_inds