(mpi-collective)=
# Collective Communication

In the {ref}`mpi-point2point` section, we discussed point-to-point communication, which involves the mutual transfer of data between sender and receiver. This section focuses on a different type of communication â€“ collective communication, where data is simultaneously transmitted among multiple processes within a group. Collective communication only supports blocking modes.

The commonly used collective communications include:

* Synchronization: For example, [`Comm.Barrier`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Barrier).
* Data Movement: Such as [`Comm.Bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Bcast), [`Comm.Scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Scatter), [`Comm.Gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Gather), etc.
* Collective Computation: Including [`Comm.Reduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Reduce), [`Intracomm.Scan`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Intracomm.html#mpi4py.MPI.Intracomm.Scan), etc.

Functions with uppercase initial letters are based on buffers, such as `Comm.Bcast`, `Comm.Scatter`, `Comm.Gather`, [`Comm.Allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgather), [`Comm.Alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoall). Functions with lowercase initial letters can send and receive Python objects, such as [`Comm.bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.bcast), [`Comm.scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.scatter), [`Comm.gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.gather), [`Comm.allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.allgather), [`Comm.alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.alltoall).

## Synchronization

MPI computations are distributed across multiple processes, each with varying computational speeds. `Comm.Barrier` forces synchronization, as the name suggests, by setting up a barrier for all processes within the communicator. Faster processes reaching `Comm.Barrier` cannot proceed with the subsequent code logic until all other processes have also reached this point. It acts as a synchronization point, ensuring that all processes complete their computations before moving forward.

## Data Movement

### Broadcast

`Comm.Bcast` globally broadcasts data from one sender to all processes within the group. Broadcast operations are useful in scenarios where the same data needs to be sent to all processes, such as broadcasting the value of a global variable to all processes, as illustrated in {numref}`mpi-broadcast`.

```{figure} ../img/ch-mpi/broadcast.svg
---
width: 600px
name: mpi-broadcast
---
Broadcast
```

### Example 1: Broadcast

The example in {numref}`mpi-broadcast-py` demonstrates how to broadcast a NumPy array to all processes.

```{code-block} python
:caption: broadcast.py
:name: mpi-broadcast-py

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD

comm.Barrier()

N = 5
if comm.rank == 0:
    A = np.arange(N, dtype=np.float64)    # rank 0 initializes data into variable A
else:
    A = np.empty(N, dtype=np.float64)     # As on other processes are empty

# Broadcast
comm.Bcast([A, MPI.DOUBLE])

# Print to verify
print("Rank:%2d, data:%s" % (comm.rank, A))
```

In [2]:
!mpiexec -np 4 python broadcast.py

Rank: 0, data:[0. 1. 2. 3. 4.]
Rank: 2, data:[0. 1. 2. 3. 4.]
Rank: 1, data:[0. 1. 2. 3. 4.]
Rank: 3, data:[0. 1. 2. 3. 4.]


### Scatter and Gather

`Comm.Scatter` and `Comm.Gather` are a pair of corresponding operations:

* `Comm.Scatter` scatters data from one process to all processes within the group. A process divides the data into multiple chunks, with each chunk sent to the corresponding process. Other processes receive and store their respective chunks. Scatter operations are suitable for partitioning a larger dataset into multiple smaller chunks.

* `Comm.Gather`, on the contrary, gathers small data chunks from all processes in the group to one process.

```{figure} ../img/ch-mpi/scatter-gather.svg
---
width: 600px
name: mpi-scatter-gather
---
Scatter and Gather
```

### Example 2: Scatter

The example in {numref}`mpi-scatter` demonstrates how to use Scatter to distribute data to all processes.

```{code-block} python
:caption: scatter.py
:name: mpi-scatter

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = None
if rank == 0:
    sendbuf = np.empty([size, 8], dtype='i')
    sendbuf.T[:,:] = range(size)
    print(f"Rank: {rank}, to be scattered: \n{sendbuf}")
recvbuf = np.empty(8, dtype='i')
comm.Scatter(sendbuf, recvbuf, root=0)
print(f"Rank: {rank}, after scatter: {recvbuf}")
assert np.allclose(recvbuf, rank)
```

In [3]:
!mpiexec -np 4 python scatter.py

Rank: 0, to be scattered: 
[[0 0 0 0 0 0 0 0]
 [1 1 1 1 1 1 1 1]
 [2 2 2 2 2 2 2 2]
 [3 3 3 3 3 3 3 3]]
Rank: 0, after scatter: [0 0 0 0 0 0 0 0]
Rank: 1, after scatter: [1 1 1 1 1 1 1 1]
Rank: 2, after scatter: [2 2 2 2 2 2 2 2]
Rank: 3, after scatter: [3 3 3 3 3 3 3 3]


### Allgather and Alltoall

Two more complex operations are `Comm.Allgather` and `Comm.Alltoall`.

`Comm.Allgather` is an advanced version of `Comm.Gather`, as shown in {numref}`mpi-allgather`. It sends multiple small data chunks scattered across various processes to every process, ensuring that each process contains an identical set of data.

```{figure} ../img/ch-mpi/allgather.svg
---
width: 600px
name: mpi-allgather
---
Allgather
```


`Comm.Alltoall` is a combination of `Comm.Scatter` and `Comm.Gather`, as illustrated in {numref}`mpi-alltoall`. It first performs `Comm.Scatter` and then follows with `Comm.Gather`. If the data is viewed as a matrix, `Comm.Alltoall` can be considered a global transpose operation.

```{figure} ../img/ch-mpi/alltoall.svg
---
width: 600px
name: mpi-alltoall
---
Alltoall
```

## Collective Computation

Collective computation refers to performing computations on data when aggregating scattered data from different processes, such as `Comm.Reduce` and [`Intracomm`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Intracomm.html). As shown in {numref}`mpi-reduce` and {numref}`mpi-scan`, when data is gathered to a specific process, an aggregation function `f` is applied. Common aggregation functions include summation [`MPI.SUM`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.SUM.html) and others.

```{figure} ../img/ch-mpi/reduce.svg
---
width: 500px
name: mpi-reduce
---
Reduce
```

```{figure} ../img/ch-mpi/scan.svg
---
width: 500px
name: mpi-scan
---
Scan
```