Featured image of post MPI 与并行计算(三):集合通信

MPI 与并行计算(三):集合通信

集合通信是进程集合里的所有进程进行数据交换的同时操作,实现了一对多,多对一,全体对全体的数据交换。本文深入讨论了一对多通信如广播和散播,多对一通信如收集,以及如何聚合所有进程的数据。然后,本文介绍了如何进行同步以确保所有进程在相同的执行点进行数据交换。接着,本文介绍了规约,一种特殊的集合通信操作,它将所有进程的数据聚合并进行运算,得出一个单一结果。总的来说,本文为读者提供了关于 MPI 集合通信的深入解析,包括其定义、功能和几种主要的通信模式。

# MPI 与并行计算(三):集合通信

# 1. 定义

  • 集合通信(Collective Communication):是一个进程组中的所有进程都参加的全局通信操作。
  • 特点:
    • 通信空间中的所有进程都参与通信操作
    • 每一个进程都需要调用该操作函数

数据移动类型

  • Broadcast
bcast_p2p
  • Scatter
scatter
  • Gather
gather
  • AllGather
  • Alltoall
20230720003014

# 2. 集合通信实现的功能

  • 集合通信一般实现三个功能:通信、聚合和同步

    类型函数名含义
    通信MPI_Bcast一对多广播同样的消息
    通信MPI_Gather多对一收集各个进程的消息
    通信MPI_GathervMPI_Gather 的一般化
    通信MPI_Allgather全局收集
    通信MPI_AllgathervMPI_Allgather 的一般化
    通信MPI_Scatter一对多散播不同的消息
    通信MPI_ScattervMPI_Scatter 的一般化
    通信MPI_Alltoall多对多全局交换消息
    通信MPI_AlltoallvMPI_Alltoall 的一般化
    聚合MPI_Reduce多对一规约
    聚合MPI_AllreduceMPI_Reduce 的一般化
    聚合MPI_Scan多对多扫描
    聚合MPI_Reduce_scatterMPI_Reduce 的一般化
    同步MPI_Barrier路障同步
  • 通信:集合通信,按照通信方向的不同,又可以分为三种:一对多通信,多对一通信和多对多通信。

  • 一对多通信:一个进程向其它所有的进程发送消息,这个负责发送消息的进程叫做 Root 进程。

  • 多对一通信:一个进程负责从其它所有的进程接收消息,这个接收的进程也叫做 Root 进程。

  • 多对多通信:每一个进程都向其它所有的进程发送或者接收消息。

# 3. 一对多通信:广播

  • 广播是一对多通信的典型例子,其调用格式为:MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm)

注释

示例 1:广播

// bcast.c
#include <stdio.h>
#include <mpi.h>

int main(int argc, char *argv[])
{
    int rank, size;
    int data;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (rank == 0)
    {
        data = 123;
    }
    MPI_Bcast(&data, 1, MPI_INT, 0, MPI_COMM_WORLD);
    printf("Process %d got data %d\n", rank, data);
    MPI_Finalize();
    return 0;
}
  • 结果:
root@ubuntu:~# mpicc bcast.c -o bcast
root@ubuntu:~# mpirun -n 2 ./bcast
Process 0 got data 123
Process 1 got data 123
  • 广播的特点
    • 标号为 Root 的进程发送相同的消息给通信域 Comm 中的所有进程。
    • 消息的内容如同点对点通信一样由三元组<Address, Count, Datatype>标识。
    • 对 Root 进程来说,这个三元组既定义了发送缓冲也定义了接收缓冲。对其它进程来说,这个三元组只定义了接收缓冲

# 4. 多对一通信:收集

  • 收集是多对一通信的典型例子,其调用格式为:MPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
20230720004519

注释

示例 2:收集

// gather.c
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

int main(int argc, char *argv[])
{
    int rank, size;
    // 分布变量
    int data[2];
    int *buf;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    data[0] = rank * 2 + 1;
    data[1] = rank * rank * 3 + 2;

    if (rank == 0)
    {
        // 开辟接收缓存区
        buf = malloc(2 * size * sizeof(int));
    }

    MPI_Gather(data, 2, MPI_INT, buf, 2, MPI_INT, 0, MPI_COMM_WORLD);

    if (rank == 0)
    {
        for (int i = 0; i < 2 * size; i++)
        {
            printf("%d ", buf[i]);
        }
        printf("\n");
        free(buf);
    }
    MPI_Finalize();
    return 0;
}
  • 结果:
root@ubuntu:~# mpicc gather.c -o gather
root@ubuntu:~# mpirun -n 2 ./gather
1 2 3 5
  • 收集的特点
    • 在收集操作中,Root 进程从进程域 Comm 的所有进程(包括它自已)接收消息。
    • 这 n 个消息按照进程的标识 rank 排序进行拼接,然后存放在 Root 进程的接收缓冲中。
    • 接收缓冲由三元组<RecvAddress, RecvCount, RecvDatatype>标识,发送缓冲由三元组<SendAddress, SendCount, SendDatatype>标识,所有非 Root 进程忽略接收缓冲。

# 5. 一对多通信:散播

  • 散播是一个一对多操作,其调用格式为:MPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
20230720005711

注释

示例 3:散播

// scatter.c
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

int main(int argc, char *argv[])
{
    int rank, size;
    int *buf;
    int data[2];
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (rank == 0)
    {
        buf = malloc(2 * size * sizeof(int));
        for (int i = 0; i < 2 * size; i++)
        {
            buf[i] = i;
        }
    }

    MPI_Scatter(buf, 2, MPI_INT, data, 2, MPI_INT, 0, MPI_COMM_WORLD);
    printf("rank = %d, data = %d %d\n", rank, data[0], data[1]);

    if (rank == 0)
    {
        free(buf);
    }
    MPI_Finalize();
    return 0;
}
  • 结果:
root@ubuntu:~# mpicc scatter.c -o scatter
root@ubuntu:~# mpirun -n 2 ./scatter
rank = 0, data = 0 1
rank = 1, data = 2 3
  • 散播的特点
    • Scatter 执行与 Gather 相反的操作。
    • Root 进程给所有进程(包括它自已)发送一个不同的消息,这 n (n 为进程域 comm 包括的进程个数)个消息在 Root 进程的发送缓冲区中按进程标识的顺序有序地存放。
    • 每个接收缓冲由三元组<RecvAddress, RecvCount, RecvDatatype>标识,所有的非 Root 进程忽略发送缓冲。对 Root 进程,发送缓冲由三元组<SendAddress,SendCount, SendDatatype>标识。

# 6. 聚合

  • 集合通信的聚合功能使得 MPI 进行通信的同时完成一定的计算。
  • MPI 聚合的功能分三步实现:
    • 首先是通信的功能,即消息根据要求发送到目标进程,目标进程也已经收到了各自需要的消息
    • 然后是对消息的处理,即执行计算功能
    • 最后把处理结果放入指定的接收缓冲区
  • MPI 提供了两种类型的聚合操作: 归约(Reduce)和扫描(Scan)。

# 7. 同步

  • 同步功能用来协调各个进程之间的进度和步伐 。目前 MPI 的实现中支持一个同步操作,即路障同步(Barrier)
  • 路障同步的调用格式为:MPI_Barrier(MPI_Comm comm)
    • 在路障同步操作MPI_Barrier(Comm)中,通信域 Comm 中的所有进程相互同步。
    • 在该操作调用返回后,可以保证组内所有的进程都已经执行完了调用之前的所有操作,可以开始该调用后的操作。

# 8. 规约

MPI_REDUCE 将组内每个进程输入缓冲区中的数据按给定的操作 op 进行运算,并将其结果返回到序列号为 root 的进程的输出缓冲区中,输入缓冲区由参数 sendbuf、count 和 datatype 定义,输出缓冲区由参数 recvbuf count 和 datatype 定义,要求两者的元素数目和类型都必须相同,因为所有组成员都用同样的参数 count、datatype、op、root 和 comm 来调用此例程 故而所有进程都提供长度相同、元素类型相同的输入和输出缓冲区,每个进程可能提供一个元素或一系列元素 组合操作依次针对每个元素进行。

操作 op 始终被认为是可结合的 并且所有 MPI 定义的操作被认为是可交换的,用户自定义的操作被认为是可结合的,但可以不是可交换的。MPI 中已经定义好的一些操作,它们是为函数MPI_Reduce 和一些其他的相关函数,如MPI_AllreduceMPI_Reduce_scatterMPI_Scan 而定义的 这些操作用来设定相应的 op。

MPI 预定的归约操作如下:

操作含义操作含义
MPI_MAX最大值MPI_MIN最小值
MPI_SUM求和MPI_PROD求积
MPI_LAND逻辑与MPI_BAND按位与
MPI_LOR逻辑或MPI_BOR按位或
MPI_LXOR逻辑异或MPI_BXOR按位异或
MPI_MAXLOC最大值和位置MPI_MINLOC最小值和位置

注释

示例 4:计算 pi 值

// reduce.c
#include <math.h>
#include <mpi.h>
#include <stdio.h>

double f(double);
double f(double x)
{
    return 4.0 / (1.0 + x * x);
}

int main(int argc, char **argv)
{
    int done      = 0, n, myid, numprocs, i;
    double PI25DT = 3.141592653589793238462643;
    double mypi, pi, h, sum, x;
    double startwtime = 0.0, endwtime;
    int namelen;
    char process_name[MPI_MAX_PROCESSOR_NAME];
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(process_name, &namelen);

    fprintf(stdout, "Process %d of %d is on %s\n", myid, numprocs, process_name);
    n = 0;
    if (myid == 0)
    {
        fprintf(stdout, "Enter the number of intervals: (0 quits) ");
        fflush(stdout);
        scanf("%d", &n);
        startwtime = MPI_Wtime();
    }
    /* 将 n 广播给所有进程 */
    MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
    /* 矩形宽度 */
    h = 1.0 / (double)n;
    /* 矩形面积初值 */
    sum = 0.0;
    /* 每个进程计算自己的部分 */
    for (i = myid + 1; i <= n; i += numprocs)
    {
        x = h * ((double)i - 0.5);
        sum += f(x);
    }
    /* 各个进程并行计算得到的和 */
    mypi = h * sum;
    MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
    /* 将部分和累加得到最终结果 */
    if (myid == 0)
    {
        printf("pi is approximately %.16f, Error is %.16f\n", pi, fabs(pi - PI25DT));
        endwtime = MPI_Wtime();
        printf("wall clock time = %f\n", endwtime - startwtime);
        fflush(stdout);
    }
    MPI_Finalize();
    return 0;
}
本博客已稳定运行
总访客数: Loading
总访问量: Loading
发表了 73 篇文章 · 总计 323.73k

使用 Hugo 构建
主题 StackJimmy 设计
基于 v3.27.0 分支版本修改