6
\$\begingroup\$

This is mostly an exercise for me to try to understand the differences between semaphores and locks. It's a long and rambling because it took me quite a few tries to understand the concepts. Please bear with me. I am hoping you can either confirm that the lesson I learned was correct or pointing out my misunderstanding. Please jump to the last code section if you just would like to see my final code.

I read about this blog: https://vorbrodt.blog/2019/02/03/blocking-queue/ and it really confused me. If we were going to serialize access to the elements, what's the point of the semaphore? I originally thought a semaphore is basically a counter protected by a lock, so I was having trouble understand the differences. I decided to implement it myself without using a semaphore. Here is my first (incorrect) attempt of implementing a blocking queue with one producer and one consumer:

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>

template <typename T>
class OneToOneBlockingQueue {
private:
  unsigned int m_maxSize;
  std::queue <T> m_data;
  std::mutex m_mutex;
  std::condition_variable m_readCond;
  std::condition_variable m_writeCond;
public:
  OneToOneBlockingQueue(unsigned int size): m_maxSize(size) {
  }

  void push(T value) {
    std::unique_lock <std::mutex> myLock(m_mutex);
    m_writeCond.wait(myLock, [this]() { return m_data.size() < m_maxSize; });
    m_data.push(value);
    m_readCond.notify_one();
  }

  void pop(T& value) {
    std::unique_lock <std::mutex> myLock(m_mutex);
    m_readCond.wait(myLock, [this]() { return m_data.size() > 0; });

    value = m_data.front();
    m_data.pop();
    m_writeCond.notify_one();
  }
};

class Producer {
public:
  Producer(OneToOneBlockingQueue <int>& bq, int id):m_bq(bq), m_id(id) {
  }
  
  void operator()() {
    for (int i = 0; i < 10; i++) {
      m_bq.push(i);
    }
  }
private:
  OneToOneBlockingQueue<int> &m_bq;
  int m_id;
};

class Consumer {
public:
  Consumer(OneToOneBlockingQueue <int>& bq, int id):m_bq(bq), m_id(id) {
  }

  void operator()() {
    std::cout << "Reading from queue: ";
    for (int i = 0; i < 10; i++) {
      int value;
      m_bq.pop(value);
      std::cout << value << " ";
    }

    std::cout << std::endl;
  }
private:
  OneToOneBlockingQueue <int> &m_bq;
  int m_id;
};

int main() {
  OneToOneBlockingQueue <int>bq(2);

  std::thread producerThread (Producer(bq, 0));
  std::thread consumerThread (Consumer(bq, 0));

  producerThread.join();
  consumerThread.join(); 
} 

While it worked, I then realized it was not correct since the producer and the consumer can't read and write at the same time. Assuming the consumer is very slow, the producer would be locked out until the consumer finished reading even though the queue is not full. The only critical section is the counter, not the data itself. However, using std::queue, I couldn't separate the two. Maybe that is why the other author used an looping array instead?

Here is my second attempt:

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>

template <typename T>
class OneToOneBlockingQueue {
private:
  unsigned int m_maxSize;
  T *m_data;
  unsigned int m_size;
  std::mutex m_mutex;
  std::condition_variable m_readCond;
  std::condition_variable m_writeCond;
  unsigned int m_readLoc;
  unsigned int m_writeLoc;
public:
  OneToOneBlockingQueue(unsigned int size): m_maxSize(size), m_size(0), m_data(new T[size]), m_readLoc(0), m_writeLoc(0) {
  }

  void push(T value) {
    std::unique_lock <std::mutex> myLock(m_mutex);
    m_writeCond.wait(myLock, [this]() { return m_size < m_maxSize; });
    myLock.unlock();

    m_data[m_writeLoc++] = value;
    if (m_writeLoc == m_maxSize) {
      m_writeLoc = 0;
    }

    myLock.lock();
    m_size++;
    m_readCond.notify_one();
  }

  void pop(T& value) {
    std::unique_lock <std::mutex> myLock(m_mutex);
    m_readCond.wait(myLock, [this]() { return m_size > 0; });
    myLock.unlock();

    value = m_data[m_readLoc++];
    if (m_readLoc == m_maxSize) {
      m_readLoc = 0;
    }

    myLock.lock();
    m_size--;
    m_writeCond.notify_one();
  }
};

class Producer {
public:
  Producer(OneToOneBlockingQueue <int>& bq, int id):m_bq(bq), m_id(id) {
  }
  
  void operator()() {
    for (int i = 0; i < 10; i++) {
      m_bq.push(i);
    }
  }
private:
  OneToOneBlockingQueue<int> &m_bq;
  int m_id;
};

class Consumer {
public:
  Consumer(OneToOneBlockingQueue <int>& bq, int id):m_bq(bq), m_id(id) {
  }

  void operator()() {
    std::cout << "Reading from queue: ";
    for (int i = 0; i < 10; i++) {
      int value;
      m_bq.pop(value);
      std::cout << value << " ";
    }
    std::cout << std::endl;
  }
private:
  OneToOneBlockingQueue <int> &m_bq;
  int m_id;
};

int main() {
  OneToOneBlockingQueue <int>bq(2);

  std::thread producerThread (Producer(bq, 0));
  std::thread consumerThread (Consumer(bq, 0));

  producerThread.join();
  consumerThread.join();
}

I think the differences between semaphore and lock is that the semaphore by itself does not protect the elements, only the use count. The producer and consumer must inherently work on different elements for it to work. Is that correct?

Here is the code after abstracting the counter into a semaphore class.

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>

class Semaphore {
private:
  unsigned int m_counter;
  std::mutex m_mutex;
  std::condition_variable m_cond;
public:
  Semaphore(unsigned int counter):m_counter(counter) {
  }

  void P() {
    std::unique_lock <std::mutex> myLock(m_mutex);
    m_cond.wait(myLock, [this]() { return m_counter > 0; });
    m_counter--;
  }

  void V() {
    std::lock_guard <std::mutex> myLock(m_mutex);
    m_counter++;
    m_cond.notify_one();
  }
};

template <typename T>
class OneToOneBlockingQueue {
private:
  unsigned int m_maxSize;
  T *m_data;
  Semaphore m_filledSlots;
  Semaphore m_emptySlots;
  unsigned int m_readLoc;
  unsigned int m_writeLoc;
public:
  OneToOneBlockingQueue(unsigned int size): m_maxSize(size), m_data(new T[size]), m_filledSlots(0), m_emptySlots(size), m_readLoc(0), m_writeLoc(0) {
  }

  void push(T value) {
    m_emptySlots.P();

    m_data[m_writeLoc++] = value;
    if (m_writeLoc == m_maxSize) {
      m_writeLoc = 0;
    }

    m_filledSlots.V();
  }

  void pop(T& value) {
    m_filledSlots.P();

    value = m_data[m_readLoc++];
    if (m_readLoc == m_maxSize) {
      m_readLoc = 0;
    }

    m_emptySlots.V();
  }
};

class Producer {
public:
  Producer(OneToOneBlockingQueue <int>& bq, int id):m_bq(bq), m_id(id) {
  }
  
  void operator()() {
    for (int i = 0; i < 10; i++) {
      m_bq.push(i);
    }
  }
private:
  OneToOneBlockingQueue<int> &m_bq;
  int m_id;
};

class Consumer {
public:
  Consumer(OneToOneBlockingQueue <int>& bq, int id):m_bq(bq), m_id(id) {
  }

  void operator()() {
    std::cout << "Reading from queue: ";
    for (int i = 0; i < 10; i++) {
      int value;
      m_bq.pop(value);
      std::cout << value << " ";
    }
    std::cout << std::endl;
  }
private:
  OneToOneBlockingQueue <int> &m_bq;
  int m_id;
};

int main() {
  OneToOneBlockingQueue <int>bq(2);

  std::thread producerThread (Producer(bq, 0));
  std::thread consumerThread (Consumer(bq, 0));

  producerThread.join();
  consumerThread.join();  
}

And finally, to allow multiple consumer, we only need to worry about producers and consumers separately. Semaphores do not work between consumers (or producers) since it does not provide exclusive access to individual elements. So I created a producerMutex and a consumerMutex. The reason the original blog post confused me was because he was using a single mutex, which made me think the semaphore was unnecessary. Here is my final code:

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <vector>
#include <queue>
#include <unistd.h>

class Semaphore {
private:
  unsigned int m_counter;
  std::mutex m_mutex;
  std::condition_variable m_cond;
public:
  Semaphore(unsigned int counter):m_counter(counter) {
  }

  void P() {
    std::unique_lock <std::mutex> myLock(m_mutex);
    m_cond.wait(myLock, [this]() { return m_counter > 0; });
    m_counter--;
  }

  void V() {
    std::lock_guard <std::mutex> myLock(m_mutex);
    m_counter++;
    m_cond.notify_one();
  }
};

template <typename T>
class ManyToManyBlockingQueue {
private:
  unsigned int m_maxSize;
  T *m_data;
  Semaphore m_filledSlots;
  Semaphore m_emptySlots;
  unsigned int m_readLoc;
  unsigned int m_writeLoc;
  std::mutex m_consumerMutex;
  std::mutex m_producerMutex;
public:
  ManyToManyBlockingQueue(unsigned int size): m_maxSize(size), m_data(new T[size]), m_filledSlots(0), m_emptySlots(size), m_readLoc(0), m_writeLoc(0) {
  }

  void push(T value) {
    m_emptySlots.P();

    std::unique_lock <std::mutex> producerLock(m_producerMutex);
    m_data[m_writeLoc++] = value;
    if (m_writeLoc == m_maxSize) {
      m_writeLoc = 0;
    }
    producerLock.unlock();

    m_filledSlots.V();
  }

  void pop(T& value) {
    m_filledSlots.P();

    std::unique_lock <std::mutex>consumerLock(m_consumerMutex);
    value = m_data[m_readLoc++];
    if (m_readLoc == m_maxSize) {
      m_readLoc = 0;
    }
    consumerLock.unlock();

    m_emptySlots.V();
  }
};

class Producer {
public:
  Producer(ManyToManyBlockingQueue <int>& bq, int id):m_bq(bq), m_id(id) {
  }
  
  void operator()() {
    for (int i = 0; i < 10; i++) {
      m_bq.push(m_id*10+i);
    }
  }
private:
  ManyToManyBlockingQueue<int> &m_bq;
  int m_id;
};

class Consumer {
public:
  Consumer(ManyToManyBlockingQueue <int>& bq, int id, std::queue <int>&output):m_bq(bq), m_id(id), m_output(output) {
  }

  void operator()() {
    for (int i = 0; i < 50; i++) {
      int value;
      m_bq.pop(value);
      m_output.push(value);
    }
  }
private:
  ManyToManyBlockingQueue <int> &m_bq;
  int m_id;
  std::queue<int> &m_output;
};

int main() {
  ManyToManyBlockingQueue <int>bq(10);

  std::vector <std::thread> producerThreads;
  std::vector <std::thread> consumerThreads;
  std::vector <std::queue<int>> outputs;

  for (int i = 0; i < 10; i++) {
    producerThreads.emplace_back(Producer(bq,i));
  }

  for (int i = 0; i < 2; i++) {
    outputs.emplace_back(std::queue<int>());
  }
  
  for (int i = 0; i < 2; i++) {
    consumerThreads.emplace_back(Consumer(bq,i,outputs[i]));
  }
  
  for (std::vector <std::thread>::iterator it = producerThreads.begin();
       it != producerThreads.end();
       it++) {
    it->join();
  }

  for (std::vector <std::thread>::iterator it = consumerThreads.begin();
       it != consumerThreads.end();
       it++) {
    it->join(); 
  }

  for (std::vector <std::queue<int>>::iterator it = outputs.begin();
       it != outputs.end();
       it++) {
    std::cout << "Number of elements: " << it->size() << " Data: ";
    while(!it->empty()) {
      std::cout << it->front() << " ";
      it->pop();
    }
    std::cout << std::endl;
  }
}

Am I doing this correctly?

A couple of other issues I have with this code. The pop() function bugs me. I would really like it to return the value so that the caller can use it directly instead of having to have a temp variable. However, I can't access it after I have V() the other semaphore or a producer might overwrite it. Holding the lock longer would decrease parallelism. Is this the right way to do it or there's a better way?

The other thing is that I was new to references in C++ as I mostly used pointers before. Originally, I allocated the output queue as I was creating the thread and I was surprised that I wasn't getting any data from the first consumer. After a lot of debugging, I finally realized that the vector moved in order to grow in size. Therefore, it seems that passing a movable object by reference is dangerous. What's the best way to solve that?

Another issue is how best to allow producer to signal end of data. Would a "done" counter protected by another mutex be the right way?

Another issue is what if one partner does not respond for a while. I can't really free the queue since there is no guarantee that the partner wouldn't come back later and write into bad memory. What's the best way to handle it and abort the operation?

Sorry again about the long post. Thanks for your inputs.

p.s. I understand semaphores can behave quite differently depends on the implementation (e.g. interrupt), this is not meant to be a production code, just to understand the concept.

\$\endgroup\$
2
  • \$\begingroup\$ "Am I doing this correctly?" Did you test it? Does the final code work the way it should, without running into race conditions and other interlock trouble? If not, it's not ready for review. Please take a look at the help center. \$\endgroup\$
    – Mast
    Commented Aug 4, 2020 at 18:50
  • \$\begingroup\$ @Mast Yes, I did test it, to the extend of running a bunch of tests with different queue size as well as using introducing different timings. Seems to work. I guess I am mostly interested in whether this is the right way to do it rather than whether the code is 100% perfect. For example, I am not super interested in 0 size queue since it will add more code without illustrating the concept although that is a valid test case (I did test with size 1 queue though). \$\endgroup\$
    – wgemini
    Commented Aug 4, 2020 at 20:51

1 Answer 1

2
\$\begingroup\$

There's too much state

Each queue has four mutexes, four counters and two condition variables. That is way too much. You could do this with just a single mutex and condition variable.

In your push() function, you first have to hold a mutex at least once to check if there are empty slots (if not, you have to wait for a condition variable to be signalled, which means multiple calls the mutex lock and unlock functions), then you have to hold a mutex to update the write location, and then hold the mutex to increment the filled slots semaphore. Locking and unlocking a mutex, despite being quite optimized, is still not free.

Another issue is the duplication of information of the state of the queue. There's m_filledSlots, m_emptySlots (which should be the inverse), and the same information is also present in the difference between the read and write locations. And you have to keep everything updated.

If you just take one lock, check the read and write pointers to see how many free slots there are in the queue, wait for the condition variable if necessary, then update the read or write pointer, signal the variable if necessary, and then unlock, you have spent much less cycles than with this approach with semaphores.

Making pop() return the value

You can just write:

T pop() {
    ...
    T value = m_data[m_readLoc++];
    ...
    return value;
}

Even though it looks like there is a temporary variable that would require an extra copy, the compiler can perform return value optimization here, which is mandatory in C++17, and which most compilers have been doing already for much longer.

Pointers moving when containers grow

Indeed, a std::vector will move its contents in memory if it grows. However, there are other container classes that you can use that do guarantee that elements already in the container will keep their address, even if it needs to allocate more memory. Amongst them are std::list and std::deque. There are also container adapter classes such as std::queue that by default use a std::deque for storage, and thus inherit its properties.

Signalling that production ended

There are two common ways to do this. First is to add a flag variable to your blocking queue class that signals that the producers finished. This flag is set, and then the condition variable that the consumers listen for is broadcast to. Consumers should check this flag each time they want to dequeue an item. If it's set, they can terminate.

The other way is to have some way to enqueue an item that signals that no more data will be coming. If your queue contains pointers to objects, enqueueing a nullptr might suffice. Again, the condition variable should be broadcast, and a consumer should not actually pop this item from the queue, so that other consumers also get a chance to see it. Alternatively, you have to enqueue as many of these special items as there are consumer threads.

Timeouts

Another issue is what if one partner does not respond for a while. I can't really free the queue since there is no guarantee that the partner wouldn't come back later and write into bad memory. What's the best way to handle it and abort the operation?

I'm not sure what you mean by "partner". Is it a consumer or a producer thread? In any case, you can only delete the queue if no threads are left that could read or write from it. You could kill threads that don't respond in time, but it is very hard to do this in a safe way. The best way is to ensure these threads never take too much time to produce or consume an item to begin with.

\$\endgroup\$
5
  • \$\begingroup\$ Thanks for the detailed answers. However, wouldn't have a single conditional variable make notification tricky since a producer can notify another producer instead of a consumer? NotifyAll would wake everybody up unnecessarily. Also, I think having a single mutex might lower parallelism since producers would block consumers and vice versa. If the queue is not full or empty, they shouldn't block each other. \$\endgroup\$
    – wgemini
    Commented Aug 9, 2020 at 2:44
  • \$\begingroup\$ Yes, having two condition variables might help a little bit. But two mutexes are probably unnecessary: first, the mutex is only ever held for a brief moment, it's not held when waiting for a condition variable to be notified. Second, if the producers are waiting for consumers to free up the queue, then the consumers are obviously busy doing their stuff and not holding the lock. \$\endgroup\$
    – G. Sliepen
    Commented Aug 9, 2020 at 7:30
  • \$\begingroup\$ Sorry, I am not sure I understand. m_consumerMutex are only locked between consumers and m_producerMutex are only locked between producers. Thus, they need to be separated from the main mutex. The two semaphores also improve parallelism. Assuming we have two producers, one waiting for full semaphore, one waiting to update the empty semaphore, if we had one semaphore, a consumer could potentially be blocked twice. Although as you said, in practice, it may not make much differences since these locks are only held briefly, but semantically, the two counters serve different purposes. \$\endgroup\$
    – wgemini
    Commented Aug 9, 2020 at 15:16
  • \$\begingroup\$ I'm retracting my statement about two condition variables helping. Because let's look at this from another angle: can it ever happen that both producers and consumers are waiting to be notified? No, because the queue cannot be both completely empty and completely full at the same time. So either it is empty, consumers are waiting for a condition variable to be notified, and the producers do not wait but merely notify it. Or if it's full, then it's the other way around. And finally if half-full, no side is blocked. \$\endgroup\$
    – G. Sliepen
    Commented Aug 9, 2020 at 20:41
  • \$\begingroup\$ Hmm... The whole point of the two counters was so that we can be empty and full at the same time. Say we have 10 slots and we have 10 producers all copying data into the queue. The full counter would be full as no other producers should be able to get in, but the empty counter would be empty since no data is ready for consumption. However, on closer examination, my code does not work since I can't release the producers_mutex until I have Ved the empty semaphore, thus largely negated any parallelism gains. Maybe I have to lock the entire push/pop section? \$\endgroup\$
    – wgemini
    Commented Aug 9, 2020 at 23:47

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.