I am trying to write some code for processing streams of asynchronous data from multiple sources (Producer
Nodes
), process them using a multistage pipeline of Processors
Nodes
and then forward them to multiple Consumers
Nodes
. So the way I have thought of going about it is the follows: Each Node
has a run function which does stuff. A Consumer
node has an input
queue, Producer
node has an output
queue and the Processor
node has both. I then Construct a Pipeline
by 'connecting' input
and output
queues (by pointing input queue of next node to output queue of previous) of nodes and run
all nodes asynchronously. Each shared queue between nodes is synchronized by a mutex
and condition_variable
variable which are also 'connected'. Here is the code sample for int streams:
enum class PacketState{
START,
VALID,
CORRUPT,
END,
};
typedef struct packet{
int data;
PacketState state;
explicit packet(int x):data{x},state{PacketState::VALID}{};
packet(int d, PacketState s):data{d},state{s}{};
packet():data{0},state{PacketState::VALID}{};
} PacketType;
using OutputType = PacketType;
using InputType = OutputType;
class Node{
protected:
bool stayAlive = true;
public:
virtual void run() = 0;
};
class NodesWithOutput: public Node{
protected:
//declare output pointers
std::shared_ptr<Queue<OutputType>> output;
std::shared_ptr<std::condition_variable> cvout;
std::shared_ptr<std::mutex> m_out;
public:
NodesWithOutput(){
//setup output apparatus.....
output = std::make_shared< Queue<OutputType> >();
cvout = std::make_shared<std::condition_variable>();
m_out = std::make_shared<std::mutex>();
}
auto getOutputQueue(){return output;}
auto getOutputCondition(){return cvout;}
auto getOutputMutex(){return m_out;};
};
class NodesWithInput: public Node{
protected:
//declare input pointers
std::shared_ptr<Queue<InputType>> input;
std::shared_ptr<std::condition_variable> cvin;
std::shared_ptr<std::mutex> m_in;
};
//template<typename OutputType>
class Producer: public NodesWithOutput{
public:
Producer() = default;
void run() override {
int i = 6;
while (stayAlive) {
//generate stream of ints..
OutputType dataPacket{--i};
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// push to output queue
{
std::lock_guard<std::mutex> lock(*m_out);
output->push(dataPacket);
cvout->notify_all();
}
if(i <= 0) stayAlive = false;
}
OutputType dataPacket{-1,PacketState::END};
// push to output queue
{
std::lock_guard<std::mutex> lock(*m_out);
output->push(dataPacket);
}
// This is the right place to notify right?..
// **not** inside the guarded block
cvout->notify_all();
}
};
//template<typename InputType, typename OutputType>
// HOW DO I DO THIS PROPERLY?
class Processor:public NodesWithOutput, public NodesWithInput {
private:
int scale;
protected:
//HOW DO I FIX THIS?
//?? this doesn't seem right!
bool stayAlive = NodesWithOutput::stayAlive;
public:
Processor():scale{1}{}
void setScale(int s){ scale = s;}
void setInput(Producer& other){
input = other.getOutputQueue();
m_in = other.getOutputMutex();
cvin = other.getOutputCondition();
}
void run() override {
while(stayAlive) {
//wait for input
{
std::unique_lock<std::mutex> ulock(*m_in);
//Should I wait for notify from cvout of previous node?
//I mean this cvin will finally point to previous cvout!
cvin->wait(ulock, [this](){return !input->empty();});
}
//process input
{
std::unique_ptr<PacketType> ip;
//read from input queue
{
std::lock_guard<std::mutex> lock(*m_in);
ip = input->pop();
}
//process and
OutputType o;
o.state = ip->state;
if(ip->state != PacketState::END) {
o.data = (*ip).data * scale;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}else{
stayAlive = false;
}
//write to output queue
{
std::lock_guard<std::mutex> lock(*m_out);
output->push(o);
}
cvout->notify_all();
}
}
}
};
//template<typename InputType>
class Consumer: public NodesWithInput {
public:
Consumer() = default;
void setInput(Processor& other){
input = other.getOutputQueue();
m_in = other.getOutputMutex();
cvin = other.getOutputCondition();
}
void run() override {
while(stayAlive) {
//wait for data
{
std::unique_lock<std::mutex> ulock(*m_in);
cvin->wait(ulock, [this](){ return !input->empty();});
}
//Consume data data
std::unique_ptr<InputType> ip;
{
std::lock_guard<std::mutex> lock(*m_in);
ip = input->pop();
}
if(ip->state == PacketState::END) break;
//process data
std::cout << "Consumed: " << ip->data << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
};
class Pipeline {
private:
std::vector< Node* > nodes;
public:
Pipeline(){
nodes = std::vector< Node* >();
}
~Pipeline() = default;
void buildAndRun() {
// Node 1
Producer randInt;
//nodes.push_back(&randInt);
Processor multiplier;
//HOW DO IF FIX THIS?
//'Node' is an ambiguous base of 'Processor'
//nodes.push_back(&multiplier);
multiplier.setInput(randInt);
multiplier.setScale(2);
Consumer printer;
//nodes.push_back(&printer);
printer.setInput(multiplier);
//FIX move to separate function which operates on nodes vector....
auto run = [&](){
// Put these in a loop if nodes vector works out...
auto t1 = std::async(std::launch::async, &Producer::run, randInt);
auto t2 = std::async(std::launch::async, &Processor::run, multiplier);
auto t3 = std::async(std::launch::async, &Consumer::run, printer);
t1.get();
t2.get();
t3.get();
};
run();
}
};
Not this is the first time I am writing async
code and I have a questions! Some are in code comments but here are some major ones:
- Is the general architecture of the code OK or should I change it? Do you see any obvious (or non-obvious) issues I may will run into using this pattern?
- The
Queue
I am using is a thread safe queue Is that an overkill? Should I just use simple std::queue for holding data packets? - This may be more of a
stackoverflow
question, but how do I properly inherit from two different classes (NodesWithInput
andNodeswithoutput
) so that I can have avector
ofNode
s inPipeline
? (I shout 'how to fix...' in code comments for stuff that I haven't figure out yet.)
Here is the code for the Queue
class along with test main
function for runnable sample.
class Queue {
private:
std::queue< std::unique_ptr<T> > _queue;
std::mutex _mutex;
//?? change this to a shared ptr
// add a shared ptr to condition and point
// node sync variables to connecting queue sync variables..
public:
Queue() = default;
Queue(const Queue<T> &) = delete;
Queue& operator=(const Queue<T>& ) =delete;
Queue(Queue<T> && other) noexcept {
std::lock_guard<std::mutex> lock(_mutex);
_queue = std::move(other._queue);
}
virtual ~Queue(){}
size_t size() const {
std::lock_guard<std::mutex> lock(_mutex);
return _queue.size();
}
bool empty() const {
std::lock_guard<std::mutex> lock(_mutex);
return _queue.empty();
}
std::unique_ptr<T> pop() {
std::lock_guard<std::mutex> lock(_mutex);
//check empty only when locked
if(_queue.empty()){
return nullptr;
}
auto front = std::move(_queue.front());
_queue.pop();
return front;
}
void push(const T& packet){
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(std::make_unique<T>(packet));
}
};
int main() {
std::cout << "Hello, World!" << std::endl;
Pipeline p;
p.buildAndRun();
std::cout << "Goodbye, World!" << std::endl;
return 0;
}
- Is there a way to simplify the Code for
Nodes
by somehow addingcondition_variable
to theQueue
and using that to synchronize and signal all output nodes to stop wait? (I have some ideas for this in the comments of theQueue
class.. but need to be sure about race conditions and blocking stuff...) - Finally, a bit more open ended for the future.. I have to make the ``Queue` bounded at some point. What would be the most optimal way to handle this? The processing times of different nodes could be different and large queues could form at 'some' places but not others. Is it possible to have a global bound on total number of queued items rather than local bound on each queue?