Featured image of post MPI 与并行计算(二):点到点通信

MPI 与并行计算(二):点到点通信

本文详细介绍了 MPI 中的点到点通信模式,主要包括标准通信模式、缓冲通信模式、就绪通信模式、同步通信模式,然后介绍了阻塞与非阻塞通信的概念。并且举了多个例子说明如何编写不同通信模式的 MPI 程序。

# MPI 与并行计算(二):点到点通信

# 1. MPI 的通信模式

  • 通信模式:指的是缓冲管理以及发送方和接收方之间的同步方式。
  • MPI 支持四种通信模式:标准通信模式、缓冲通信模式、就绪通信模式和同步通信模式

# 2. 标准通信模式:MPI_Send 和 MPI_Recv

  • 由 MPI 决定是否缓冲消息
  • 没有足够的系统缓冲区时或出于性能的考虑, MPI 可能进行直接拷贝: 仅当相应的接收开始后,发送语句才能返回
  • MPI 缓冲消息:发送语句地相应的接收语句完成前返回
  • 发送的结束 == 消息已从发送方发出,而不是滞留在发送方的系统缓冲区中
  • 非本地的:发送操作的成功与否依赖于接收操作
  • 理论上要求有接收进程的 recv 调用配合,发送函数是MPI_Send()

注释

示例 1:标准通信模式

#include <mpi.h>
#include <iostream>
#include <thread>
#define BUF_SIZE 10

int main(int argc, char *argv[])
{
    int myid, numprocs;
    int other;
    int sb[BUF_SIZE];
    int rb[BUF_SIZE];
    // 初始化 MPI 环境
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Status status;

    for (int i = 0; i < BUF_SIZE; i++)
    {
        sb[i] = myid + i;
    }

    if (myid == 0)
    {
        other = 1;
    }
    else if (myid == 1)
    {
        other = 0;
    }

    if (myid == 0)
    {
        std::cout << "process " << myid << " tring send..." << std::endl;
        MPI_Send(sb, BUF_SIZE, MPI_INT, other, 1, MPI_COMM_WORLD);
        std::cout << "process " << myid << " tring receiving..." << std::endl;
        MPI_Recv(rb, BUF_SIZE, MPI_INT, other, 1, MPI_COMM_WORLD, &status);
    }
    else if (myid == 1)
    {
        // sleep 10s, 与缓冲通信模式相比,这里的发送和接收操作是阻塞的
        std::this_thread::sleep_for(std::chrono::seconds(10));
        std::cout << "process " << myid << " tring receiving..." << std::endl;
        MPI_Recv(rb, BUF_SIZE, MPI_INT, other, 1, MPI_COMM_WORLD, &status);
        std::cout << "process " << myid << " tring send..." << std::endl;
        MPI_Send(sb, BUF_SIZE, MPI_INT, other, 1, MPI_COMM_WORLD);
    }

    std::cout << "Hello World! Process " << myid << " of " << numprocs << std::endl;
    std::cout << "Send buffer: " << std::endl;
    for (int i = 0; i < BUF_SIZE; i++)
    {
        std::cout << sb[i] << " ";
    }
    std::cout << std::endl;
    std::cout << "Receive buffer: " << std::endl;
    for (int i = 0; i < BUF_SIZE; i++)
    {
        std::cout << rb[i] << " ";
    }
    std::cout << std::endl;
    MPI_Finalize();
    return 0;
}
  • 运行结果:
root@ubuntu:~# mpicxx -o mpi mpi.cpp
root@ubuntu:~# mpirun -n 2 ./mpi
process 0 tring send...
process 0 tring receiving...
process 1 tring receiving...
process 1 tring send...
Hello World! Process 1 of 2
Send buffer:
1 2 3 4 5 6 7 8 9 10
Receive buffer:
0 1 2 3 4 5 6 7 8 9
Hello World! Process 0 of 2
Send buffer:
0 1 2 3 4 5 6 7 8 9
Receive buffer:
1 2 3 4 5 6 7 8 9 10
  • 这就是点对点通信的基本用法,通过发送和接收操作,进程之间可以进行数据交换和协调工作。

# 3. 缓冲通信模式

  • 前提: 用户显示地指定用于缓冲消息的系统缓冲区MPI_Buffer_attach(*buffer, *size)
  • 发送是本地的: 完成不依赖于与其匹配的接收操作。 发送的结束仅表明消息进入系统的缓冲区中,发送缓冲区可以重用,而对接收方的情况并不知道。
  • 缓冲模式在相匹配的接收未开始的情况下,总是将送出的消息放在缓冲区内,这样发送者可以很快地继续计算,然后由系统处理放在缓冲区中的消息。
  • 占用内存,一次内存拷贝。
  • 函数调用形式为:MPI_Bsend()。B 表示缓冲,缓冲通信模式主要用于解开阻塞通信的发送与接收之间的耦合。
  • 作用总结:通常情况下,MPI 发送和接收操作是阻塞的,即发送操作会等待接收方准备好接收,接收操作会等待发送方发送数据。但是,MPI 提供了一种称为缓冲区(buffering)的机制,可以使发送操作立即返回,而不需要等待接收方准备好

注释

示例 2:缓冲通信模式

#include <mpi.h>
#include <iostream>
#include <thread>

int main(int argc, char **argv)
{
    int myid, numprocs;
    // 初始化
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

    int s1, s2;
    // 取缓冲区的上界,以字节为单位
    MPI_Pack_size(7, MPI_CHAR, MPI_COMM_WORLD, &s1);
    MPI_Pack_size(2, MPI_DOUBLE, MPI_COMM_WORLD, &s2);
    int buffer_size = 2 * MPI_BSEND_OVERHEAD + s1 + s2;
    char *buffer    = new char[buffer_size];
    // 装配一个用于通信的缓冲区
    MPI_Buffer_attach(buffer, buffer_size);

    char msg1[7]   = "Hello";
    double msg2[2] = {1.0, 2.0};

    char rmsg1[7];
    double rmsg2[2];

    if (myid == 0)
    {
        MPI_Bsend(msg1, 7, MPI_CHAR, 1, 1, MPI_COMM_WORLD);
        MPI_Bsend(msg2, 2, MPI_DOUBLE, 1, 2, MPI_COMM_WORLD);
        std::cout << "Send msg1: " << msg1 << std::endl;
        std::cout << "Send msg2: " << msg2[0] << " " << msg2[1] << std::endl;
    }
    else if (myid == 1)
    {
        // sleep 10s
        std::this_thread::sleep_for(std::chrono::seconds(10));
        MPI_Recv(rmsg1, 7, MPI_CHAR, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        MPI_Recv(rmsg2, 2, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        std::cout << "Receive msg1: " << rmsg1 << std::endl;
        std::cout << "Receive msg2: " << rmsg2[0] << " " << rmsg2[1] << std::endl;
    }

    MPI_Buffer_detach(&buffer, &buffer_size);
    free(buffer);

    MPI_Finalize();
    return 0;
}
  • 运行结果:
root@ubuntu:~# mpicxx -o mpi mpi.cpp
root@ubuntu:~# mpirun -n 2 ./mpi
Send msg1: Hello
Send msg2: 1 2
Receive msg1: Hello
Receive msg2: 1 2

# 4. 就绪通信模式

  • 发送请求仅当有匹配的接收后才能发出,否则出错。在就绪模式下,系统默认与其相匹配的接收已经调用。 接收必须先于发送。
  • 它不可以不依赖于接收方的匹配的接收请求而任意发出
  • 其函数调用形式为:MPI_RSend()。R 表示就绪,仅当对方的接收操作启动并准备就绪时,才可以发送数据。

注释

示例 3:就绪通信模式

#include <mpi.h>
#include <iostream>

int main(int argc, char **argv)
{
    int myid, numprocs;
    // 初始化
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Status status;

    int buffer[10];

    if (myid == 0)
    {

        for (int i = 0; i < 10; i++)
        {
            buffer[i] = -1;
        }

        MPI_Recv(buffer, 10, MPI_INT, 1, 1, MPI_COMM_WORLD, &status);

        for (int i = 0; i < 10; i++)
        {
            if (buffer[i] != i)
            {
                std::cout << "error" << std::endl;
                break;
            }
        }
    }
    else if (myid == 1)
    {
        for (int i = 0; i < 10; i++)
        {
            buffer[i] = i;
        }
        MPI_Rsend(buffer, 10, MPI_INT, 0, 1, MPI_COMM_WORLD);
    }
    MPI_Finalize();
    return 0;
}

# 5. 同步通信模式

  • 本质特征:收方接收该消息的缓冲区已准备好, 不需要附加的系统缓冲区
  • 任意发出:发送请求可以不依赖于收方的匹配的接收请求而任意发出
  • 成功结束: 仅当收方已发出接收该消息的请求后才成功返回,否则将阻塞。意味着:
    • 发送方缓冲区可以重用
    • 收方已发出接收请求
  • 是非本地的
  • 函数调用形式为:MPI_Ssend()。S 表示同步。

注释

示例 4:同步通信模式

#include <mpi.h>
#include <iostream>

int main(int argc, char **argv)
{
    int myid, numprocs;
    // 初始化
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Status status;

    int buffer[10];

    if (myid == 0)
    {

        for (int i = 0; i < 10; i++)
        {
            buffer[i] = -1;
        }

        MPI_Recv(buffer, 10, MPI_INT, 1, 1, MPI_COMM_WORLD, &status);
        std::cout << "process " << myid << " receiving..." << std::endl;

        for (int i = 0; i < 10; i++)
        {
            if (buffer[i] != i)
            {
                std::cout << "error" << std::endl;
                break;
            }
        }
    }
    else if (myid == 1)
    {
        for (int i = 0; i < 10; i++)
        {
            buffer[i] = i;
        }
        std::cout << "process " << myid << " sending..." << std::endl;
        MPI_Ssend(buffer, 10, MPI_INT, 0, 1, MPI_COMM_WORLD);
    }
    MPI_Finalize();
    return 0;
}

# 6. 阻塞通信与非阻塞通信

阻塞通信调用时,整个程序只能执行通信相关的内容,而无法执行计算相关的内容;非阻塞调用的初衷是尽量让通信和计算重叠进行,提高程序整体执行效率。

非阻塞通信调用返回意味着通信开始启动;而非阻塞通信完成则需要调用其他的接口来查询。

  • 非阻塞通信的调用接口
  • 非阻塞通信的完成查询接口

非阻塞通信的发送接受过程都需要同时具备以上两个要素:调用与完成。(1)”调用“按照通信方式的不同(标准、缓存、同步、就绪),有各种函数接口;(2)”完成“是重点,因为程序员需要知道非阻塞调用是否执行完成了,来做下一步的操作。

MPI 为“完成”定义了一个内部变量 MPI_Request request,每个 request 与一个在非阻塞调用发生时与该调用发生关联(这里的调用包括发送和接收)。“完成”不区分通信方式的不同,统一用 MPI_Wait 系列函数来完成。

  1. MPI_Wait(MPI_Request *request),均等着 request 执行完毕了,再往下进行
  2. 对于非重复非阻塞通信,MPI_Wait 系列函数调用的返回,还意味着 request 对象被释放了,程序员不用再显式释放 request 变量。
  3. 对于重复非阻塞通信,MPI_Wait 系列函数调用的返回,意味着将于 request 对象关联的非阻塞通信处于不激活状态,并不释放 request
  • MPI_Wait 会迫使进程进入“阻塞模式”。发送过程将简单地等待请求完成。如果进程在 MPI_Isend 之后立即等待,则 Send 与调用 MPI_Send 相同。等待 MPI_WAIT 和 MPI_WAITANY 有两种方式
int MPI_Wait(MPI_Request *request, MPI_Status *status);
int MPI_Waitany(int count, MPI_Request array_of_requests[], int *index, MPI_Status *status);
  • 前者 MPI_WAIT 只是等待给定请求的完成。请求一完成,就会返回一个状态为 MPI_STATUS 的实例。后者MPI_Waitany 等待一系列请求中的第一个完成的请求继续。一旦请求完成,INDEX 的值被设置为存储 ARRAY_OF_REQUESTS 中已完成请求的索引。该调用还存储已完成请求的状态。

  • 对于阻塞通信,如果不想跟踪此信息,可以将指向 MPI_STATUS 实例的指针替换为 MPI_STATUS_IGNORE。

  • 正如我们之前看到的,等待会阻止进程,直到请求(或某个请求)被满足。测试则是检查请求是否可以完成。如果可以,请求将自动完成并传输数据。关于等待,有两种测试等待:MPI_Test 和 MPI_Testany。它们的调用方式如下

int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status);
int MPI_Testany(int count, MPI_Request array_of_requests[], int *index, int *flag, MPI_Status *status);
  • 让我们从 MPI_Test 开始。至于 MPI_Wait,参数 request 和 status 并不神秘。请记住,测试是非阻塞的,因此在任何情况下,调用后进程都会继续执行。变量 flag 的作用是告诉你请求是否在测试过程中完成。如果 flag != 0 表示请求已完成
  • MPI_Testany 现在应该是完全显而易见的。如果有任何请求可完成,它会将 FLAG 设置为非零值。如果是这样的话,状态和索引也被赋予一个值

# 7. 非阻塞的发送和接收

  • int MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
    • buf: 发送缓冲区的起始地址
    • count: 发送数据的个数
    • datatype: 发送数据的类型
    • dest: 目标进程的 rank
    • tag: 消息标签
    • comm: 通信子
    • request: 非阻塞通信完成对象
  • MPI_Ibsend/MPI_Issend/MPI_Irsend: 缓冲/同步/就绪通信的非阻塞发送
  • int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request)

注释

示例 5:非阻塞通信

void play_non_blocking_scenario()
{
    MPI_Request request;
    MPI_Status status;
    int request_finished = 0;

    // Initialising buffer :
    for (int i = 0; i < buffer_count; ++i)
    {
        buffer[i] = (rank == 0 ? i * 2 : 0);
    }

    MPI_Barrier(MPI_COMM_WORLD);
    // Starting the chronometer
    double time = -MPI_Wtime();

    if (rank == 0)
    {
        sleep(3);
        // 1- Initialise the non-blocking send to process 1
        // [...]
        MPI_Isend(buffer, buffer_count, MPI_INT, 1, 0, MPI_COMM_WORLD, &request);

        double time_left = 6000.0;
        while (time_left > 0.0)
        {
            usleep(1000);  // We work for 1ms

            // 2- Test if the request is finished (only if not already finished)
            // [...]
            if (!request_finished)
            {
                MPI_Test(&request, &request_finished, &status);
            }

            // 1ms left to work
            time_left -= 1000.0;
        }

        // 3- If the request is not yet complete, wait here.
        // [...]
        if (!request_finished)
        {
            MPI_Wait(&request, &status);
        }

        // Modifying the buffer for second step
        for (int i = 0; i < buffer_count; ++i)
        {
            buffer[i] = -i;
        }

        // 4- Prepare another request for process 1 with a different tag
        // [...]
        MPI_Isend(buffer, buffer_count, MPI_INT, 1, 1, MPI_COMM_WORLD, &request);

        time_left = 3000.0;
        while (time_left > 0.0)
        {
            usleep(1000);  // We work for 1ms

            // 5- Test if the request is finished (only if not already finished)
            // [...]
            if (!request_finished)
            {
                MPI_Wait(&request, &status);
            }

            // 1ms left to work
            time_left -= 1000.0;
        }
        // 6- Wait for it to finish
        // [...]
        if (!request_finished)
        {
            MPI_Wait(&request, &status);
        }
    }
    else
    {
        // Work for 5 seconds
        sleep(5);

        // 7- Initialise the non-blocking receive from process 0
        // [...]
        MPI_Irecv(buffer, buffer_count, MPI_INT, 0, 0, MPI_COMM_WORLD, &request);

        // 8- Wait here for the request to be completed
        // [...]
        MPI_Wait(&request, &status);

        print_buffer();

        // Work for 3 seconds
        sleep(3);

        // 9- Initialise another non-blocking receive
        // [...]
        MPI_Irecv(buffer, buffer_count, MPI_INT, 0, 1, MPI_COMM_WORLD, &request);

        // 10- Wait for it to be completed
        // [...]
        MPI_Wait(&request, &status);

        print_buffer();
    }

    // Stopping the chronometer
    time += MPI_Wtime();

    // This line gives us the maximum time elapsed on each process.
    double final_time;
    MPI_Reduce(&time, &final_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);

    if (rank == 0)
    {
        std::cout << "Total time for non-blocking scenario : " << final_time << "s" << std::endl;
    }
}

# 8. 探测 Probe

  • 探测实际上非常有用,它有很多用途,例如获取即将接收的元素数量、接收进程的 ID 和标记,或者是否真的接收到了任何信息。
  • 用于探测的函数有两个:MPI_Probe 和 MPI_IProbe。第一个是阻塞调用,而第二个则不是。现在,MPI_Probe 只会给出与收到的下一条消息相关的 MPI_Status 值,该消息对应于某个标签和 ID。如果想探测任何类型或来自任何来源的消息接收情况,可以使用 MPI_ANY_SOURCE 和 MPI_ANY_TAG。然后,可以将生成的 MPI_Status 对象与其他函数结合使用,以获取更多信息。

注释

示例 6:探测

void probing_process(int &int_sum, float &float_sum) {
  MPI_Status status;

  // 1- Probe the incoming message
  MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
  // 2- Get the tag and the source
  int tag = status.MPI_TAG;
  int source = status.MPI_SOURCE;

  // Printing the message
  std::cout << "Received a message from process " << source << " with tag " << tag << std::endl;

  // 3- Add to int_sum or float_sum depending on the tag of the message
  if (tag == 0) {
    int other;
    MPI_Recv(&other, 1, MPI_INT, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    int_sum += other;
  } else if (tag == 1) {
    float other;
    MPI_Recv(&other, 1, MPI_FLOAT, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    float_sum += other;
  }
}

# 9. 总结

理解各种模式通信过程的行为,关键是弄清楚各个模式对缓冲使用的方式。简而言之,各个模式使用缓冲的特点可总结为:标准的 Send 实际利用了 MPI 环境提供的默认缓冲区;Bsend 实际相当于将 MPI 环境提供的 buffer 放在用户空间管理;Rsend 实际相当于不要缓冲区,但发送端不能提前等待;Ssend 实际也相当于不要缓冲区,但允许等待;异步方式下各个模式工作原理也是类似的,只不过可将其理解为 MPI 环境会另起一个线程在后台做实际的消息传输,通过 Wait、Test 等机制与 MPI 进程的主线程进行通信和同步。

通信模式阻塞型非阻塞型缓冲方式发送方等待接收方等待是否本地是否阻塞
标准通信MPI_SendMPI_IsendMPI 环境提供的默认缓冲区
缓冲通信MPI_BsendMPI_Ibsend用户空间管理
就绪通信MPI_RsendMPI_Irsend
同步通信MPI_SsendMPI_Issend
本博客已稳定运行
总访客数: Loading
总访问量: Loading
发表了 73 篇文章 · 总计 323.73k

使用 Hugo 构建
主题 StackJimmy 设计
基于 v3.27.0 分支版本修改