Featured image of post Ring buffer 详解

Ring buffer 详解

都 3202 年了,还有人没用过 ring buffer 吗?那可不行。本文将全面讲解 ring buffer 的基本原理、具体操作和具体实现(RT-Thread 与 Linux),以后尤其在通信双方循环发送数据的开发场景就可以用得上这一高效的数据结构了。

# Ring buffer 详解

# 一、概述

ring buffer 称作 环形缓冲区 ,也称作环形队列(circular queue),是一种用于表示一个固定尺寸、头尾相连的缓冲区的数据结构,适合缓存数据流。如下为 环形缓冲区(ring buffer) 的概念示意图。

在任务间的通信、串口数据收发、log 缓存、网卡处理网络数据包、音频/视频流处理中均有 环形缓冲区(ring buffer) 的应用。在 RT-Threadringbuffer.cringbuffer.h 文件中,Linux 内核文件 kfifo.hkfifo.c 中也有 环形缓冲区(ring buffer) 的代码实现。

20231202213555-2023-12-02

环形缓冲区的一些使用特点如下:

  • 当一个数据元素被读取出后,其余数据元素不需要移动其存储位置;
  • 适合于事先明确了缓冲区的最大容量的情形。缓冲区的容量(长度)一般固定,可以用一个静态数组来充当缓冲区,无需重复申请内存;
  • 如果缓冲区的大小需要经常调整,就不适合用环形缓冲区,因为在扩展缓冲区大小时,需要搬移其中的数据,这种场合使用链表更加合适;
  • 因为缓冲区成头尾相连的环形,写操作可能会覆盖未及时读取的数据,有的场景允许这种情况发生,有的场景又严格限制这种情况发生。选择何种策略和具体应用场景相关。

# 二、原理

由于计算机内存是线性地址空间,因此 环形缓冲区(ring buffer) 需要特别的算法设计才可以从逻辑上实现。

# 1. 一个简单例子

先不要想 环形缓冲区(ring buffer) 的具体实现细节,来看一个简单的例子。如下是一个空间大小为 7 的环形缓冲区,其中底部的单线箭头表示头尾相连形成一个环形地址空间:

375px-Circular_buffer_-_empty.svg-2023-12-02

假设 1 被写入圆形缓冲区的中心(在环形缓冲区中,最初的写入位置在哪里是无关紧要的):

375px-Circular_buffer_-_XX1XXXX.svg-2023-12-02

写入两个 元素,分别是 23 ,这两个元素被追加到 1 之后:

375px-Circular_buffer_-_XX123XX.svg-2023-12-02

如果 读出两个 元素,那么环形缓冲区中最老的两个元素将被读出(先进先出原则)。在本例中 12 被读出,缓冲区中剩下 3

375px-Circular_buffer_-_XXXX3XX.svg-2023-12-02

紧接着,向缓冲区中 写入六个 元素 4、5、6、7、8、9 ,这时缓冲区会被装满:

375px-Circular_buffer_-_6789345.svg-2023-12-02

如果缓冲区是满的,又要写入新的数据,这时有两种策略:一种是覆盖掉最老的数据,也就是将老数据丢掉;另一种是返回错误码或者抛出异常。来看策略一,例如,这时写入两个元素 AB ,就会覆盖掉 34

375px-Circular_buffer_-_6789AB5.svg-2023-12-02

再看,这时如果读出两个元素,就不是 34 而是 5656 这时最老),34 已经被 AB 覆盖掉。

375px-Circular_buffer_-_X789ABX.svg-2023-12-02

通过这个简单的例子,可以总结出要实现 环形缓冲区(ring buffer) 需要注意到几个问题点:

  • 在缓冲区满的时候写数据,有两种策略可以使用:第一覆盖掉老数据;第二抛出异常;
  • 读数据时,一定要读出缓冲区中最老的数据(缓冲区中数据满足 FIFO 特性);
  • 怎样来判断缓冲区是满的;
  • 如何实现一个线性地址空间的循环读写。

# 2. 具体操作

一般的,对一个环形缓冲区进行读写操作,最少需要 4 个信息:

  • 在内存中的实际 开始位置 (例如:一片内存的头指针,数组的第一个元素指针);
  • 在内存中的实际 结束位置 (也可以是缓冲区实际空间大小,结合开始位置,可以算出结束位置);
  • 在缓冲区中进行写操作时的 写索引 值;
  • 在缓冲区中进行读操作时的 读索引 值。

缓冲区开始位置缓冲区结束位置(或空间大小) 实际上定义了环形缓冲区的实际逻辑空间和大小。 读索引写索引 标记了缓冲区进行读操作和写操作时的具体位置。

具体来说,读写逻辑如下:

  1. 当环形缓冲区为空时,读索引和写索引指向相同的位置(因为是环形缓冲区,可以出现在任何位置);
  2. 当向缓冲区写入一个元素时,元素被写入 写索引 当前所指向位置,然后写索引加 1,指向下一个位置;
  3. 当从缓冲区读出一个元素时,在 读索引 当前所指向位置的元素被读出,然后读索引加 1,指向下一个位置;
  4. 当缓冲区满时,写索引和读索引指向相同的位置(和缓冲区为空时一样)。

# 2.1 在缓冲区满的时候写数据,有两种策略可以使用

缓冲区变满在 环形缓冲区(ring buffer) 中会实际发生,一般会有两种处理策略,第一覆盖掉老数据;第二抛出“异常”。这两种策略该如何选择要结合具体的应用场景。如音/视频流中,丢掉一些数据不要紧,可以选择第一种策略;在任务间通信的时候,要严格保证数据正确传输,这个时候就要选择第二种策略。

# 2.2 读数据时,一定要读出缓冲区中最老的数据

环形缓冲区(ring buffer) 也是 FIFO 类型的数据结构,需要满足先进先出的原则。写就相当于进,读就相当于出。所以读数据时,一定要保证读最老的数据。一般的情况下不会有问题,但有一种场景需要小心: 当缓冲区是满的时候,继续写入元素(覆盖),除了写索引要变,读索引也要跟着变,保证读索引一定是指向缓冲区中最老的元素

# 2.3 怎样来判断缓冲区是满的

判断缓冲区是满还是空,在环形缓冲区(ring buffer)中是一个重点问题,在维基百科 中,讲解了五种判断方法,感兴趣可以看一下。在平衡各方优缺点后,本节重点讲解 镜像指示位 方法,在 linuxRT-Thread 实现的环形缓冲区中,也都是用的该策略(或者说是该策略的扩展)。

镜像指示位:缓冲区的长度如果是 n ,逻辑地址空间则为 0 至 n-1 ;那么,规定 n 至 2n-1 为镜像逻辑地址空间。本策略规定读写指针的地址空间为 0 至 2n-1 ,其中低半部分对应于常规的逻辑地址空间,高半部分对应于镜像逻辑地址空间。当指针值大于等于 2n 时,使其折返(wrapped)到 ptr-2n。使用一位表示写指针或读指针是否进入了虚拟的镜像存储区:置位表示进入,不置位表示没进入还在基本存储区。

  • 在读写指针的值相同情况下,如果二者的指示位相同,说明缓冲区为空;如果二者的指示位不同,说明缓冲区为满。

  • 这种方法优点是测试缓冲区满/空很简单;不需要做取余数操作;读写线程可以分别设计专用算法策略,能实现精致的并发控制。

  • 缺点是读写指针各需要额外的一位作为指示位。

  • 如果缓冲区长度是 2 的幂,则本方法可以省略镜像指示位。如果读写指针的值相等,则缓冲区为空;如果读写指针相差 n ,则缓冲区为满,这可以用条件表达式(写指针 == (读指针 异或 缓冲区长度))来判断。—— 维基百科

v2-22cba7a9b8ff0746184d752a63621981_1440w-2023-12-02

上面是维基百科中对 镜像指示位 的整体描述,但是单凭上面这一个描述,去理解 镜像指示位 方法还是有一定困难,下面来进行一些讨论。

上面描述中提到了 读/写指针 的概念,注意这个读/写指针和上文提到的 读索引写索引 不是一回事。读写指针的范围是 $[0, 2n-1]$ ,而 读索引写索引 的范围是 $[0, n - 1]$ ,其必须和缓冲区的实际逻辑空间一致。但是 读/写指针读索引写索引 有一个转换关系:

$$ 读索引 = 读指针 \bmod 缓冲区长度 \\ 写索引 = 写指针 \bmod 缓冲区长度 $$

但是如果缓冲区长度是 2 的幂,那么求余运算可以等价的转换为如下的 按位与 运算:

$$ 读索引 = 读指针 \& (缓冲区长度 - 1) \\ 写索引 = 写指针 \& (缓冲区长度 - 1) $$

按位与的运算效率要比求余运算高的多,在 linux 内核中将缓冲区长度扩展为 2 的幂长度随处可见,都是为了用按位与操作代替求余操作。为了判断缓冲区是否为空或者满,镜像指示位策略引入了两个布尔变量(指示位),来分别标记读指针或写指针是否进入了镜像区间 $[n, 2n-1]$ ,在读写指针的值相同情况下,如果二者的指示位相同,说明缓冲区 为空 ;如果二者的指示位不同,说明缓冲区 为满 。但如果缓冲区的长度是 2 的幂,则可以省略镜像指示位。如果读写指针的值相等,则缓冲区为空;如果读写指针相差 n ,则缓冲区为满。

# 2.4 如何实现一个线性地址空间的循环读写

理解了 2.3 节的描述,再来理解用一个线性地址空间来实现循环读写就比较容易。如一个环形缓冲区的长度为 ,则其读写索引的区间为 $[0, 6]$ 。当写索引的值为 6 ,再向缓冲区中写入一个元素时,写索引应该要回到缓冲区的起始索引位置 0 ,读索引在碰到这种情况也是类似处理。总结为一句话就是,当写索引或读索引已经到了环形缓冲区的结束位置时,进行下一步操作时,其应该要回到环形缓冲区的开始位置。

# 三、实现

对于环形缓冲区的代码实现,本文会分析 RT-Threadringbuffer.cringbuffer.h 文件,Linux 内核中的 kfifo.hkfifo.c 文件。

# 1. RT-Thread 中实现的 ring buffer

下面分析 RT-Threadring buffer 实现,主要会讨论 环形缓冲区结构体缓冲区初始化操作写操作读操作判断缓冲区是否为空或满

# 1.1 环形缓冲区结构体

RT-Thread 中定义了结构体 rt_ringbuffer ,其中 buffer_ptrbuffer_sizeread_indexwrite_index 和之前介绍的 4 个信息是完全对应的。为了判断缓冲区是空还是满,还定义了两个布尔型变量 read_mirrorwrite_mirror ,其是通过位域的定义方式来实现。

struct rt_ringbuffer
{
    rt_uint8_t *buffer_ptr;
    rt_uint16_t read_mirror : 1;
    rt_uint16_t read_index : 15;
    rt_uint16_t write_mirror : 1;
    rt_uint16_t write_index : 15;
    rt_int16_t buffer_size;
};

# 1.2 缓冲区初始化操作

初始化操作 rt_ringbuffer_init 很容易理解,就是将申请好的内存地址赋值给环形缓冲区,缓冲区实际逻辑大小也传入进去。read_indexwrite_indexread_mirrorwrite_mirror 全部初始化为零。

void rt_ringbuffer_init(struct rt_ringbuffer *rb,
                        rt_uint8_t           *pool,
                        rt_int16_t            size)
{
    RT_ASSERT(rb != RT_NULL);
    RT_ASSERT(size > 0);

    /* initialize read and write index */
    rb->read_mirror = rb->read_index = 0;
    rb->write_mirror = rb->write_index = 0;

    /* set buffer pool and size */
    rb->buffer_ptr = pool;
    rb->buffer_size = RT_ALIGN_DOWN(size, RT_ALIGN_SIZE);
}

# 1.3 写操作和读操作

写操作有两个接口 rt_ringbuffer_putrt_ringbuffer_put_force ,当缓冲区满的时候,前一个不会写入,后一个会强制写入(覆盖);读操作有一个接口 rt_ringbuffer_get

这里先说明一下, RT-Threadring buffer 实现虽然借鉴了上一章讲的 镜像指示位 策略,但其并没有使用读写指针,而是直接用的 写索引读索引 ,也就是说结构体中的 read_indexwrite_index 就是写索引和读索引,无需进行转换,直接可以用来操作缓冲区。这一点和 linux 的实现方式不同,在下面的 linux 章节中会看到。但 read_mirrorwrite_mirror 是和 镜像指示位策略 中讲的一样,用来标记是否进入了镜像区间。

先来看 rt_ringbuffer_put 的实现,该函数的返回值是实际写入大小,就是如果传入的 length 大于缓冲区的剩余空间,则 length 只有部分会被写入缓冲区。

  • if (rb->buffer_size - rb->write_index > length) 为真时,就是说从 写索引 到缓冲区结束位置这一段空间能容纳全部所写入数据。写索引无需回环。对应的代码就是 rt_memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
  • if (rb->buffer_size - rb->write_index > length) 为假时,就是说从 写索引 到缓冲区结束位置这一段空间无法全部容纳所写入数据,写索引需要回环到缓冲区开头,写入剩下的数据。对应代码就是 rt_memcpy(&rb->buffer_ptr[rb->write_index],&ptr[0],rb->buffer_size - rb->write_index);rt_memcpy(&rb->buffer_ptr[0],&ptr[rb->buffer_size - rb->write_index],length - (rb->buffer_size - rb->write_index)); 。因为写索引已经回环了,所以要将 write_mirror 做一下取反操作:rb->write_mirror = ~rb->write_mirror;

写操作接口 rt_ringbuffer_put_force 和上面介绍的基本一样,其实就是多了当传入的 length 大于缓冲区的剩余空间时,会将已有的元素覆盖掉。如果发生了元素覆盖,那缓冲区一定会变满,read_indexwrite_index 会相等,对应语句 if (length > space_length) rb->read_index = rb->write_index; 。因为会操作 read_index 元素,也要考虑其是否发生了回环,发生了回环后 read_mirror 需要取反,对应语句 rb->read_mirror = ~rb->read_mirror;

读接口 rt_ringbuffer_get 和写接口的操作逻辑基本一致,也是通过条件 if (rb->buffer_size - rb->write_index > length) 将读操作分成了两种情形,过程和写操作接口 rt_ringbuffer_put 没有差异。

/**
 * @brief Put a block of data into the ring buffer. If the capacity of ring buffer is insufficient, it will discard out-of-range data.
 *
 * @param rb            A pointer to the ring buffer object.
 * @param ptr           A pointer to the data buffer.
 * @param length        The size of data in bytes.
 *
 * @return Return the data size we put into the ring buffer.
 */
rt_size_t rt_ringbuffer_put(struct rt_ringbuffer *rb,
                            const rt_uint8_t     *ptr,
                            rt_uint16_t           length)
{
    rt_uint16_t size;

    RT_ASSERT(rb != RT_NULL);

    /* whether has enough space */
    size = rt_ringbuffer_space_len(rb);

    /* no space */
    if (size == 0)
        return 0;

    /* drop some data */
    if (size < length)
        length = size;

    if (rb->buffer_size - rb->write_index > length)
    {
        /* read_index - write_index = empty space */
        rt_memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
        /* this should not cause overflow because there is enough space for
         * length of data in current mirror */
        rb->write_index += length;
        return length;
    }

    rt_memcpy(&rb->buffer_ptr[rb->write_index],
           &ptr[0],
           rb->buffer_size - rb->write_index);
    rt_memcpy(&rb->buffer_ptr[0],
           &ptr[rb->buffer_size - rb->write_index],
           length - (rb->buffer_size - rb->write_index));

    /* we are going into the other side of the mirror */
    rb->write_mirror = ~rb->write_mirror;
    rb->write_index = length - (rb->buffer_size - rb->write_index);

    return length;
}
RTM_EXPORT(rt_ringbuffer_put);

/**
 * @brief Put a block of data into the ring buffer. If the capacity of ring buffer is insufficient, it will overwrite the existing data in the ring buffer.
 *
 * @param rb            A pointer to the ring buffer object.
 * @param ptr           A pointer to the data buffer.
 * @param length        The size of data in bytes.
 *
 * @return Return the data size we put into the ring buffer.
 */
rt_size_t rt_ringbuffer_put_force(struct rt_ringbuffer *rb,
                            const rt_uint8_t     *ptr,
                            rt_uint16_t           length)
{
    rt_uint16_t space_length;

    RT_ASSERT(rb != RT_NULL);

    space_length = rt_ringbuffer_space_len(rb);

    if (length > rb->buffer_size)
    {
        ptr = &ptr[length - rb->buffer_size];
        length = rb->buffer_size;
    }

    if (rb->buffer_size - rb->write_index > length)
    {
        /* read_index - write_index = empty space */
        rt_memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
        /* this should not cause overflow because there is enough space for
         * length of data in current mirror */
        rb->write_index += length;

        if (length > space_length)
            rb->read_index = rb->write_index;

        return length;
    }

    rt_memcpy(&rb->buffer_ptr[rb->write_index],
           &ptr[0],
           rb->buffer_size - rb->write_index);
    rt_memcpy(&rb->buffer_ptr[0],
           &ptr[rb->buffer_size - rb->write_index],
           length - (rb->buffer_size - rb->write_index));

    /* we are going into the other side of the mirror */
    rb->write_mirror = ~rb->write_mirror;
    rb->write_index = length - (rb->buffer_size - rb->write_index);

    if (length > space_length)
    {
        if (rb->write_index <= rb->read_index)
            rb->read_mirror = ~rb->read_mirror;
        rb->read_index = rb->write_index;
    }

    return length;
}
RTM_EXPORT(rt_ringbuffer_put_force);

/**
 * @brief Get data from the ring buffer.
 *
 * @param rb            A pointer to the ring buffer.
 * @param ptr           A pointer to the data buffer.
 * @param length        The size of the data we want to read from the ring buffer.
 *
 * @return Return the data size we read from the ring buffer.
 */
rt_size_t rt_ringbuffer_get(struct rt_ringbuffer *rb,
                            rt_uint8_t           *ptr,
                            rt_uint16_t           length)
{
    rt_size_t size;

    RT_ASSERT(rb != RT_NULL);

    /* whether has enough data  */
    size = rt_ringbuffer_data_len(rb);

    /* no data */
    if (size == 0)
        return 0;

    /* less data */
    if (size < length)
        length = size;

    if (rb->buffer_size - rb->read_index > length)
    {
        /* copy all of data */
        rt_memcpy(ptr, &rb->buffer_ptr[rb->read_index], length);
        /* this should not cause overflow because there is enough space for
         * length of data in current mirror */
        rb->read_index += length;
        return length;
    }

    rt_memcpy(&ptr[0],
           &rb->buffer_ptr[rb->read_index],
           rb->buffer_size - rb->read_index);
    rt_memcpy(&ptr[rb->buffer_size - rb->read_index],
           &rb->buffer_ptr[0],
           length - (rb->buffer_size - rb->read_index));

    /* we are going into the other side of the mirror */
    rb->read_mirror = ~rb->read_mirror;
    rb->read_index = length - (rb->buffer_size - rb->read_index);

    return length;
}
RTM_EXPORT(rt_ringbuffer_get);

# 1.4 判断缓冲区是否为空或满

判断缓冲区是否为空或满,通过函数 rt_ringbuffer_status 来实现。其逻辑是:在读写指针的值相同情况下,如果二者的指示位相同,说明缓冲区 为空 ;如果二者的指示位不同,说明缓冲区 为满 。注意这里的读写指针已经在读写( rt_ringbuffer_getrt_ringbuffer_put )过程中转换为了读写索引。

rt_inline enum rt_ringbuffer_state rt_ringbuffer_status(struct rt_ringbuffer *rb)
{
    if (rb->read_index == rb->write_index)
    {
        if (rb->read_mirror == rb->write_mirror)
            return RT_RINGBUFFER_EMPTY;
        else
            return RT_RINGBUFFER_FULL;
    }
    return RT_RINGBUFFER_HALFFULL;
}

# 小结

  • 在多线程中,对同一个环形缓冲区进行读写操作时,需要加上锁,不然存在访问不安全问题;
  • 当只有一个读线程和一个写线程(单生产者单消费者模型)时,用 rt_ringbuffer_putrt_ringbuffer_get 进行读写操作缓冲区是线程安全的,无需加锁;但是 rt_ringbuffer_put_force 不行,因为其可能对 读写索引 都进行操作的场景,这个时候再进行 rt_ringbuffer_get 读操作,就是不安全访问;
  • 读写指针已经在读写( rt_ringbuffer_getrt_ringbuffer_put )过程中转换为了读写索引。所以 read_index(读索引)和 write_index (写索引)可以直接用来操作缓冲区,无需转换;
  • read_indexwrite_index 的大小区间为$[0, n-1]$ , $n$ 为缓冲区大小;
  • RT-Thread 的环形缓冲区不需要 buffer 大小为 2 的幂

# 2. Linux 中实现的 ring buffer

linux 内核中, kfifo 就是 ring buffer 的经典实现方式,本文将介绍 linux 2.6 版本中的 ring buffer 实现方式,主要介绍 缓冲区结构体缓冲区初始化读操作写操作判断缓冲区是否为空或满

# 2.1 缓冲区结构体

kfiforing buffer 结构体定义如下,其中 buffer、size、in、out 环形缓冲区 4 个信息是一一对应的。但其中 inout 分别是 写指针读指针 ,而不是 写索引读索引 。参数 lock 是自旋锁,在多进程/线程对同一个环形缓冲区进行读写操作时,需要进行锁保护。和 RT-Thread 对比,可以看到其并没有读写的 镜像指示位

struct kfifo {
 unsigned char *buffer; /* the buffer holding the data */
 unsigned int size; /* the size of the allocated buffer */
 unsigned int in; /* data is added at offset (in % size) */
 unsigned int out; /* data is extracted from off. (out % size) */
 spinlock_t *lock; /* protects concurrent modifications */
};

# 2.2 缓冲区初始化

kfifo 的初始化 kfifo_init 中可以看出,其会对所传入的 size 大小进行扩展,使其满足 size2 的幂 。这样就可以使用性质: 如果缓冲区的长度是 2 的幂 ,则可以省略镜像指示位。如果读写指针的值相等,则缓冲区为空;如果读写指针相差 n(缓冲区大小),则缓冲区为满 。所以在传入 buffersize 大小时,最好开始就将其确定为 2 的幂

struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
    int gfp_mask, spinlock_t *lock)
{
 struct kfifo *fifo;

 /* size must be a power of 2 */
 BUG_ON(size & (size - 1));

 fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
 if (!fifo)
  return ERR_PTR(-ENOMEM);

 fifo->buffer = buffer;
 fifo->size = size;
 fifo->in = fifo->out = 0;
 fifo->lock = lock;

 return fifo;
}

# 2.3 读操作和写操作

可以看到 kfifo 对读操作和写操作的实现非常简洁。在进行读操作和写操作时,其充分利用了无符号整型的性质。在 __kfifo_put (写操作)和 __kfifo_get (读操作)时, in (写指针)和 out (读指针)都是正向增加的,当达到最大值时,产生溢出,使得从 0 开始,进行循环使用。 in(写指针)out(读指针) 会恒定的保持如下关系:

  • 读指针 + 缓冲区已存储数据长度 = 写指针

其中读指针是 out ,写指针是 inout(读指针) 永远不会超过 in(写指针) 的大小,最多两者相等,相等就是缓冲区为空的时候。

先看 __kfifo_put 的源码。 len = min(len, fifo->size - fifo->in + fifo->out); 中表达的意思就是实际写入的长度一定要小于缓冲区的可用空间大小,防止发生覆盖已有元素的场景。来看这一句 l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); ,其中 (fifo->in & (fifo->size - 1)) 就是将 in(写指针) 转换为写索引,整体表达的意思是从 写索引 到缓冲区结束位置这一段所能写入数据的大小,这一段写入操作的代码为 memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); 。如果这一段还不够,需要折返到缓冲区的开始位置,将剩下的部分写入到缓冲区中,其代码为 memcpy(fifo->buffer, buffer + l, len - l); 。而且 $len >= l$, 当 $len=l$ 就说明第一段已经可以容纳所写入大小,缓冲区无需折返,第二个 memcpy 拷贝了零个字节,相当于什么也没有发生。

再看 __kfifo_get 的源码。其思路基本和 __kfifo_put 一致,了解了上面的转换关系,就比较好理解:

/*
 * __kfifo_put - puts some data into the FIFO, no locking version
 * @fifo: the fifo to be used.
 * @buffer: the data to be added.
 * @len: the length of the data to be added.
 *
 * This function copies at most 'len' bytes from the 'buffer' into
 * the FIFO depending on the free space, and returns the number of
 * bytes copied.
 *
 * Note that with only one concurrent reader and one concurrent
 * writer, you don't need extra locking to use these functions.
 */
unsigned int __kfifo_put(struct kfifo *fifo,
    unsigned char *buffer, unsigned int len)
{
 unsigned int l;

 len = min(len, fifo->size - fifo->in + fifo->out);

 /* first put the data starting from fifo->in to buffer end */
 l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
 memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

 /* then put the rest (if any) at the beginning of the buffer */
 memcpy(fifo->buffer, buffer + l, len - l);

 fifo->in += len;

 return len;
}
EXPORT_SYMBOL(__kfifo_put);

unsigned int __kfifo_get(struct kfifo *fifo,
    unsigned char *buffer, unsigned int len)
{
 unsigned int l;

 len = min(len, fifo->in - fifo->out);

 /* first get the data from fifo->out until the end of the buffer */
 l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
 memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

 /* then get the rest (if any) from the beginning of the buffer */
 memcpy(buffer + l, fifo->buffer, len - l);

 fifo->out += len;

 return len;
}
EXPORT_SYMBOL(__kfifo_get);

# 2.4 判断缓冲区是否为空或满

kfifo 中没有专门的函数判断缓冲区是否为空或满,但可以通过 __kfifo_len 函数获取 缓冲区已存储数据长度 。如果其值等于零就说明缓冲区为空,如果其值等于缓冲区大小,就说明缓冲区满。

static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
 return fifo->in - fifo->out;
}

# 小结

  • linux 中环形缓冲区(ring buffer)的实现,其实是对 镜像指示位 策略的扩展,读指针和写指针区间范围不再局限在镜像区间 $[0, 2n-1]$ ,而是整个 unsigned int 大小的空间,对于 32 位机器,读指针和写指针的区间范围是 $[0, 2^{32} - 1]$ ;
  • 进行扩展后,还能维持如下的关系,是因为缓冲区大小 n 会被扩展为 2 的幂 ,那么 $2^32$ 肯定是 $n$ 扩展后的整数倍,所以还是能够满足如下关系;
    • 读索引 = 读指针 & (缓冲区长度 - 1)
    • 写索引 = 写指针 & (缓冲区长度 - 1)
  • 读索引和写索引的区间范围仍然是 $[0, n-1]$ , $n$ 为缓冲区大小;
  • 在多进程/线程中,对同一个环形缓冲区进行读写操作时,需要加上锁,不然存在访问不安全问题;
  • 当只有一个读进程/线程和一个写进程/线程时,无需加锁,也能保证访问安全。

# 3. 5.17+版本的 Linux 中实现的 ring buffer

linux5.17+ 版本中的 kfifo.ckfifo.h 中,其源码实现已经和 linux 2.6 版本有很大的不同,但是最新版本的 ring buffer 核心思想和 linux 2.6 版本并没有不同。

kfifo.h 文件中定义的 ring buffer 结构体,其中 inout 依然是 写指针读指针mask 是缓冲区大小减 1(做&操作,更方便的将读写指针转换为 读写索引 ), esize 缓冲区单个存储元素的字节大小(在 linux 2.6 版本中,一个元素就是一个字节大小,最新版本将其进行了扩展), data 缓冲区的逻辑起始地址(指针类型不再是字节)。其它的 初始化接口读接口写接口判断缓冲区是否为空或满接口逻辑linux 2.6 大致差不多,可以对照源码看一下。

struct __kfifo {
 unsigned int in;
 unsigned int out;
 unsigned int mask;
 unsigned int esize;
 void  *data;
};

# 四、总结

  • 环形缓冲区(ring buffer)适合于事先明确了缓冲区的最大容量的情形。缓冲区的容量(长度)一般固定,可以用一个静态数组来充当缓冲区,无需重复申请内存;
  • 如果缓冲区的大小需要经常调整,就不适合用环形缓存区,因为在扩展缓冲区大小时,需要搬移其中的数据,这种场合使用链表更加合适;
  • 因为缓冲区成头尾相连的环形,写操作可能会覆盖未及时读取的数据,有的场景允许这种情况发生,有的场景又严格限制这种情况发生。选择何种策略和具体应用场景相关;
  • 环形缓冲区(ring buffer)特别适合于通信双方循环发送数据的场景;
  • 镜像指示位是一种高效判断缓冲区是否为空或满的策略,在 RT-Threadlinux 中都使用了该策略(或者是该策略的扩展),其能够保证在只有一个读线程(或进程)和一个写线程(或进程)中无需锁也能做到线程安全;
  • 注意区分写指针和写索引,读指针和读索引,最终对缓冲区进行操作还是需要写索引和读索引;
  • 如果自己嵌入式项目中需要使用环形缓冲区(ring buffer),可以借鉴 linux 2.6 版本的 kfifo 实现,很容易改写,而且非常高效。

# 五、参考资料

Licensed under CC BY-NC-SA 4.0
本博客已稳定运行
总访客数: Loading
总访问量: Loading
发表了 73 篇文章 · 总计 323.75k

使用 Hugo 构建
主题 StackJimmy 设计
基于 v3.27.0 分支版本修改