8.3. Point-to-Point Communication#

The simplest communication patterns is Point-to-Point communication, which can be further divided into Blocking and Non-Blocking. When implementing Point-to-Point communication, two main considerations are:

  • How to identify different processes? For example, if you want the process of rank 0 to send a message to the process of rank 1.

  • What kind of data to send or receive? For example, 1024 integers.

Send and Receive#

Comm.send() and Comm.recv() are used for blocking send and receive, respectively.

The key parameters for Comm.send(obj, dest, tag=0) are obj and dest. obj is the data we want to send, and it can be a Python built-in data type such as list and dict, a NumPy ndarray, or even CuPy data on a GPU. In MPI Hello World, we introduced communicator and rank, and you can use the rank number to locate a process. dest is the rank number. tag provides programmers with more control options. For example, the receiver can choose to only receive messages with specific tags.

Example 1: Send Python Object#

Here, we show how to send a Python object, which is serialized by pickle.

Listing 8.2 send-py-object.py#
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1)
    print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
    data = comm.recv(source=0)
    print(f"Received: {data}, to rank: {rank}.")

Save the code in a file named send-py-object.py and launch it in the command line:

!mpiexec -np 2 python send-py-object.py
Sended: {'a': 7, 'b': 3.14}, from rank: 0.
Received: {'a': 7, 'b': 3.14}, to rank: 1.

Example 2: Send NumPy ndarray#

Send a NumPy ndarray:

Listing 8.3 send-np.py#
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# tell MPI data type is int
# dtype='i', i is short for INT
if rank == 0:
    data = np.arange(10, dtype='i')
    comm.Send([data, MPI.INT], dest=1)
    print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
    data = np.empty(10, dtype='i')
    comm.Recv([data, MPI.INT], source=0)
    print(f"Received: {data}, to rank: {rank}.")

# MPI detects data type
if rank == 0:
    data = np.arange(10, dtype=np.float64)
    comm.Send(data, dest=1)
    print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
    data = np.empty(10, dtype=np.float64)
    comm.Recv(data, source=0)
    print(f"Received: {data}, to rank: {rank}.")

Save as send-np.py.

!mpiexec -np 2 python send-np.py
Sended: [0 1 2 3 4 5 6 7 8 9], from rank: 0.
Received: [0 1 2 3 4 5 6 7 8 9], to rank: 1.
Received: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.], to rank: 1.
Sended: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.], from rank: 0.

Note

The initial letters of the Send and Recv functions are capitalized because these capitalized methods are based on buffers. For these buffer-based functions, it is crucial to explicitly specify the data type, such as passing a binary tuple [data, MPI.DOUBLE] or a triple [data, count, MPI.DOUBLE]. In Listing 8.3, the comm.Send(data, dest=1) does not explicitly inform MPI about the data type and size because MPI automatically detects the type of NumPy and CuPy ndarray.

Example 3: Master-Worker#

In this example, we implement a Master-Worker computation with a total of size processes. The first size-1 processes act as Workers, generating random data. The last process (rank size-1) serves as the Master, receiving data and printing its size.

The data exchange process between Master and Worker processes is demonstrated in Listing 8.4.

Listing 8.4 master-worker.py#
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank < size - 1:
    # Worker process
    np.random.seed(rank)
    # Generate random data
    data_count = np.random.randint(100)
    data = np.random.randint(100, size=data_count)
    comm.send(data, dest=size - 1)
    print(f"Worker: worker ID: {rank}; count: {len(data)}")
else:
    # Master process
    for i in range(size - 1):
        status = MPI.Status()
        data = comm.recv(source=MPI.ANY_SOURCE, status=status)
        print(f"Master: worker ID: {status.Get_source()}; count: {len(data)}")

comm.Barrier()

In this example, processes with rank less than size - 1 are Workers, generating random data and sending it to the last process (with rank size - 1). The last process receives the data and prints the size of the received data.

!mpiexec -np 8 python master-worker.py
Worker: worker ID: 0; count: 44
Worker: worker ID: 2; count: 40
Worker: worker ID: 4; count: 46
Worker: worker ID: 3; count: 24
Master: worker ID: 2; count: 40
Master: worker ID: 3; count: 24
Master: worker ID: 4; count: 46
Master: worker ID: 0; count: 44
Worker: worker ID: 5; count: 99Master: worker ID: 5; count: 99

Worker: worker ID: 1; count: 37
Master: worker ID: 1; count: 37
Worker: worker ID: 6; count: 10
Master: worker ID: 6; count: 10

Example 4: Rectangle Simulation for Calculating \(\pi\)#

For a circle with a radius R, we can use a differential method to divide the circle into N small rectangles. When the number of rectangles, N, approaches infinity, the total area of all rectangles approximates 1/4 of the circle area, as shown in Fig. 8.3.

../_images/rectangle-pi.svg

Fig. 8.3 Simulating 1/4 of a circle using N small rectangles.#

Assuming there are size processes involved in the calculation, we first determine the number of rectangles each process needs to handle (N/size). Each process calculates the sum of the areas of its rectangles and sends the result to the Master process. The first process acts as the Master, receiving data from each Worker, consolidating all rectangle areas, and thereby approximating the value of \(\pi\).

Listing 8.5 shows the process.

Listing 8.5 rectangle-pi.py#
import math
import time

from mpi4py import MPI

communicator = MPI.COMM_WORLD
rank = communicator.Get_rank()
process_nums = communicator.Get_size()
"""
Configuration:
R=1
N=64*1024*1024
"""
t0 = time.time()
rect_num = 64 * 1024 * 1024
rect_width = 1 / rect_num
step_size = rect_num // process_nums

def cal_rect_area(process_no, step_size, rect_width):
    total_area = 0.0
    rect_start = (process_no * step_size + 1) * rect_width

    for i in range(step_size):
        x = rect_start + i * rect_width
        # (x,y) is the upper right point of the i-th rectangle
        # x^2+y^2=1 => y=sqrt(1-x^2)
        rect_length = math.pow(1 - x * x, 0.5)
        total_area += rect_width * rect_length
    return total_area

# Calculating on each process
total_area = cal_rect_area(rank, step_size, rect_width)

if rank == 0:
    # Master
    for i in range(1, process_nums):
        total_area += communicator.recv(source=i)
    p_i = total_area * 4
    t1 = time.time()
    print("Simulated PI: {:.10f}, Relative Error:{:.10f}".format(p_i, abs(1 - p_i / math.pi)))
    print("Time:{:.3f}s".format(t1 - t0))
else:
    # Worker
    communicator.send(total_area, dest=0)

In this case, we set the following configurations:R=1, N=64*1024*1024, and save as rectangle_pi.py.

!mpiexec -np 8 python rectangle_pi.py
Simulated PI: 3.1415926238, Relative Error:0.0000000095
Time:7.361s

Blocking v.s. Non-blocking#

Blocking#

Let’s first analyze blocking communication. The Send and Recv methods, which are based on buffering:

  • Send: It will not return until the buffer is empty, meaning all the data in the buffer has been sent. The buffer area can then be reused in subsequent Sends.

  • Recv: It will not return until the buffer is full.

As shown in Two-Sided v.s. One-Sided, blocking communication returns only when the data transmission is completed; otherwise, it keeps waiting.

../_images/blocking.svg

Fig. 8.4 Blocking communications#

Code using blocking communication is easier to design, but a common issue is deadlock. For example, in the code below, rank 1 causes a deadlock. The order of Send and Recv calls should be swapped to avoid this:

if rank == 0:
	comm.Send(..to rank 1..)
    comm.Recv(..from rank 1..)
else if (rank == 1):           <- deadlock
    comm.Send(..to rank 0..)   <- should swap Send and Recv
    comm.Recv(..from rank 0..)

Non-blocking#

In contrast, non-blocking communication does not wait for the completion of data transmission. Non-blocking communication can enhance performance by overlapping communication and computation, i.e., the communications are handled on the network side, meanwhile the computational tasks are performed on the CPU side. The isend and irecv methods are used for non-blocking communication:

  • isend: Initiates a non-blocking send operation and immediately returns control to the user, allowing the execution of subsequent code.

  • irecv: Initiates a non-blocking receive operation and immediately returns control to the user, allowing the execution of subsequent code.

After a non-blocking communication call, the Request handle is returned immediately. Subsequently, the programmer can perform further processing on the Request, such as waiting for the data transfer associated with the Request to complete. Non-blocking communication is denoted by an uppercase ‘I’ or a lowercase ‘i’, where ‘I’ is buffer-based and ‘i’ is not. The function parameters of isend are similar to send, with the key distinction being that isend returns a Request. The Request class provides a wait() method, and explicitly calling wait() allows for waiting until the data transfer is complete. Code written in a blocking manner using send can be modified to utilize non-blocking communication by using isend + Request.wait().

Non-blocking communication is illustrated in non-blocking.py.

Listing 8.6 non-blocking.py#
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    req = comm.isend(data, dest=1, tag=11)
    print(f"Sending: {data}, from rank: {rank}.")
    req.wait()
    print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
    req = comm.irecv(source=0, tag=11)
    print(f"Receiving: to rank: {rank}.")
    data = req.wait()
    print(f"Received: {data}, to rank: {rank}.")
!mpiexec -np 8 python non-blocking.py
Receiving: to rank: 1.
Sending: {'a': 7, 'b': 3.14}, from rank: 0.
Sended: {'a': 7, 'b': 3.14}, from rank: 0.
Received: {'a': 7, 'b': 3.14}, to rank: 1.

Fig. 8.5 demonstrates the data flow changes of non-blocking communication.

../_images/non-blocking.svg

Fig. 8.5 Non-blocking communications#