Note

This feature requires MPI, and may not be able to be run on Colab.

Distributed Variables

At times when you need to perform a computation using large input arrays, you may want to perform that computation in multiple processes, where each process operates on some subset of the input values. This may be done purely for performance reasons, or it may be necessary because the entire input will not fit in the memory of a single machine. In any case, this can be accomplished in OpenMDAO by declaring those inputs and outputs as distributed. By definition, a distributed variable is an input or output where each process contains only a part of the whole variable. Distributed variables are declared by setting the optional “distributed” argument to True when adding the variable to a component. A component that has at least one distributed variable can also be called a distributed component.

Any variable that is not distributed is called a non-distributed variable. When the model is run under MPI, every process contains a copy of the entire variable.

We’ve already seen that by using src_indices, we can connect an input to only a subset of an output variable. By giving different values for src_indices in each MPI process, we can distribute computations on a distributed output across the processes. All of the scenarios that involve connecting distributed and non-distributed variables are detailed in Connections involving distributed variables.

Example: Simple Component with Distributed Input and Output

The following example shows how to create a simple component, SimpleDistrib, that takes a distributed variable as an input and computes a distributed output. The calculation is divided across the available processes, but the details of that division are not contained in the component. In fact, the input is sized based on it’s connected source using the “shape_by_conn” argument.

%%px 

import numpy as np

import openmdao.api as om


class SimpleDistrib(om.ExplicitComponent):

    def setup(self):

        # Distributed Input
        self.add_input('in_dist', shape_by_conn=True, distributed=True)

        # Distributed Output
        self.add_output('out_dist', copy_shape='in_dist', distributed=True)

    def compute(self, inputs, outputs):
        x = inputs['in_dist']

        # "Computationally Intensive" operation that we wish to parallelize.
        f_x = x**2 - 2.0*x + 4.0

        outputs['out_dist'] = f_x

In the next part of the example, we take the SimpleDistrib component, place it into a model, and run it. Suppose the vector of data we want to process has 7 elements. We have 4 processors available for computation, so if we distribute them as evenly as we can, 3 procs can handle 2 elements each, and the 4th processor can pick up the last one. OpenMDAO’s utilities includes the evenly_distrib_idxs function which computes the sizes and offsets for all ranks. The sizes are used to determine how much of the array to allocate on any specific rank. The offsets are used to figure out where the local portion of the array starts, and in this example, is used to set the initial value properly. In this case, the initial value for the full distributed input “in_dist” is a vector of 7 values between 3.0 and 9.0, and each processor has a 1 or 2 element piece of it.

%%px

from openmdao.utils.array_utils import evenly_distrib_idxs
from openmdao.utils.mpi import MPI

size = 7

if MPI:
    comm = MPI.COMM_WORLD
    rank = comm.rank
    sizes, offsets = evenly_distrib_idxs(comm.size, size)
else:
    # When running without MPI, the entire variable is on one proc.
    rank = 0
    sizes = {rank : size}
    offsets = {rank : 0}

prob = om.Problem()
model = prob.model

# Create a distributed source for the distributed input.
ivc = om.IndepVarComp()
ivc.add_output('x_dist', np.zeros(sizes[rank]), distributed=True)

model.add_subsystem("indep", ivc)
model.add_subsystem("D1", SimpleDistrib())

model.connect('indep.x_dist', 'D1.in_dist')

prob.setup()

# Set initial values of distributed variable.
x_dist_init = 3.0 + np.arange(size)[offsets[rank]:offsets[rank] + sizes[rank]]
prob.set_val('indep.x_dist', x_dist_init)

prob.run_model()

# Values on each rank.
for var in ['indep.x_dist', 'D1.out_dist']:
    print(var, prob.get_val(var))
    
# Full gathered values.
for var in ['indep.x_dist', 'D1.out_dist']:
    print(var, prob.get_val(var, get_remote=True))
print('')
[stdout:0] 
indep.x_dist [3. 4.]
D1.out_dist [ 7. 12.]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
D1.out_dist [ 7. 12. 19. 28. 39. 52. 67.]

[stdout:1] 
indep.x_dist [5. 6.]
D1.out_dist [19. 28.]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
D1.out_dist [ 7. 12. 19. 28. 39. 52. 67.]

[stdout:2] 
indep.x_dist [7. 8.]
D1.out_dist [39. 52.]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
D1.out_dist [ 7. 12. 19. 28. 39. 52. 67.]

[stdout:3] 
indep.x_dist [9.]
D1.out_dist [67.]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
D1.out_dist [ 7. 12. 19. 28. 39. 52. 67.]

Note that we created a connection source ‘x_dist’ that passes its value to ‘D1.in_dist’. OpenMDAO requires a source for non-constant inputs, and usually creates one automatically as an output of a component referred to as an ‘Auto-IVC’. However, the automatic creation is not supported for distributed variables. We must manually create an IndepVarComp and connect it to our input.

When using distributed variables, OpenMDAO can’t always size the component inputs based on the shape of the connected source. In this example, the component determines its own split using evenly_distrib_idxs. This requires that the component know the full vector size, which is passed in via the option ‘vec_size’.

%%px

import numpy as np

import openmdao.api as om
from openmdao.utils.array_utils import evenly_distrib_idxs
from openmdao.utils.mpi import MPI

class SimpleDistrib(om.ExplicitComponent):

    def initialize(self):
        self.options.declare('vec_size', types=int, default=1,
                             desc="Total size of vector.")

    def setup(self):
        comm = self.comm
        rank = comm.rank

        size = self.options['vec_size']
        sizes, _ = evenly_distrib_idxs(comm.size, size)
        mysize = sizes[rank]

        # Distributed Input
        self.add_input('in_dist', np.ones(mysize, float), distributed=True)

        # Distributed Output
        self.add_output('out_dist', np.ones(mysize, float), distributed=True)

    def compute(self, inputs, outputs):
        x = inputs['in_dist']

        # "Computationally Intensive" operation that we wish to parallelize.
        f_x = x**2 - 2.0*x + 4.0

        outputs['out_dist'] = f_x


size = 7

if MPI:
    comm = MPI.COMM_WORLD
    rank = comm.rank
    sizes, offsets = evenly_distrib_idxs(comm.size, size)
else:
    # When running without MPI, the entire variable is on one proc.
    rank = 0
    sizes = {rank : size}
    offsets = {rank : 0}

prob = om.Problem()
model = prob.model

# Create a distributed source for the distributed input.
ivc = om.IndepVarComp()
ivc.add_output('x_dist', np.zeros(sizes[rank]), distributed=True)

model.add_subsystem("indep", ivc)
model.add_subsystem("D1", SimpleDistrib(vec_size=size))

model.connect('indep.x_dist', 'D1.in_dist')

prob.setup()

# Set initial values of distributed variable.
x_dist_init = 3.0 + np.arange(size)[offsets[rank]:offsets[rank] + sizes[rank]]
prob.set_val('indep.x_dist', x_dist_init)

prob.run_model()

# Values on each rank.
for var in ['indep.x_dist', 'D1.out_dist']:
    print(var, prob.get_val(var))

# Full gathered values.
for var in ['indep.x_dist', 'D1.out_dist']:
    print(var, prob.get_val(var, get_remote=True))
print('')
[stdout:0] 
indep.x_dist [3. 4.]
D1.out_dist [ 7. 12.]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
D1.out_dist [ 7. 12. 19. 28. 39. 52. 67.]

[stdout:1] 
indep.x_dist [5. 6.]
D1.out_dist [19. 28.]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
D1.out_dist [ 7. 12. 19. 28. 39. 52. 67.]

[stdout:2] 
indep.x_dist [7. 8.]
D1.out_dist [39. 52.]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
D1.out_dist [ 7. 12. 19. 28. 39. 52. 67.]

[stdout:3] 
indep.x_dist [9.]
D1.out_dist [67.]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
D1.out_dist [ 7. 12. 19. 28. 39. 52. 67.]

Example: Distributed I/O and a Non-Distributed Input

OpenMDAO supports both non-distributed and distributed I/O on the same component, so in this example, we expand the problem to include a non-distributed input. In this case, the non-distributed input also has a vector width of 7, but those values will be the same on each processor. This non-distributed input is included in the computation by taking the vector sum and adding it to the distributed output.

%%px 

import numpy as np

import openmdao.api as om
from openmdao.utils.array_utils import evenly_distrib_idxs
from openmdao.utils.mpi import MPI


class MixedDistrib1(om.ExplicitComponent):

    def setup(self):

        # Distributed Input
        self.add_input('in_dist', shape_by_conn=True, distributed=True)

        # Non-Distributed Input
        self.add_input('in_nd', shape_by_conn=True)

        # Distributed Output
        self.add_output('out_dist', copy_shape='in_dist', distributed=True)

    def compute(self, inputs, outputs):
        x = inputs['in_dist']
        y = inputs['in_nd']

        # "Computationally Intensive" operation that we wish to parallelize.
        f_x = x**2 - 2.0*x + 4.0

        # This operation is repeated on all procs.
        f_y = y ** 0.5
        
        outputs['out_dist'] = f_x + np.sum(f_y)
        
size = 7

if MPI:
    comm = MPI.COMM_WORLD
    rank = comm.rank
    sizes, offsets = evenly_distrib_idxs(comm.size, size)
else:
    # When running without MPI, the entire variable is on one proc.
    rank = 0
    sizes = {rank : size}
    offsets = {rank : 0}

prob = om.Problem()
model = prob.model

# Create a distributed source for the distributed input.
ivc = om.IndepVarComp()
ivc.add_output('x_dist', np.zeros(sizes[rank]), distributed=True)
ivc.add_output('x_nd', np.zeros(size))

model.add_subsystem("indep", ivc)
model.add_subsystem("D1", MixedDistrib1())

model.connect('indep.x_dist', 'D1.in_dist')
model.connect('indep.x_nd', 'D1.in_nd')

prob.setup()

# Set initial values of distributed variable.
x_dist_init = 3.0 + np.arange(size)[offsets[rank]:offsets[rank] + sizes[rank]]
prob.set_val('indep.x_dist', x_dist_init)

# Set initial values of non-distributed variable.
x_nd_init = 1.0 + 2.0*np.arange(size)
prob.set_val('indep.x_nd', x_nd_init)

prob.run_model()

# Values on each rank.
for var in ['indep.x_dist', 'indep.x_nd', 'D1.out_dist']:
    print(var, prob.get_val(var))
    
# Full gathered values.
for var in ['indep.x_dist', 'indep.x_nd', 'D1.out_dist']:
    print(var, prob.get_val(var, get_remote=True))
print('')
[stdout:0] 
indep.x_dist [3. 4.]
indep.x_nd [ 1.  3.  5.  7.  9. 11. 13.]
D1.out_dist [24.53604616 29.53604616]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
indep.x_nd [ 1.  3.  5.  7.  9. 11. 13.]
D1.out_dist [24.53604616 29.53604616 36.53604616 45.53604616 56.53604616 69.53604616
 84.53604616]

[stdout:1] 
indep.x_dist [5. 6.]
indep.x_nd [ 1.  3.  5.  7.  9. 11. 13.]
D1.out_dist [36.53604616 45.53604616]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
indep.x_nd [ 1.  3.  5.  7.  9. 11. 13.]
D1.out_dist [24.53604616 29.53604616 36.53604616 45.53604616 56.53604616 69.53604616
 84.53604616]

[stdout:2] 
indep.x_dist [7. 8.]
indep.x_nd [ 1.  3.  5.  7.  9. 11. 13.]
D1.out_dist [56.53604616 69.53604616]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
indep.x_nd [ 1.  3.  5.  7.  9. 11. 13.]
D1.out_dist [24.53604616 29.53604616 36.53604616 45.53604616 56.53604616 69.53604616
 84.53604616]

[stdout:3] 
indep.x_dist [9.]
indep.x_nd [ 1.  3.  5.  7.  9. 11. 13.]
D1.out_dist [84.53604616]
indep.x_dist [3. 4. 5. 6. 7. 8. 9.]
indep.x_nd [ 1.  3.  5.  7.  9. 11. 13.]
D1.out_dist [24.53604616 29.53604616 36.53604616 45.53604616 56.53604616 69.53604616
 84.53604616]

Example: Distributed I/O and a Non-Distributed Output

You can also create a component with a non-distributed output and distributed outputs and inputs. This situation tends to be more tricky and usually requires you to performe some MPI operations in your component’s run method. If the non-distributed output is only a function of the non-distributed inputs, then you can handle that variable just like you do on any other component. However, this example extends the previous component to include a non-distributed output that is a function of both the non-distributed and distributed inputs. In this case, it’s a function of the sum of the square root of each element in the full distributed vector. Since the data is not all on any local processor, we use an MPI operation, in this case Allreduce, to make a summation across the distributed vector, and gather the answer back to each processor. The MPI operation and your implementation will vary, but consider this to be a general example.

Note

In this example, we introduce a new component called an IndepVarComp. If you used OpenMDAO prior to version 3.2, then you are familiar with this component. It is used to define an independent variable.

You usually do not have to define these because OpenMDAO defines and uses them automatically for all unconnected inputs in your model. This automatically-created IndepVarComp is called an Auto-IVC.

However, when we define a distributed input, we often use the “src_indices” attribute to determine the allocation of that input to the processors that the component sees. For some sets of these indices, it isn’t possible to easily determine the full size of the corresponding independent variable, and the IndepVarComp cannot be created automatically. So, for unconnected inputs on a distributed component, you must manually create one, as we did in this example.

Derivatives with Distributed Variables

In the following examples, we show how to add analytic derivatives to the distributed examples given above. In most cases it is straighforward, but when you have a non-distributed output and a distributed input, the matrix-free format is required.

Derivatives: Distributed I/O and a Non-Distributed Input

In this example, we have a distributed input, a distributed output, and a non-distributed input. The derivative of ‘out_dist’ with respect to ‘in_dist’ has a diagonal Jacobian, so we use sparse declaration and each processor gives declare_partials the local number of rows and columns. The derivatives are verified against complex step using check_totals since our component is complex-safe.

%%px 

import numpy as np

import openmdao.api as om
from openmdao.utils.array_utils import evenly_distrib_idxs
from openmdao.utils.mpi import MPI


class MixedDistrib1(om.ExplicitComponent):

    def setup(self):

        # Distributed Input
        self.add_input('in_dist', shape_by_conn=True, distributed=True)

        # Non-Distributed Input
        self.add_input('in_nd', shape_by_conn=True)

        # Distributed Output
        self.add_output('out_dist', copy_shape='in_dist', distributed=True)

    def setup_partials(self):
        meta = self.get_io_metadata(metadata_keys=['shape'])
        local_size = meta['in_dist']['shape'][0]

        row_col_d = np.arange(local_size)

        self.declare_partials('out_dist', 'in_dist', rows=row_col_d, cols=row_col_d)
        self.declare_partials('out_dist', 'in_nd')

    def compute(self, inputs, outputs):
        x = inputs['in_dist']
        y = inputs['in_nd']

        # "Computationally Intensive" operation that we wish to parallelize.
        f_x = x**2 - 2.0*x + 4.0

        # This operation is repeated on all procs.
        f_y = y ** 0.5

        outputs['out_dist'] = f_x + np.sum(f_y)

    def compute_partials(self, inputs, partials):
        x = inputs['in_dist']
        y = inputs['in_nd']
        size = len(y)
        local_size = len(x)

        partials['out_dist', 'in_dist'] = 2.0 * x - 2.0

        df_dy = 0.5 / y ** 0.5
        partials['out_dist', 'in_nd'] = np.tile(df_dy, local_size).reshape((local_size, size))


size = 7

if MPI:
    comm = MPI.COMM_WORLD
    rank = comm.rank
    sizes, offsets = evenly_distrib_idxs(comm.size, size)
else:
    # When running without MPI, the entire variable is on one proc.
    rank = 0
    sizes = {rank : size}
    offsets = {rank : 0}

prob = om.Problem()
model = prob.model

# Create a distributed source for the distributed input.
ivc = om.IndepVarComp()
ivc.add_output('x_dist', np.zeros(sizes[rank]), distributed=True)
ivc.add_output('x_nd', np.zeros(size))

model.add_subsystem("indep", ivc)
model.add_subsystem("D1", MixedDistrib1())

model.connect('indep.x_dist', 'D1.in_dist')
model.connect('indep.x_nd', 'D1.in_nd')

model.add_design_var('indep.x_nd')
model.add_design_var('indep.x_dist')
model.add_objective('D1.out_dist')

prob.setup(force_alloc_complex=True)

# Set initial values of distributed variable.
x_dist_init = 3.0 + np.arange(size)[offsets[rank]:offsets[rank] + sizes[rank]]
prob.set_val('indep.x_dist', x_dist_init)

# Set initial values of non-distributed variable.
x_nd_init = 1.0 + 2.0*np.arange(size)
prob.set_val('indep.x_nd', x_nd_init)

prob.run_model()

if rank > 0:
    prob.check_totals(method='cs', out_stream=None)
else:
    prob.check_totals(method='cs')
[stdout:0] 
-------------------------
Group: Group 'Full Model'
-------------------------
  Full Model: 'D1.out_dist' wrt 'indep.x_dist'
    Analytic Magnitude: 2.849561e+01
          Fd Magnitude: 2.849561e+01 (cs:None)
    Absolute Error (Jan - Jfd) : 0.000000e+00

    Relative Error (Jan - Jfd) / Jfd : 0.000000e+00
    MPI Rank 0

    Raw Analytic Derivative (Jfor)
[[ 4.  0.  0.  0.  0.  0.  0.]
 [ 0.  6.  0.  0.  0.  0.  0.]
 [ 0.  0.  8.  0.  0.  0.  0.]
 [ 0.  0.  0. 10.  0.  0.  0.]
 [ 0.  0.  0.  0. 12.  0.  0.]
 [ 0.  0.  0.  0.  0. 14.  0.]
 [ 0.  0.  0.  0.  0.  0. 16.]]

    Raw FD Derivative (Jfd)
[[ 4.  0.  0.  0.  0.  0.  0.]
 [ 0.  6.  0.  0.  0.  0.  0.]
 [ 0.  0.  8.  0.  0.  0.  0.]
 [ 0.  0.  0. 10.  0.  0.  0.]
 [ 0.  0.  0.  0. 12.  0.  0.]
 [ 0.  0.  0.  0.  0. 14.  0.]
 [ 0.  0.  0.  0.  0.  0. 16.]]

 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  Full Model: 'D1.out_dist' wrt 'indep.x_nd'
    Analytic Magnitude: 1.849725e+00
          Fd Magnitude: 1.849725e+00 (cs:None)
    Absolute Error (Jan - Jfd) : 1.642042e-16

    Relative Error (Jan - Jfd) / Jfd : 8.877220e-17
    MPI Rank 0

    Raw Analytic Derivative (Jfor)
[[0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]]

    Raw FD Derivative (Jfd)
[[0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]]

 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

Derivatives: Distributed I/O and a Non-Distributed Output

If you have a component with distributed inputs and a non-distributed output, then the standard compute_partials API will not work for specifying the derivatives. You will need to use the matrix-free API with compute_jacvec_product, which is described in the feature document for ExplicitComponent

Computing the matrix-vector product for the derivative of the non-distributed output with respect to a distributed input will require you to use MPI operations to gather the required parts of the Jacobian to all processors. When computing the matrix-vector product in forward mode, the contribution from each processor must be added together so that the result is the same on every rank. This is done with the Allreduce operation. When computing the matrix-vector product in reverse mode, an Allreduce is also needed to gather the contents of the non-distributed d_outputs vector. This is not intuitive, particularly because non-distributed variables contain the same data on all processors. However, the reverse-mode derivatives vectors are an exception to this rule. Further explanation can be found in the theory manual section Using OpenMDAO with MPI.

The following example shows how to implement derivatives on the earlier MixedDistrib2 component.

%%px


import numpy as np

import openmdao.api as om
from openmdao.utils.array_utils import evenly_distrib_idxs
from openmdao.utils.mpi import MPI


class MixedDistrib2(om.ExplicitComponent):

    def setup(self):

        # Distributed Input
        self.add_input('in_dist', shape_by_conn=True, distributed=True)

        # Non-Distributed Input
        self.add_input('in_nd', shape_by_conn=True)

        # Distributed Output
        self.add_output('out_dist', copy_shape='in_dist', distributed=True)

        # Non-Distributed Output
        self.add_output('out_nd', copy_shape='in_nd')

    def compute(self, inputs, outputs):
        x = inputs['in_dist']
        y = inputs['in_nd']

        # "Computationally Intensive" operation that we wish to parallelize.
        f_x = x**2 - 2.0*x + 4.0

        # These operations are repeated on all procs.
        f_y = y ** 0.5
        g_y = y**2 + 3.0*y - 5.0

        # Compute square root of our portion of the distributed input.
        g_x = x ** 0.5

        # Distributed output
        outputs['out_dist'] = f_x + np.sum(f_y)

        # Non-Distributed output
        if MPI and comm.size > 1:

            # We need to gather the summed values to compute the total sum over all procs.
            local_sum = np.array(np.sum(g_x))
            total_sum = local_sum.copy()
            self.comm.Allreduce(local_sum, total_sum, op=MPI.SUM)
            outputs['out_nd'] = g_y + total_sum
        else:
            # Recommended to make sure your code can run without MPI too, for testing.
            outputs['out_nd'] = g_y + np.sum(g_x)

    def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode):
        x = inputs['in_dist']
        y = inputs['in_nd']

        df_dx = 2.0 * x - 2.0
        df_dy = 0.5 / y ** 0.5
        dg_dx = 0.5 / x ** 0.5
        dg_dy = 2.0 * y + 3.0

        nx = len(x)
        ny = len(y)

        if mode == 'fwd':
            if 'out_dist' in d_outputs:
                if 'in_dist' in d_inputs:
                    d_outputs['out_dist'] += df_dx * d_inputs['in_dist']
                    
                if 'in_nd' in d_inputs:
                    d_outputs['out_dist'] += np.tile(df_dy, nx).reshape((nx, ny)).dot(d_inputs['in_nd'])
                    
            if 'out_nd' in d_outputs:
                if 'in_dist' in d_inputs:
                    deriv = np.tile(dg_dx, ny).reshape((ny, nx)).dot(d_inputs['in_dist'])
                    if MPI and self.comm.size > 1:
                        # In Fwd, allreduce the result of the dot product with the subjac.
                        # Allocate buffer of same size and dtype for storing the result.
                        deriv_sum = np.zeros_like(deriv)
                        self.comm.Allreduce(deriv, deriv_sum, op=MPI.SUM)
                        d_outputs['out_nd'] += deriv_sum
                    else:
                        # Recommended to make sure your code can run without MPI too, for testing.
                        d_outputs['out_nd'] += deriv
                        
                if 'in_nd' in d_inputs:
                    d_outputs['out_nd'] += dg_dy * d_inputs['in_nd']

        else:
            if 'out_dist' in d_outputs:
                if 'in_dist' in d_inputs:
                    d_inputs['in_dist'] += df_dx * d_outputs['out_dist']
                    
                if 'in_nd' in d_inputs:
                    d_inputs['in_nd'] += np.tile(df_dy, nx).reshape((nx, ny)).T.dot(d_outputs['out_dist'])

            if 'out_nd' in d_outputs:
                if 'out_nd' in d_outputs:
                    if 'in_dist' in d_inputs:
                        if MPI and self.comm.size > 1:
                            # In Rev, allreduce the serial derivative vector before the dot product.
                            # Allocate buffer of same size and dtype for storing the result.
                            full = np.zeros_like(d_outputs['out_nd'])
                            self.comm.Allreduce(d_outputs['out_nd'], full, op=MPI.SUM)
                        else:
                            # Recommended to make sure your code can run without MPI too, for testing.
                            full = d_outputs['out_nd']
                            
                        d_inputs['in_dist'] += np.tile(dg_dx, ny).reshape((ny, nx)).T.dot(full)
                        
                if 'in_nd' in d_inputs:
                    d_inputs['in_nd'] += dg_dy * d_outputs['out_nd']


size = 7

if MPI:
    comm = MPI.COMM_WORLD
    rank = comm.rank
    sizes, offsets = evenly_distrib_idxs(comm.size, size)
else:
    # When running without MPI, the entire variable is on one proc.
    rank = 0
    sizes = {rank : size}
    offsets = {rank : 0}

prob = om.Problem()
model = prob.model

# Create a distributed source for the distributed input.
ivc = om.IndepVarComp()
ivc.add_output('x_dist', np.zeros(sizes[rank]), distributed=True)
ivc.add_output('x_nd', np.zeros(size))

model.add_subsystem("indep", ivc)
model.add_subsystem("D1", MixedDistrib2())

model.connect('indep.x_dist', 'D1.in_dist')
model.connect('indep.x_nd', 'D1.in_nd')

model.add_design_var('indep.x_nd')
model.add_design_var('indep.x_dist')
model.add_constraint('D1.out_dist', lower=0.0)
model.add_constraint('D1.out_nd', lower=0.0)

prob.setup(force_alloc_complex=True)

# Set initial values of distributed variable.
x_dist_init = 3.0 + np.arange(size)[offsets[rank]:offsets[rank] + sizes[rank]]
prob.set_val('indep.x_dist', x_dist_init)

# Set initial values of non-distributed variable.
x_nd_init = 1.0 + 2.0*np.arange(size)
prob.set_val('indep.x_nd', x_nd_init)

prob.run_model()

if rank > 0:
    prob.check_totals(method='cs', out_stream=None)
else:
    prob.check_totals(method='cs')
[stdout:0] 
-------------------------
Group: Group 'Full Model'
-------------------------
  Full Model: 'D1.out_dist' wrt 'indep.x_dist'
    Analytic Magnitude: 2.849561e+01
          Fd Magnitude: 2.849561e+01 (cs:None)
    Absolute Error (Jan - Jfd) : 0.000000e+00

    Relative Error (Jan - Jfd) / Jfd : 0.000000e+00
    MPI Rank 0

    Raw Analytic Derivative (Jfor)
[[ 4. -0. -0. -0. -0. -0. -0.]
 [-0.  6. -0. -0. -0. -0. -0.]
 [-0. -0.  8. -0. -0. -0. -0.]
 [-0. -0. -0. 10. -0. -0. -0.]
 [-0. -0. -0. -0. 12. -0. -0.]
 [-0. -0. -0. -0. -0. 14. -0.]
 [-0. -0. -0. -0. -0. -0. 16.]]

    Raw FD Derivative (Jfd)
[[ 4.  0.  0.  0.  0.  0.  0.]
 [ 0.  6.  0.  0.  0.  0.  0.]
 [ 0.  0.  8.  0.  0.  0.  0.]
 [ 0.  0.  0. 10.  0.  0.  0.]
 [ 0.  0.  0.  0. 12.  0.  0.]
 [ 0.  0.  0.  0.  0. 14.  0.]
 [ 0.  0.  0.  0.  0.  0. 16.]]

 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  Full Model: 'D1.out_dist' wrt 'indep.x_nd'
    Analytic Magnitude: 1.849725e+00
          Fd Magnitude: 1.849725e+00 (cs:None)
    Absolute Error (Jan - Jfd) : 1.642042e-16

    Relative Error (Jan - Jfd) / Jfd : 8.877220e-17
    MPI Rank 0

    Raw Analytic Derivative (Jfor)
[[0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]]

    Raw FD Derivative (Jfd)
[[0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]
 [0.5        0.28867513 0.2236068  0.18898224 0.16666667 0.15075567
  0.13867505]]

 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  Full Model: 'D1.out_nd' wrt 'indep.x_dist'
    Analytic Magnitude: 1.525023e+00
          Fd Magnitude: 1.525023e+00 (cs:None)
    Absolute Error (Jan - Jfd) : 1.798767e-16

    Relative Error (Jan - Jfd) / Jfd : 1.179502e-16
    MPI Rank 0

    Raw Analytic Derivative (Jfor)
[[0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]]

    Raw FD Derivative (Jfd)
[[0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]
 [0.28867513 0.25       0.2236068  0.20412415 0.18898224 0.1767767
  0.16666667]]

 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  Full Model: 'D1.out_nd' wrt 'indep.x_nd'
    Analytic Magnitude: 4.970915e+01
          Fd Magnitude: 4.970915e+01 (cs:None)
    Absolute Error (Jan - Jfd) : 3.552714e-15

    Relative Error (Jan - Jfd) / Jfd : 7.147001e-17
    MPI Rank 0

    Raw Analytic Derivative (Jfor)
[[ 5. -0. -0. -0. -0. -0. -0.]
 [-0.  9. -0. -0. -0. -0. -0.]
 [-0. -0. 13. -0. -0. -0. -0.]
 [-0. -0. -0. 17. -0. -0. -0.]
 [-0. -0. -0. -0. 21. -0. -0.]
 [-0. -0. -0. -0. -0. 25. -0.]
 [-0. -0. -0. -0. -0. -0. 29.]]

    Raw FD Derivative (Jfd)
[[ 5.  0.  0.  0.  0.  0.  0.]
 [ 0.  9.  0.  0.  0.  0.  0.]
 [ 0.  0. 13.  0.  0.  0.  0.]
 [ 0.  0.  0. 17.  0.  0.  0.]
 [ 0.  0.  0.  0. 21.  0.  0.]
 [ 0.  0.  0.  0.  0. 25.  0.]
 [ 0.  0.  0.  0.  0.  0. 29.]]

 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -