译文 · 原文: Friday Q&A 2012-02-17: Ring Buffers and Mirrored Memory: Part II · 作者 Mike Ash
原文:https://www.mikeash.com/pyblog/friday-qa-2012-02-17-ring-buffers-and-mirrored-memory-part-ii.html 发布:2012-02-17 作者:Mike Ash 译者:MiMo(mimo-v2.5-pro);代码块保留英文原样
上次的周五技术问答中,我开始讨论如何通过虚拟内存技巧实现镜像内存的环形缓冲区(ring buffer)。第一篇文章集中介绍了那些虚拟内存技巧。今天,我将补充拼图的后半部分,展示如何在我们开发的镜像内存分配器(mirrored memory allocator)之上实现环形缓冲区。如果你还没读过上一篇文章,我强烈推荐你先读一下,否则镜像内存的部分可能会令人困惑。
代码 和上次一样,我们将讨论的代码已发布在 GitHub 上:
https://github.com/mikeash/MAMirroredQueue
目标 镜像内存技巧允许向外部世界暴露指向内部缓冲区的指针,因为数据和空闲空间始终是连续的。因此,目标就是创建一个易于使用的 API。在默认操作中,环形缓冲区也应该能够增长以容纳新写入的数据。对于多线程使用,可以锁定环形缓冲区的大小,此时缓冲区对于单个并发的读取器和写入器是线程安全的。这种情况下,线程安全应该无需使用锁即可实现。
API
环形缓冲区实现在一个名为 MAMirroredQueue 的类中:
@interface MAMirroredQueue : NSObject读取操作有三个方法。一个方法用于检索可读取的数据量,一个方法返回指向数据的指针,另一个方法用于移动指针:
- (size_t)availableBytes; - (void *)readPointer; - (void)advanceReadPointer: (size_t)howmuch;这样,客户端就能知道可以读取多少数据,可以访问数据,完成操作后还能通过向前推进指针来从环形缓冲区中移除这些数据。
对于写入数据,接口设计是类似的。不过,这里不是提供查询数据量的方法,而是提供一个简单确保所需空闲空间可用的方法:
- (BOOL)ensureWriteSpace: (size_t)howmuch; - (void *)writePointer; - (void)advanceWritePointer: (size_t)howmuch;ensureWriteSpace: 方法返回成功或失败。当分配未被锁定(默认状态)时,该方法总会成功。当分配被锁定(以确保线程安全)时,仅当缓冲区中有足够的空闲空间才会成功,否则返回 NO。
写入 API 中的另外两个方法与读取 API 中的相同:一个用于获取数据指针,另一个用于在数据写入后推进该指针。
关于锁定分配的讨论这么多,我们可能需要一些方法来实际管理它:
- (void)lockAllocation; - (void)unlockAllocation;队列初始处于未锁定状态。若需要线程安全的操作,可调用 ensureWriteSpace: 来分配所需的空间大小,随后 lockAllocation 可确保缓冲区不被重新分配。若未来需要扩展缓冲区,可先调用 unlockAllocation 解锁,再通过 ensureWriteSpace: 实现扩容。
最后,我还为上述功能编写了一组 UNIX 风格的封装函数:
- (size_t)read: (void *)buf count: (size_t)howmuch; - (size_t)write: (const void *)buf count: (size_t)howmuch;这些操作并非必要,实际上效率较低,因为 API 要求数据在环形缓冲区(ring buffer)中进出时必须进行复制。但拥有一个共享 POSIX read和write系统调用语义的 API 可能是个不错的选择。
实例变量
缓冲区本身由三个实例变量(Instance Variables)描述:
char *_buf; size_t _bufSize; BOOL _allocationLocked;所有这些结构体,我希望都是不言自明的。请注意 _buf 被声明为 char *,以便进行简单的指针运算(pointer arithmetic),这在后续处理中会很有用。尽管 gcc 和 clang 编译器允许对 void * 指针进行指针运算,但技术上这是不被允许的,因此对于按字节寻址的实体(byte-addressed entities),char * 是一个便捷的选择。
除了缓冲区之外,我们还需要一个读指针和一个写指针:
char *_readPointer; char *_writePointer;页面大小对这段代码至关重要,它需要能够将数字向上或向下舍入到页面大小的倍数。我为此编写了两个简单的包装器,它们基于 mach 宏来实现这一功能:
static size_t RoundUpToPageSize(size_t n) { return round_page(n); }
static void *RoundDownToPageSize(void *ptr) { return (void *)trunc_page((intptr_t)ptr); }第一个函数将字节大小向上取整至页面大小的最近倍数,第二个函数则将指针向下取整至最近的页面边界。
代码 首先是 dealloc 方法。我假设使用了 ARC(自动引用计数),因此无需调用 super。它所做的就是,如果缓冲区已被分配,则释放它:
- (void)dealloc { if(_buf) free_mirrored(_buf, _bufSize, MIRROR_COUNT); }MIRROR_COUNT 是一个简单的 #define,用于定义需要分配多少份镜像副本。有趣的是,它被设置为 3 而非你可能预想的 2,这正是我的镜像内存分配器支持任意数量镜像而非硬编码两份的原因。其背后的逻辑稍后会进一步说明。
由于只需将所有实例变量(instance variables)置为 0 或 NULL 即可完成初始化,因此没有专门的初始化方法。环形缓冲区(ring buffer)起始为空状态,而全零的初始化值正好描述了这一点。
接下来是 availableBytes 方法。该方法首先执行的操作是:将写指针(write pointer)减去读指针(read pointer)。
- (size_t)availableBytes { ptrdiff_t amount = _writePointer - _readPointer;通常,这仅仅是缓冲区中数据字节的数量。然而,如果另一个线程在我们计算此值时正在修改此缓冲区,指针可能正在移动。如果它们只是通过一次读写操作的大小移动,那么没问题。我们可能最终计算出可用字节数的旧值或新值,但两者都能正常工作。
然而,指针也可能按缓冲区的大小移动。当读指针进入第二个镜像区域时,它会被重置回第一个区域,写指针也会随之重置。正因为如此,这里计算的大小可能小于零(如果我们看到了写指针的更新但没看到读指针的),也可能大于缓冲区大小(如果我们看到了读指针的更新但没看到写指针的)。由于我们知道可用字节数必须介于零和缓冲区大小之间,这很容易修正:只需检查这些情况,并相应地调整数量即可:
if(amount < 0) amount += _bufSize; else if((size_t)amount > _bufSize) amount -= _bufSize;
return amount; }接下来,我们有 readPointer。它简单地返回「ivar(实例变量)」。
- (void *)readPointer { return _readPointer; }接下来是 advanceReadPointer: 方法。这个方法只是简单地将指定数量加到读指针上:
- (void)advanceReadPointer: (size_t)howmuch { _readPointer += howmuch;然而,此处操作尚未完成。若此操作使读指针(read pointer)越过首个镜像区域的边界,则需将读指针与写指针(write pointer)同时回拨。对于读指针,只需执行减去 _bufSize 的操作即可。由于写指针可能同时被读线程(通过此方法)和写线程(在 advanceWritePointer: 中)修改,因此必须通过原子操作(atomic operation)进行更新。我使用内置的 __sync_sub_and_fetch 函数来完成此操作:
if((size_t)(_readPointer - _buf) >= _bufSize) { _readPointer -= _bufSize; __sync_sub_and_fetch(&_writePointer, _bufSize); } }接下来是 ensureWriteSpace: 方法。该方法的前半部分很简单:通过从总缓冲区大小中减去 [self availableBytes](可用字节数)来计算剩余空闲空间,如果请求的写入量小于该空闲空间,则一切就绪:
- (BOOL)ensureWriteSpace: (size_t)howmuch { size_t contentLength = [self availableBytes]; if(howmuch <= _bufSize - contentLength) return YES;否则,我们知道空闲空间不足以满足该请求。如果分配过程处于锁定状态,那就此结束,返回 NO:
else if(_allocationLocked) return NO;如果分配未被锁定,那么现在就是重新分配缓冲区的时候了。
首先需要计算要分配多少内存,然后分配一个该大小的新缓冲区。回想一下,由于镜像分配器(mirrored allocator)使用虚拟内存技巧,它必须分配页面大小(page size)的整数倍。我们至少需要 contentLength + howmuch 的内存,因此通过将该数字向上取整到最近的页面大小来确定新缓冲区的大小:
size_t newBufferLength = RoundUpToPageSize(contentLength + howmuch);接下来,分配新的缓冲区:
char *newBuf = allocate_mirrored(newBufferLength, MIRROR_COUNT);现在我们有了新的缓冲区,代码会有所分支。如果已经存在一个已有的缓冲区,那么我们就是在重新分配内存,必须将数据从旧缓冲区复制到新缓冲区中:
if(_bufSize > 0) {接下来,我们要再次玩虚拟内存的把戏。Mach 提供了一个 vm_copy 函数,它可以在不实际复制数据的情况下复制页对齐(page-aligned)的内存。相反,这些页面会被重新映射,并设置为写时复制(copy on write)。对于这种将立即释放旧内存的场景,这意味着数据实际上从未被复制,系统只是通过一些虚拟内存技巧让它看起来像是被复制了。
我们希望从读指针开始复制,但由于所有操作都必须页对齐,因此复制操作必须从包含读指针的页面的起始位置开始:
char *copyStart = RoundDownToPageSize(_readPointer);同样,长度也需要是页大小(page size)的整数倍。从 copyStart 开始,我们需要复制 _writePointer - copyStart 字节,但这个值需要向上舍入以符合页大小要求:
size_t copyLength = RoundUpToPageSize(_writePointer - copyStart);现在这一点已设置好,我们可以将这些数据” 复制” 到新缓冲区中:
vm_copy(mach_task_self(), (vm_address_t)copyStart, copyLength, (vm_address_t)newBuf);既然数据已经拷贝完毕,我们需要计算新的读指针位置。由于将 _readPointer 向下取整到 copyStart 导致多拷贝了一些字节,新的读指针位置就等于 newBuf 加上这些附加字节数:
char *newReadPointer = newBuf + (_readPointer - copyStart);这段代码在开发时对我来说特别棘手,因此我添加了一个断言(assert)来确保问题能尽早且明显地暴露出来:
if(*newReadPointer != *_readPointer) abort();现在我们可以释放旧缓冲区并重新分配读指针:
free_mirrored(_buf, _bufSize, MIRROR_COUNT); _readPointer = newReadPointer;写指针被设置为等于读指针加上之前计算过的内容长度:
_writePointer = _readPointer + contentLength; }对于不存在先前缓冲区(buffer)的情况,代码很简单:只需将读写指针(read and write pointer)设置到新缓冲区的开头:
else { _readPointer = newBuf; _writePointer = newBuf; }新的缓冲区(buffer)被分配,如果需要的话,数据(data)被复制,现在剩下的就是设置 _buf 和 _bufSize 实例变量(ivar),然后向调用者返回 YES。
_buf = newBuf; _bufSize = newBufferLength;
return YES; }接下来是 writePointer 方法,这再次仅仅是一个简单的访问器。
- (void *)writePointer { return _writePointer; }advanceWritePointer: 方法同样简单,对 _writePointer: 执行一次原子加(atomic add)操作:
- (void)advanceWritePointer: (size_t)howmuch { __sync_add_and_fetch(&_writePointer, howmuch); }请注意,与 advanceReadPointer: 方法不同,此方法无需执行任何检查来将指针回绕至第一个镜像数据区。advanceReadPointer: 方法会同时处理读指针和写指针的回绕。由于此处不会发生回绕,写指针可能长时间停留在第二个镜像数据区,但这完全没问题。
通过让写指针始终领先于读指针,此代码避免了一个令人困扰的歧义。使用读写指针的环形缓冲区(ring buffer)通常会在读写指针相等时遇到问题。无法简单区分缓冲区是空的(两者指针相等,因为它们之间没有数据)还是满的(两者指针相等,因为它们之间没有空闲空间)。
环形缓冲区实现通常通过以下两种方式避免此问题:要么禁止缓冲区被完全填满,而是将” 满” 定义为比真实满容量少一个单位;要么使用一个指针加一个计数器,而非使用两个指针。
在这个案例中,两种替代方案都不够理想。人为让缓冲区少一个字节的做法,在处理复杂的虚拟内存操作时尤其令人头疼,因为使用该环形缓冲区的代码很可能也需要处理整页内存,而这丢失的单个字节实质上会损失整个 4KB 页面。若采用指针加计数器的方案,要实现无锁线程安全将变得极其困难甚至完全不可能 —— 写指针会成为由另外两个值推导出的派生量,而这两个值都会被读线程修改,当两个值同时被更新时,从其他线程观察可能瞬间出现不一致状态。
镜像内存分配(mirrored allocation)所采用的虚拟内存技巧提供了第三种更优解:只需使用两个指针,并通过” 写指针等于读指针加上缓冲区大小” 来表示缓冲区已满。这种方式自然直观,得益于镜像内存分配的特性工作效果极佳,且处理起来非常简便。
接下来是分配锁定方法。这些方法仅仅操作_allocationLocked实例变量(译注:ivar,即 instance variable)。这些方法中不需要执行其他操作,因为它们只是修改了ensureWriteSpace:的行为。代码如下:
- (void)lockAllocation { _allocationLocked = YES; }
- (void)unlockAllocation { _allocationLocked = NO; }接下来,我们有 UNIX 类似的兼容包装器。这些有助于说明更原始、直接的 API(应用程序编程接口)是如何使用的。read:count: 方法使用 availableBytes 确定读取量,从 readPointer 复制出数据,然后调用 advanceReadPointer: 以标记数据为已读:
- (size_t)read: (void *)buf count: (size_t)howmuch { size_t toRead = MIN(howmuch, [self availableBytes]); memcpy(buf, [self readPointer], toRead); [self advanceReadPointer: toRead]; return toRead; }write:count
,因为它的行为会因分配是否被锁定而有所不同。若分配被锁定,则仅写入缓冲区剩余空间能容纳的数据量;否则会调用 ensureWriteSpace:(确保写入空间)方法,按需扩展缓冲区至合适大小: - (size_t)write: (const void *)buf count: (size_t)howmuch { if(_allocationLocked) howmuch = MIN(howmuch, _bufSize - [self availableBytes]); else [self ensureWriteSpace: howmuch];其余部分则很直接。它将数据复制到 writePointer,推进写指针,并返回已写入的数据量:
memcpy([self writePointer], buf, howmuch); [self advanceWritePointer: howmuch]; return howmuch; }这便完成了镜像队列的实现。
线程安全性
本实现的目标之一是保证线程安全,适用于单读取线程与单写入线程并存的情况。上述代码实现了这一目标,但其安全性原理并非显而易见。代码中未使用锁,读取指针(_readPointer)甚至没有通过原子操作(atomic operations)进行更新。
首先需注意,代码仅在分配操作加锁(allocation locked)的情况下才是线程安全的。这意味着 ensureWriteSpace: 中那些复杂的重新分配代码不会参与运作。这很关键,因为若不加锁,将该部分代码实现为线程安全的难度会极大。鉴于此,我们可以将 _buf 和 _bufSize 视为常量。唯一可能被一个线程修改的同时又被另一个线程读取的变量,只剩下 _readPointer 和 _writePointer。
最简单的分析方式是将其视为两个独立情况。第一,即使写入线程正在修改这些值,读取线程也必须保持正确性。第二,即使读取线程正在修改这些值,写入线程也必须保持正确性。若两者均成立,则整个系统即正确。
我们来看第一种情况,即确保在写入线程进行修改时,读取线程仍能正确读取数据。写入线程仅在 _writePointer 处进行修改。读取线程依赖于 _writePointer 值的唯一位置是在 availableBytes: 方法中:
ptrdiff_t amount = _writePointer - _readPointer;这种非同步访问是完全安全的。虽然存在竞争条件(race condition),无法确定读线程会看到 _writePointer 的旧值还是新值,但这并不重要。_writePointer 只能增加,可用字节数同样只能增加。如果它看到旧值,计算出的可用字节数仍然正确,只是略微过时。如果它看到新值,那情况会更好。因此,读操作是安全的。
现在来看写操作在面对读线程修改时的安全性。读线程可以修改两个指针,因此分析稍复杂一些。写操作代码也调用了 availableBytes(名义上属于读线程侧的方法),因此该方法必须对两者都安全。
读线程修改 _writePointer 的唯一方式是通过这行代码:
__sync_sub_and_fetch(&_writePointer, _bufSize);由于底层缓冲区的镜像结构,_writePointer 的旧值和新值在此处都是正确的。availableBytes 可以处理任一值,并在两种情况下都能返回正确结果。同样地,当 writePointer 返回该值时,无论是旧值还是新值都无关紧要,因为两者在数据写入时的作用是等价的。最后,advanceWritePointer: 是安全的,因为它也使用了 __sync 内置函数(译注:指 GCC / Clang 的原子操作内置指令)来修改 _writePointer,确保两次更新会以某种顺序应用,且具体顺序并不重要。
写入线程唯一使用 _readPointer 的地方是在 availableBytes 中。就像写入线程修改其指针而读取线程调用 availableBytes 的对应情况一样,当写入线程正在计算 availableBytes 时,读取线程修改 _readPointer 是安全的。推进读取指针会减少可用字节数,从而增加写入空间。如果写入线程在这里看到的是 _readPointer 的旧值,它会计算出较旧的、更小的写入空间,这仍然是安全的,只是稍微过时了。
写入线程面对读取线程的变更时是安全的,反之亦然。因此,这段代码确实是线程安全的。
三副本设计
我曾承诺要解释 MAMirroredQueue 为何分配三份镜像缓冲区副本,现在正是时候。
通常情况下,连续环形缓冲区(contiguous ring buffer)只需两份副本即可。但请注意,本实现相比常规镜像环形缓冲区有些特殊:它使用独立的读写指针,并允许写指针指向第二个镜像区域。这既实现了无锁(lockless)线程安全,又能充分利用整个缓冲区,避免了两个指针相等时出现的奇异歧义性。不过,这也要求必须存在第三个镜像区域。
这种情况极其罕见,但确实可能发生。首先,我们需要一个数据部分已缠绕回起始位置的缓冲区。在普通环形缓冲区中,它看起来是这样的:
read | v +----------+ |xxx xxxxx| +----------+ ^ | write在镜像环形缓冲区中,它看起来像是这样:
read | v +----------+----------+ |xxx xxxxx|xxx xxxxx| +----------+----------+ ^ | write到目前为止一切顺利。写指针有效,向空白区域写入数据是正确的。现在,假设存在独立的读线程和写线程同时操作这个缓冲区。读线程将读指针向前移动:
read | v +----------+----------+ | x | x | +----------+----------+ ^ | write读线程接下来的步骤是将读指针和写指针都移动到第一个镜像区域。但是!在发生这种情况之前,假设写线程突然要写入一大段数据。(记住,在抢占式线程环境中,两个线程可能以各种奇怪的顺序执行。)在读线程能够移动任何指针之前,写线程计算了可用空间,将数据写入缓冲区,然后崩溃了:
read | v +----------+----------+ | x | xXXXXXXX|XX +----------+----------+ ^ | write写入操作越过了第二个镜像段的末尾!它探测到大量可用空间 —— 这一点是正确的,但问题在于这段空闲空间的末尾只能从驻留在第一个镜像段的写指针访问。由于我们允许写指针停留在第二个镜像段中,就存在了出现问题的可能性。这是一个短暂的竞态窗口,需要非常特定的条件才能触发,但确实可能发生,因此需要防护措施。
幸运的是,这个问题可以通过简单地分配第三个镜像段来轻松解决。在这种情况下,操作序列如下所示:
read | v +----------+----------+----------+ |xxx xxxxx|xxx xxxxx|xxx xxxxx| +----------+----------+----------+ ^ | write
read | v +----------+----------+----------+ | x | x | x | +----------+----------+----------+ ^ | write
read | v +----------+----------+----------+ |XXxXXXXXXX|XXxXXXXXXX|XXxXXXXXXX| +----------+----------+----------+ ^ | write通过末尾的额外镜像区域,多余数据被写入安全位置,所有操作都能正确执行。此时读取线程会介入,将指针向下移动,一切照常运行。
结论 以上便是我们对镜像内存环形缓冲区的完整探索。希望这段技术之旅能带来启发。顺带一提,所有源代码均以标准 BSD 许可协议提供,方便您在自己的应用中使用。
欢迎下周继续收看精彩章节。Friday Q & A 栏目始终以读者反馈为驱动,若有主题建议请随时发送。
Original (English)
Source: https://www.mikeash.com/pyblog/friday-qa-2012-02-17-ring-buffers-and-mirrored-memory-part-ii.html
Last time on Friday Q&A, I started talking about implementing a ring buffer using virtual memory tricks to mirror memory. The first article concentrated on those virtual memory tricks. Today, I’m going to fill out the second half of the puzzle and show how to implement the ring buffer on top of the mirrored memory allocator we developed. If you haven’t read the previous article yet, I strongly recommend you so so, otherwise the memory mirroring is likely to be confusing.
CodeJust like last time, the code we’re going to discuss is available on GitHub:
https://github.com/mikeash/MAMirroredQueue
GoalsThe mirrored memory trick allows exposing pointers to the interior buffer to the outside world, since both data and free space are always contiguous. The goal then is to create an API which makes this easy to use. In default operation, the ring buffer should also grow to accommodate newly written data. For multithreaded use, the ring buffer’s size can be locked, at which point the buffer becomes thread safe for one simultaneous reader and writer. For that case, thread safety should be acheived with no locks.
APIThe ring buffer is implemented in a class called MAMirroredQueue:
@interface MAMirroredQueue : NSObjectFor reading, there are three methods. One method retrieves how much data is available to be read, one method returns a pointer to the data, and one method advances the pointer:
- (size_t)availableBytes; - (void *)readPointer; - (void)advanceReadPointer: (size_t)howmuch;This way a client can find out how much data it can read, it can access the data, and then when it’s finished it can remove that data from the ring buffer by advancing the pointer.
For writing data, the interface is similar. However, instead of a method to query the amount of data, there’s a method to simply ensure that the necessary amount of free space is available:
- (BOOL)ensureWriteSpace: (size_t)howmuch; - (void *)writePointer; - (void)advanceWritePointer: (size_t)howmuch;The ensureWriteSpace: method returns success or failure. When allocation is not locked (the default state), it will always succeed. When allocation is locked (to ensure thread safety), it will succeed only if there is enough free space in the buffer, and return NO otherwise.
The other two methods in write API are the same as in the read API: one to retrieve the data pointer, and one to advance it once data is written.
With all this talk of locking allocations, we probably want some methods to actually manage that:
- (void)lockAllocation; - (void)unlockAllocation;The queue starts out unlocked. If thread-safe operation is needed, the desired amount of space can be allocated by calling ensureWriteSpace:, and afterwards lockAllocation ensures that the buffer doesn’t get reallocated. If the buffer ever needs to be expanded in the future, unlockAllocation followed by another ensureWriteSpace: can be used to accomplish that.
Finally, I also wrote a pair of UNIX-like wrappers around the above functionality:
- (size_t)read: (void *)buf count: (size_t)howmuch; - (size_t)write: (const void *)buf count: (size_t)howmuch;These aren’t necessary, and are actually inefficient because the API requires copying data into and out of the ring buffer, but an API that shares the semantics of the POSIX read and write calls can be nice to have.
Instance VariablesThe buffer itself is described by three instance variables:
char *_buf; size_t _bufSize; BOOL _allocationLocked;All of which are, I hope, self explanatory. Note that _buf is a char * to allow for easy pointer arithmetic, which will come in handy later. Although gcc and clang allow it, technically pointer arithmetic is not allowed on void * pointers, so char * is a convenient choice for byte-addressed entities.
In addition to the buffer, we also need a read pointer and a write pointer:
char *_readPointer; char *_writePointer;Utility FunctionsPage size is important for this code, and it needs to be able to round numbers up and down to a multiple of the page size. I wrote two simple wrappers around mach macros which manage this:
static size_t RoundUpToPageSize(size_t n) { return round_page(n); }
static void *RoundDownToPageSize(void *ptr) { return (void *)trunc_page((intptr_t)ptr); }The first one simply rounds a byte size up to the nearest multiple of page size, and the second one rounds a pointer down to the nearest page boundary.
CodeFirst up is the dealloc method. I’m assuming ARC, so no need to call super. All it does is free the buffer if it’s been allocated:
- (void)dealloc { if(_buf) free_mirrored(_buf, _bufSize, MIRROR_COUNT); }MIRROR_COUNT is simply a #define which describes how many mirrored copies to allocate. Interestingly, it’s set to 3, not 2 as you might expect, which is why my mirrored allocator supports an arbitrary number of mirrorings instead of just hardcoding two of them. More on the reasoning for this later.
There is no initializer method, as simply having all of the instance variables set to 0 or NULL is sufficient. The ring buffer starts out empty, and all zeroes describes that just fine.
Next up, we have the availableBytes method. The first thing it does is subtract the read pointer from the write pointer:
- (size_t)availableBytes { ptrdiff_t amount = _writePointer - _readPointer;Normally, this would just be the number of data bytes in the buffer. However, in the event that another thread is modifying this buffer while we’re computing this value, the pointers could be moving around. If they’re just moving around by a read or write amount, then that’s no problem. We may end up computing the old value or the new value for the number of available bytes, but either one works fine.
However, the pointers can also be moved around by the size of the buffer. When the read pointer goes into the second mirrored region, it gets reset back into the first one, and the write pointer follows. Because of this, the computed size here may be less than zero (if we see the update to the write pointer but not the read pointer), or may be greater than the buffer size (if we see the read pointer update but not the write pointer). Since we know that the number of available bytes must be between zero and the buffer size, this is easy to correct: just check for these cases, and adjust the amount accordingly:
if(amount < 0) amount += _bufSize; else if((size_t)amount > _bufSize) amount -= _bufSize;
return amount; }Next up, we have readPointer. This simply returns the ivar:
- (void *)readPointer { return _readPointer; }Next, advanceReadPointer:. This simply adds the amount to the read pointer:
- (void)advanceReadPointer: (size_t)howmuch { _readPointer += howmuch;However, it’s not done here. In the event that this advanced the read pointer past the end of the first mirrored region, both it and the write pointer need to be pulled back. For the read pointer, this is simply a matter of subtracting _bufSize. Since the write pointer can be modified by both the reader thread (with this method) and the writer thread (in advanceWritePointer:) simultaneously, it needs to be updated using an atomic operation. I use the built-in __sync_sub_and_fetch function to do this:
if((size_t)(_readPointer - _buf) >= _bufSize) { _readPointer -= _bufSize; __sync_sub_and_fetch(&_writePointer, _bufSize); } }Next comes ensureWriteSpace:. The first part of this is trivial: find out how much free space is available, by subtracting [self availableBytes] from the total buffer size, and if the requested amount is less, everything is all set:
- (BOOL)ensureWriteSpace: (size_t)howmuch { size_t contentLength = [self availableBytes]; if(howmuch <= _bufSize - contentLength) return YES;Otherwise, we know that the free space is not sufficient to meet the request. If allocation is locked, that’s it, game over, return NO:
else if(_allocationLocked) return NO;If allocation is not locked, then it’s time to reallocate the buffer.
The first thing to do is figure out how much memory to allocate, then allocate a new buffer of that size. Recall that, because the mirrored allocator uses virtual memory tricks, it must allocate a multiple of the page size. We need at least contentLength + howmuch memory, so the new buffer size is found by rounding that number up to the nearest page size:
size_t newBufferLength = RoundUpToPageSize(contentLength + howmuch);Next, allocate the new buffer:
char *newBuf = allocate_mirrored(newBufferLength, MIRROR_COUNT);Now that we have the new buffer, the code branches a bit. If there’s already an existing buffer, then we’re reallocating memory, and have to copy the data out of the old buffer and into the new:
if(_bufSize > 0) {Once again, we’re going to play virtual memory games. Mach provides a vm_copy function which copies page-aligned memory without actually copying it. Instead, the pages are remapped and set up to copy on write. For this case, where we’re going to immediately deallocate the old memory, this means that no data is ever actually copied, and the system just plays some virtual memory tricks to make it look like it was.
We want to copy starting from the read pointer, but because everything has to be page-aligned, the copy has to start at the beginning of the page containing the read pointer:
char *copyStart = RoundDownToPageSize(_readPointer);Likewise, the length needs to be a multiple of the page size. Starting from copyStart, we need to copy _writePointer - copyStart bytes, but this needs to be rounded up fit the page size:
size_t copyLength = RoundUpToPageSize(_writePointer - copyStart);Now that this is set, we can “copy” this data into the new buffer:
vm_copy(mach_task_self(), (vm_address_t)copyStart, copyLength, (vm_address_t)newBuf);Now that the data is copied, we need to compute the location of the new read pointer. We copied additional bytes by rounding _readPointer down to copyStart. The new read pointer is equal to newBuf plus that number of additional bytes:
char *newReadPointer = newBuf + (_readPointer - copyStart);This spot was particularly troublesome for me when I was developing the code, so I tossed in an assert to make sure that it failed early and loudly:
if(*newReadPointer != *_readPointer) abort();Now we can free the old buffer and reassign the read pointer:
free_mirrored(_buf, _bufSize, MIRROR_COUNT); _readPointer = newReadPointer;The write pointer is set to equal the read pointer plus the previously computed content length:
_writePointer = _readPointer + contentLength; }For the case where no previous buffer exists, the code is simple: just set the read and write pointer to the beginning of the new buffer:
else { _readPointer = newBuf; _writePointer = newBuf; }The new buffer is allocated, data is copied if necessary, and now all that remains is to set the _buf and _bufSize ivars and then return YES to the caller:
_buf = newBuf; _bufSize = newBufferLength;
return YES; }Next up, the writePointer method, which is once again just a simple accessor:
- (void *)writePointer { return _writePointer; }The advanceWritePointer: method is also simple, performing an atomic add of _writePointer:
- (void)advanceWritePointer: (size_t)howmuch { __sync_add_and_fetch(&_writePointer, howmuch); }Note that, unlike advanceReadPointer:, this method doesn’t need any checks to wrap pointers back to the first mirrored data section. The advanceReadPointer: method handles wrapping both read and write pointers. Since no wrapping occurs here, the write pointer can often sit in the second mirrored data section for long periods of time, but that’s perfectly fine.
By having the write pointer always be ahead of the read pointer, this code avoids an annoying ambiguity. A ring buffer which uses read and write pointers usually suffers from a problem when the read and write pointers are equal. There’s no simple way to distinguish between the buffer being empty (where both pointers are equal because there is no data between them) and the buffer being full (where the pointers are equal becaus there’s no free space between them).
Ring buffer implementations typically avoid this either by prohibiting the buffer from becoming completely full, and instead defining “full” as being one unit less than true full capacity, or by using a pointer plus a count rather than using two pointers.
Neither alternative is attractive in this case. Having the buffer be artifically one byte small is particularly painful when playing crazy virtual memory games, since it’s likely that code using this ring buffer will want to deal in whole pages as well, and by losing a single byte out of the buffer, it essentially loses an entire 4kB page. Using a pointer and a count makes achieving lockless thread safety much more difficult if not completely impossible, since the write pointer becomes a derived value computed from two other values, both of which get modified by the read thread, and which can be momentarily inconsistent with each other as seen from another thread when both are updated simultaneously.
The virtual memory games played with mirrored allocation give a third, better way: simply use two pointers, and express “full” by having the write pointer equal the read pointer plus the buffer size. It’s natural, it works great due to the mirrored allocation, and it’s easy to deal with.
Next up are the allocation locking methods. These simply manipulate the _allocationLocked ivar. Nothing else needs to be done in these methods, since they just modify the behavior of ensureWriteSpace:. Here’s the code:
- (void)lockAllocation { _allocationLocked = YES; }
- (void)unlockAllocation { _allocationLocked = NO; }Next, we have the UNIX-like compatibility wrappers. These help illustrate how the more primitive, direct API is used. The read:count: method figures out how much to read using availableBytes, copies data out from readPointer, then calls advanceReadPointer: to mark the data as read:
- (size_t)read: (void *)buf count: (size_t)howmuch { size_t toRead = MIN(howmuch, [self availableBytes]); memcpy(buf, [self readPointer], toRead); [self advanceReadPointer: toRead]; return toRead; }The story for write:count: is a little more complex, as it behaves differently depending on whether allocation is locked. If allocation is locked, then it only writes as much data as will fit in the buffer’s remaining space. Otherwise, it uses ensureWriteSpace: to grow the buffer to the appropriate size, if needed:
- (size_t)write: (const void *)buf count: (size_t)howmuch { if(_allocationLocked) howmuch = MIN(howmuch, _bufSize - [self availableBytes]); else [self ensureWriteSpace: howmuch];The rest is straightforward. It copies the data into writePointer, advances the write pointer, and returns the amount of data written:
memcpy([self writePointer], buf, howmuch); [self advanceWritePointer: howmuch]; return howmuch; }And that completes the implementation of the mirrored queue.
Thread SafetyOne of the goals of this implementation is to be thread safe, for the case where there is one reader thread and one writer thread. The above code achieves this, but it’s not entirely clear why it’s safe. There are no locks, and the read pointer doesn’t even use atomic operations to update.
First, note that the code is only thread safe for the case when allocation is locked. That means that all of the tricky reallocation code in ensureWriteSpace: doesn’t come into play. This is good, because it would be incredibly difficult to make that code thread safe without locking. Given that, we can consider _buf and _bufSize to be constant. The only variables which could be potentially modified by one thread while simultaneouly being read by another thread are _readPointer and _writePointer.
It’s easiest to consider this as two separate cases. First, the reader thread needs to be correct even when the writer thread is modifying these values. Second, the writer thread needs to be correct even when the reader thread is modifying these values. If both hold, then the entire hting is correct.
Let’s look at the first case, of making sure the reader thread is correct in the face of modifications from the writer thread. The writer thread only ever modifies _writePointer. The only place where the reader thread depends on the value of _writePointer is in availableBytes:
ptrdiff_t amount = _writePointer - _readPointer;This unsynchronized access is perfectly safe. There’s a race condition in that it’s not certain whether the read thread will see the old or new value of _writePointer. However, it doesn’t matter. _writePointer can only increase, and the number of available bytes can likewise only increase. If it sees the old value, it still computes a number of available bytes that’s correct, just slightly out of date. If it sees the new value, then so much the better. Therefore, the reader is safe.
Let’s look at the writer’s safety in the face of changes from the reader now. The reader can alter both pointers, so the analysis is a little more complex. The writer code also calls availableBytes, a method nominally on the reader side, so that method has to be safe for both.
The only way that the reader thread can alter _writePointer is with this line:
__sync_sub_and_fetch(&_writePointer, _bufSize);Because of the mirrored structure of the underlying buffer, both the old and new values of _writePointer are correct here. availableBytes can deal with either value, and will return the right answer in either case. Likewise, when writePointer returns the value, it doesn’t matter whether it’s the old or new value, as they are both equivalent in terms of what happens when data is written to them. Finally, advanceWritePointer: is safe, since it also uses a __sync builtin to modify _writePointer, ensuring that both updates will be applied in some order, and the particular order is not important.
The only place where the writer thread uses _readPointer is in availableBytes. Just like the corresponding case when the writer thread modifies its pointer and the reader calls availableBytes, it’s safe for the reader thread to modify _readPointer while the writer is calculating availableBytes. Advancing the read pointer decreases the number of available bytes, which increases the amount of write space. If the writer thread sees the old value for _readPointer here, it computes the older, smaller amount of write space, which is still safe, just slightly stale.
The writer thread is safe in the face of changes from the reader thread, and vice versa. Therefore, this code really is thread safe.
TriplicateI promised that I would explain why MAMirroredQueue allocates three mirrored copies of its buffer, and now is the time.
Normally, two copies is enough for the contiguous ring buffer. However, remember that this implementation is a little odd, even for a mirrored ring buffer, in using separate read and write pointers and allowing the write pointer to sit in the second mirrored area. This enables lockless thread safety and use of the full buffer without the strange ambiguity when the two pointers are equal. However, it also requires the existence of a third mirrored area.
The need is extremely rare, but it can happen. To start with, we need a buffer where the data portion has wrapped around to the beginning. In a normal ring buffer, that would look like this:
read | v +----------+ |xxx xxxxx| +----------+ ^ | writeIn the mirrored ring buffer, it instead looks like this:
read | v +----------+----------+ |xxx xxxxx|xxx xxxxx| +----------+----------+ ^ | writeSo far so good, still. The write pointer is valid, and writing data into the empty area is correct. Now, let’s say there are separate reader and writer threads manipulating this buffer simultaneously. The reader thread moves the read pointer up:
read | v +----------+----------+ | x | x | +----------+----------+ ^ | writeThe next step for the reader thread is to move both read and write pointers down into the first mirrored area. But! Before that can happen, let’s say that the writer suddenly goes to write a bunch of data. (Remember, in the world of preemptive threading, the two threads can run in all sorts of weird orders.) Before the read thread can move any pointers around, the write thread computes the available space, writes data into the buffer, and crashes:
read | v +----------+----------+ | x | xXXXXXXX|XX +----------+----------+ ^ | writeThe writer wrote off the end of the second mirrored segment! It saw a large amount of free space available, which is correct, but the trouble is that the end of this free space can only be accessed from a write pointer residing in the first mirrored segment. Since we allow the write pointer to hang out in the second mirrored segment, there’s the possibility for trouble. This is a brief race window requiring very specific circumstances to trigger, but it can happen, and it needs to be protected against.
Fortunately, this problem is easy to solve by simply allocating a third mirrored segment. In that case, the sequence looks like this:
read | v +----------+----------+----------+ |xxx xxxxx|xxx xxxxx|xxx xxxxx| +----------+----------+----------+ ^ | write
read | v +----------+----------+----------+ | x | x | x | +----------+----------+----------+ ^ | write
read | v +----------+----------+----------+ |XXxXXXXXXX|XXxXXXXXXX|XXxXXXXXXX| +----------+----------+----------+ ^ | writeWith the extra mirrored region at the end, the surplus data is written to a safe spot and everything works correctly. At this point, the read thread will jump in, shift the pointers down, and life continues as usual.
ConclusionThat wraps up our exploration of a mirrored-memory ring buffer. I hope the journey was enlightening. Oh, and the source code is all available under a standard BSD license in case you have a use for it in your own apps.
Come back next time for another exciting episode. Friday Q&A is, as always, driven by reader input, so send in your ideas for topics in the meantime.