Source code for openmdao.proc_allocators.proc_allocator

"""Define the base ProcAllocator class."""
import numpy as np
from openmdao.core.constants import INT_DTYPE


[docs]class ProcAllocationError(Exception): """ Exception containing subsystem index information for use at higher levels. Parameters ---------- msg : str The message string. sub_inds : list of int Indices of subsystems in _subsystems_allprocs in parent. Attributes ---------- msg : str The message string. sub_inds : list of int Indices of subsystems in _subsystems_allprocs in parent. """
[docs] def __init__(self, msg, sub_inds=None): """ Initialize all attributes. """ super().__init__(msg) self.msg = msg self.sub_inds = sub_inds
[docs]class ProcAllocator(object): """ Algorithm for allocating processors to a given system's subsystems. Parameters ---------- parallel : bool If True, split subsystem comm. Attributes ---------- parallel : bool True means the comm is split across subsystems; False means the comm is passed to all subsystems. """
[docs] def __init__(self, parallel=False): """ Initialize all attributes. """ self.parallel = parallel
def __call__(self, proc_info, nsubs, comm): """ Perform the allocation if parallel. Parameters ---------- proc_info : list of (min_procs, max_procs, weight) Information used to determine MPI process allocation to subsystems. nsubs : int Number of subsystems. comm : MPI.Comm or <FakeComm> communicator of the owning system. Returns ------- isubs : [int, ...] indices of the owned local subsystems. sub_comm : MPI.Comm or <FakeComm> communicator to pass to the subsystems. sub_proc_range : (int, int) The range of processors that the subcomm owns, among those of comm. """ if self.parallel and comm.size > 1: # This is a parallel group return self._divide_procs(proc_info, comm) else: nproc = comm.size min_procs, max_procs, _, _ = self._split_proc_info(proc_info, comm) if np.any(max_procs < nproc): raise ProcAllocationError("too many MPI procs allocated (%d)" % nproc, np.array(list(range(nsubs)))[max_procs < nproc]) if np.any(min_procs > nproc): raise ProcAllocationError("can't meet min_procs required", np.array(list(range(nsubs)))[min_procs > nproc]) # This is a serial group - all procs get all subsystems return list(range(nsubs)), comm def _split_proc_info(self, proc_info, comm): """ Split proc_info into min_procs, max_procs, and weights. Parameters ---------- proc_info : list of (min_procs, max_procs, weight, proc_group) Information used to determine MPI process allocation to subsystems. comm : MPI.Comm or <FakeComm> communicator of the owning system. Returns ------- ndarray of int Min procs required for each subsystem. ndarray of int Max procs required for each subsystem. ndarray of float Weights for each subsystem. dict of str: list of int Proc groups (if any) and their corresponding proc_info indices. """ nproc = comm.size min_procs = np.array([minp for minp, _, _, _ in proc_info], dtype=INT_DTYPE) # if max_procs entry is None or > nproc, it just becomes nproc max_procs = np.array([nproc if maxp is None or maxp > nproc else maxp for _, maxp, _, _ in proc_info], dtype=INT_DTYPE) weights = np.array([weight for _, _, weight, _ in proc_info]) min_sum = np.sum(min_procs) if self.parallel and nproc > 1: if np.sum(max_procs) < nproc: raise ProcAllocationError("too many MPI procs allocated. Comm is size %d but " "can only use %d." % (nproc, np.sum(max_procs))) if min_sum > nproc and np.any(min_procs > 1): raise ProcAllocationError("can't meet min_procs required because the sum of the " "min procs required exceeds the procs allocated and the " "min procs required is > 1", np.array(list(range(len(proc_info))))[min_procs > 1]) gdict = {} for i, (_, _, _, g) in enumerate(proc_info): if g is None: continue if g in gdict: gdict[g].append(i) else: gdict[g] = [i] for grp, idxs in gdict.items(): min_match, max_match, wmatch, _ = proc_info[idxs[0]] for i in range(1, len(idxs)): mn, mx, w, _ = proc_info[idxs[i]] if mn != min_match or mx != max_match or w != wmatch: raise ProcAllocationError(f"proc_group '{grp}' members do not all have matching" " min_procs, max_procs, and/or proc_weights.") if gdict: # get reduced min_procs, max_procs, and weights based on groups, and compute index map mask = np.ones(len(proc_info), dtype=bool) gimap = {} # maps first index of a group to all group indices for idxs in gdict.values(): # keep first index and zero out the rest gimap[idxs[0]] = idxs for i in range(1, len(idxs)): mask[idxs[i]] = False min_procs = min_procs[mask] max_procs = max_procs[mask] weights = weights[mask] reduced_inds = np.arange(len(proc_info), dtype=int)[mask] rimap = [] for i, ridx in enumerate(reduced_inds): if ridx in gimap: rimap.append(gimap[ridx]) else: rimap.append([ridx]) else: rimap = None return min_procs, max_procs, weights, rimap def _divide_procs(self, proc_info, comm): """ Perform the parallel processor allocation. Parameters ---------- proc_info : list of (min_procs, max_procs, weight) Information used to determine MPI process allocation to subsystems. comm : MPI.Comm or <FakeComm> communicator of the owning system. Returns ------- isubs : [int, ...] indices of the owned local subsystems. sub_comm : MPI.Comm or <FakeComm> communicator to pass to the subsystems. """ pass