2
\$\begingroup\$

I am trying to write some code for processing streams of asynchronous data from multiple sources (Producer Nodes), process them using a multistage pipeline of Processors Nodes and then forward them to multiple Consumers Nodes. So the way I have thought of going about it is the follows: Each Node has a run function which does stuff. A Consumer node has an input queue, Producer node has an output queue and the Processor node has both. I then Construct a Pipeline by 'connecting' input and output queues (by pointing input queue of next node to output queue of previous) of nodes and run all nodes asynchronously. Each shared queue between nodes is synchronized by a mutex and condition_variable variable which are also 'connected'. Here is the code sample for int streams:

    enum class PacketState{
        START,
        VALID,
        CORRUPT,
        END,
    };
    typedef struct packet{
        int data;
        PacketState state;
        explicit packet(int x):data{x},state{PacketState::VALID}{};
        packet(int d, PacketState s):data{d},state{s}{};
        packet():data{0},state{PacketState::VALID}{};
    } PacketType;
    using OutputType = PacketType;
    using InputType = OutputType;

    class Node{
    protected:
        bool stayAlive = true;
    public:
        virtual void run() = 0;
    };

    class NodesWithOutput: public Node{
    protected:
        //declare output pointers
        std::shared_ptr<Queue<OutputType>> output;
        std::shared_ptr<std::condition_variable> cvout;
        std::shared_ptr<std::mutex> m_out;
    public:
        NodesWithOutput(){
            //setup output apparatus.....
            output = std::make_shared< Queue<OutputType> >();
            cvout = std::make_shared<std::condition_variable>();
            m_out = std::make_shared<std::mutex>();
        }
        auto getOutputQueue(){return output;}
        auto getOutputCondition(){return cvout;}
        auto getOutputMutex(){return m_out;};
    };

    class NodesWithInput: public Node{
    protected:
        //declare input pointers
        std::shared_ptr<Queue<InputType>> input;
        std::shared_ptr<std::condition_variable> cvin;
        std::shared_ptr<std::mutex> m_in;
    };

    //template<typename OutputType>
    class Producer: public NodesWithOutput{
    public:
        Producer() = default;

        void run() override {
            int i = 6;
                while (stayAlive) {
                    //generate stream of ints..
                    OutputType dataPacket{--i};
                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                    // push to output queue
                    {
                        std::lock_guard<std::mutex> lock(*m_out);
                        output->push(dataPacket);
                        cvout->notify_all();
                    }
                    if(i <= 0) stayAlive = false;
                }

            OutputType dataPacket{-1,PacketState::END};
            // push to output queue
            {
                std::lock_guard<std::mutex> lock(*m_out);
                output->push(dataPacket);
            }
            // This is the right place to notify right?..
            // **not** inside the guarded block
            cvout->notify_all();
        }
    };

    //template<typename InputType, typename OutputType>
    // HOW DO I DO THIS PROPERLY?
    class Processor:public NodesWithOutput, public NodesWithInput {
    private:
        int scale;
    protected:
        //HOW DO I FIX THIS?
        //?? this doesn't seem right!
        bool stayAlive = NodesWithOutput::stayAlive;
    public:
        Processor():scale{1}{}

        void setScale(int s){ scale = s;}

        void setInput(Producer& other){
            input = other.getOutputQueue();
            m_in = other.getOutputMutex();
            cvin = other.getOutputCondition();
        }

        void run() override {
            while(stayAlive) {
                //wait for input
                {
                    std::unique_lock<std::mutex> ulock(*m_in);
                    //Should I wait for notify from cvout of previous node?
                    //I mean this cvin will finally point to previous cvout!
                    cvin->wait(ulock, [this](){return !input->empty();});
                }
                //process input
                {
                    std::unique_ptr<PacketType> ip;
                    //read from input queue
                    {
                        std::lock_guard<std::mutex> lock(*m_in);
                        ip = input->pop();
                    }
                    //process and
                    OutputType o;
                    o.state = ip->state;
                    if(ip->state != PacketState::END) {
                        o.data = (*ip).data * scale;
                        std::this_thread::sleep_for(std::chrono::milliseconds(50));
                    }else{
                        stayAlive = false;
                    }

                    //write to output queue
                    {
                        std::lock_guard<std::mutex> lock(*m_out);
                        output->push(o);
                    }
                    cvout->notify_all();
                }
            }
        }
    };

    //template<typename InputType>
    class Consumer: public NodesWithInput {
    public:
        Consumer() = default;
        void setInput(Processor& other){
            input = other.getOutputQueue();
            m_in = other.getOutputMutex();
            cvin = other.getOutputCondition();
        }
        void run() override {
            while(stayAlive) {
                //wait for data
                {
                    std::unique_lock<std::mutex> ulock(*m_in);
                    cvin->wait(ulock, [this](){ return !input->empty();});
                }
                //Consume data data
                std::unique_ptr<InputType> ip;
                {
                    std::lock_guard<std::mutex> lock(*m_in);
                    ip = input->pop();
                }
                if(ip->state == PacketState::END) break;

                //process data
                std::cout << "Consumed: " << ip->data << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(5));
            }
        }
    };

    class Pipeline {
    private:
        std::vector< Node* > nodes;
    public:
        Pipeline(){
            nodes = std::vector< Node* >();
        }

        ~Pipeline() = default;

        void buildAndRun() {
            // Node 1
            Producer randInt;
            //nodes.push_back(&randInt);

            Processor multiplier;
            //HOW DO IF FIX THIS?
            //'Node' is an ambiguous base of 'Processor'
            //nodes.push_back(&multiplier);
            multiplier.setInput(randInt);
            multiplier.setScale(2);


            Consumer printer;
            //nodes.push_back(&printer);
            printer.setInput(multiplier);

            //FIX move to separate function which operates on nodes vector....
            auto run = [&](){
                // Put these in a loop if nodes vector works out...
                auto t1 = std::async(std::launch::async, &Producer::run, randInt);
                auto t2 = std::async(std::launch::async, &Processor::run, multiplier);
                auto t3 = std::async(std::launch::async, &Consumer::run, printer);

                t1.get();
                t2.get();
                t3.get();
            };
            run();

        }


    };

Not this is the first time I am writing async code and I have a questions! Some are in code comments but here are some major ones:

  1. Is the general architecture of the code OK or should I change it? Do you see any obvious (or non-obvious) issues I may will run into using this pattern?
  2. The Queue I am using is a thread safe queue Is that an overkill? Should I just use simple std::queue for holding data packets?
  3. This may be more of a stackoverflow question, but how do I properly inherit from two different classes (NodesWithInput and Nodeswithoutput) so that I can have a vector of Nodes in Pipeline? (I shout 'how to fix...' in code comments for stuff that I haven't figure out yet.)

Here is the code for the Queue class along with test main function for runnable sample.

    class Queue {
    private:
        std::queue< std::unique_ptr<T> > _queue;
        std::mutex _mutex;
        //?? change this to a shared ptr 
        // add a shared ptr to condition and point 
        // node sync variables to connecting queue sync variables..
    public:
        Queue() = default;
        Queue(const Queue<T> &) = delete;
        Queue& operator=(const Queue<T>& ) =delete;

        Queue(Queue<T> && other) noexcept {
            std::lock_guard<std::mutex> lock(_mutex);
            _queue = std::move(other._queue);
        }

        virtual ~Queue(){}

        size_t size() const {
            std::lock_guard<std::mutex> lock(_mutex);
            return _queue.size();
        }

        bool empty() const {
            std::lock_guard<std::mutex> lock(_mutex);
            return _queue.empty();
        }


        std::unique_ptr<T> pop() {
            std::lock_guard<std::mutex> lock(_mutex);
            //check empty only when locked
            if(_queue.empty()){
                return nullptr;
            }
            auto front = std::move(_queue.front());
            _queue.pop();
            return front;
        }

        void push(const T& packet){
            std::lock_guard<std::mutex> lock(_mutex);
            _queue.push(std::make_unique<T>(packet));
        }
    };


    int main() {
        std::cout << "Hello, World!" << std::endl;
        Pipeline p;
        p.buildAndRun();
        std::cout << "Goodbye, World!" << std::endl;

        return 0;
    }
  1. Is there a way to simplify the Code for Nodes by somehow adding condition_variable to the Queue and using that to synchronize and signal all output nodes to stop wait? (I have some ideas for this in the comments of the Queue class.. but need to be sure about race conditions and blocking stuff...)
  2. Finally, a bit more open ended for the future.. I have to make the ``Queue` bounded at some point. What would be the most optimal way to handle this? The processing times of different nodes could be different and large queues could form at 'some' places but not others. Is it possible to have a global bound on total number of queued items rather than local bound on each queue?
\$\endgroup\$
1
  • 1
    \$\begingroup\$ Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers. \$\endgroup\$
    – Mast
    Commented May 6, 2023 at 16:44

2 Answers 2

3
\$\begingroup\$

General

  1. typedef is not needed in C++ for structs
  2. In C++, one tends to have public members first and then private members. similarly constructors and functions generally come before member variables.
  3. All classes with virtual functions must have a virtual destructor.
  4. It's considered bad practice to have protected members. It is also considered bad practice to have member variables in base classes that are pure virtual
  5. Avoid having _ as a prefix to before any names. Its against C++ guidelines.

Architecture

  1. Your hierarchies are too deep. There is very little to be gained from having a Node class. Also, it is the cause of the ambigous base error in Processor. Look up diamond dependency issue.
  2. The NodesWithOutput and NodesWithInput is leaking variables. Everytime, the getters are called a new shared_ptr is created. Looking at the usage you could have easily made it into a struct.
  3. Further there is no point having NodesWithOutput and NodesWithInput, why not just create a class called Pipe and let the Nodes have a shared_ptr to it?
  4. Do you intend to write a new Producer, Consumer and Processor class everytime you have to have a new way to produce, process or consume something? This is too much broilerplate code to achieve something mundane.
  5. Since the Queues are shared it which means that if you have multiple consumers then only one of the queue will get the packet.

Final word

There are many other issue. But the main issue is the architecture. It is needlessly complicated. I would start with something like this. Disclaimer: The code below is not not tested. It only serves as a rough sketch of how I would rearchitect the code

#include <memory>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>

template<typename T>
class Queue
{
public:
    T pop()
    {
        auto lock = std::unique_lock(mMutex);
        mConditionVariable.wait(lock, [this]{return !mQueue.empty();});    
        auto element = mQueue.front(); 
        mQueue.pop();
        return element;
    }

    void push(T&& t)
    {
        auto lock = std::unique_lock(mMutex);
        mQueue.push(t);
    }

    bool empty()
    {
        auto lock = std::unique_lock(mMutex);
        return true;
    }

private:
    std::condition_variable mConditionVariable;
    std::queue<T> mQueue;
    std::mutex mMutex;
};

template<typename T>
class Producer
{
public:

    void run()
    {
        while(1)
        {
            mOutputQueue->push(mProduce());
        }
    }

    void connect(std::shared_ptr<Queue<T>> q)
    {
        mOutputQueue = q;
    }

private:    
    std::function<T()> mProduce;
    std::shared_ptr<Queue<T>> mOutputQueue;
};

template<typename T>
class Processor
{
public:

    void run()
    {
        while(1)
        {
            mOutputQueue->push(
                mProcess(
                    mInputQueue.pop()
                )
            );
        }
    }

    void connectInput(std::shared_ptr<Queue<T>> q)
    {
        mInputQueue = q;
    }

    void connectOutput(std::shared_ptr<Queue<T>> q)
    {
        mOutputQueue = q;
    }

private:
    std::function<void(T)> mProcess;
    std::shared_ptr<Queue<T>> mInputQueue;
    std::shared_ptr<Queue<T>> mOutputQueue;
};

template<typename T>
class Consumer
{
public:

    void run()
    {
        while(1)
        {
            auto value = mInputQueue->pop();
            mConsume(std::move(value));
        }
    }

    void connectInput(std::shared_ptr<Queue<T>> q)
    {
        mInputQueue = q;
    }


private:
    std::function<void(T&)> mConsume;
    std::shared_ptr<Queue<T>> mInputQueue;
};

If you can refactor the code to look something like this and repost it. I can review it and make it robust. I suspect there are race conditions which we are unaware of but they will start to be apparent only after the code is readable.

\$\endgroup\$
2
  • 2
    \$\begingroup\$ You correctly point out several mistakes, like “All classes with virtual functions must have a virtual destructor.” However, it's quite possible the poster doesn't know why you should do that. It helps if you could add links to other StackExchange questions (like this one) or other online resources (like this one) that add some context. \$\endgroup\$
    – G. Sliepen
    Commented May 5, 2023 at 9:00
  • \$\begingroup\$ @G.Sliepen Yes. I forgot to give rationle for a lot of my suggestions \$\endgroup\$ Commented May 12, 2023 at 21:04
2
\$\begingroup\$

Architecture

I agree with Ajinkya Kamat that the architecture of your code is too complex and can be simplified. I would even go further than Ajinkya suggested, and consider not making classes for the producers, consumers and processors at all. Instead, those can be made stand-alone functions. You pass in any objects needed as arguments. So for example, a really simplified version of the consumer could look like:

template<typename T>
void run_consumer(std::shared_ptr<Queue<T>> q, std::function<void(T&)> consume)
{
    while (true)
    {
        consume(q->pop());
    }
}

You don't need std::shared_ptrs

While you can use std::shared_ptr<Queue<T>> to share queues between nodes, you don't actually need all the features std::shared_ptr provides. If you can create the Queue objects before creating the producers/processors/consumers, then you can just pass references to the queues to the latter. So for example:

Queue<int> randint_queue;
Queue<int> multiplied_queue;

{
    auto t1 = std::async(std::launch::async,
                         run_producer<int>, randint,
                         randint_queue);

    auto t2 = std::async(std::launch::async,
                         run_processor<int, int>, multiplier,
                         randint_queue, multiplied_queue);

    auto t3 = std::async(std::launch::async,
                         run_consumer<int>, printer,
                         multiplied_queue);
}

Where now something like run_consumer() looks like:

template<typename T>
void run_consumer(Queue<T>& queue, std::function<void(T&)> consume)
{
    while (true)
    {
        consume(queue.pop());
    }
}

And the actual functions that do something can look like:

void printer(const int& value) {
    std::cout << "Consumed: " << value << '\n';
}

Or even be passed in as lambdas.

Make building a pipeline easier

Your code and even my suggestions so far don't really make building the pipeline itself much easier. There is still the need to manually create nodes (in whatever form), and to connect those nodes together. Wouldn't it be much nicer if you could make a whole pipeline in one statement?

// With a variadic function:
auto pipeline = make_pipeline(randint, multiplier, printer);

// Or perhaps even using operator overloading:
auto pipeline = randint | multiplier | printer;

You can actually make something like the above. The result of make_pipeline() would be of a type that holds the Queues and std::futures returned by the std::async calls.

I am leaving this as an excercise for the reader, but the main point I want to make with this is that you should try to find ways to make building and using pipelines as easy as possible, and avoid the caller from having to create complicated classes.

Issues with Queue

There are some issues with your implementation of a thread-safe queue:

The move-constructor is unsafe

In the move-constructor, you lock the mutex of this, but that does not do anything useful. It would be much better to lock other._mutex, to prevent other threads from manipulating the queue while you are moving it. But that raises even more questions: what happens after other._queue has been moved from and the mutex is unlocked again? It's undefined what the other threads will see.

You could say that no other thread should use or even hold a reference to a Queue that is being moved from, but the compiler won't enforce that. It's much better to avoid this altogether and to delete the move-constructor.

Remove size() and empty()

This is a common mistake. While you do lock the mutex inside the size() and empty() member functions, consider that by the time the value is returned, the mutex is no longer locked, so the actual size can already have been changed before the caller looks at the return value. This means the caller cannot trust that value, and should not rely on it. I strongly recommend you remove these functions to avoid that issue.

That brings me to the following:

Let the Queue do the waiting

Instead of having callers check empty() and waiting until something is pushed before calling pop(), move the logic for waiting for the queue to be non-empty into Queue itself.

Add a way to signal that work is finished

Typically, a queue will have a producer and a consumer. At some point, the producer is finished. You then want the queue to be drained by the consumer, and the consumer to know when all the work has been done so it can quit itself. To do this, you need some way to tell the queue that it is done, and after the final item has been popped, it should be able to notify the consumer about this somehow.

One way would be to have a way to set a flag, and let pop() return a std::optional<T> which contains a T when there was still something in the queue, and std::nullopt if the queue is empty and the flag is set. This way, the consumer loop can simply look like:

void run_consumer(Queue<T>& queue, std::function<void(T&)> consume)
{
    while (auto item = queue.pop())
    {
        consume(*item);
    }
}

Think about error handling

What if something goes wrong somewhere in the pipeline? It could be at the beginning, in the middle or at the end. If the consumer dies, should the producer keep producing? The queues would grow and eventually run out of memory. You should have some way to detect errors, and have some way to propagate them so the whole pipeline will terminate.

\$\endgroup\$

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.