{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"active-ipynb",
"remove-input",
"remove-output"
]
},
"outputs": [],
"source": [
"%matplotlib inline\n",
"from ipyparallel import Client, error\n",
"cluster=Client(profile=\"mpi\")\n",
"view=cluster[:]\n",
"view.block=True\n",
"\n",
"try:\n",
" from openmdao.utils.notebook_utils import notebook_mode\n",
"except ImportError:\n",
" !python -m pip install openmdao[notebooks]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Connections involving distributed variables\n",
"\n",
"The purpose of this section is to describe the behavior you should expect when connecting two variables together when at least one of them is a distributed variable.\n",
"\n",
"## Distributed output to non-distributed input\n",
"Because distributed variables may have different sizes and/or values on different ranks, and non-distributed variables have the same value and size on all ranks, the only way that a distributed to non-distributed connection is allowed is if `src_indices` are specified for the non-distributed input and that the specified `src_indices` are identical on all ranks where the input variable exists. Otherwise the non-distributed inputs could have different values on different ranks, which is illegal. Note that in previous versions of OpenMDAO, the behavior when connecting a distributed output to a non-distributed input when `src_indices` were not specified was that the non-distributed input on each rank will be the same size as the full distributed output. This is no longer the case. In fact, this case is no longer allowed. However, you can still achieve the same behavior by specifying `src_indices` of `om.slicer[:]` on the input, which explicitly specifies that the input on each rank is the same size as the full distributed output. For example:\n",
"\n",
"```python\n",
"group.connect('mydistcomp.out', 'mynondistributed.in', src_indices=om.slicer[:])\n",
"```\n",
"\n",
"## Distributed output to distributed input\n",
"When connecting two distributed variables, you can specify `src_indices` on the input however you choose, or OpenMDAO will automatically assign `src_indices` to the input based on the size of the input in each rank. For example, if the component that owns the input is running on 3 ranks with input sizes of [2, 3, 4], then `src_indices` of [0, 1], [2, 3, 4], and [5, 6, 7, 8] will be specified on the 3 ranks. If, however, the global size of the output does not equal the global size of the input, an exception will be raised saying that OpenMDAO is not able to determine the `src_indices`.\n",
"\n",
"## Non-distributed output to distributed input\n",
"These types of connections are deprecated and will become errors in a future release, but for now, you are allowed to connect a non-distributed output to a distributed input, provided that the global sizes of input and output are equal and the inputs are the same size on every rank.\n",
"\n",
"## Example\n",
"The following example shows the `MixedDistrib2` component, which contains a non-distributed input, a distributed input, and a distributed output. We show how to connect the non-distributed input and the distributed input to a distributed output."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%px\n",
"\n",
"import numpy as np\n",
"\n",
"import openmdao.api as om\n",
"from openmdao.utils.array_utils import evenly_distrib_idxs\n",
"from openmdao.utils.mpi import MPI\n",
"\n",
"\n",
"class MixedDistrib2(om.ExplicitComponent):\n",
"\n",
" def initialize(self):\n",
" self.options.declare('vec_size', types=int, default=1,\n",
" desc=\"Total size of vector.\")\n",
"\n",
" def setup(self):\n",
" comm = self.comm\n",
" rank = comm.rank\n",
"\n",
" size = self.options['vec_size']\n",
" sizes, _ = evenly_distrib_idxs(comm.size, size)\n",
" mysize = sizes[rank]\n",
"\n",
" # Distributed Input\n",
" self.add_input('in_dist', np.ones(mysize, float), distributed=True)\n",
"\n",
" # Non-Distributed Input\n",
" self.add_input('in_nd', np.ones(size, float))\n",
"\n",
" # Distributed Output\n",
" self.add_output('out_dist', copy_shape='in_dist', distributed=True)\n",
"\n",
" def compute(self, inputs, outputs):\n",
" x = inputs['in_dist']\n",
" y = inputs['in_nd']\n",
"\n",
" # \"Computationally Intensive\" operation that we wish to parallelize.\n",
" f_x = x**2 - 2.0*x + 4.0\n",
"\n",
" # This operation is repeated on all procs.\n",
" f_y = y ** 0.5\n",
"\n",
" outputs['out_dist'] = f_x + np.sum(f_y)\n",
"\n",
"size = 7\n",
"\n",
"if MPI:\n",
" comm = MPI.COMM_WORLD\n",
" rank = comm.rank\n",
" sizes, offsets = evenly_distrib_idxs(comm.size, size)\n",
"else:\n",
" # When running without MPI, the entire variable is on one proc.\n",
" rank = 0\n",
" sizes = {rank : size}\n",
" offsets = {rank : 0}\n",
"\n",
"prob = om.Problem()\n",
"model = prob.model\n",
"\n",
"# Create a distributed source for the distributed input.\n",
"ivc = om.IndepVarComp()\n",
"ivc.add_output('x_dist', np.zeros(sizes[rank]), distributed=True)\n",
"ivc.add_output('x_nd', np.zeros(size))\n",
"\n",
"model.add_subsystem(\"indep\", ivc)\n",
"model.add_subsystem(\"D1\", MixedDistrib2(vec_size=size))\n",
"model.add_subsystem(\"D2\", MixedDistrib2(vec_size=size))\n",
"\n",
"model.connect('indep.x_dist', 'D1.in_dist')\n",
"model.connect('indep.x_nd', 'D1.in_nd')\n",
"\n",
"# Distributed output to distributed input.\n",
"model.connect('D1.out_dist', 'D2.in_dist')\n",
"\n",
"# Distributed output to non-distributed input.\n",
"# The Slicer is used to select all elements in D1.out_dist.\n",
"model.connect('D1.out_dist', 'D2.in_nd', src_indices=om.slicer[:])\n",
"\n",
"\n",
"prob.setup(force_alloc_complex=True)\n",
"\n",
"# Set initial values of distributed variable.\n",
"x_dist_init = 3.0 + np.arange(size)[offsets[rank]:offsets[rank] + sizes[rank]]\n",
"prob.set_val('indep.x_dist', x_dist_init)\n",
"\n",
"# Set initial values of non-distributed variable.\n",
"x_nd_init = 1.0 + 2.0*np.arange(size)\n",
"prob.set_val('indep.x_nd', x_nd_init)\n",
"\n",
"prob.run_model()\n",
"\n",
"# Values on each rank.\n",
"for var in ['D1.out_dist', 'D2.in_dist']:\n",
" print(var, prob.get_val(var))\n",
" \n",
"# Note: to get the value of a non-distributed input that is connected to a distributed output, you \n",
"# need to set get_remote to True.\n",
"print('D2.in_nd', prob.get_val('D2.in_nd', get_remote=True))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"remove-input",
"remove-output"
]
},
"outputs": [],
"source": [
"%%px\n",
"from openmdao.utils.assert_utils import assert_near_equal\n",
"\n",
"assert_near_equal(prob.get_val('D2.in_nd', get_remote=True), np.array([24.53604616, 29.53604616, 36.53604616, 45.53604616, 56.53604616, 69.53604616, 84.53604616]), 1e-6)"
]
}
],
"metadata": {
"celltoolbar": "Tags",
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
},
"orphan": true
},
"nbformat": 4,
"nbformat_minor": 4
}