I have tried implemented a fast lock free Single producer single consumer queue in C++ for practicing concurrency and low level design concepts. I also benchmarked the code on x86_64 2.3GHz Intel 4core CPU. Please review the benchmark code (inspired from moodycamel) and Queue implementation for correctness:
- lfqueue.h
#include <atomic>
#include <cstddef>
#include <unistd.h>
#include <cassert>
#include <limits>
#define CACHE_LINE_SIZE 64
template<typename T>
class LFQueue{
public:
LFQueue(std::size_t size) : write_id(0), read_id(0), capacity_(size) {
buf = new T[capacity_];
}
~LFQueue() { delete[] buf;}
template<typename... Args>
bool write(Args... args){
auto curr_read = read_id.load(std::memory_order_acquire);
auto curr_write = write_id.load(std::memory_order_relaxed);
if(__builtin_expect(curr_write + 1 == curr_read, 0)){
return false;
}
new (&buf[curr_write]) T(std::forward<Args>(args)...);
curr_write++;
if(__builtin_expect(curr_write == capacity_, 0)){
curr_write = 0;
}
write_id.store(curr_write, std::memory_order_release);
return true;
}
bool read(T &val){
auto curr_write = write_id.load(std::memory_order_acquire);
auto curr_read = read_id.load(std::memory_order_relaxed);
if(__builtin_expect(curr_read == curr_write, 0))
return false;
val = std::move(buf[curr_read]);
curr_read++;
if(__builtin_expect(curr_read == capacity_, 0)){
curr_read = 0;
}
read_id.store(curr_read, std::memory_order_release);
return true;
}
T* frontPtr() {
auto curr_read = read_id.load(std::memory_order_acquire);
auto curr_write = write_id.load(std::memory_order_acquire);
if(curr_read == curr_write)
return nullptr;
return buf + curr_read;
}
void popFront() {
auto curr_read = read_id.load(std::memory_order_acquire);
auto curr_write = write_id.load(std::memory_order_acquire);
if(curr_read != curr_write){
curr_read++;
if(curr_read == capacity_)
curr_read = 0;
read_id.store(curr_read, std::memory_order_release);
}
}
bool isEmpty() const {
return write_id.load(std::memory_order_acquire) == read_id.load(std::memory_order_acquire);
}
bool isFull() const {
return write_id.load(std::memory_order_acquire) + 1 == read_id.load(std::memory_order_acquire);
}
size_t size() const {
auto curr_read = read_id.load(std::memory_order_acquire);
auto curr_write = write_id.load(std::memory_order_acquire);
if(curr_write >= curr_read){
return curr_write - curr_read;
}
return curr_write + capacity_ - curr_read;
}
size_t capacity() const { return capacity_;}
private:
alignas(CACHE_LINE_SIZE) std::atomic_size_t write_id;
alignas(CACHE_LINE_SIZE) std::atomic_size_t read_id;
alignas(CACHE_LINE_SIZE) const std::size_t capacity_;
alignas(CACHE_LINE_SIZE) T* buf;
__always_inline bool is_capacity(size_t id){
return (id & capacity_) && !(id & (capacity_ - 1));
}
};
- benchmark_helpers.h
#include <array>
#include <cstddef>
#include <cstdint>
#include <pthread.h>
#include <random>
#include <stdexcept>
#include <x86intrin.h>
constexpr int QUEUE_SIZE = 1000000;
constexpr int LIMIT = QUEUE_SIZE;
constexpr size_t FREQ = 2300000000;
static std::array<bool, LIMIT> rnd_vals1{}, rnd_vals2{};
#define MAIN_THREAD_CPU 0
#define TEST_THREAD_CPU 1
#define PRODUCER_THREAD_CPU 2
#define CONSUMER_THREAD_CPU 3
namespace detail {
void pin_thread_to_cpu(int cpu_id) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_id, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
throw std::runtime_error("Error setting thread affinity");
}
}
inline uint64_t rdtsc(){
return __rdtsc();
}
void init_rand_array(std::array<bool, LIMIT> &rnd_vals){
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int> dist(0, 1);
for(size_t i = 0; i < LIMIT; i++){
rnd_vals[i] = dist(gen);
}
}
}
- benchmark_functions.h
#include "benchmark_helpers.h"
#include <iostream>
#include <thread>
template<typename QueueType, typename T>
void sanity_checks(){
std::cout << "Sanity checks:\n";
QueueType queue(LIMIT + 1);
std::thread producer([&]{
for(int i = 0; i < LIMIT; i++){
queue.write(i);
}
});
std::thread consumer([&]{
T val;
for(int i = 0; i < LIMIT; i++){
while(!queue.read(val));
assert(val == i && "Dequeued element doesn't match");
}
});
producer.join();
consumer.join();
std::cout << "OK\n";
}
template<typename QueueType, typename T>
static void raw_add() {
QueueType queue(QUEUE_SIZE + 1);
size_t total_ops = 0;
detail::pin_thread_to_cpu(TEST_THREAD_CPU); // OK for x86_64 where TSC are synchronized across all cores. Not portable.
auto start = detail::rdtsc();
for (int i = 0; i < LIMIT; i++) {
if(!queue.write(i)){
break;
}
total_ops++;
}
auto cycles = detail::rdtsc() - start;
double avg_time = (static_cast<double>(cycles)) * 1e9 / (FREQ * total_ops);
std::cout << "raw_add\t" << avg_time << "ns, total_ops: " << total_ops << "\n";
}
template<typename QueueType, typename T>
static void raw_remove() {
detail::pin_thread_to_cpu(TEST_THREAD_CPU);
QueueType queue(QUEUE_SIZE + 1);
size_t total_ops = 0;
for(int i = 0; i < QUEUE_SIZE; i++){
queue.write(i);
}
auto start = detail::rdtsc();
for (int i = 0; i < LIMIT; i++) {
T val;
if(!queue.read(val)){
break;
}
total_ops++;
}
auto cycles = detail::rdtsc() - start;
double avg_time = (static_cast<double>(cycles)) * 1e9 / (FREQ * total_ops);
std::cout << "raw_remove\t" << avg_time << "ns, total_ops: " << total_ops << "\n";
}
template<typename QueueType, typename T>
static void single_threaded() {
detail::pin_thread_to_cpu(TEST_THREAD_CPU);
QueueType queue(QUEUE_SIZE + 1);
size_t total_ops = 0;
for(int i = 0; i < QUEUE_SIZE; i++){
queue.write(i);
}
auto start = detail::rdtsc();
for (int i = 0; i < LIMIT; i++) {
if(rnd_vals1[i]){
queue.write(i);
}
else{
T val;
queue.read(val);
}
}
auto cycles = detail::rdtsc() - start;
total_ops = LIMIT;
double avg_time = (static_cast<double>(cycles)) * 1e9 / (FREQ * total_ops);
std::cout << "single_threaded\t" << avg_time << "ns, total_ops: " << total_ops << "\n";
}
template<typename QueueType, typename T>
static void mostly_add() {
detail::pin_thread_to_cpu(TEST_THREAD_CPU);
QueueType queue(QUEUE_SIZE + 1);
std::atomic_size_t total_reads = 0, total_writes = 0;
auto start = detail::rdtsc();
std::thread producer([&]{
detail::pin_thread_to_cpu(PRODUCER_THREAD_CPU);
for(int i = 0; i < LIMIT; i++){
queue.write(i);
total_writes++;
}
});
std::thread consumer([&]{
detail::pin_thread_to_cpu(CONSUMER_THREAD_CPU);
T val;
for(int i = 0; i < LIMIT/10; i++){
if(rnd_vals1[i]){
queue.read(val);
total_reads++;
}
}
});
producer.join();
consumer.join();
auto cycles = detail::rdtsc() - start;
auto total_ops = total_reads + total_writes;
double avg_time = (static_cast<double>(cycles)) * 1e9 / (FREQ * total_ops);
std::cout << "mostly_add\t" << avg_time << "ns, total_ops: " << total_ops << "\n";
}
template<typename QueueType, typename T>
static void mostly_remove() {
detail::pin_thread_to_cpu(TEST_THREAD_CPU);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int> dist(0, 3);
QueueType queue(QUEUE_SIZE + 1);
std::atomic_size_t total_reads = 0, total_writes = 0;
auto start = detail::rdtsc();
std::thread producer([&]{
detail::pin_thread_to_cpu(PRODUCER_THREAD_CPU);
for(int i = 0; i < LIMIT/10; i++){
if(rnd_vals1[i]){
queue.write(i);
total_writes++;
}
}
});
std::thread consumer([&]{
detail::pin_thread_to_cpu(CONSUMER_THREAD_CPU);
T val;
for(int i = 0; i < LIMIT; i++){
queue.read(val);
}
});
total_reads = LIMIT;
producer.join();
consumer.join();
auto cycles = detail::rdtsc() - start;
auto total_ops = total_reads + total_writes;
double avg_time = (static_cast<double>(cycles)) * 1e9 / (FREQ * total_ops);
std::cout << "mostly_remove\t" << avg_time << "ns, total_ops: " << total_ops << "\n";
}
template<typename QueueType, typename T>
static void heavy_concurrent() {
detail::pin_thread_to_cpu(TEST_THREAD_CPU);
QueueType queue(QUEUE_SIZE + 1);
std::atomic_size_t total_reads = LIMIT, total_writes = LIMIT;
auto start = detail::rdtsc();
std::thread producer([&]{
detail::pin_thread_to_cpu(PRODUCER_THREAD_CPU);
for(int i = 0; i < LIMIT; i++){
queue.write(i);
}
});
std::thread consumer([&]{
detail::pin_thread_to_cpu(CONSUMER_THREAD_CPU);
T val;
for(int i = 0; i < LIMIT; i++){
queue.read(val);
}
});
producer.join();
consumer.join();
auto cycles = detail::rdtsc() - start;
auto total_ops = total_reads + total_writes;
double avg_time = (static_cast<double>(cycles)) * 1e9 / (FREQ * total_ops);
std::cout << "heavy_concurrent\t" << avg_time << "ns, total_ops: " << total_ops << "\n";
}
template<typename QueueType, typename T>
static void random_concurrent() {
detail::pin_thread_to_cpu(TEST_THREAD_CPU);
QueueType queue(QUEUE_SIZE + 1);
std::atomic_size_t total_reads = 0, total_writes = 0;
auto start = detail::rdtsc();
std::thread producer([&]{
detail::pin_thread_to_cpu(PRODUCER_THREAD_CPU);
for(int i = 0; i < LIMIT; i++){
if(rnd_vals1[i]){
queue.write(i);
total_writes++;
}
}
});
std::thread consumer([&]{
detail::pin_thread_to_cpu(CONSUMER_THREAD_CPU);
T val;
for(int i = 0; i < LIMIT; i++){
if(rnd_vals2[i]){
queue.read(val);
total_reads++;
}
}
});
producer.join();
consumer.join();
auto cycles = detail::rdtsc() - start;
auto total_ops = total_reads + total_writes;
double avg_time = (static_cast<double>(cycles)) * 1e9 / (FREQ * total_ops);
std::cout << "random_concurrent\t" << avg_time << "ns, total_ops: " << total_ops << "\n";
}
template<typename QueueType, typename T = int>
void run_all_benchmarks(std::string queue_name = "Queue"){
detail::pin_thread_to_cpu(MAIN_THREAD_CPU);
std::cout << "\nBenchmarking " << queue_name << "\n";
sanity_checks<QueueType, T>();
std::thread th(raw_add<QueueType, T>);
th.join();
th = std::thread(raw_remove<QueueType, T>);
th.join();
th = std::thread(single_threaded<QueueType, T>);
th.join();
th = std::thread(mostly_add<QueueType, T>);
th.join();
th = std::thread(mostly_remove<QueueType, T>);
th.join();
th = std::thread(heavy_concurrent<QueueType, T>);
th.join();
th = std::thread(random_concurrent<QueueType, T>);
th.join();
}
And the driver code: 4. spsc_raw_benchmark.cpp
#include <folly/ProducerConsumerQueue.h>
#include "../src/lfqueue.h"
#include "benchmark_functions.h"
struct ComplexStruct{
bool first;
int second;
double last;
ComplexStruct(int val) : second(val) {}
ComplexStruct() = default;
} __attribute__((packed));
int main(){
detail::init_rand_array(rnd_vals1);
detail::init_rand_array(rnd_vals2);
// using DATA_TYPE = int;
using DATA_TYPE = ComplexStruct;
run_all_benchmarks<folly::ProducerConsumerQueue<DATA_TYPE>, DATA_TYPE>("Folly");
run_all_benchmarks<LFQueue<DATA_TYPE>, DATA_TYPE>("LFQueue");
return 0;
}
This is a sample benchmark result for non cache-aligned ComplexStruct
Benchmarking Folly
Sanity checks:
OK
raw_add 10.6289ns, total_ops: 1000000
raw_remove 2.46765ns, total_ops: 1000000
single_threaded 6.36481ns, total_ops: 1000000
mostly_add 13.5758ns, total_ops: 1050105
mostly_remove 3.49217ns, total_ops: 1050105
heavy_concurrent 3.57867ns, total_ops: 2000000
random_concurrent 28.8257ns, total_ops: 1000665
Benchmarking LFQueue
Sanity checks:
OK
raw_add 3.5452ns, total_ops: 1000000
raw_remove 2.74462ns, total_ops: 1000000
single_threaded 5.90889ns, total_ops: 1000000
mostly_add 14.2668ns, total_ops: 1050105
mostly_remove 2.31141ns, total_ops: 1050105
heavy_concurrent 3.46162ns, total_ops: 2000000
random_concurrent 38.9555ns, total_ops: 1000665
And this is 1 sample result for int:
Benchmarking Folly
Sanity checks:
OK
raw_add 4.81129ns, total_ops: 1000000
raw_remove 2.78987ns, total_ops: 1000000
single_threaded 6.13505ns, total_ops: 1000000
mostly_add 12.5706ns, total_ops: 1049680
mostly_remove 2.69507ns, total_ops: 1049680
heavy_concurrent 2.90395ns, total_ops: 2000000
random_concurrent 36.6664ns, total_ops: 1000798
Benchmarking LFQueue
Sanity checks:
OK
raw_add 2.55449ns, total_ops: 1000000
raw_remove 2.63848ns, total_ops: 1000000
single_threaded 5.68764ns, total_ops: 1000000
mostly_add 13.2418ns, total_ops: 1049680
mostly_remove 2.70407ns, total_ops: 1049680
heavy_concurrent 2.86895ns, total_ops: 2000000
random_concurrent 52.6381ns, total_ops: 1000798
If the implementations are correct, I have some questions about benchmark results:
- I notice in multiple runs that
random_concurrentbenchmark results vary from 35-50ns and are usually slower than same run for folly, why is that even though the raw benchmarks are consistently faster? - I had tried to improve cache hit for writes by explicitly default constructing the
bufarray inLFQueueusingbuf(new T[size]()). But that doesn't improve the write performance or reduce the cache misses (I analysed using cachegrind and they still show 62500 hits onnew (&buf[curr_write])line inside write function). Why is that, even though the entire buf (4MB for int case) can fit inside the cache (L3=48MB).