6
\$\begingroup\$

I have a fast producer thread and a slow consumer thread. The consumer processes as many objects as it can and simply ignores the rest in order to not slow down the producer. Therefore, I used a one-element wait-free single-producer single-consumer queue for passing objects to the consumer in a thread-safe manner. After taking one out, it should be filled back in by the producer before the consumer finishes processing the element. I was wondering about how to simplify this one-element edge case and came up with the following.

The spsc_object class reserves storage for a single object and achieves thread safety for a single writer and a single reader. It's inspired by boost::lockfree::spsc_queue and supports move semantics.

#ifndef SPSC_OBJECT_HH
#define SPSC_OBJECT_HH

#include <type_traits>
#include <new>
#include <atomic>

template<typename T>
class spsc_object {
    using storage_t = typename std::aligned_storage<sizeof(T), alignof(T)>::type;

    storage_t storage_;
    std::atomic_bool occupied_;

public:
    bool write(const T& object)
    {
        const bool occupied = occupied_.load(std::memory_order_acquire);
        if (!occupied) {
            new (&storage_) T(object); // copy
            occupied_.store(true, std::memory_order_release);
            return true;
        }

        return false;
    }

    bool write(T&& object)
    {
        const bool occupied = occupied_.load(std::memory_order_acquire);
        if (!occupied) {
            new (&storage_) T(std::move(object)); // move
            occupied_.store(true, std::memory_order_release);
            return true;
        }

        return false;
    }

    template<typename Functor>
    bool read(Functor& functor)
    {
        const bool occupied = occupied_.load(std::memory_order_acquire);
        if (!occupied)
            return false;

        T& object = reinterpret_cast<T&>(storage_);
        functor(object);
        object.~T();

        occupied_.store(false, std::memory_order_release);
        return true;
    }

    template<typename Functor>
    bool read(const Functor& functor)
    {
        const bool occupied = occupied_.load(std::memory_order_acquire);
        if (!occupied)
            return false;

        T& object = reinterpret_cast<T&>(storage_);
        functor(object);
        object.~T();

        occupied_.store(false, std::memory_order_release);
        return true;
    }
};

#endif

It seems to work in a small test case I wrote but correctness is hard to verify. Considering performance, boost::lockfree::spsc_queue uses a read index and a write index (placed in different cache lines!) and one additional queue element to distinguish an empty queue from a full one. My container only uses an std::atomic_bool in addition to the object storage space, so it is definitely smaller and hopefully faster. I am not certain whether there could be a faster alternative to the std::atomic_bool. Maybe a processor-word sized variable is faster? Would two separate flags be useful to reduce contention? Did I miss anything else?

Edit: Here's my test case (see Coliru) that produces dummy objects and outputs which of those could be consumed. For some reason, there will be major gaps, i.e. large numbers of subsequent objects are not consumed. I guess that happens whenever std::cout's buffer is flushed.

#include "spsc_object.hpp"

#include <cassert>
#include <atomic>
#include <future>
#include <iostream>

struct test_t {
    int x, y;
    test_t(int x) : x(x), y(1) {}
    ~test_t() {
        y = 0;
    }
};

const int to_produce = 10000000;

spsc_object<test_t> object;
std::atomic_bool stop_consuming;

int producer()
{
    int written = 0;
    for (int i = 0; i < to_produce; i++) {
        if (object.write(test_t(i)))
            written++;
    }
    return written;
}

int consumer()
{
    int read = 0;
    for (;;) {
        test_t mytest(3);
        if (object.read([&mytest](const test_t& test) {
            mytest = test;
        })) {
            read++;
            // Here go expensive calculations
            std::cout << mytest.x << '\n';
            assert(mytest.y != 0);
        } else if (stop_consuming) {
            return read;
        } else {
            // Should not happen unless only a single hardware thread is
            // available for both producer and consumer, as seems to be the case
            // on Coliru for example.
            std::cout << "Oh boy, the producer was too slow!\n";
        }
    }
}

int main()
{
    stop_consuming = false;
    auto t1 = std::async(std::launch::async, producer);
    auto t2 = std::async(std::launch::async, consumer);
    int written = t1.get();
    stop_consuming = true;
    int read = t2.get();

    std::cout << "produced = " << to_produce << "\n"
        "written = " << written << "\n"
        "read = consumed = " << read << '\n';
}

Sample output:

...
9908795
9908803
9908812
9908822
9908832
9908842
9908852
9908861
9908872
9908881
9908891
9908898
9908907
9908917
9908928
9908938
9908948
produced = 10000000
written = 37374
read = consumed = 37374
\$\endgroup\$
3
  • \$\begingroup\$ Your question might be valid as is, but you should really provide a minimal complete working program, and not just your class. \$\endgroup\$
    – papagaga
    Commented Oct 12, 2018 at 9:09
  • 1
    \$\begingroup\$ You're right, I'll add my test case. \$\endgroup\$ Commented Oct 12, 2018 at 10:05
  • \$\begingroup\$ Oh wow I forgot to initialize occupied_. There needs to be a member initializer or a constructor that sets it to false. Also copy constructor and assignment operator should probably be deleted. \$\endgroup\$ Commented Oct 16, 2018 at 8:13

1 Answer 1

4
\$\begingroup\$

Build on top of the standard library, not along

You didn't specify which version of the standard you target. If it is the most recent one, then you have in std::optional the tool to avoid explicit memory management and keep the benefit of uninitialized, local storage.

Even if not, a self-contained object handler probably won't bring anything worth renouncing standard tools such as std::unique_ptr. That you didn't notice that your program would leak resources if the given Functor, or T's constructor, threw an exception, or even if a write isn't read, is a proof of this wise principle: don't reinvent memory management tools.

If you really want to provide your own memory manager, make it a dedicated, RAII based class, and then compose your spsc_object from it and an atomic.

Avoid code duplication

You don't need to have two write methods, and two read methods even less. read already uses a template argument, meaning that Functor& will adapt to either const Functor& or Functor&. For the write method, you can introduce a template argument to get into template deduction context and benefit from a forwarding reference.

Don't clutter variable names with underscores

Classes are meant to qualify names and avoid conflicts, so keep your names informative and beautiful.

With all that in mind, here's what I'd suggest:

#include <type_traits>
#include <atomic>
#include <optional>

template<typename T>
class spsc_object {

    std::optional<T> value;
    std::atomic_bool occupied;

public:
    template <typename U>
    bool write(U&& object) {
        static_assert(std::is_convertible_v<U, T>);
        if (!occupied.load(std::memory_order_acquire)) {
            value = std::forward<U>(object);
            occupied.store(true, std::memory_order_release);
            return true;
        }
        return false;
    }

    template<typename Functor>
    bool read(Functor&& functor) {
        if (!occupied.load(std::memory_order_acquire)) 
            return false;
        functor(*value);
        value.reset();
        occupied.store(false, std::memory_order_release);
        return true;
    }
};
\$\endgroup\$
7
  • 1
    \$\begingroup\$ Re "Don't clutter [member] variable names with [trailing] underscores" — I'd disagree. The advantage of uglifying your member-data names is that you can write e.g. void set_value(T value) { value_ = value; } in a natural way. It's a really easy way to never worry about shadowing again, and thereby save those few extra brain cells to devote to more productive tasks. \$\endgroup\$ Commented Oct 13, 2018 at 22:11
  • 1
    \$\begingroup\$ Also, complete nit, but std::is_convertible_v<U, T> raises a tiny red flag because it's using a perfect-forwarding reference in a non-perfect-forwarding context. You meant std::is_convertible_v<U&&, T>, or even more relevantly, std::is_assignable_v<T&, U&&>. The further you go down that road, though, the more you can't avoid worrying about the behavior of mySpscObject.write(std::nullopt)... :) \$\endgroup\$ Commented Oct 13, 2018 at 22:18
  • \$\begingroup\$ @papagaga I guess you are right, I should put more thought into object lifetime and exception safety. But I don't quite like the std::optional approach as it incurs additional overhead because the "occupied" state is duplicated. Also it requires T to have an assignment operator which is a stronger requirement than being copyable or movable by construction in my opinion. \$\endgroup\$ Commented Oct 13, 2018 at 22:53
  • \$\begingroup\$ What I really do like though is the template approach to reduce code duplication! \$\endgroup\$ Commented Oct 13, 2018 at 22:56
  • \$\begingroup\$ Regarding exception safety I think that exceptions in write or read are still not safe because the atomic is not released afterwards. But I could be wrong. One would have to make a RAII wrapper around the atomic or catch and re-throw. Both seem like quite a hassle for tiny reward, so I will likely settle with additional requirements for the user: noexcept copy/move construction, noexcept functors, reading the last value before destruction. I could statically enforce the noexcepts using std::is_nothrow_.... Not always being destructible reminds me of std::thread which should be fine imo. \$\endgroup\$ Commented Oct 13, 2018 at 23:22

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.