For an application I'm developing, I need an system which allows a thread to submit messages to another thread. For this purpose I designed a message queue system with the following features.
- Any thread can submit a message
- And wait until it is done
- Or let the operation fail if the queue is full
- Any thread can claim the submitted messages
- And wait until there are messages
- Or continue with 0 message if non where submitted
I implemented this using a fixed size buffer as storage which is swapped out when the messages are claimed. For synchronization I used a shared mutex with 2 condition variables as well as an atomic index to reserve a space in the buffer.
#ifndef __THREADING_COLLECTIONS__MESSAGE_QUEUE__
#define __THREADING_COLLECTIONS__MESSAGE_QUEUE__
#include <array>
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <memory>
#include <mutex>
#include <shared_mutex>
namespace Threading
{
/** Allows (producing) threads to submit messages which can be handled by (consuming) threads
* @tparam T the type of messages submitted
* @tparam Size the buffer size for unprocessed messages */
template < typename T, std::size_t Size >
class MessageQueue
{
public:
MessageQueue(): mBuffer( CreateClaimBuffer() ){};
public:
/** Submit a new message to the queue
* @attention will block until the message is successfully submitted
* @param[in] inMessage the message to be submitted */
void SubmitMessage( T const & inMessage )
{
// Whilst this lock is held
// - the queue can't be emptied
std::shared_lock lock( mQueueReadWriteLock );
while( true ) {
// Reserve a spot
auto index = mNextFreeIndex++;
// Queue is full
if( index >= Size ) {
// Wait until the Queue is emptied
mQueueEmptiedCondition.wait( lock ); // Also releases the lock until the queue is emptied
// Try again
continue;
}
// Actually write the message
( *mBuffer )[index] = inMessage;
// We are done writing
lock.unlock();
// Notify of new work
mNewWorkCondition.notify_one();
// We are done
break;
}
}
/** Tries to submit a new message to the queue
* @attention can fail if the queue is full
* @param[in] inMessage the message to be submitted
* @return true if the message was successfully submitted */
bool TrySubmitMessage( T const & inMessage )
{
// Whilst this lock is held
// - the queue can't be emptied
std::shared_lock lock( mQueueReadWriteLock );
auto index = mNextFreeIndex++;
// Queue is full
if( index >= Size ) {
lock.unlock();
return false;
}
// Actually write the message
( *mBuffer )[index] = inMessage;
// We are done writing
lock.unlock();
// Notify of new work
mNewWorkCondition.notify_one();
// We are done
return true;
}
/** Collect submitted messages
* @attention will block until there are actual message to be claimed
* @param[out] outDestination the storage for the collected messages
* @return the amount of claimed messages */
std::size_t WaitForMessages( std::unique_ptr< std::array< T, Size > > & outDestination )
{
// Get an exclusive lock to swap the buffers
// Whilst this lock is held
// - mNextFreeIndex cannot be modified
std::unique_lock lock( mQueueReadWriteLock );
// Wait until theres is actual work
mNewWorkCondition.wait( lock ); // Also releases the lock until there is work
// Swap out the queue for an "empty" one
std::swap( outDestination, mBuffer );
// Get the amount of messages
std::size_t size = std::min< std::size_t >( Size, mNextFreeIndex );
// Mark the queue as empty
mNextFreeIndex = 0;
// We are done with the shared buffer
lock.unlock();
// Notify everyone that the queue is empty
mQueueEmptiedCondition.notify_all();
return size;
}
/** Collect submitted messages
* @param[out] outDestination the storage for the collected messages
* @return the amount of claimed messages */
std::size_t ClaimMessages( std::unique_ptr< std::array< T, Size > > & outDestination )
{
// Get an exclusive lock to swap the buffers
// Whilst this lock is held
// - mNextFreeIndex cannot be modified
std::unique_lock lock( mQueueReadWriteLock );
// Swap out the queue for an "empty" one
std::swap( outDestination, mBuffer );
// Get the amount of messages
std::size_t size = std::min< std::size_t >( Size, mNextFreeIndex );
// Mark the queue as empty
mNextFreeIndex = 0;
// We are done with the shared buffer
lock.unlock();
// Notify everyone that the queue is empty
mQueueEmptiedCondition.notify_all();
return size;
}
/** Creates a storage buffer for messages compatible with this queue
* @return a storage buffer */
static std::unique_ptr< std::array< T, Size > > CreateClaimBuffer()
{
return std::make_unique< std::array< T, Size > >();
}
private:
std::condition_variable mNewWorkCondition;
std::condition_variable_any mQueueEmptiedCondition;
std::shared_mutex mQueueReadWriteLock;
std::atomic< std::size_t > mNextFreeIndex = 0;
std::unique_ptr< std::array< T, Size > > mBuffer;
};
}
#endif
The specific feedback I'm looking for is:
- Correctness ( safety, deadlocks, etc. )
- Performance ( General, and specific scenarios )
- General code ( documentation, names, etc. )