Get in Line (Part 2) - Vyukov's Queue and its specializations

In the last post we got a low-latency SPSC queue. But in practice you often need multi-producer or multi-consumer queues too, or both. For example, you might spin off a bunch of threads in a threadpool to do some work and then aggregate the results back on the main thread, or launch a bunch of async tasks and not wait on them immediately but still need their results eventually.

Unlike SPSC for which most (all?) usecases basically have a single answer1, any multi-producer or multi-consumer queue’s performance is heavily affected by the workload.

Still, there are some implementations which are “good-enough” for most cases (i.e. the queue won’t be the performance bottleneck). One of the most famous ones is the MPMC queue by Dimitry Vyukov. This queue (with a few tweaks) is also used by crossbeam.

Vyukov’s MPMC

The core idea is of Vyukov’s MPMC is that we store an atomic integer “epoch” (or “sequence”) on each slot of the shared ring buffer of the queue, and then use the value of the epoch to mark each cell as either “free” or “used”. Each slot \(s_i\) for \(i \in {0, 1, \ldots, n-1}\) is initialised with its epoch \(e_i = i\). Head \(H\) and tail \(T\) are \(0\).Head and tail are always incremented, and so we use modulo to find the corresponding index on the queue. The cost of division is negligible compares to cost of using atomic ops, but if you really want to avoid that you can always keep the size a power of \(2\) and use mask instead.

Any producer can push to the queue as follows:

  1. Read \(t \leftarrow T\), find the corresponding slot using \(i \leftarrow t \bmod N\).
  2. Read \(e_i\), and compare it to \(t\).
  3. If \(e_i = t\), then there is a good chance that this slot was either never written to, or was written to but has since been read by a consumer. So we try to CAS \(T\) from \(t\) to \(t + 1\).
    1. If CAS succeeds, write the value and set \(e_i = t + 1\). Return with success.
    2. Otherwise, another producer beat us to it. But there is a chance that following slots might still be free, so we start from step 1 again.
  4. If \(e_i < t\), then some producer wrote to it before us and none of the consumers have read it yet. This also means all following slots are also written to but unread, so we return with failure (queue is full).
  5. If \(e_i > t\), some other producer wrote to this slot between when we read the tail and when we read the slot’s epoch. There is a chance that the next slot might still be free, so try again from step 1.

Any consumer can pop from the queue similarly:

  1. Read \(h \leftarrow H\), find the corresponding slot \(i \leftarrow H \bmod N\).
  2. Read \(e_i\), and compare it to \(h + 1\).
  3. If \(e_i = h + 1\), a producer has written something here that hasn’t been consumed yet. Try to CAS \(H\) from \(h\) to \(h + 1\).
    1. If CAS succeeds, read the value out of the slot and set \(e_i = h + N\) marking it free for the next round of producers. Return the value.
    2. Otherwise, another consumer beat us to it. But there is a chance that following slots might still have something to read, so we start from step 1 again.
  4. If \(e_i < h + 1\), the producer hasn’t written here yet, so the queue is empty. Return with failure.
  5. If \(e_i > h + 1\), another consumer already read this slot between when we read the head and when we checked the epoch. There is a chance that next slot still might have some value, so we try again from step 1.

So the epoch of any given slot cycles like this:

  • \(i\), initialised, free for the first producer
  • \(i + 1\), written by a producer
  • \(i + N\), read by a consumer, free again for the next round
  • \(i + N + 1\), written by a producer again
  • \(i + 2N\), read by a consumer again

…and so on.

There are two kinds of syncs going on here: the CAS on \(H\) and \(T\) resolves contention among consumers and among producers respectively, while the epoch on each slot coordinates the handoff between a producer and a consumer. Another way to put it is - \(H\) and \(T\) allow producers and consumers to “reserve” a slot they are going to read or write from, but epoch signals when that slot is ready for the other side.

You may have noticed we haven’t discussed the memory ordering for any of these atomic ops. Let’s walk through the producer’s write flow.

Steps 1 and 2 have a data dependency (we need the index from \(T\) to know which \(e_i\) to read), so they won’t be reordered.

Then we load \(e_i\), CAS on \(T\), write the value, and store \(e_i\). The CAS should not happen before we have compared \(e_i\), otherwise we are reserving a slot before checking if it is free. And the store on \(e_i\) should be after the write, otherwise a consumer could see the updated epoch and read garbage from the slot.

So we load \(e_i\) with acquire and store \(e_i\) with release. The key reason is cross-thread synchronisation: the producer’s acquire load of \(e_i\) pairs with the release store a consumer did when it freed this slot (setting \(e_i = h + N\)). This pairing establishes a happens-before edge, guaranteeing that the consumer’s read of the old value is complete before the producer overwrites it. Without acquire, we have no such guarantee, and the consumer’s read and our write would be a data race. Similarly, our release store of \(e_i = t + 1\) pairs with the acquire load a consumer will do later, ensuring our write is visible before the consumer reads. The CAS on \(T\) can stay relaxed because it only needs atomicity (no two producers grab the same slot). The same argument applies symmetrically to the consumer’s flow.

Can we drop the acquire on epoch’s load and use just release on epoch’s store?

I had done this mistake initially, making the load of \(e_i\) relaxed. My reasoning was that the comparison introduces a control dependency, so CAS can not be reordered before the branch is confirmed (or at least not make the changes global and visible to other threads). But I totally missed that the loading of value from slot \(i\) is not dependant, so it is totally possible that consumer reads the cell value, and before it can read the epoch, producer writes to the slot and updates the epoch, thus making consumer read old value.

In code, the producer’s try_send looks something like this:

fn try_send(&self, value: V) -> Result<(), V> {
loop {
let t = self.T.load(Relaxed);
let i = t % self.N;
let e = self.slots[i].epoch.load(Acquire);
if e == t {
// slot is free, try to reserve it
if self.T.compare_exchange_weak(t, t + 1, Relaxed, Relaxed).is_ok() {
self.slots[i].data.write(value);
self.slots[i].epoch.store(t + 1, Release);
return Ok(());
}
// CAS failed, another producer got here first, retry
} else if e < t {
// slot is still occupied, queue is full
return Err(value);
}
// e > t: another producer moved tail past us, retry
}
}

And the consumer’s try_recv:

fn try_recv(&self) -> Option<V> {
loop {
let h = self.H.load(Relaxed);
let i = h % self.N;
let e = self.slots[i].epoch.load(Acquire);
if e == h + 1 {
// producer has written here, try to reserve it
if self.H.compare_exchange_weak(h, h + 1, Relaxed, Relaxed).is_ok() {
let value = self.slots[i].data.read();
self.slots[i].epoch.store(h + self.N, Release);
return Some(value);
}
// CAS failed, another consumer got here first, retry
} else if e < h + 1 {
// producer hasn't written yet, queue is empty
return None;
}
// e > h + 1: another consumer moved head past us, retry
}
}

Optimisations

Local head/tail caching

The biggest time-sink is always the atomic ops because they need to sync between threads. Best if we could avoid them altogether. One common optimisation is to keep a local copy of the last known value of head/tail and try CAS directly with that. This gives us some performance in light contention cases, without much regression in heavy contention cases because you’ll likely be stuck in the retry loop for a bit anyway.

Blocking send/recv

Right now we return with failure if the queue is full or empty, hoping that the caller will have something better to do than just spin. But if spin looping is acceptable, we can get better performance by using fetch_add instead of CAS. we are looping anyways, so we can unconditionally reserve the slot and then spin until the other side is done with it.

Why is fetch_add better than CAS?

fetch_add will always succeed in reserving a slot, whereas CAS can fail when another thread moves head/tail first and we have to retry - so we are moving from looping with 2 atomic ops to looping with 1 atomic op, and that too on the epoch atomic which has lower contention than head/tail. This results in better performance in heavy contention cases.

But there is a pitfall - if for whatever reason the thread that reserved a slot is cancelled or aborted, the slot will be stuck reserved forever preventing all the other threads from moving past it and the queue is stuck.

Reducing contention on larger number of senders/receivers

If the number of senders/receivers is larger, the contention on head/tail is a lot. When testing, I saw ~70% of the time being spent on the CAS on head/tail, failing most of the time. So in this case, it might just be better to wait on failed CAS for a while in hope that the contention is less, by either spinning or yielding (or even parking). I found that spinning for exponentially longer time and then yielding worked best.

Specialising for Single Producer

Notice that quite a bit of the logic and code for the try_send above is dealing with synchronising among producers, a lot of which can be stripped away if we know that there is only one producer.

We know that the single producer is the only one writing to the tail, so we can just keep a local value of tail. No need to read from some atomic tail value every time at the start, nor update it on success. This also means there is no need for a retry loop - if the epoch at tail slot is not what we expect, return with failure. Otherwise just write to the slot. The code looks a lot simpler.

fn try_send(&self, value: V) -> Result<(), V> {
let t = self.local_T;
let i = t % self.N;
let e = self.slots[i].epoch.load(Acquire);
if e == t {
// slot is free, try to reserve it
self.slots[i].data.write(value);
self.slots[i].epoch.store(t + 1, Release);
self.local_T = t + 1;
return Ok(());
}
return Err(value);
}

The case for single consumer is similar.

Benchmarks

Throughput for 1M usize elements on an Apple M3, measured with Criterion (10 samples, 3s measurement time):

ConfigurationQueue Size 512Queue Size 4096
1 sender, 1 receiver~300 Melem/s~637 Melem/s
4 senders, 4 receivers~111 Melem/s~273 Melem/s

discuss this on Twitter, Bluesky or Reddit

Footnotes

  1. what we discussed before in the previous post, at least in the current C++-style memory model which will probably be the same in forseeable future, even in RISC-V, excluding any special hardware-level tricks like cldemote