# MPI 与并行计算(二):点到点通信
# 1. MPI 的通信模式
- 通信模式:指的是缓冲管理以及发送方和接收方之间的同步方式。
- MPI 支持四种通信模式:标准通信模式、缓冲通信模式、就绪通信模式和同步通信模式
# 2. 标准通信模式:MPI_Send 和 MPI_Recv
- 由 MPI 决定是否缓冲消息
- 没有足够的系统缓冲区时或出于性能的考虑, MPI 可能进行直接拷贝: 仅当相应的接收开始后,发送语句才能返回
- MPI 缓冲消息:发送语句地相应的接收语句完成前返回
- 发送的结束 == 消息已从发送方发出,而不是滞留在发送方的系统缓冲区中
- 非本地的:发送操作的成功与否依赖于接收操作
- 理论上要求有接收进程的 recv 调用配合,发送函数是
MPI_Send()
。
注释
示例 1:标准通信模式
#include <mpi.h>
#include <iostream>
#include <thread>
#define BUF_SIZE 10
int main(int argc, char *argv[])
{
int myid, numprocs;
int other;
int sb[BUF_SIZE];
int rb[BUF_SIZE];
// 初始化 MPI 环境
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Status status;
for (int i = 0; i < BUF_SIZE; i++)
{
sb[i] = myid + i;
}
if (myid == 0)
{
other = 1;
}
else if (myid == 1)
{
other = 0;
}
if (myid == 0)
{
std::cout << "process " << myid << " tring send..." << std::endl;
MPI_Send(sb, BUF_SIZE, MPI_INT, other, 1, MPI_COMM_WORLD);
std::cout << "process " << myid << " tring receiving..." << std::endl;
MPI_Recv(rb, BUF_SIZE, MPI_INT, other, 1, MPI_COMM_WORLD, &status);
}
else if (myid == 1)
{
// sleep 10s, 与缓冲通信模式相比,这里的发送和接收操作是阻塞的
std::this_thread::sleep_for(std::chrono::seconds(10));
std::cout << "process " << myid << " tring receiving..." << std::endl;
MPI_Recv(rb, BUF_SIZE, MPI_INT, other, 1, MPI_COMM_WORLD, &status);
std::cout << "process " << myid << " tring send..." << std::endl;
MPI_Send(sb, BUF_SIZE, MPI_INT, other, 1, MPI_COMM_WORLD);
}
std::cout << "Hello World! Process " << myid << " of " << numprocs << std::endl;
std::cout << "Send buffer: " << std::endl;
for (int i = 0; i < BUF_SIZE; i++)
{
std::cout << sb[i] << " ";
}
std::cout << std::endl;
std::cout << "Receive buffer: " << std::endl;
for (int i = 0; i < BUF_SIZE; i++)
{
std::cout << rb[i] << " ";
}
std::cout << std::endl;
MPI_Finalize();
return 0;
}
- 运行结果:
root@ubuntu:~# mpicxx -o mpi mpi.cpp
root@ubuntu:~# mpirun -n 2 ./mpi
process 0 tring send...
process 0 tring receiving...
process 1 tring receiving...
process 1 tring send...
Hello World! Process 1 of 2
Send buffer:
1 2 3 4 5 6 7 8 9 10
Receive buffer:
0 1 2 3 4 5 6 7 8 9
Hello World! Process 0 of 2
Send buffer:
0 1 2 3 4 5 6 7 8 9
Receive buffer:
1 2 3 4 5 6 7 8 9 10
- 这就是点对点通信的基本用法,通过发送和接收操作,进程之间可以进行数据交换和协调工作。
# 3. 缓冲通信模式
- 前提: 用户显示地指定用于缓冲消息的系统缓冲区
MPI_Buffer_attach(*buffer, *size)
。 - 发送是本地的: 完成不依赖于与其匹配的接收操作。 发送的结束仅表明消息进入系统的缓冲区中,发送缓冲区可以重用,而对接收方的情况并不知道。
- 缓冲模式在相匹配的接收未开始的情况下,总是将送出的消息放在缓冲区内,这样发送者可以很快地继续计算,然后由系统处理放在缓冲区中的消息。
- 占用内存,一次内存拷贝。
- 函数调用形式为:
MPI_Bsend()
。B 表示缓冲,缓冲通信模式主要用于解开阻塞通信的发送与接收之间的耦合。 - 作用总结:通常情况下,MPI 发送和接收操作是阻塞的,即发送操作会等待接收方准备好接收,接收操作会等待发送方发送数据。但是,MPI 提供了一种称为缓冲区(buffering)的机制,可以使发送操作立即返回,而不需要等待接收方准备好。
注释
示例 2:缓冲通信模式
#include <mpi.h>
#include <iostream>
#include <thread>
int main(int argc, char **argv)
{
int myid, numprocs;
// 初始化
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
int s1, s2;
// 取缓冲区的上界,以字节为单位
MPI_Pack_size(7, MPI_CHAR, MPI_COMM_WORLD, &s1);
MPI_Pack_size(2, MPI_DOUBLE, MPI_COMM_WORLD, &s2);
int buffer_size = 2 * MPI_BSEND_OVERHEAD + s1 + s2;
char *buffer = new char[buffer_size];
// 装配一个用于通信的缓冲区
MPI_Buffer_attach(buffer, buffer_size);
char msg1[7] = "Hello";
double msg2[2] = {1.0, 2.0};
char rmsg1[7];
double rmsg2[2];
if (myid == 0)
{
MPI_Bsend(msg1, 7, MPI_CHAR, 1, 1, MPI_COMM_WORLD);
MPI_Bsend(msg2, 2, MPI_DOUBLE, 1, 2, MPI_COMM_WORLD);
std::cout << "Send msg1: " << msg1 << std::endl;
std::cout << "Send msg2: " << msg2[0] << " " << msg2[1] << std::endl;
}
else if (myid == 1)
{
// sleep 10s
std::this_thread::sleep_for(std::chrono::seconds(10));
MPI_Recv(rmsg1, 7, MPI_CHAR, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(rmsg2, 2, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << "Receive msg1: " << rmsg1 << std::endl;
std::cout << "Receive msg2: " << rmsg2[0] << " " << rmsg2[1] << std::endl;
}
MPI_Buffer_detach(&buffer, &buffer_size);
free(buffer);
MPI_Finalize();
return 0;
}
- 运行结果:
root@ubuntu:~# mpicxx -o mpi mpi.cpp
root@ubuntu:~# mpirun -n 2 ./mpi
Send msg1: Hello
Send msg2: 1 2
Receive msg1: Hello
Receive msg2: 1 2
# 4. 就绪通信模式
- 发送请求仅当有匹配的接收后才能发出,否则出错。在就绪模式下,系统默认与其相匹配的接收已经调用。 接收必须先于发送。
- 它不可以不依赖于接收方的匹配的接收请求而任意发出
- 其函数调用形式为:
MPI_RSend()
。R 表示就绪,仅当对方的接收操作启动并准备就绪时,才可以发送数据。
注释
示例 3:就绪通信模式
#include <mpi.h>
#include <iostream>
int main(int argc, char **argv)
{
int myid, numprocs;
// 初始化
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Status status;
int buffer[10];
if (myid == 0)
{
for (int i = 0; i < 10; i++)
{
buffer[i] = -1;
}
MPI_Recv(buffer, 10, MPI_INT, 1, 1, MPI_COMM_WORLD, &status);
for (int i = 0; i < 10; i++)
{
if (buffer[i] != i)
{
std::cout << "error" << std::endl;
break;
}
}
}
else if (myid == 1)
{
for (int i = 0; i < 10; i++)
{
buffer[i] = i;
}
MPI_Rsend(buffer, 10, MPI_INT, 0, 1, MPI_COMM_WORLD);
}
MPI_Finalize();
return 0;
}
# 5. 同步通信模式
- 本质特征:收方接收该消息的缓冲区已准备好, 不需要附加的系统缓冲区
- 任意发出:发送请求可以不依赖于收方的匹配的接收请求而任意发出
- 成功结束: 仅当收方已发出接收该消息的请求后才成功返回,否则将阻塞。意味着:
- 发送方缓冲区可以重用
- 收方已发出接收请求
- 是非本地的
- 函数调用形式为:
MPI_Ssend()
。S 表示同步。
注释
示例 4:同步通信模式
#include <mpi.h>
#include <iostream>
int main(int argc, char **argv)
{
int myid, numprocs;
// 初始化
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Status status;
int buffer[10];
if (myid == 0)
{
for (int i = 0; i < 10; i++)
{
buffer[i] = -1;
}
MPI_Recv(buffer, 10, MPI_INT, 1, 1, MPI_COMM_WORLD, &status);
std::cout << "process " << myid << " receiving..." << std::endl;
for (int i = 0; i < 10; i++)
{
if (buffer[i] != i)
{
std::cout << "error" << std::endl;
break;
}
}
}
else if (myid == 1)
{
for (int i = 0; i < 10; i++)
{
buffer[i] = i;
}
std::cout << "process " << myid << " sending..." << std::endl;
MPI_Ssend(buffer, 10, MPI_INT, 0, 1, MPI_COMM_WORLD);
}
MPI_Finalize();
return 0;
}
# 6. 阻塞通信与非阻塞通信
阻塞通信调用时,整个程序只能执行通信相关的内容,而无法执行计算相关的内容;非阻塞调用的初衷是尽量让通信和计算重叠进行,提高程序整体执行效率。
非阻塞通信调用返回意味着通信开始启动;而非阻塞通信完成则需要调用其他的接口来查询。
- 非阻塞通信的调用接口
- 非阻塞通信的完成查询接口
非阻塞通信的发送和接受过程都需要同时具备以上两个要素:调用与完成。(1)”调用“按照通信方式的不同(标准、缓存、同步、就绪),有各种函数接口;(2)”完成“是重点,因为程序员需要知道非阻塞调用是否执行完成了,来做下一步的操作。
MPI 为“完成”定义了一个内部变量 MPI_Request request,每个 request 与一个在非阻塞调用发生时与该调用发生关联(这里的调用包括发送和接收)。“完成”不区分通信方式的不同,统一用 MPI_Wait 系列函数来完成。
MPI_Wait(MPI_Request *request)
,均等着 request 执行完毕了,再往下进行- 对于非重复非阻塞通信,
MPI_Wait
系列函数调用的返回,还意味着 request 对象被释放了,程序员不用再显式释放 request 变量。 - 对于重复非阻塞通信,
MPI_Wait
系列函数调用的返回,意味着将于 request 对象关联的非阻塞通信处于不激活状态,并不释放 request
- MPI_Wait 会迫使进程进入“阻塞模式”。发送过程将简单地等待请求完成。如果进程在 MPI_Isend 之后立即等待,则 Send 与调用 MPI_Send 相同。等待 MPI_WAIT 和 MPI_WAITANY 有两种方式
int MPI_Wait(MPI_Request *request, MPI_Status *status);
int MPI_Waitany(int count, MPI_Request array_of_requests[], int *index, MPI_Status *status);
前者 MPI_WAIT 只是等待给定请求的完成。请求一完成,就会返回一个状态为 MPI_STATUS 的实例。后者
MPI_Waitany
等待一系列请求中的第一个完成的请求继续。一旦请求完成,INDEX 的值被设置为存储 ARRAY_OF_REQUESTS 中已完成请求的索引。该调用还存储已完成请求的状态。对于阻塞通信,如果不想跟踪此信息,可以将指向 MPI_STATUS 实例的指针替换为 MPI_STATUS_IGNORE。
正如我们之前看到的,等待会阻止进程,直到请求(或某个请求)被满足。测试则是检查请求是否可以完成。如果可以,请求将自动完成并传输数据。关于等待,有两种测试等待:MPI_Test 和 MPI_Testany。它们的调用方式如下
int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status);
int MPI_Testany(int count, MPI_Request array_of_requests[], int *index, int *flag, MPI_Status *status);
- 让我们从 MPI_Test 开始。至于 MPI_Wait,参数 request 和 status 并不神秘。请记住,测试是非阻塞的,因此在任何情况下,调用后进程都会继续执行。变量 flag 的作用是告诉你请求是否在测试过程中完成。如果 flag != 0 表示请求已完成
- MPI_Testany 现在应该是完全显而易见的。如果有任何请求可完成,它会将 FLAG 设置为非零值。如果是这样的话,状态和索引也被赋予一个值
# 7. 非阻塞的发送和接收
int MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
- buf: 发送缓冲区的起始地址
- count: 发送数据的个数
- datatype: 发送数据的类型
- dest: 目标进程的 rank
- tag: 消息标签
- comm: 通信子
- request: 非阻塞通信完成对象
MPI_Ibsend/MPI_Issend/MPI_Irsend
: 缓冲/同步/就绪通信的非阻塞发送int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request)
注释
示例 5:非阻塞通信
void play_non_blocking_scenario()
{
MPI_Request request;
MPI_Status status;
int request_finished = 0;
// Initialising buffer :
for (int i = 0; i < buffer_count; ++i)
{
buffer[i] = (rank == 0 ? i * 2 : 0);
}
MPI_Barrier(MPI_COMM_WORLD);
// Starting the chronometer
double time = -MPI_Wtime();
if (rank == 0)
{
sleep(3);
// 1- Initialise the non-blocking send to process 1
// [...]
MPI_Isend(buffer, buffer_count, MPI_INT, 1, 0, MPI_COMM_WORLD, &request);
double time_left = 6000.0;
while (time_left > 0.0)
{
usleep(1000); // We work for 1ms
// 2- Test if the request is finished (only if not already finished)
// [...]
if (!request_finished)
{
MPI_Test(&request, &request_finished, &status);
}
// 1ms left to work
time_left -= 1000.0;
}
// 3- If the request is not yet complete, wait here.
// [...]
if (!request_finished)
{
MPI_Wait(&request, &status);
}
// Modifying the buffer for second step
for (int i = 0; i < buffer_count; ++i)
{
buffer[i] = -i;
}
// 4- Prepare another request for process 1 with a different tag
// [...]
MPI_Isend(buffer, buffer_count, MPI_INT, 1, 1, MPI_COMM_WORLD, &request);
time_left = 3000.0;
while (time_left > 0.0)
{
usleep(1000); // We work for 1ms
// 5- Test if the request is finished (only if not already finished)
// [...]
if (!request_finished)
{
MPI_Wait(&request, &status);
}
// 1ms left to work
time_left -= 1000.0;
}
// 6- Wait for it to finish
// [...]
if (!request_finished)
{
MPI_Wait(&request, &status);
}
}
else
{
// Work for 5 seconds
sleep(5);
// 7- Initialise the non-blocking receive from process 0
// [...]
MPI_Irecv(buffer, buffer_count, MPI_INT, 0, 0, MPI_COMM_WORLD, &request);
// 8- Wait here for the request to be completed
// [...]
MPI_Wait(&request, &status);
print_buffer();
// Work for 3 seconds
sleep(3);
// 9- Initialise another non-blocking receive
// [...]
MPI_Irecv(buffer, buffer_count, MPI_INT, 0, 1, MPI_COMM_WORLD, &request);
// 10- Wait for it to be completed
// [...]
MPI_Wait(&request, &status);
print_buffer();
}
// Stopping the chronometer
time += MPI_Wtime();
// This line gives us the maximum time elapsed on each process.
double final_time;
MPI_Reduce(&time, &final_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
if (rank == 0)
{
std::cout << "Total time for non-blocking scenario : " << final_time << "s" << std::endl;
}
}
# 8. 探测 Probe
- 探测实际上非常有用,它有很多用途,例如获取即将接收的元素数量、接收进程的 ID 和标记,或者是否真的接收到了任何信息。
- 用于探测的函数有两个:MPI_Probe 和 MPI_IProbe。第一个是阻塞调用,而第二个则不是。现在,MPI_Probe 只会给出与收到的下一条消息相关的 MPI_Status 值,该消息对应于某个标签和 ID。如果想探测任何类型或来自任何来源的消息接收情况,可以使用 MPI_ANY_SOURCE 和 MPI_ANY_TAG。然后,可以将生成的 MPI_Status 对象与其他函数结合使用,以获取更多信息。
注释
示例 6:探测
void probing_process(int &int_sum, float &float_sum) {
MPI_Status status;
// 1- Probe the incoming message
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// 2- Get the tag and the source
int tag = status.MPI_TAG;
int source = status.MPI_SOURCE;
// Printing the message
std::cout << "Received a message from process " << source << " with tag " << tag << std::endl;
// 3- Add to int_sum or float_sum depending on the tag of the message
if (tag == 0) {
int other;
MPI_Recv(&other, 1, MPI_INT, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
int_sum += other;
} else if (tag == 1) {
float other;
MPI_Recv(&other, 1, MPI_FLOAT, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
float_sum += other;
}
}
# 9. 总结
理解各种模式通信过程的行为,关键是弄清楚各个模式对缓冲使用的方式。简而言之,各个模式使用缓冲的特点可总结为:标准的 Send 实际利用了 MPI 环境提供的默认缓冲区;Bsend 实际相当于将 MPI 环境提供的 buffer 放在用户空间管理;Rsend 实际相当于不要缓冲区,但发送端不能提前等待;Ssend 实际也相当于不要缓冲区,但允许等待;异步方式下各个模式工作原理也是类似的,只不过可将其理解为 MPI 环境会另起一个线程在后台做实际的消息传输,通过 Wait、Test 等机制与 MPI 进程的主线程进行通信和同步。
通信模式 | 阻塞型 | 非阻塞型 | 缓冲方式 | 发送方等待 | 接收方等待 | 是否本地 | 是否阻塞 |
---|---|---|---|---|---|---|---|
标准通信 | MPI_Send | MPI_Isend | MPI 环境提供的默认缓冲区 | 是 | 是 | 是 | 是 |
缓冲通信 | MPI_Bsend | MPI_Ibsend | 用户空间管理 | 否 | 是 | 是 | 是 |
就绪通信 | MPI_Rsend | MPI_Irsend | 无 | 否 | 是 | 否 | 是 |
同步通信 | MPI_Ssend | MPI_Issend | 无 | 否 | 是 | 否 | 是 |