Source code for openmdao.utils.concurrent

"""
Utilities for submitting function evaluations under MPI.
"""
import os
import traceback
from itertools import chain, islice

from openmdao.utils.mpi import debug

trace = os.environ.get('OPENMDAO_TRACE')


[docs]def concurrent_eval_lb(func, cases, comm, broadcast=False): """ Evaluate function on multiple processors with load balancing. Runs a load balanced version of the given function, with the master rank (0) sending a new case to each worker rank as soon as it has finished its last case. Parameters ---------- func : function The function to execute in workers. cases : collection of function args Entries are assumed to be of the form (args, kwargs) where kwargs are allowed to be None and args should be a list or tuple. comm : MPI communicator or None The MPI communicator that is shared between the master and workers. If None, the function will be executed serially. broadcast : bool(False) If True, the results will be broadcast out to the worker procs so that the return value of concurrent_eval_lb will be the full result list in every process. Returns ------- object Return from function. """ if comm is not None: if comm.rank == 0: # master rank if trace: debug('Running Master Rank') results = _concurrent_eval_lb_master(cases, comm) if trace: debug('Master Rank Complete') else: if trace: debug('Running Worker Rank %d' % comm.rank) results = _concurrent_eval_lb_worker(func, comm) if trace: debug('Running Worker Rank %d Complete' % comm.rank) if broadcast: results = comm.bcast(results, root=0) else: # serial execution results = [] for args, kwargs in cases: try: if kwargs: retval = func(*args, **kwargs) else: retval = func(*args) except Exception: err = traceback.format_exc() retval = None else: err = None results.append((retval, err)) return results
def _concurrent_eval_lb_master(cases, comm): """ Coordinate worker processes. This runs only on rank 0. It sends cases to all of the workers and collects their results. Parameters ---------- cases : collection of function args Entries are assumed to be of the form (args, kwargs) where kwargs are allowed to be None and args should be a list or tuple. comm : MPI communicator or None The MPI communicator that is shared between the master and workers. If None, the function will be executed serially. Returns ------- object Return from function. """ received = 0 sent = 0 results = [] case_iter = iter(cases) # seed the workers for i in range(1, comm.size): try: case = next(case_iter) except StopIteration: break if trace: debug('Master sending case', i) comm.send(case, i, tag=1) if trace: debug('Master sent case', i) sent += 1 # send the rest of the cases if sent > 0: while True: # wait for any worker to finish worker, retval, err = comm.recv(tag=2) received += 1 # store results results.append((retval, err)) # don't stop until we hear back from every worker process # we sent a case to if received == sent: break try: case = next(case_iter) except StopIteration: pass else: # send new case to the last worker that finished comm.send(case, worker, tag=1) sent += 1 # tell all workers to stop for rank in range(1, comm.size): comm.send((None, None), rank, tag=1) return results def _concurrent_eval_lb_worker(func, comm): while True: # wait on a case from the master if trace: debug('Worker Waiting on Case') args, kwargs = comm.recv(source=0, tag=1) if trace: debug('Worker Case Received') if args is None: # we're done break try: if kwargs: retval = func(*args, **kwargs) else: retval = func(*args) except Exception: err = traceback.format_exc() retval = None else: err = None # tell the master we're done with that case comm.send((comm.rank, retval, err), 0, tag=2)
[docs]def concurrent_eval(func, cases, comm, allgather=False, model_mpi=None): """ Run the given function concurrently on all procs in the communicator. NOTE : This function should NOT be used if the concurrent function makes any internal collective MPI calls. Parameters ---------- func : function The function to execute in workers. cases : iter of function args Entries are assumed to be of the form (args, kwargs) where kwargs are allowed to be None and args should be a list or tuple. comm : MPI communicator or None The MPI communicator that is shared between the master and workers. If None, the function will be executed serially. allgather : bool(False) If True, the results will be allgathered to all procs in the comm. Otherwise, results will be gathered to rank 0 only. model_mpi : None or tuple If the function in func runs in parallel, then this will be a tuple containing the total number of cases to evaluate concurrently, and the color of the cases to evaluate on this rank. Returns ------- object Return from function. """ results = [] if comm is None: it = cases else: rank = comm.rank if model_mpi is not None: size, color = model_mpi it = islice(cases, color, None, size) else: it = islice(cases, rank, None, comm.size) for args, kwargs in it: try: if kwargs: retval = func(*args, **kwargs) else: retval = func(*args) except Exception: err = traceback.format_exc() retval = None else: err = None results.append((retval, err)) if comm is not None: if allgather: allresults = comm.allgather(results) # Limit results to 1 set from each color. if model_mpi is not None: allresults = allresults[:size] results = list(chain(*allresults)) else: allresults = comm.gather(results, root=0) if comm.rank == 0: # Limit results to 1 set from each color. if model_mpi is not None: allresults = allresults[:size] results = list(chain(*allresults)) else: results = None return results