7
\$\begingroup\$

For an application I'm developing, I need an system which allows a thread to submit messages to another thread. For this purpose I designed a message queue system with the following features.

  • Any thread can submit a message
    • And wait until it is done
    • Or let the operation fail if the queue is full
  • Any thread can claim the submitted messages
    • And wait until there are messages
    • Or continue with 0 message if non where submitted

I implemented this using a fixed size buffer as storage which is swapped out when the messages are claimed. For synchronization I used a shared mutex with 2 condition variables as well as an atomic index to reserve a space in the buffer.

#ifndef __THREADING_COLLECTIONS__MESSAGE_QUEUE__
#define __THREADING_COLLECTIONS__MESSAGE_QUEUE__

#include <array>
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <memory>
#include <mutex>
#include <shared_mutex>

namespace Threading
{
    /** Allows (producing) threads to submit messages which can be handled by (consuming) threads
     * @tparam T the type of messages submitted
     * @tparam Size the buffer size for unprocessed messages */
    template < typename T, std::size_t Size >
    class MessageQueue
    {
        public:
            MessageQueue(): mBuffer( CreateClaimBuffer() ){};

        public:
            /** Submit a new message to the queue
             * @attention will block until the message is successfully submitted
             * @param[in] inMessage the message to be submitted */
            void SubmitMessage( T const & inMessage )
            {
                // Whilst this lock is held
                // - the queue can't be emptied
                std::shared_lock lock( mQueueReadWriteLock );

                while( true ) {
                    // Reserve a spot
                    auto index = mNextFreeIndex++;
                    // Queue is full
                    if( index >= Size ) {
                        // Wait until the Queue is emptied
                        mQueueEmptiedCondition.wait( lock ); // Also releases the lock until the queue is emptied
                        // Try again
                        continue;
                    }
                    // Actually write the message
                    ( *mBuffer )[index] = inMessage;
                    // We are done writing
                    lock.unlock();
                    // Notify of new work
                    mNewWorkCondition.notify_one();
                    // We are done
                    break;
                }
            }

            /** Tries to submit a new message to the queue
             * @attention can fail if the queue is full
             * @param[in] inMessage the message to be submitted
             * @return true if the message was successfully submitted */
            bool TrySubmitMessage( T const & inMessage )
            {
                // Whilst this lock is held
                // - the queue can't be emptied
                std::shared_lock lock( mQueueReadWriteLock );
                auto             index = mNextFreeIndex++;
                // Queue is full
                if( index >= Size ) {
                    lock.unlock();
                    return false;
                }
                // Actually write the message
                ( *mBuffer )[index] = inMessage;
                // We are done writing
                lock.unlock();
                // Notify of new work
                mNewWorkCondition.notify_one();
                // We are done
                return true;
            }

            /** Collect submitted messages
             * @attention will block until there are actual message to be claimed
             * @param[out] outDestination the storage for the collected messages
             * @return the amount of claimed messages */
            std::size_t WaitForMessages( std::unique_ptr< std::array< T, Size > > & outDestination )
            {
                // Get an exclusive lock to swap the buffers
                // Whilst this lock is held
                // - mNextFreeIndex cannot be modified
                std::unique_lock lock( mQueueReadWriteLock );
                // Wait until theres is actual work
                mNewWorkCondition.wait( lock ); // Also releases the lock until there is work
                // Swap out the queue for an "empty" one
                std::swap( outDestination, mBuffer );
                // Get the amount of messages
                std::size_t size = std::min< std::size_t >( Size, mNextFreeIndex );
                // Mark the queue as empty
                mNextFreeIndex = 0;
                // We are done with the shared buffer
                lock.unlock();
                // Notify everyone that the queue is empty
                mQueueEmptiedCondition.notify_all();

                return size;
            }

            /** Collect submitted messages
             * @param[out] outDestination the storage for the collected messages
             * @return the amount of claimed messages */
            std::size_t ClaimMessages( std::unique_ptr< std::array< T, Size > > & outDestination )
            {
                // Get an exclusive lock to swap the buffers
                // Whilst this lock is held
                // - mNextFreeIndex cannot be modified
                std::unique_lock lock( mQueueReadWriteLock );
                // Swap out the queue for an "empty" one
                std::swap( outDestination, mBuffer );
                // Get the amount of messages
                std::size_t size = std::min< std::size_t >( Size, mNextFreeIndex );
                // Mark the queue as empty
                mNextFreeIndex = 0;
                // We are done with the shared buffer
                lock.unlock();
                // Notify everyone that the queue is empty
                mQueueEmptiedCondition.notify_all();

                return size;
            }

            /** Creates a storage buffer for messages compatible with this queue
             * @return a storage buffer */
            static std::unique_ptr< std::array< T, Size > > CreateClaimBuffer()
            {
                return std::make_unique< std::array< T, Size > >();
            }

        private:
            std::condition_variable                  mNewWorkCondition;
            std::condition_variable_any              mQueueEmptiedCondition;
            std::shared_mutex                        mQueueReadWriteLock;
            std::atomic< std::size_t >               mNextFreeIndex = 0;
            std::unique_ptr< std::array< T, Size > > mBuffer;
    };
}

#endif

The specific feedback I'm looking for is:

  • Correctness ( safety, deadlocks, etc. )
  • Performance ( General, and specific scenarios )
  • General code ( documentation, names, etc. )
\$\endgroup\$

1 Answer 1

5
\$\begingroup\$

Use wait() with a predicate

I recommend that you always pass a predicate to wait() that checks whether the condition you are waiting for has become true. This will often simplify your code, as you no longer have to implement a while-loop yourself. Also, without a predicate, it's often a bug if you forget to add a while-loop, as wait() can otherwise return even if no other thread signaled it, like you did in WaitForMessage(). So:

void SubmitMessage(T const& inMessage)
{
    std::shared_lock lock(mQueueReadWriteLock);
    std::size_t index;
    mQueueEmptiedCondition.wait(lock, [&]{
        index = mNextFreeIndex++;
        return index < Size;
    });
    (*mBuffer)[index] = inMessage;
    …
}

std::size_t WaitForMessages(…)
{
     std::unique_lock lock(mQueueReadWriteLock);
     mNewWorkCondition.wait(lock, []{return mNextFreeIndex > 0;});
     …
}

Consider using std::vector

Instead of a std::unique_ptr<std::array<…>>, just use a std::vector to store the messages. You can std::swap() two std::vectors just as you can swap two std::unique_ptrs, and it's just as efficient. As a bonus, you no longer have to fix the size of the buffer at compile time.

Use a single condition variable

It is not possible for the queue to be both full and empty at the same time, so it will virtually never happen that you have two threads, one waiting for the queue to become empty, and another waiting for something being added to the queue. So a single condition variable should be enough.

Also note that you should always use std::condition_variable_any, as only that is guaranteed to work with a std::shared_mutex.

Correctness

Your code seems correct in practice, but in theory you could have a huge number of threads calling SubmitMessage() or TrySubmitMessage(), such that mNextFreeIndex overflows and wraps back to zero. This would cause threads to overwrite earlier messages.

Add a way to stop the queue

Consider you have a consumer thread blocked in WaitForMessages(), but the producers are all done by now and will no longer submit any new messages. How can you ensure the consumer thread can shut down safely? Add some flag that indicates whether all work is done, a public member function to set it (which will also notify mNewWorkCondition), and make WaitForMessages() return immediately when this happens.

Naming things

The names in your code a reasonably well chosen. However, I would remove Message from both the class name and the member function names. It's redundant; a queue already stores some type of items, and your code doesn't care whether that is a "message" or something else. So nothing is lost by removing it, and it saves some typing.

Allow messages to be emplaced into the buffer

Because SubmitMessage() takes the message by const reference, it will always make a copy of it when putting it into the buffer. However, this can either be inefficient, or even impossible, depending on the type. I recommend you make it a template that will allow you to "emplace" an element into the buffer:

template<typename... Args>
void SubmitMessage(Args&&... args) {
    …
    (*mBuffer)[index] = {std::forward<Args>(args)...};
    …
}       

When are constructors and destructors called?

There are some inefficiencies in your code. First, because you are using std::array<T, Size>, you always default-construct a lot of Ts at the start of your program: Size times one more than the number of consumer threads. In the long run, that might not matter; you wrote the code so the buffers are recycled. But, there actually lies the second problem:

In SubmitMessage(), when you overwrite (*mBuffer)[index], it first needs to call the destructor for the old value, before the new value can be copied/moved into its place. You do this with the lock held, so if the destructor is taking a long time, this can hold up other threads. Calling the destructor inside the lock is not strictly necessary however. Ideally, you would let the consumer thread destroy each element of the buffer, before passing the buffer back to WaitForMessages().

To do this, you'd have to implement your own container class that works like a mix between std::array and std::vector: having a fixed size, but only constructing/destructing the elements that are actually in use. This certainly is possible, but a bit of work.

Consider simplifying the queue

Having a std::shared_mutex to allow concurrent submissions, and having consumers get whole buffers at a time is a bit more complex than a thread-safe queue that just allows pushing and popping one message at a time. Also note that even though WaitForMessages() doesn't look that complicated, the caller has to deal with looping over the result correctly. Your method probably has some performance benefits, but there are also some costs: std::shared_mutex operations are a bit slower than std::unique_mutex, and the more consumers you have the more memory you are using.

There is a tradeoff to be made: you can simplify the queue and the callers, at potentially the cost of a little bit of performance. I would recommend you keep things simple, and only go for more complex solutions if you have measured that you really need that for performance.

\$\endgroup\$

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.