8.3. Point-to-Point Communication#
The simplest communication patterns is Point-to-Point communication, which can be further divided into Blocking and Non-Blocking. When implementing Point-to-Point communication, two main considerations are:
How to identify different processes? For example, if you want the process of rank 0 to send a message to the process of rank 1.
What kind of data to send or receive? For example, 1024 integers.
Send and Receive#
Comm.send()
and Comm.recv()
are used for blocking send and receive, respectively.
The key parameters for Comm.send(obj, dest, tag=0)
are obj
and dest
. obj
is the data we want to send, and it can be a Python built-in data type such as list
and dict
, a NumPy ndarray
, or even CuPy data on a GPU. In MPI Hello World, we introduced communicator and rank, and you can use the rank number to locate a process. dest
is the rank number. tag
provides programmers with more control options. For example, the receiver can choose to only receive messages with specific tags.
Example 1: Send Python Object#
Here, we show how to send a Python object, which is serialized by pickle.
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = {'a': 7, 'b': 3.14}
comm.send(data, dest=1)
print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
data = comm.recv(source=0)
print(f"Received: {data}, to rank: {rank}.")
Save the code in a file named send-py-object.py
and launch it in the command line:
!mpiexec -np 2 python send-py-object.py
Sended: {'a': 7, 'b': 3.14}, from rank: 0.
Received: {'a': 7, 'b': 3.14}, to rank: 1.
Example 2: Send NumPy ndarray
#
Send a NumPy ndarray
:
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# tell MPI data type is int
# dtype='i', i is short for INT
if rank == 0:
data = np.arange(10, dtype='i')
comm.Send([data, MPI.INT], dest=1)
print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
data = np.empty(10, dtype='i')
comm.Recv([data, MPI.INT], source=0)
print(f"Received: {data}, to rank: {rank}.")
# MPI detects data type
if rank == 0:
data = np.arange(10, dtype=np.float64)
comm.Send(data, dest=1)
print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
data = np.empty(10, dtype=np.float64)
comm.Recv(data, source=0)
print(f"Received: {data}, to rank: {rank}.")
Save as send-np.py
.
!mpiexec -np 2 python send-np.py
Sended: [0 1 2 3 4 5 6 7 8 9], from rank: 0.
Received: [0 1 2 3 4 5 6 7 8 9], to rank: 1.
Received: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.], to rank: 1.
Sended: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.], from rank: 0.
Note
The initial letters of the Send
and Recv
functions are capitalized because these capitalized methods are based on buffers. For these buffer-based functions, it is crucial to explicitly specify the data type, such as passing a binary tuple [data, MPI.DOUBLE]
or a triple [data, count, MPI.DOUBLE]
. In Listing 8.3, the comm.Send(data, dest=1)
does not explicitly inform MPI about the data type and size because MPI automatically detects the type of NumPy and CuPy ndarray
.
Example 3: Master-Worker#
In this example, we implement a Master-Worker computation with a total of size
processes. The first size-1
processes act as Workers, generating random data. The last process (rank size-1
) serves as the Master, receiving data and printing its size.
The data exchange process between Master and Worker processes is demonstrated in Listing 8.4.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank < size - 1:
# Worker process
np.random.seed(rank)
# Generate random data
data_count = np.random.randint(100)
data = np.random.randint(100, size=data_count)
comm.send(data, dest=size - 1)
print(f"Worker: worker ID: {rank}; count: {len(data)}")
else:
# Master process
for i in range(size - 1):
status = MPI.Status()
data = comm.recv(source=MPI.ANY_SOURCE, status=status)
print(f"Master: worker ID: {status.Get_source()}; count: {len(data)}")
comm.Barrier()
In this example, processes with rank less than size - 1
are Workers, generating random data and sending it to the last process (with rank size - 1
). The last process receives the data and prints the size of the received data.
!mpiexec -np 8 python master-worker.py
Worker: worker ID: 0; count: 44
Worker: worker ID: 2; count: 40
Worker: worker ID: 4; count: 46
Worker: worker ID: 3; count: 24
Master: worker ID: 2; count: 40
Master: worker ID: 3; count: 24
Master: worker ID: 4; count: 46
Master: worker ID: 0; count: 44
Worker: worker ID: 5; count: 99Master: worker ID: 5; count: 99
Worker: worker ID: 1; count: 37
Master: worker ID: 1; count: 37
Worker: worker ID: 6; count: 10
Master: worker ID: 6; count: 10
Example 4: Rectangle Simulation for Calculating \(\pi\)#
For a circle with a radius R, we can use a differential method to divide the circle into N small rectangles. When the number of rectangles, N, approaches infinity, the total area of all rectangles approximates 1/4 of the circle area, as shown in Fig. 8.3.
Assuming there are size
processes involved in the calculation, we first determine the number of rectangles each process needs to handle (N/size
). Each process calculates the sum of the areas of its rectangles and sends the result to the Master process. The first process acts as the Master, receiving data from each Worker, consolidating all rectangle areas, and thereby approximating the value of \(\pi\).
Listing 8.5 shows the process.
import math
import time
from mpi4py import MPI
communicator = MPI.COMM_WORLD
rank = communicator.Get_rank()
process_nums = communicator.Get_size()
"""
Configuration:
R=1
N=64*1024*1024
"""
t0 = time.time()
rect_num = 64 * 1024 * 1024
rect_width = 1 / rect_num
step_size = rect_num // process_nums
def cal_rect_area(process_no, step_size, rect_width):
total_area = 0.0
rect_start = (process_no * step_size + 1) * rect_width
for i in range(step_size):
x = rect_start + i * rect_width
# (x,y) is the upper right point of the i-th rectangle
# x^2+y^2=1 => y=sqrt(1-x^2)
rect_length = math.pow(1 - x * x, 0.5)
total_area += rect_width * rect_length
return total_area
# Calculating on each process
total_area = cal_rect_area(rank, step_size, rect_width)
if rank == 0:
# Master
for i in range(1, process_nums):
total_area += communicator.recv(source=i)
p_i = total_area * 4
t1 = time.time()
print("Simulated PI: {:.10f}, Relative Error:{:.10f}".format(p_i, abs(1 - p_i / math.pi)))
print("Time:{:.3f}s".format(t1 - t0))
else:
# Worker
communicator.send(total_area, dest=0)
In this case, we set the following configurations:R=1
, N=64*1024*1024
, and save as rectangle_pi.py
.
!mpiexec -np 8 python rectangle_pi.py
Simulated PI: 3.1415926238, Relative Error:0.0000000095
Time:7.361s
Blocking v.s. Non-blocking#
Blocking#
Let’s first analyze blocking communication. The Send
and Recv
methods, which are based on buffering:
Send
: It will not return until the buffer is empty, meaning all the data in the buffer has been sent. The buffer area can then be reused in subsequentSend
s.Recv
: It will notreturn
until the buffer is full.
As shown in Two-Sided v.s. One-Sided, blocking communication returns only when the data transmission is completed; otherwise, it keeps waiting.
Code using blocking communication is easier to design, but a common issue is deadlock. For example, in the code below, rank 1 causes a deadlock. The order of Send
and Recv
calls should be swapped to avoid this:
if rank == 0:
comm.Send(..to rank 1..)
comm.Recv(..from rank 1..)
else if (rank == 1): <- deadlock
comm.Send(..to rank 0..) <- should swap Send and Recv
comm.Recv(..from rank 0..)
Non-blocking#
In contrast, non-blocking communication does not wait for the completion of data transmission. Non-blocking communication can enhance performance by overlapping communication and computation, i.e., the communications are handled on the network side, meanwhile the computational tasks are performed on the CPU side. The isend
and irecv
methods are used for non-blocking communication:
isend
: Initiates a non-blocking send operation and immediately returns control to the user, allowing the execution of subsequent code.irecv
: Initiates a non-blocking receive operation and immediately returns control to the user, allowing the execution of subsequent code.
After a non-blocking communication call, the Request
handle is returned immediately. Subsequently, the programmer can perform further processing on the Request
, such as waiting for the data transfer associated with the Request
to complete. Non-blocking communication is denoted by an uppercase ‘I’ or a lowercase ‘i’, where ‘I’ is buffer-based and ‘i’ is not.
The function parameters of isend
are similar to send
, with the key distinction being that isend
returns a Request
. The Request
class provides a wait()
method, and explicitly calling wait()
allows for waiting until the data transfer is complete. Code written in a blocking manner using send
can be modified to utilize non-blocking communication by using isend
+ Request.wait()
.
Non-blocking communication is illustrated in non-blocking.py.
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = {'a': 7, 'b': 3.14}
req = comm.isend(data, dest=1, tag=11)
print(f"Sending: {data}, from rank: {rank}.")
req.wait()
print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
req = comm.irecv(source=0, tag=11)
print(f"Receiving: to rank: {rank}.")
data = req.wait()
print(f"Received: {data}, to rank: {rank}.")
!mpiexec -np 8 python non-blocking.py
Receiving: to rank: 1.
Sending: {'a': 7, 'b': 3.14}, from rank: 0.
Sended: {'a': 7, 'b': 3.14}, from rank: 0.
Received: {'a': 7, 'b': 3.14}, to rank: 1.
Fig. 8.5 demonstrates the data flow changes of non-blocking communication.