8.4. Collective Communication#

In the Point-to-Point Communication section, we discussed point-to-point communication, which involves the mutual transfer of data between sender and receiver. This section focuses on a different type of communication – collective communication, where data is simultaneously transmitted among multiple processes within a group. Collective communication only supports blocking modes.

The commonly used collective communications include:

Functions with uppercase initial letters are based on buffers, such as Comm.Bcast, Comm.Scatter, Comm.Gather, Comm.Allgather, Comm.Alltoall. Functions with lowercase initial letters can send and receive Python objects, such as Comm.bcast, Comm.scatter, Comm.gather, Comm.allgather, Comm.alltoall.

Synchronization#

MPI computations are distributed across multiple processes, each with varying computational speeds. Comm.Barrier forces synchronization, as the name suggests, by setting up a barrier for all processes within the communicator. Faster processes reaching Comm.Barrier cannot proceed with the subsequent code logic until all other processes have also reached this point. It acts as a synchronization point, ensuring that all processes complete their computations before moving forward.

Data Movement#

Broadcast#

Comm.Bcast globally broadcasts data from one sender to all processes within the group. Broadcast operations are useful in scenarios where the same data needs to be sent to all processes, such as broadcasting the value of a global variable to all processes, as illustrated in Fig. 8.6.

../_images/broadcast.svg

Fig. 8.6 Broadcast#

Example 1: Broadcast#

The example in Listing 8.7 demonstrates how to broadcast a NumPy array to all processes.

Listing 8.7 broadcast.py#
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD

comm.Barrier()

N = 5
if comm.rank == 0:
    A = np.arange(N, dtype=np.float64)    # rank 0 initializes data into variable A
else:
    A = np.empty(N, dtype=np.float64)     # As on other processes are empty

# Broadcast
comm.Bcast([A, MPI.DOUBLE])

# Print to verify
print("Rank:%2d, data:%s" % (comm.rank, A))
!mpiexec -np 4 python broadcast.py
Rank: 0, data:[0. 1. 2. 3. 4.]
Rank: 2, data:[0. 1. 2. 3. 4.]
Rank: 1, data:[0. 1. 2. 3. 4.]
Rank: 3, data:[0. 1. 2. 3. 4.]

Scatter and Gather#

Comm.Scatter and Comm.Gather are a pair of corresponding operations:

  • Comm.Scatter scatters data from one process to all processes within the group. A process divides the data into multiple chunks, with each chunk sent to the corresponding process. Other processes receive and store their respective chunks. Scatter operations are suitable for partitioning a larger dataset into multiple smaller chunks.

  • Comm.Gather, on the contrary, gathers small data chunks from all processes in the group to one process.

../_images/scatter-gather.svg

Fig. 8.7 Scatter and Gather#

Example 2: Scatter#

The example in Listing 8.8 demonstrates how to use Scatter to distribute data to all processes.

Listing 8.8 scatter.py#
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = None
if rank == 0:
    sendbuf = np.empty([size, 8], dtype='i')
    sendbuf.T[:,:] = range(size)
    print(f"Rank: {rank}, to be scattered: \n{sendbuf}")
recvbuf = np.empty(8, dtype='i')
comm.Scatter(sendbuf, recvbuf, root=0)
print(f"Rank: {rank}, after scatter: {recvbuf}")
assert np.allclose(recvbuf, rank)
!mpiexec -np 4 python scatter.py
Rank: 0, to be scattered: 
[[0 0 0 0 0 0 0 0]
 [1 1 1 1 1 1 1 1]
 [2 2 2 2 2 2 2 2]
 [3 3 3 3 3 3 3 3]]
Rank: 0, after scatter: [0 0 0 0 0 0 0 0]
Rank: 1, after scatter: [1 1 1 1 1 1 1 1]
Rank: 2, after scatter: [2 2 2 2 2 2 2 2]
Rank: 3, after scatter: [3 3 3 3 3 3 3 3]

Allgather and Alltoall#

Two more complex operations are Comm.Allgather and Comm.Alltoall.

Comm.Allgather is an advanced version of Comm.Gather, as shown in Fig. 8.8. It sends multiple small data chunks scattered across various processes to every process, ensuring that each process contains an identical set of data.

../_images/allgather.svg

Fig. 8.8 Allgather#

Comm.Alltoall is a combination of Comm.Scatter and Comm.Gather, as illustrated in Fig. 8.9. It first performs Comm.Scatter and then follows with Comm.Gather. If the data is viewed as a matrix, Comm.Alltoall can be considered a global transpose operation.

../_images/alltoall.svg

Fig. 8.9 Alltoall#

Collective Computation#

Collective computation refers to performing computations on data when aggregating scattered data from different processes, such as Comm.Reduce and Intracomm. As shown in Fig. 8.10 and Fig. 8.11, when data is gathered to a specific process, an aggregation function f is applied. Common aggregation functions include summation MPI.SUM and others.

../_images/reduce.svg

Fig. 8.10 Reduce#

../_images/scan.svg

Fig. 8.11 Scan#