I am working on a concurrency system based on message queues with an underlying thread pool. A queue can target other queues. Also a queue can be targeted by multiple queues (i.e. children) to create a tree hierarchy and to prioritise some of the queues.
I want to avoid signalling threads each time a message is queued while the queue (and its children) is (are) already being drained.
So the queue knows if it's sleeping or running. Transition from sleeping to running is easy. That can be done when a new message is queued.
I am more concerned about the other way around. How can I avoid missing messages/events when I transition from running to sleeping? To clarify: this is not an atomic step. The queue also can't go to sleep when its children still need attention.
At the moment in the sleep function I set the state to sleeping, then check if the current queue or its children have any pending events that were missed. If so, I wake the queue back up. But doesn't that potentially cause an infinite recursion of awake - sleep - awake - sleep - ... ?
How can I avoid this, or what's the proper way to setup such a queue system?
In pseudo code:
atomic state = sleeping
target = anotherQueue
children = queues that target this queue
func Async(event)
{
get lock
push event onto queue (i.e. array/fifo/...)
release lock
this.wakeup()
}
func wakeup()
{
If (state == running) { return }
state = running
!target ? drain() : target.wakeup()
}
func drain()
{
while (event = next() )
{
event.run()
}
sleep()
}
func sleep()
{
state = sleeping
*** what if an event is posted during the transition from running to sleeping? ***
//check if events have been missed
If (event.count > 0)
{
this.wakeup()
}
//check children: don't sleep if they have pending events
If (children.haveEvents) { this.wakeup }
}
func next()
{
get lock
nextEvent = pop event from queue
release lock
If (!nextEvent)
{
//check each child queue for an event
children.nextEvent
}
return nextEvent
}
while(true) {drain();}and thensleepdoesn't need to callwakeup, it can just return without sleeping. To avoid the potential deadlock I'd use a condition variable forstate.