Source code for openmdao.proc_allocators.default_allocator

"""Define the DefaultAllocator class."""
import warnings

import numpy as np

from openmdao.proc_allocators.proc_allocator import ProcAllocator, ProcAllocationError
from openmdao.utils.mpi import MPI


[docs] class DefaultAllocator(ProcAllocator): """ Default processor allocator. Parameters ---------- parallel : bool If True, split subsystem comm. """ def _divide_procs(self, proc_info, comm): """ Perform the parallel processor allocation. Parameters ---------- proc_info : list of (min_procs, max_procs, weight) Information used to determine MPI process allocation to subsystems. comm : MPI.Comm or <FakeComm> communicator of the owning System. Returns ------- isubs : [int, ...] indices of the owned local subsystems. sub_comm : MPI.Comm or <FakeComm> communicator to pass to the subsystems. sub_proc_range : (int, int) The range of processors that the subcomm owns, among those of comm. """ iproc = comm.rank nproc = comm.size min_procs, max_procs, proc_weights, rind_map = self._split_proc_info(proc_info, comm) min_sum = np.sum(min_procs) nsubs = len(min_procs) # Define the normalized weights for all subsystems proc_weights /= np.sum(proc_weights) if min_sum > nproc: isubs_list = [[] for ind in range(nproc)] proc_load = np.zeros(nproc) sub_sort_idxs = np.flipud(np.argsort(proc_weights)) vals = proc_weights # Assign the slowest subsystem to the most free processor for isub in sub_sort_idxs: min_loads = np.argsort(proc_load) for i in range(min_procs[isub]): iproc1 = min_loads[i] isubs_list[iproc1].append(isub) proc_load[iproc1] += vals[isub] # Result sub_comm = comm.Split(iproc) if rind_map is None: return sorted(isubs_list[iproc]), sub_comm else: # must map indices back based on reduced isubs_list isubs = [] for ris in isubs_list[iproc]: isubs.extend(rind_map[ris]) return sorted(isubs), sub_comm num_procs = min_procs.copy() if min_sum < nproc: # weighted sums to nproc weighted = proc_weights * nproc # the number of procs expected beyond the min requested weighted_less_min = weighted.astype(int) - min_procs weighted_less_min[weighted_less_min < 0] = 0 if np.sum(weighted_less_min) + min_sum <= nproc: # start with min procs then add what's left over using weights num_procs += weighted_less_min excess_idxs = (max_procs - num_procs) < 0 # limit all procs to their stated max num_procs[excess_idxs] = max_procs[excess_idxs] expected_total = np.sum(num_procs) extras = nproc - expected_total if extras > 0: # we have some extra procs lying around. # give remaining procs such that after each addition we are closest to # desired weights newsum = expected_total eye = np.eye(weighted.size) weighted[:] = proc_weights for i in range(extras): mask = max_procs <= num_procs weighted[mask] = 0.0 weighted *= (1. / np.sum(weighted)) newsum += 1 mat = eye + num_procs mat *= (1. / newsum) mat -= weighted # prevent rows associated with the maxed out subsystems from having the # smallest norm. mat[mask] = 1e99 # zero out columns for maxed out subsystems mat[:, mask] = 0.0 norm = np.linalg.norm(mat, axis=1) # add a proc to a subsystem based on matching closest to desired weights for # the remaining 'active' subsystems. num_procs[np.argmin(norm)] += 1 # Compute the coloring color = np.zeros(nproc, int) start, end = 0, 0 for isub in range(nsubs): end += num_procs[isub] color[start:end] = isub start += num_procs[isub] isub = color[iproc] # Result if rind_map is None: isubs = [isub] else: # must map reduced isub to include all subs in proc_group isubs = rind_map[isub] sub_comm = comm.Split(isub) start = list(color).index(isub) # find lowest matching color return isubs, sub_comm