8.4. Collective Communication#
In the Point-to-Point Communication section, we discussed point-to-point communication, which involves the mutual transfer of data between sender and receiver. This section focuses on a different type of communication – collective communication, where data is simultaneously transmitted among multiple processes within a group. Collective communication only supports blocking modes.
The commonly used collective communications include:
Synchronization: For example,
Comm.Barrier
.Data Movement: Such as
Comm.Bcast
,Comm.Scatter
,Comm.Gather
, etc.Collective Computation: Including
Comm.Reduce
,Intracomm.Scan
, etc.
Functions with uppercase initial letters are based on buffers, such as Comm.Bcast
, Comm.Scatter
, Comm.Gather
, Comm.Allgather
, Comm.Alltoall
. Functions with lowercase initial letters can send and receive Python objects, such as Comm.bcast
, Comm.scatter
, Comm.gather
, Comm.allgather
, Comm.alltoall
.
Synchronization#
MPI computations are distributed across multiple processes, each with varying computational speeds. Comm.Barrier
forces synchronization, as the name suggests, by setting up a barrier for all processes within the communicator. Faster processes reaching Comm.Barrier
cannot proceed with the subsequent code logic until all other processes have also reached this point. It acts as a synchronization point, ensuring that all processes complete their computations before moving forward.
Data Movement#
Broadcast#
Comm.Bcast
globally broadcasts data from one sender to all processes within the group. Broadcast operations are useful in scenarios where the same data needs to be sent to all processes, such as broadcasting the value of a global variable to all processes, as illustrated in Fig. 8.6.
Example 1: Broadcast#
The example in Listing 8.7 demonstrates how to broadcast a NumPy array to all processes.
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm.Barrier()
N = 5
if comm.rank == 0:
A = np.arange(N, dtype=np.float64) # rank 0 initializes data into variable A
else:
A = np.empty(N, dtype=np.float64) # As on other processes are empty
# Broadcast
comm.Bcast([A, MPI.DOUBLE])
# Print to verify
print("Rank:%2d, data:%s" % (comm.rank, A))
!mpiexec -np 4 python broadcast.py
Rank: 0, data:[0. 1. 2. 3. 4.]
Rank: 2, data:[0. 1. 2. 3. 4.]
Rank: 1, data:[0. 1. 2. 3. 4.]
Rank: 3, data:[0. 1. 2. 3. 4.]
Scatter and Gather#
Comm.Scatter
and Comm.Gather
are a pair of corresponding operations:
Comm.Scatter
scatters data from one process to all processes within the group. A process divides the data into multiple chunks, with each chunk sent to the corresponding process. Other processes receive and store their respective chunks. Scatter operations are suitable for partitioning a larger dataset into multiple smaller chunks.Comm.Gather
, on the contrary, gathers small data chunks from all processes in the group to one process.
Example 2: Scatter#
The example in Listing 8.8 demonstrates how to use Scatter to distribute data to all processes.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
sendbuf = None
if rank == 0:
sendbuf = np.empty([size, 8], dtype='i')
sendbuf.T[:,:] = range(size)
print(f"Rank: {rank}, to be scattered: \n{sendbuf}")
recvbuf = np.empty(8, dtype='i')
comm.Scatter(sendbuf, recvbuf, root=0)
print(f"Rank: {rank}, after scatter: {recvbuf}")
assert np.allclose(recvbuf, rank)
!mpiexec -np 4 python scatter.py
Rank: 0, to be scattered:
[[0 0 0 0 0 0 0 0]
[1 1 1 1 1 1 1 1]
[2 2 2 2 2 2 2 2]
[3 3 3 3 3 3 3 3]]
Rank: 0, after scatter: [0 0 0 0 0 0 0 0]
Rank: 1, after scatter: [1 1 1 1 1 1 1 1]
Rank: 2, after scatter: [2 2 2 2 2 2 2 2]
Rank: 3, after scatter: [3 3 3 3 3 3 3 3]
Allgather and Alltoall#
Two more complex operations are Comm.Allgather
and Comm.Alltoall
.
Comm.Allgather
is an advanced version of Comm.Gather
, as shown in Fig. 8.8. It sends multiple small data chunks scattered across various processes to every process, ensuring that each process contains an identical set of data.
Comm.Alltoall
is a combination of Comm.Scatter
and Comm.Gather
, as illustrated in Fig. 8.9. It first performs Comm.Scatter
and then follows with Comm.Gather
. If the data is viewed as a matrix, Comm.Alltoall
can be considered a global transpose operation.
Collective Computation#
Collective computation refers to performing computations on data when aggregating scattered data from different processes, such as Comm.Reduce
and Intracomm
. As shown in Fig. 8.10 and Fig. 8.11, when data is gathered to a specific process, an aggregation function f
is applied. Common aggregation functions include summation MPI.SUM
and others.