Get in Line - superfast SPSC Queue

4-ish years back I started writing a bounded single-consumer single-producer queue for some project I don’t remember, but I couldn’t get a correct implementation. Earlier this week I needed the same queue again, and instead of relying on popular well-tested libraries out there and being a productive developer, I decided to write one down myself. The final implementation gives about 40GiB/s throughput and 80ns latency (though not both of these at the same time), which is about as fast as you can get on my M3 Mac. I only tested this on Apple Silicon CPUs, but it should work on other architectures too as there isn’t really any ARM-specific stuff in the implementation.

You can find the crate here to see the final code directly. I also rewrote the queue in a separate repo to show the exact code for the benchmarks at each stage (linked after each section).

Starting Simple

The simplest queue you could write would just be a VecDeque wrapped in Arc<Mutex<..>>:

struct Queue<T> {
buffer: VecDeque<T>,
capacity: usize,
}
struct Sender<T> {
inner: Arc<Mutex<Queue<T>>>,
}
impl<T> Sender<T> {
fn send(&self, item: T) {
loop {
let mut lock = self.inner.lock().unwrap();
if lock.buffer.len() < lock.capacity {
lock.buffer.push_back(item);
break;
}
}
}
}
struct Receiver<T> {
inner: Arc<Mutex<Queue<T>>>,
}
impl<T> Receiver<T> {
fn recv(&self) -> T {
loop {
if let Some(val) = self.inner.lock().unwrap().buffer.pop_front() {
return val;
}
}
}
}

The performance (…is terrible compared to what I claimed, we will get there):

latency/size_512 time: [1.3195 µs 1.5799 µs 2.1086 µs]
latency/size_4096 time: [1.9729 µs 3.1022 µs 3.9848 µs]
throughput/size_512 time: [10.119 ms 10.686 ms 11.473 ms]
thrpt: [8.7164 Melem/s 9.3578 Melem/s 9.8820 Melem/s]
thrpt: [133.00 MiB/s 142.79 MiB/s 150.79 MiB/s]
throughput/size_4096 time: [7.4096 ms 7.8135 ms 8.1513 ms]
thrpt: [12.268 Melem/s 12.798 Melem/s 13.496 Melem/s]
thrpt: [187.19 MiB/s 195.29 MiB/s 205.93 MiB/s]

Note that even though this is round-trip latency, one-way latency would not be that different, maybe slightly lower. This is because the atomics are also synchronised and need to travel from core-to-core.

Same test will be used later too for consistency. size here is the capacity with which the queue was created.

We could try using RwLock (link) and avoid holding an exclusive lock when reading head/tail, but that wouldn’t bring any improvements as there are only two threads working over this queue and one of them exclusively has to get write access. So effectively it has similar contention to a Mutex, because the write-access acts like a mutex anyway.

Another major point is that these are running in a spin loop, which while inflating these benchmarks are really bad in real applications where you could be doing other work in the background. Though if waiting/sending data is the only thing you can do, then these are fine.

Code at this point.

More details about queue with RwLock If you want to see the performance compared to one with Mutex:

latency/size_512 time: [1.4813 µs 3.1652 µs 4.9153 µs]
change: [+5.9038% +71.260% +155.23%] (p = 0.05 > 0.05)
No change in performance detected.
latency/size_4096 time: [1.8180 µs 2.5355 µs 3.2314 µs]
change: [−25.547% +19.284% +92.522%] (p = 0.50 > 0.05)
No change in performance detected.
throughput/size_512 time: [26.399 ms 31.534 ms 35.914 ms]
thrpt: [2.7844 Melem/s 3.1712 Melem/s 3.7880 Melem/s]
thrpt: [42.487 MiB/s 48.389 MiB/s 57.800 MiB/s]
change:
time: [+155.24% +186.10% +221.52%] (p = 0.00 < 0.05)
thrpt: [−68.898% −65.047% −60.821%]
Performance has regressed.
throughput/size_4096 time: [4.8831 ms 8.7747 ms 12.557 ms]
thrpt: [7.9635 Melem/s 11.396 Melem/s 20.479 Melem/s]
thrpt: [121.51 MiB/s 173.89 MiB/s 312.48 MiB/s]
change:
time: [−33.908% +2.2474% +49.381%] (p = 0.93 > 0.05)
thrpt: [−33.057% −2.1980% +51.304%]
No change in performance detected.

Performance has slightly dropped because now we are trying to lock twice when sending.

Code with RwLock.

Conditional Variables

How about if we only wake up the other thread when there is actually some data to send/some space to send data, instead of just try to lock-unlock in a loop? We could use conditional variables which will wake the thread up only when other threads send a signal over. Another nice thing is that we won’t be wasting CPU spin looping (benchmarks can be misleading because good performance in benchmarks does not mean it will work well with rest of the application you are writing).

struct Queue<T> {
buffer: VecDeque<T>,
buffer: Mutex<VecDeque<T>>,
condvar: Condvar,
capacity: usize,
}
impl<T> Sender<T> {
fn send(&self, item: T) {
let mut buffer_lock = self.inner.buffer.lock().unwrap();
loop {
if buffer_lock.len() < self.inner.capacity {
buffer_lock.push_back(item);
break;
}
buffer_lock = self.inner.condvar.wait(buffer_lock).unwrap();
}
self.inner.condvar.notify_one();
}
}
impl<T> Receiver<T> {
fn recv(&self) -> T {
let mut buffer_lock = self.inner.buffer.lock().unwrap();
loop {
if let Some(val) = buffer_lock.pop_front() {
self.inner.condvar.notify_one();
return val;
}
buffer_lock = self.inner.condvar.wait(buffer_lock).unwrap();
}
}
}

performance:

latency/size_512 time: [2.2194 µs 2.2302 µs 2.2409 µs]
change: [+14.922% +42.752% +75.530%] (p = 0.00 < 0.05)
Performance has regressed.
latency/size_4096 time: [2.2335 µs 2.2378 µs 2.2408 µs]
change: [−32.970% −10.855% +27.263%] (p = 0.55 > 0.05)
No change in performance detected.
throughput/size_512 time: [3.4748 ms 3.4836 ms 3.4912 ms]
thrpt: [28.643 Melem/s 28.706 Melem/s 28.778 Melem/s]
thrpt: [437.06 MiB/s 438.02 MiB/s 439.12 MiB/s]
change:
time: [−68.884% −66.884% −63.926%] (p = 0.00 < 0.05)
thrpt: [+177.21% +201.97% +221.38%]
Performance has improved.
throughput/size_4096 time: [3.4268 ms 3.4720 ms 3.5407 ms]
thrpt: [28.243 Melem/s 28.802 Melem/s 29.182 Melem/s]
thrpt: [430.95 MiB/s 439.48 MiB/s 445.27 MiB/s]
change:
time: [−59.404% −57.004% −54.640%] (p = 0.00 < 0.05)
thrpt: [+120.46% +132.58% +146.33%]
Performance has improved.

Compared to plain Mutex, there is a massive improvement in throughput. Latency is worse, which is expected as we are still doing some more work and not waiting in a tight(er) spin loop anymore. But we are also not burning CPU cycles.

If I wanted a simple, easy to understand, easy to maintain version, I would’ve stopped here. The performance is bad (especially compared to ones we will see later on), but this is enough for most small projects which just need a queue and don’t want to use an external library. Also this version is dead simple.

Though this is not good enough for production systems, and we can still do a lot better with very little complexity jump.

Code till now.

Separating/Sharding the Shared State

If we really think about it, do we really need to put a lock on the entire queue at once to push or pop? As the buffer size is constant, there is no reallocation of memory and no change of pointers. The sender only cares about the tail until the queue is full and the receiver only cares about the head until the queue is empty. We could just put a lock on the head pointer and tail pointer instead.

struct Queue<T> {
buffer: *mut MaybeUninit<T>,
head: Mutex<usize>,
tail: Mutex<usize>,
capacity: usize,
}
impl<T> Sender<T> {
fn send(&self, item: T) {
loop {
let mut tail = self.inner.tail.lock().unwrap();
let head = *self.inner.head.lock().unwrap();
let next_tail = (*tail + 1) % self.inner.capacity;
if next_tail != head {
unsafe {
self.inner.buffer.add(*tail).write(std::mem::MaybeUninit::new(item));
}
*tail = next_tail;
break;
}
}
}
}
impl<T> Receiver<T> {
fn recv(&self) -> T {
loop {
let mut head = self.inner.head.lock().unwrap();
let tail = *self.inner.tail.lock().unwrap();
if *head != tail {
let val = unsafe {
self.inner.buffer.add(*head).read().assume_init()
};
*head = (*head + 1) % self.inner.capacity;
return val;
}
}
}
}

You may have noticed already, but this causes a deadlock when:

  1. sender locks the tail
  2. the OS yeets the sender thread
  3. receiver locks the head
  4. now neither thread can proceed.

We can fix that by always locking the tail first then head. Still this is just doing more work, locking two things instead of one. Performance is actually the worst so far:

latency/size_512 time: [4.2295 µs 4.3851 µs 4.6308 µs]
change: [+87.467% +95.116% +103.98%] (p = 0.00 < 0.05)
Performance has regressed.
latency/size_4096 time: [4.1396 µs 4.3851 µs 4.5334 µs]
change: [+88.570% +95.194% +101.23%] (p = 0.00 < 0.05)
Performance has regressed.
throughput/size_512 time: [805.15 ms 1.2389 s 1.6952 s]
thrpt: [58.991 Kelem/s 80.718 Kelem/s 124.20 Kelem/s]
thrpt: [921.74 KiB/s 1.2317 MiB/s 1.8952 MiB/s]
change:
time: [+23076% +35408% +49451%] (p = 0.00 < 0.05)
thrpt: [−99.798% −99.718% −99.569%]
Performance has regressed.
throughput/size_4096 time: [443.20 ms 581.29 ms 712.75 ms]
thrpt: [140.30 Kelem/s 172.03 Kelem/s 225.63 Kelem/s]
thrpt: [2.1408 MiB/s 2.6250 MiB/s 3.4429 MiB/s]
change:
time: [+13038% +16725% +20942%] (p = 0.00 < 0.05)
thrpt: [−99.525% −99.406% −99.239%]
Performance has regressed.

But this is the right direction (not obvious right now why that is the case, but will be explained later). If we can just avoid locking on both things, or at least avoid locking both most of the time, we have a shot at getting better performance.

Code till now.

Waiting to Sync (Shadow Variables)

The main cost after sharding is acquiring of locks, or more generally, syncing. One way to avoid the cost is to not lock as often. The producer doesn’t need to know exactly where the tail is at the moment, just that the tail is pointing to valid un-consumed data. As the producer is the only one modifying the tail, it is alright if the synced-tail is not pointing to the latest data. Same argument can be applied for head and the consumer. So we keep a “shadow” variable, which is occasionally synced with the real head and tail. The only time we actually sync the head/tail is when we find that it is no longer possible to pop/push more data as per the shadow variables.

struct Sender<T> {
inner: Arc<Queue<T>>,
tail: usize,
head: usize,
}
impl<T> Sender<T> {
fn send(&mut self, item: T) {
loop {
let next_tail = (self.tail + 1) % self.inner.capacity;
if next_tail != self.head {
unsafe {
self.inner.buffer.add(self.tail).write(std::mem::MaybeUninit::new(item));
}
let mut tail = self.inner.tail.lock().unwrap();
*tail = next_tail;
self.tail = next_tail;
break;
}
self.head = *self.inner.head.lock().unwrap();
}
}
}

And we similarly modify the receiver too. Unfortunately this does not lead to any increase in performance compared to the simple version with just a single Mutex:

latency/size_512 time: [3.8723 µs 5.0873 µs 5.8944 µs]
change: [+58.229% +92.926% +132.40%] (p = 0.00 < 0.05)
Performance has regressed.
latency/size_4096 time: [4.3719 µs 5.0350 µs 5.7585 µs]
change: [+57.399% +115.36% +185.58%] (p = 0.00 < 0.05)
Performance has regressed.
throughput/size_512 time: [3.6511 ms 3.7586 ms 3.8652 ms]
thrpt: [25.872 Melem/s 26.606 Melem/s 27.389 Melem/s]
thrpt: [394.77 MiB/s 405.98 MiB/s 417.92 MiB/s]
change:
time: [+1.8870% +5.5301% +8.9882%] (p = 0.01 < 0.05)
thrpt: [−8.2470% −5.2403% −1.8521%]
Performance has regressed.
throughput/size_4096 time: [3.3533 ms 3.4144 ms 3.4818 ms]
thrpt: [28.721 Melem/s 29.287 Melem/s 29.822 Melem/s]
thrpt: [438.24 MiB/s 446.89 MiB/s 455.04 MiB/s]
change:
time: [−3.7307% −0.8104% +2.5110%] (p = 0.62 > 0.05)
thrpt: [−2.4495% +0.8170% +3.8752%]
No change in performance detected.

Code till now.

At least we got the throughput back. Looking at the flamegraphs, we can still clearly see that waiting for locks is consuming bulk of the time.

Flamegraph showing lock contention

There are various things that we can do to improve it further, like batching and increasing the size of the inner queue, but there aren’t any major “algorithmic” improvements left to do. We have exhausted the pure software logic; now we need to rely on better hardware primitives.

Atomics

We are wrapping integers in Mutex, which is quite a stupid and expensive thing to do for small, Copy-able data types like usize. Atomics are basically Mutex<usize>, but exploit the fact that a lot of operations on modern hardware are possible to do atomically on integers and thus we don’t need complicated mutual-exclusion things.

The standard way to use atomics is CAS operations which rely on comparing the values before swapping. So they run a “read-modify-write” loop, and the hardware supports doing so atomically via CAS operations. But because in our case only one thread modifies the integer (only sender modifies the tail, only receiver modifies the head), we don’t need to actually compare and can directly change the values from the correct thread.

struct Queue<T> {
buffer: *mut MaybeUninit<T>,
head: AtomicUsize,
tail: AtomicUsize,
capacity: usize,
}
impl<T> Sender<T> {
fn send(&mut self, item: T) {
loop {
let head = self.inner.head.load(Ordering::Acquire);
let tail = self.inner.tail.load(Ordering::Acquire);
let next_tail = (tail + 1) % self.inner.capacity;
if next_tail != head {
unsafe {
self.inner
.buffer
.add(tail)
.write(std::mem::MaybeUninit::new(item));
}
self.inner.tail.store(next_tail, Ordering::Release);
break;
}
}
}
}
impl<T> Receiver<T> {
fn recv(&mut self) -> T {
loop {
let tail = self.inner.tail.load(Ordering::Acquire);
let head = self.inner.head.load(Ordering::Acquire);
if head != tail {
let val = unsafe { self.inner.buffer.add(head).read().assume_init() };
let next_head = (head + 1) % self.inner.capacity;
self.inner.head.store(next_head, Ordering::Release);
return val;
}
}
}
}

Understanding Ordering in atomics I am assuming that you know about Atomics and Memory Ordering, but if you don’t (or need a refresher), here are some links:

  • Jon Gjengset’s video - Crust of Rust: Atomics and Memory Ordering - long video but explains everything, including the reasoning of why atomics are useful. Great beginner resource.
  • Rust STL’s official docs - if you prefer reading instead. Still goes into great depth.
  • cppreference.com’s page on std::memory_order (link) - if you want concise and dense docs for a quick refresher.

Just directly replacing Mutex with AtomicUsize in the basic version without shadowing gives great improvements:

latency/size_512 time: [128.05 ns 131.43 ns 137.80 ns]
change: [−97.355% −96.871% −96.202%] (p = 0.00 < 0.05)
Performance has improved.
latency/size_4096 time: [147.79 ns 151.98 ns 154.64 ns]
change: [−97.614% −96.935% −95.814%] (p = 0.00 < 0.05)
Performance has improved.
throughput/size_512 time: [2.0787 ms 2.0964 ms 2.1164 ms]
thrpt: [47.250 Melem/s 47.701 Melem/s 48.106 Melem/s]
thrpt: [720.98 MiB/s 727.86 MiB/s 734.04 MiB/s]
change:
time: [−44.700% −42.724% −40.683%] (p = 0.00 < 0.05)
thrpt: [+68.586% +74.593% +80.832%]
Performance has improved.
throughput/size_4096 time: [2.0233 ms 2.0299 ms 2.0360 ms]
thrpt: [49.117 Melem/s 49.264 Melem/s 49.423 Melem/s]
thrpt: [749.46 MiB/s 751.70 MiB/s 754.14 MiB/s]
change:
time: [−42.189% −40.825% −39.583%] (p = 0.00 < 0.05)
thrpt: [+65.517% +68.990% +72.976%]
Performance has improved.

We can still avoid some syncing via atomics by keeping a local copy of tail in receiver (and head in sender). Unfortunately this again makes no difference:

latency/size_512 time: [133.67 ns 136.56 ns 139.14 ns]
change: [−4.4807% −1.0442% +2.7171%] (p = 0.60 > 0.05)
No change in performance detected.
latency/size_4096 time: [130.44 ns 134.32 ns 137.54 ns]
change: [−14.212% −9.6386% −4.3449%] (p = 0.01 < 0.05)
Performance has improved.
throughput/size_512 time: [1.9316 ms 2.1315 ms 2.4965 ms]
thrpt: [40.055 Melem/s 46.915 Melem/s 51.772 Melem/s]
thrpt: [611.20 MiB/s 715.87 MiB/s 789.97 MiB/s]
change:
time: [−4.9192% +9.4505% +27.938%] (p = 0.32 > 0.05)
thrpt: [−21.837% −8.6345% +5.1737%]
No change in performance detected.
throughput/size_4096 time: [1.9885 ms 2.0915 ms 2.1651 ms]
thrpt: [46.188 Melem/s 47.812 Melem/s 50.290 Melem/s]
thrpt: [704.77 MiB/s 729.55 MiB/s 767.36 MiB/s]
change:
time: [−5.6552% −0.5175% +5.6131%] (p = 0.86 > 0.05)
thrpt: [−5.3148% +0.5202% +5.9942%]
No change in performance detected.

This is the code with just atomics, and this is code with shadowing.

Why did the shadowing not work? Honestly, I’ve got no clue. Logically it makes sense that reducing number of atomic ops should have an effect on performance somehow. My best guess is that adding shadowing didn’t actually reduce the number of atomic load/stores somehow.

EDIT: Also if you look under Extra stuff -> More shadowing of atomics for performance, you’ll see that my benchmarks at this point were biased towards contention-heavy usage, so probably this shadowing helped a lot with low-contention usage but I never benchmarked that.

Spin Loop Hint

Notice that in case the queue is empty/full we are effectively busy-looping/spin-looping. It is possible that the same thread is continuously loading the same atomic and not giving the other thread a chance to store/read from it themselves. We can fix this by using std::hint::spin_loop in the middle of the while loop right before the load. This leads to slightly better performance:

latency/size_512 time: [134.81 ns 142.88 ns 151.85 ns]
change: [−1.3404% +3.5886% +9.2331%] (p = 0.23 > 0.05)
No change in performance detected.
latency/size_4096 time: [118.24 ns 129.58 ns 138.32 ns]
change: [−13.630% −8.3261% −2.5195%] (p = 0.02 < 0.05)
Performance has improved.
throughput/size_512 time: [1.8775 ms 1.8978 ms 1.9248 ms]
thrpt: [51.953 Melem/s 52.693 Melem/s 53.264 Melem/s]
thrpt: [792.74 MiB/s 804.03 MiB/s 812.74 MiB/s]
change:
time: [−29.164% −17.840% −5.7520%] (p = 0.02 < 0.05)
thrpt: [+6.1030% +21.714% +41.170%]
Performance has improved.
throughput/size_4096 time: [1.7486 ms 1.7744 ms 1.8362 ms]
thrpt: [54.460 Melem/s 56.358 Melem/s 57.189 Melem/s]
thrpt: [830.99 MiB/s 859.95 MiB/s 872.63 MiB/s]
change:
time: [−13.679% −8.3516% −2.0571%] (p = 0.02 < 0.05)
thrpt: [+2.1003% +9.1127% +15.847%]
Performance has improved.

Using the spin_loop hint is usually bad for latency, but in this case it is clear that the spin loop was actually blocking the data from coming in by starving the sender and thus we even got a slight improvement (or no change) in latency.

Code at this point.

Relaxing the Ordering

When the Sender::send first loads the current tail, it is using Ordering::Acquire. This would’ve been necessary if other threads were also updating the tail atomic, but in this case the sender thread is the only one writing to tail. Also because of the data dependency, the sender thread’s CPU can not schedule the load before the store as (current tail is used to calculate the next tail). So we can get by with Ordering::Relaxed for this load of tail in sender. Similar argument can be applied for first load of head in Receiver::recv.

latency/size_512 time: [117.64 ns 124.73 ns 133.65 ns]
change: [−15.854% −10.442% −4.5425%] (p = 0.00 < 0.05)
Performance has improved.
latency/size_4096 time: [111.22 ns 114.58 ns 117.24 ns]
change: [−13.109% −7.6552% −2.1833%] (p = 0.02 < 0.05)
Performance has improved.
throughput/size_512 time: [723.39 µs 740.06 µs 751.51 µs]
thrpt: [133.07 Melem/s 135.12 Melem/s 138.24 Melem/s]
thrpt: [1.9828 GiB/s 2.0135 GiB/s 2.0599 GiB/s]
change:
time: [−61.733% −60.886% −60.138%] (p = 0.00 < 0.05)
thrpt: [+150.86% +155.66% +161.32%]
Performance has improved.
throughput/size_4096 time: [865.55 µs 883.62 µs 914.36 µs]
thrpt: [109.37 Melem/s 113.17 Melem/s 115.53 Melem/s]
thrpt: [1.6297 GiB/s 1.6864 GiB/s 1.7216 GiB/s]
change:
time: [−53.307% −50.817% −48.491%] (p = 0.00 < 0.05)
thrpt: [+94.141% +103.32% +114.16%]
Performance has improved.

Code at this point.

Removing False Sharing

Profiling the binary via the Instruments tool on MacOS (with its shitty UI and no support for exports like perf on Linux), we see that we are getting a lot of cache misses:

SampleL1 Cache Misses (Load)L1 Cache Misses (Store)L2 Cache MissesFunction
6 (100.0%)85,777 (100.0%)19,410 (100.0%)1,350 (100.0%)test
1 (16.7%)49,060 (57.2%)11,772 (60.6%)805 (59.6%)core::sync::atomic::atomic_load
1 (16.7%)30,107 (35.1%)4,170 (21.5%)58 (4.3%)core::ptr::non_null::NonNull::as_ref

If we can avoid these cache misses we can probably improve the performance a bit more. Say, if we can somehow always keep the tail in the cache of the CPU which is running Sender::send, then we eliminate the misses on stores to tail, and the only loads that will miss will be from Receiver::recv thread. Same for when keeping head in Receiver::recv’s thread. We can (almost) achieve this by adding padding after each atomic, enough to completely fill out the cache line.

A cache line on Apple Silicon Macs is 128 bytes:

sysctl -a hw machdep.cpu | rg cachelinesize
> hw.cachelinesize: 128

If you are on linux you can use

lscpu -C
> NAME ONE-SIZE ALL-SIZE WAYS TYPE LEVEL SETS PHY-LINE COHERENCY-SIZE
> L1d 32K 512K 8 Data 1 64 1 64
> L1i 32K 512K 8 Instruction 1 64 1 64
> L2 1M 16M 8 Unified 2 2048 1 64
> L3 96M 128M 16 Unified 3 98304 1 64

and see under COHERENCY-SIZE.

What is the actual size of cacheline on Apple Silicon While getting the cacheline size via software gives us 128 bytes, I think physically it is actually 64 bytes. In this benchmark even if you change the alignment to 64, the performance remains exactly the same. You read more details about this on this paper’s Section 3 (or read rest of the paper too, just for fun). Though Apple folks probably had a good reason to report 128 instead, and using larger alignment doesn’t really affect us anyway, except for maybe size of structs.

So we want to pad after head and tail just enough so that the atomics live on separate cache lines. C++ has a convenient static constexpr std::hardware_destructive_interference_size, but I couldn’t find an equivalent in Rust so I am just hardcoding 128 bytes to be safe.

#[repr(align(128))]
struct Padding<T> {
value: T,
}
pub(crate) struct Queue<T> {
head: AtomicUsize,
head: Padding<AtomicUsize>,
tail: AtomicUsize,
tail: Padding<AtomicUsize>,
capacity: usize,
buffer: *mut MaybeUninit<T>,
}
latency/size_512 time: [88.922 ns 90.830 ns 94.060 ns]
change: [−31.258% −26.918% −22.850%] (p = 0.00 < 0.05)
Performance has improved.
latency/size_4096 time: [80.777 ns 81.812 ns 83.805 ns]
change: [−27.990% −24.698% −21.139%] (p = 0.00 < 0.05)
Performance has improved.
throughput/size_512 time: [258.84 µs 261.57 µs 265.48 µs]
thrpt: [376.68 Melem/s 382.30 Melem/s 386.34 Melem/s]
thrpt: [2.8065 GiB/s 2.8484 GiB/s 2.8784 GiB/s]
change:
time: [−65.388% −64.812% −63.999%] (p = 0.00 < 0.05)
thrpt: [+177.77% +184.19% +188.91%]
Performance has improved.
throughput/size_4096 time: [249.95 µs 251.46 µs 254.87 µs]
thrpt: [392.35 Melem/s 397.67 Melem/s 400.07 Melem/s]
thrpt: [2.9233 GiB/s 2.9629 GiB/s 2.9808 GiB/s]
change:
time: [−73.051% −72.075% −71.074%] (p = 0.00 < 0.05)
thrpt: [+245.71% +258.10% +271.08%]
Performance has improved.

Code at this point.

Now this is the point at which the queue is probably good enough for most applications. If you want a wait-free one you can just allow the send/recv to fail after one update to shadow variable instead of looping if the queue if full/empty. You could make it more power efficient at the cost of some latency by using std::thread::yield_now(). Even though these benchmarks only test how fast you can send usizes, you can also just use something like arena allocators to keep the latency low. Even the ProducerConsumerQueue in Facebook’s folly stops here, and if it’s good enough for them then it is probably good enough for most people.

I suspect this latency is as low as you can get. According to core-to-core-latency:

Num cores: 8
Num iterations per samples: 1000
Num samples: 300
1) CAS latency on a single shared cache line
0 1 2 3 4 5 6 7
0
1 53±1
2 43±0 36±0
3 35±0 35±0 35±0
4 35±0 36±0 34±0 34±0
5 35±0 34±0 34±0 34±0 35±0
6 34±0 34±0 34±0 34±0 34±0 35±0
7 35±0 35±0 35±0 34±0 34±0 34±0 34±0
Min latency: 33.6ns ±0.1 cores: (7,5)
Max latency: 53.3ns ±1.2 cores: (1,0)
Mean latency: 35.5ns

About core-to-core-latency app If we look inside, the read_write benchmark is basically another spsc, and it runs almost the same round-trip latency benchmark that we do, except it divides by 2 to get one-way latency. So not super sure if this really provides any more info than what we already know, except maybe confirming that we are on right track with the benchmarks.

The actual minimum latency would be twice the core-to-core latency, because in best case a thread does 2 operations which need to be synced between the cores: first is the actual fetching of data at the head/tail position and second is the final atomic store to head/tail itself.

As you can see, we are already quite close to minimum latency. If we really, really care about latency we could remove the hint::spin_loop():

latency/size_512 time: [66.599 ns 68.148 ns 70.187 ns]
change: [−36.215% −27.721% −19.234%] (p = 0.00 < 0.05)
Performance has improved.
latency/size_4096 time: [65.401 ns 68.008 ns 70.571 ns]
change: [−41.446% −38.184% −34.219%] (p = 0.00 < 0.05)
Performance has improved.

I wouldn’t recommend that for real applications as this artificially improves benchmarks and blocks CPU for doing other useful works. Also has no effect on throughput anyway. This gives us as low as we can get in latency.

But we are still an order of magnitude away from saturating the throughput. Apple claims memory bandwidth of 100GB/s (though that is just the bandwidth, this does not mean that a pair of producer-consumer threads can actually saturate it). If I paid for 100GB/s, I will use 100GB/s (I’ll try).

Batching

If we look at the flamegraph for the version after we fix false sharing, we get this:

Flamegraph for final unbatched version

The biggest time sink is loading/storing atomics. If we can reduce the number of atomic ops per message somehow, we may be able to get even higher throughput. What if instead of sending one item and then “commit”-ing it by updating the tail immediately, we instead send multiple items (write them in the buffer) before updating the tail? This way we could amortise the cost of loading/storing atomics over multiple items. We can do this by separating the step where we “reserve” a spot in the buffer (and also reserve multiple spots at once) and then committing some of the spots, the ones we were able to fill. We will take a hit on latency though, which is fine because technically latency is still the same if you commit one element at a time, and most applications anyway send data in batches.

impl<T> Sender<T> {
pub fn write_buffer(&mut self) -> &mut [MaybeUninit<T>] {
let tail_ref = unsafe { &*(&raw const (*self.inner.as_ptr()).tail.value) };
let tail = tail_ref.load(Ordering::Relaxed);
let next_tail = if tail + 1 == self.capacity { 0 } else { tail + 1 };
if next_tail == self.cached_head {
let head_ref = unsafe { &*(&raw const (*self.inner.as_ptr()).head.value) };
self.cached_head = head_ref.load(Ordering::Acquire);
}
let end = if self.cached_head > tail {
self.cached_head - 1
} else if self.cached_head == 0 {
self.capacity - 1
} else {
self.capacity
};
unsafe {
let ptr = self.buffer.add(tail).cast();
std::slice::from_raw_parts_mut(ptr.as_ptr(), end - tail)
}
}
pub unsafe fn commit(&mut self, len: usize) {
let tail_ref = unsafe { &*(&raw const (*self.inner.as_ptr()).tail.value) };
let tail = tail_ref.load(Ordering::Relaxed);
let mut new_tail = tail + len;
if new_tail >= self.capacity {
new_tail -= self.capacity;
}
tail_ref.store(new_tail, Ordering::Release);
}
}

Similarly you can implement read_buffer/advance on Reader<T> too.

batching/size_512 time: [1.2599 ms 1.2869 ms 1.3238 ms]
thrpt: [755.39 Melem/s 777.05 Melem/s 793.69 Melem/s]
thrpt: [5.6281 GiB/s 5.7895 GiB/s 5.9134 GiB/s]
batching/size_4096 time: [1.5422 ms 1.5532 ms 1.5650 ms]
thrpt: [638.99 Melem/s 643.83 Melem/s 648.41 Melem/s]
thrpt: [4.7609 GiB/s 4.7969 GiB/s 4.8310 GiB/s]
batching/size_65536 time: [1.7904 ms 1.8044 ms 1.8121 ms]
thrpt: [551.84 Melem/s 554.20 Melem/s 558.54 Melem/s]
thrpt: [4.1115 GiB/s 4.1291 GiB/s 4.1614 GiB/s]

Not quite the 40GiB/s! But if we look closely at the benchmarks, we are writing one element at a time. Which is not the fastest. Using std::ptr::copy_nonoverlapping (which uses memcpy underneath) would be much better.

batching/size_512 time: [250.29 µs 252.37 µs 255.05 µs]
thrpt: [3.9208 Gelem/s 3.9624 Gelem/s 3.9954 Gelem/s]
thrpt: [29.212 GiB/s 29.522 GiB/s 29.768 GiB/s]
batching/size_4096 time: [219.35 µs 223.51 µs 228.63 µs]
thrpt: [4.3739 Gelem/s 4.4740 Gelem/s 4.5588 Gelem/s]
thrpt: [32.588 GiB/s 33.334 GiB/s 33.966 GiB/s]
batching/size_65536 time: [177.43 µs 178.58 µs 179.60 µs]
thrpt: [5.5679 Gelem/s 5.5997 Gelem/s 5.6359 Gelem/s]
thrpt: [41.484 GiB/s 41.721 GiB/s 41.991 GiB/s]

Final code.

There we go! Note that you can probably go above 60GiB/s by tuning the size of items and queues (though sizes larger than 16 bytes will have a larger latency). You can run benchmarks on the official crate (not this example one), and some of them will achieve around 60GiB/s on M3 chip.

Extra Stuff

More shadowing of atomics for performance

When comparing my implementation with other crates, I came across rtrb which also has an SPSC. When running my benchmarks on it, of course mine was faster (slightly). And when running their benchmarks on my implementation, theirs was faster, by a lot :’( Oh the joys of benchmarking.

Anyway after some digging I found that their implementation also “shadows” the head in receiver and tail in sender, but only for local loads. This makes sense, because if sender is the only one writing to tail, why should it have to load it every time? The local version is always correct and up-to-date. We just need to update the local one when we are updating the atomic one too. After implementing this, mine was just as fast (within reasonable margin) on their benchmarks too.

My benchmarks were heavily biased for contention-heavy queue usage - when queue is often empty or full. But for low contention queue usage (for example when queue size is huge, and/or queue is almost never completely empty/full) it is better to also avoid the first load (even if it is Relaxed) entirely and shadow the other head/tail too.

Also if you survey for SPSC queues in general, almost every one of them does something similar to this anyway.

ISB vs YIELD on ARM

On ARM, std::hint::spin_loop() compiles to ISB instruction, whereas the std::thread::yield_now() actually does a syscall (which is why it is slower). The former is better for latency while the latter is better for power efficiency.

Intel cldemote

Intel has this special instruction called cldemote (Cache Line Demote) which can probably give fantastic performance for this case. It flushes the cache line from the core’s private L1/L2 cache to the shared L3 cache directly. If I test it out, I will update this section.

Pointer Projections and Aliasing

I used raw pointer offsets (the &raw const) instead of simply creating a reference to the struct to access fields. It is undefined behaviour in Rust to create multiple mutable references to the same object at the same time in different threads. This is because Rust marks &mut T as noalias in the LLVM IR which means LLVM is free to optimise things as it pleases which can be problematic, for example:

  • it might read a value once and store it in a CPU register locally never checking main memory again, assuming the value cannot change
  • it might re-order the writes and wait until the very end of function to do that and possibly more things we do not want.

Creating multiple immutable references to the same object is fine though, as long as the underlying object is Sync. AtomicUsize is Sync, so creating references to it via pointer projection is fine.

discuss this on Hacker News, Twitter, Bluesky or Reddit.

EDIT: thanks to u/matthieum and u/The_8472 on Reddit for pointing out mistakes with this post, which have now been fixed.