I have a fast producer thread and a slow consumer thread. The consumer processes as many objects as it can and simply ignores the rest in order to not slow down the producer. Therefore, I used a one-element wait-free single-producer single-consumer queue for passing objects to the consumer in a thread-safe manner. After taking one out, it should be filled back in by the producer before the consumer finishes processing the element. I was wondering about how to simplify this one-element edge case and came up with the following.
The spsc_object
class reserves storage for a single object and achieves thread safety for a single writer and a single reader. It's inspired by boost::lockfree::spsc_queue
and supports move semantics.
#ifndef SPSC_OBJECT_HH
#define SPSC_OBJECT_HH
#include <type_traits>
#include <new>
#include <atomic>
template<typename T>
class spsc_object {
using storage_t = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
storage_t storage_;
std::atomic_bool occupied_;
public:
bool write(const T& object)
{
const bool occupied = occupied_.load(std::memory_order_acquire);
if (!occupied) {
new (&storage_) T(object); // copy
occupied_.store(true, std::memory_order_release);
return true;
}
return false;
}
bool write(T&& object)
{
const bool occupied = occupied_.load(std::memory_order_acquire);
if (!occupied) {
new (&storage_) T(std::move(object)); // move
occupied_.store(true, std::memory_order_release);
return true;
}
return false;
}
template<typename Functor>
bool read(Functor& functor)
{
const bool occupied = occupied_.load(std::memory_order_acquire);
if (!occupied)
return false;
T& object = reinterpret_cast<T&>(storage_);
functor(object);
object.~T();
occupied_.store(false, std::memory_order_release);
return true;
}
template<typename Functor>
bool read(const Functor& functor)
{
const bool occupied = occupied_.load(std::memory_order_acquire);
if (!occupied)
return false;
T& object = reinterpret_cast<T&>(storage_);
functor(object);
object.~T();
occupied_.store(false, std::memory_order_release);
return true;
}
};
#endif
It seems to work in a small test case I wrote but correctness is hard to verify. Considering performance, boost::lockfree::spsc_queue
uses a read index and a write index (placed in different cache lines!) and one additional queue element to distinguish an empty queue from a full one. My container only uses an std::atomic_bool
in addition to the object storage space, so it is definitely smaller and hopefully faster. I am not certain whether there could be a faster alternative to the std::atomic_bool
. Maybe a processor-word sized variable is faster? Would two separate flags be useful to reduce contention? Did I miss anything else?
Edit: Here's my test case (see Coliru) that produces dummy objects and outputs which of those could be consumed. For some reason, there will be major gaps, i.e. large numbers of subsequent objects are not consumed. I guess that happens whenever std::cout
's buffer is flushed.
#include "spsc_object.hpp"
#include <cassert>
#include <atomic>
#include <future>
#include <iostream>
struct test_t {
int x, y;
test_t(int x) : x(x), y(1) {}
~test_t() {
y = 0;
}
};
const int to_produce = 10000000;
spsc_object<test_t> object;
std::atomic_bool stop_consuming;
int producer()
{
int written = 0;
for (int i = 0; i < to_produce; i++) {
if (object.write(test_t(i)))
written++;
}
return written;
}
int consumer()
{
int read = 0;
for (;;) {
test_t mytest(3);
if (object.read([&mytest](const test_t& test) {
mytest = test;
})) {
read++;
// Here go expensive calculations
std::cout << mytest.x << '\n';
assert(mytest.y != 0);
} else if (stop_consuming) {
return read;
} else {
// Should not happen unless only a single hardware thread is
// available for both producer and consumer, as seems to be the case
// on Coliru for example.
std::cout << "Oh boy, the producer was too slow!\n";
}
}
}
int main()
{
stop_consuming = false;
auto t1 = std::async(std::launch::async, producer);
auto t2 = std::async(std::launch::async, consumer);
int written = t1.get();
stop_consuming = true;
int read = t2.get();
std::cout << "produced = " << to_produce << "\n"
"written = " << written << "\n"
"read = consumed = " << read << '\n';
}
Sample output:
...
9908795
9908803
9908812
9908822
9908832
9908842
9908852
9908861
9908872
9908881
9908891
9908898
9908907
9908917
9908928
9908938
9908948
produced = 10000000
written = 37374
read = consumed = 37374
occupied_
. There needs to be a member initializer or a constructor that sets it to false. Also copy constructor and assignment operator should probably be deleted. \$\endgroup\$