2
\$\begingroup\$

I have implemented a message queue for inter thread communication just like POSIX message queue. I want to know whether there is any scope for performance improvement and also any bugs which need to be corrected. This is my code:

#include <iostream>
#include <cstring>
#include <list>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mu;
std::condition_variable cv;
std::list<std::vector<char>> msg_queue;

void recv() {
    char buff[2048];
    
    while(true) {
        std::unique_lock<std::mutex> ul { mu };
        
        while(msg_queue.empty())
            cv.wait(ul);
            
        std::vector<char>& queue_msg = msg_queue.front();
        if(queue_msg.size() < sizeof(buff)) {
            std::memcpy(buff, queue_msg.data(), queue_msg.size());
            buff[queue_msg.size()] = '\0';
        }
        
        msg_queue.pop_front();
        
        ul.unlock();
        
        std::cout << buff << std::endl;
    }
}

int32_t main() {
    const char *data = "Hello world!";
    std::thread th(recv);
    
    while(true) {
        mu.lock();
        
        msg_queue.emplace_back(std::vector<char>(data, data + strlen(data)));
        cv.notify_one();
        
        mu.unlock();
        
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    
    th.join();
    
    return EXIT_SUCCESS;
}
\$\endgroup\$
1
  • 2
    \$\begingroup\$ Welcome to Code Review! It would help reviewers if you could provide an example of how this code is intended to be used. \$\endgroup\$
    – Edward
    Commented Dec 19, 2020 at 17:23

1 Answer 1

2
\$\begingroup\$

Since you’re primarily interested in efficiency, that’s what I’m going to focus on in my review.

However, first I do need to point out that int32_t main() is not legal C++. I’m mildly surprised it even compiles. There are only two legal forms of main():

  • int main(); and
  • int main(int, char*[]).

I should also mention that a message queue is probably better done as a class, with the mutex and other implementation details as private data members, rather than as global variables, and details like notifying the condition variable handled automatically. But that isn’t a performance issue, just a design one.

std::list<std::vector<char>> msg_queue;

Using a std::list for the actual queue is an odd choice. It’s not wrong; it’ll work perfectly fine. And std::list is not the most performant container, but its inefficiencies will be absolutely dwarfed by all the thread context switching and locking/unlocking.

Normally for a message queue like this you’d use a std::vector, or maybe a std::deque. Or sometimes you’d actually use a std::array (or a std::vector with a fixed size), and either treat it like a ring buffer—so if there are too many unhandled messages, older messages get lost—or block any new messages until some older messages are handled.

As I said, std::list isn’t wrong. It’ll work. I just don’t see any benefits to using it, because at least in the example code, you don’t really benefit from iterator stability (which is the primary reason to prefer a std::list), and you’ll pay a lot more for allocations and cache misses.

char buff[2048];

Is there a reason you use a fixed-size buffer? I don’t see any performance gains from it, and you’ll just lose any messages longer than 2k.

I’ll offer alternative suggestions later.

while(true) {

You have a bug in your program in that the loops are infinite. Infinite loops are UB. (Correction: Infinite loops are UB if and only if they have no observable side effects. These loops have side effects (like printing to standard out, and locking mutexes). However, there is no portable way to end the program, and—more importantly (because it’s the subject of the review)—the message queue. If you tried to put this message queue as-is in an existing program, like the example one, it will never end; it will deadlock while waiting for the thread to join, while the thread loops and waits on the condition variable.)

What you really need is a way for the recv() function to realize the program’s ending. For example, you could use a message "done" as a signal to wind things up:

// in recv()
while (true)
{
    // ... [snip] ...

    if (buf == "done")
        break;
}

// in main()
for (auto n = 0; n < 10000; ++n) // instead of while (true)
{
    // ... [snip] ...
}

// artificial scope
{
    auto lock = std::scoped_lock(mu);
    msg_queue.emplace_back({'d', 'o', 'n', 'e'});
}

cv.notify_one();

th.join();

It doesn’t need to be a special message to signal the end, but if you use something else, like a flag, then you might need to be sure you finish handling all the messages before ending the recv() loop, and you need to consider what happens if the flag is set while the queue is empty (and the condition variable is waiting on it to not be empty).

while(msg_queue.empty())
    cv.wait(ul);

Rather than a manual loop like this, it’s cleaner and less likely to cause problems if you use the other form of wait() that takes a lambda:

cv.wait(ul, [] { return not msg_queue.empty(); });

This removes the temptation for some later code monkey to fool with the loop in some way. Raw loops are a code smell, because you should generally prefer algorithms. They’re fine in the use cases here, but you never know what some future coder might think to do. Also, this reads much clearer, even to someone who doesn’t understand condition variables (who might ask, “why a loop and a call to a wait function?”).

std::vector<char>& queue_msg = msg_queue.front();
if(queue_msg.size() < sizeof(buff)) {
    std::memcpy(buff, queue_msg.data(), queue_msg.size());
    buff[queue_msg.size()] = '\0';
}

msg_queue.pop_front();

Okay, here’s where the biggest inefficiencies lie. I don’t see a reason—at least in this code—why you copy the message from the message queue into a buffer, then delete the message from the queue. Why not just take the message right from the queue and use it?

In other words, why not do this:

void recv()
{
    while (true)
    {
        std::unique_lock<std::mutex> ul { mu };
        cv.wait(ul, [] { return not msg_queue.empty(); });

        // *TAKE* the message from the queue. This will be *LIGHTNING* quick;
        // possibly *thousands* of times faster than copying the message into a
        // buffer.
        auto queue_msg = std::move(msg_queue.front());

        // Remove it from the list.
        msg_queue.pop_front();

        // No need to hold the lock anymore.
        ul.unlock();

        // Now you have the message as queue_msg, and you can do whatever you want
        // with it. For example, print it:
        std::cout.write(queue_msg.data(), queue_msg.size());
        std::cout << '\n';
    }
}

But that’s just the tip of the performance iceberg. It’s very wasteful to do all that locking and so on for ONE message at a time. What if there are several messages in the queue? Why not handle them all at once?

For example:

void recv()
{
    while (true)
    {
        std::unique_lock<std::mutex> ul { mu };
        cv.wait(ul, [] { return not msg_queue.empty(); });

        // Take the ENTIRE QUEUE. Even if there are multiple messages, this could
        // still be thousands of times faster than copying a SINGLE message into a
        // buffer, and that's not even taking into account the gains you get from
        // avoiding multiple locking and unlocking and waiting cycles.
        auto messages = std::vector<std::vector<char>>{};
        messages.resize(msg_queue.size()); // make space for the messages
        std::move(msg_queue.begin(), msg_queue.end(), messages.begin()); // move them all out of the queue
        msg_queue.clear(); // clear the queue of the (now empty, moved-from) messages

        // No need for the lock anymore.
        ul.unlock();

        // Now we can handle all the messages at our leisure.
        std::for_each(messages.begin(), messages.end(),
            [](auto&& message)
            {
                std::cout.write(message.data(), message.size());
                std::cout << '\n';
            });
    }
}

As an optimization, you could also move messages out of the loop, and call messages.clear() at the end of the loop. The advantage of this is that it will mean that messages.resize() will almost never need to allocate because you can reuse the memory. That means that that entire block of code in the middle of the function above will amount to a few measly pointer copies… which will be FAST… far faster than copying entire strings, even with std::memcpy(). And that in turn means much less time spent with the mutex locked, which means you should be able to get a MUCH higher throughput for your messages.

\$\endgroup\$
9
  • 2
    \$\begingroup\$ "only two legal forms of main()" is not quite true. Compilers are permitted to accept other signatures for the main function - a common extension is int main(int argc, char **argv, char **envp) for example. What is true is that the code presented is needlessly non-portable. \$\endgroup\$ Commented Dec 19, 2020 at 20:20
  • 3
    \$\begingroup\$ An infinite loop is not UB. An infinite loop without a side effect is. \$\endgroup\$
    – vnp
    Commented Dec 19, 2020 at 21:12
  • \$\begingroup\$ -1 because of the incorrect statements about main() and infinite loops, will +1 once that's fixed :) I would keep the bit about std::list short, just say that std::vector or std::deque might be fast because it avoids a lot of allocations and keeps things together in memory. As for copying the contents of the queue so you can process multiple messages in one go: why not just messages.swap(msg_queue)? \$\endgroup\$
    – G. Sliepen
    Commented Dec 20, 2020 at 11:06
  • 1
    \$\begingroup\$ 3) Thing is, because there are no comments, I’m not sure there isn’t a valid reason to use std::list. I’ve made concurrent message queues using (singly) linked lists before (so they can be lock-free). Maybe std::list is perfect for whatever the ultimate intended application here is. 🤷🏼 I don’t like to presume cluelessness wantonly, so I try to explain the usual reasons why std::list isn’t right, so the reader can at least balance them with whatever reasons for using it they have in their head. \$\endgroup\$
    – indi
    Commented Dec 21, 2020 at 21:05
  • 1
    \$\begingroup\$ As for 4), good point about the allocated memory. If both the queue itself and the temporary are a vector, then you could do something like std::vector<...> messages; messages.reserve(msg_queue.capacity); messages.swap(msg_queue). If you also declare messages outside the loop, you need to call msg_queue.clear() somewhere in the loop, but you avoid unnecessary memory (re)allocations. If a std::list is used for both the queue and temporary, you can also swap(). \$\endgroup\$
    – G. Sliepen
    Commented Dec 22, 2020 at 0:21

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.