5
\$\begingroup\$

The title describes pretty well what the algorithm is for. It will be used for a realtime inter-process communication library that only uses shared memory for exchanging data. For realtime systems, it's usually more important to have the current data than a lossless communication and there it gets tricky. The function producer_force_put should discard the oldest message when the producer adds a new one and the queue is already full. This is just a proof of concept, so code style, variable names and API may change. I want to focus on the correctness of the algorithm and find ways to verify it. For testing I started to write unit tests and using fibers for some race conditions. Code and test can be found here:

https://github.com/mausys/message-queue.git

But testing alone is not enough because, the order of memory accesses depends on compiler and CPU. Are there other ways to verify the correctness of a wait-free algorithm?

Implementation:


#include <stdbool.h>
#include <stdint.h>
#include <limits.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <assert.h>


typedef struct producer producer_t;
typedef struct consumer consumer_t;

typedef unsigned int index_t;
typedef atomic_uint atomic_index_t;

#define INDEX_END UINT_MAX

#define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1))

#define ORIGIN_MASK CONSUMED_FLAG

#define INDEX_MASK (~ORIGIN_MASK)

typedef struct msgq {
    unsigned n;
    size_t msg_size;
    uintptr_t msgs_buffer;
    /* producer and consumer can change the tail
    *  the MSB shows who has last modified the tail */
    atomic_index_t *tail;
    /* head is only written by producer and only used
    * in consumer_get_head */
    atomic_index_t *head;
    /* circular queue for ordering the messages,
    * initialized simple as queue[i] = (i + 1) % n,
    * but due to overruns might get scrambled.
    * only producer can modify the queue */
    atomic_index_t *queue;
} msgq_t;


typedef struct producer {
    msgq_t msgq;
    index_t head; /* last message in chain that can be used by consumer, chain[head] is always INDEX_END */
    index_t current; /* message used by producer, will become head  */
    index_t overrun; /* message used by consumer when tail moved away by producer, will become current when released by consumer */
} producer_t;


typedef struct consumer {
    msgq_t msgq;
    index_t current;
} consumer_t;


static void* get_message(const msgq_t *msgq, index_t index)
{
    if (index >= msgq->n) {
        return NULL;
    }

    return (void*)(msgq->msgs_buffer + (index * msgq->msg_size));
}


static index_t get_next(msgq_t *msgq, index_t current)
{
    return atomic_load(&msgq->queue[current]);
}

/* set the current message as head */
static index_t append_msg(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    index_t next = get_next(msgq, producer->current);

    /* current message is the new end of chain*/
    atomic_store(&msgq->queue[producer->current], INDEX_END);

    if (producer->head == INDEX_END) {
        /* first message */
        atomic_store(msgq->tail, producer->current);
    } else {
        /* append current message to the chain */
        atomic_store(&msgq->queue[producer->head], producer->current);
    }

    producer->head = producer->current;

    /* announce the new head for consumer_get_head */
    atomic_store(msgq->head, producer->head);

    return next;
}


static bool producer_move_tail(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t next = get_next(msgq, tail & INDEX_MASK);

    return atomic_compare_exchange_weak(producer->msgq.tail, &tail, next);
}


/* try to jump over tail blocked by consumer */
static void producer_overrun(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t new_current = get_next(msgq, tail & INDEX_MASK); /* next */
    index_t new_tail = get_next(msgq, new_current); /* after next */

    /* if atomic_compare_exchange_weak fails expected will be overwritten */
    index_t expected = tail;

    if (atomic_compare_exchange_weak(producer->msgq.tail, &expected, new_tail)) {
        producer->current = new_current;
        producer->overrun = tail & INDEX_MASK;
    } else {
        /* consumer just released tail, so use it */
        producer->current = tail & INDEX_MASK;
    }
}

/* inserts the current message into the queue and
 * if the queue is full, discard the last message that is not
 * used by consumer. Returns pointer to new message */
void* producer_force_put(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    if (producer->current == INDEX_END) {
        producer->current = 0;
         return get_message(msgq, producer->current);
    }

    index_t next = append_msg(producer);

    index_t tail = atomic_load(msgq->tail);

    bool consumed = !!(tail & CONSUMED_FLAG);

    bool full = (next == (tail & INDEX_MASK));

    /* only for testing */
    index_t old_current = producer->current;

    if (producer->overrun != INDEX_END) {
        /* we overran the consumer and moved the tail, use overran message as
        * soon as the consumer releases it */
        if (consumed) {
            /* consumer released overrun message, so we can use it */
            /* requeue overrun */
            atomic_store(&msgq->queue[producer->overrun], next);

            producer->current = producer->overrun;
            producer->overrun = INDEX_END;

        } else {
            /* consumer still blocks overran message, move the tail again,
             * because the message queue is still full */
            if (producer_move_tail(producer, tail)) {
                producer->current = tail & INDEX_MASK;
            } else {
                /* consumer just released overrun message, so we can use it */
                /* requeue overrun */
                atomic_store(&msgq->queue[producer->overrun], next);

                producer->current = producer->overrun;
                producer->overrun = INDEX_END;
            }
        }
    } else {
        /* no previous overrun, use next or after next message */
        if (!full) {
            /* message queue not full, simply use next */
            producer->current = next;
        } else {
            if (!consumed) {
                /* message queue is full, but no message is consumed yet, so try to move tail */
                if (producer_move_tail(producer, tail)) {
                    producer->current = tail & INDEX_MASK;
                } else {
                   /* consumer just started and consumed tail
                      if consumer already moved on, we will use tail  */
                    producer_overrun(producer, tail | CONSUMED_FLAG);
                }
            } else {
                /* overrun the consumer, if the consumer keeps tail*/
                producer_overrun(producer, tail);
            }
        }
    }

    assert(old_current != producer->current);

    return get_message(msgq, producer->current);
}


void* consumer_get_head(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;

    for (;;) {
        index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

        if (tail == INDEX_END) {
            /* or CONSUMED_FLAG doesn't change INDEX_END*/
            return NULL;
        }

        index_t head = atomic_load(msgq->head);

        tail |= CONSUMED_FLAG;

        if (atomic_compare_exchange_weak(msgq->tail, &tail, head | CONSUMED_FLAG)) {
            /* only accept head if producer didn't move tail,
            *  otherwise the producer could fill the whole queue and the head could be the
            *  producers current message  */
            consumer->current = head;
            break;
        }
    }

    return get_message(msgq, consumer->current);
}


void* consumer_get_tail(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;
    index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

    if (tail == INDEX_END)
        return NULL;

    if (tail == (consumer->current | CONSUMED_FLAG)) {
        /* try to get next message */
        index_t next = get_next(msgq, consumer->current);

        if (next != INDEX_END) {
            if (atomic_compare_exchange_weak(msgq->tail, &tail, next | CONSUMED_FLAG)) {
                consumer->current = next;
            } else {
                /* producer just moved tail, use it */
                 consumer->current = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);
            }
        }
    } else {
        /* producer moved tail, use it*/
        consumer->current = tail;
    }

    if (consumer->current == INDEX_END) {
        /* nothing produced yet */
        return NULL;
    }

    return get_message(msgq, consumer->current);
}

static void msgq_init(msgq_t *msgq, msgq_shm_t *shm)
{
    *msgq = (msgq_t) {
        .n = shm->n,
        .msg_size = shm->msg_size,
        .tail = msgq_shm_get_tail(shm),
        .head = msgq_shm_get_head(shm),
        .queue = msgq_shm_get_list(shm),
        .msgs_buffer = msgq_shm_get_buffer(shm),
    };
}

producer_t *producer_new(msgq_shm_t *shm)
{
    producer_t *producer = malloc(sizeof(producer_t));

    if (!producer)
        return NULL;

    msgq_init(&producer->msgq, shm);

    producer->current = INDEX_END;
    producer->overrun = INDEX_END;
    producer->head = INDEX_END;

    return producer;
}

void producer_delete(producer_t *producer)
{
    free(producer);
}

consumer_t* consumer_new(msgq_shm_t *shm)
{
    consumer_t *consumer = malloc(sizeof(consumer_t));

    if (!consumer)
        return NULL;

    msgq_init(&consumer->msgq, shm);

    consumer->current = INDEX_END;

    return consumer;
}

void consumer_delete(consumer_t *consumer)
{
    free(consumer);
}
\$\endgroup\$
0

3 Answers 3

6
\$\begingroup\$

design document

We need to understand the constraints you're working within, so you should write them down. There are some comments in the implementation, which are helpful, but that's no substitute for presenting the use case and theory of operation at a high level.

In concurrency and in crypto, the usual rule of thumb is "don't reinvent the wheel", because likely you will do so badly. Better to exploit mature, well tested building blocks.

It would help to know the name of at least one RT OS this library is targeting. Producing an element with possible overwrite, due to slow consumer, is a pretty commonly desired feature. We see a core library implement it here, for example:
https://github.com/RIOT-OS/RIOT/blob/master/core/lib/include/ringbuffer.h#L68

If the libraries available to you are somehow unsuitable or lacking in features, it's important to write down those details. Otherwise a maintenance engineer might come along in a few months, survey the codebase, and conclude that it's simpler to rip out some code and replace it with calls to a well-tested publicly available library.

learning

If you deliberately wish to reinvent the wheel as a learning exercise, then state that explicitly. It makes a big difference to the engineering tradeoffs.

scheduling

If instead of OS scheduled threads we're relying on app scheduled fibers, well, that's a pretty important assumption to document. Presumably benchmark results would be one of the considerations informing such a design decision, but we don't see any performance discussion or observed timings in the codebase.

It's possible that this fiber library is mature, solid, and well tested. But there's scant evidence of that in the source file or associated documentation files.

It's not obvious to me that atomic_load() is a big win over protecting variables with CV or mutex.

extra identifier

nit: naming ORIGIN_MASK is perhaps not worth it. Consider just using a (~CONSUMED_FLAG) expression instead.

good comments

Thank you for including many very helpful comments in the code. Ideally we would see several datastructure invariants listed in the design document, and then the comments would echo those, assuring us that the invariants always hold.

I would also like to see a paragraph describing an overrun event. Given that producer will adjust head on each overrun, I don't think there's any interesting difference between single element overrun, multiple overrun, single wrap, or multiple wrap around events prior to consumer making its next request. But an example would help to put such concerns to rest.

statistics

Consider maintaining counters for overrun and wraparound events. An application might like to syslog them at hourly intervals, for example. Automated unit tests which deliberately consume "too slowly" could use the counters to verify that the desired rates had been achieved.

cache line

It appears to me that both of these go in the same cache line, which might bounce around from core to core to maintain cache coherency:

typedef struct msgq {
    ...
    atomic_index_t *tail;
    ...
    atomic_index_t *head;

Consider padding, so they're in distinct cache lines. Benchmarking results should inform any such changes.

If your production workload will include diverse threads / fibers besides just the producer and consumer, be sure to include such "distractor" threads in your benchmark workload, as well.

interleavings

other ways to verify the correctness

The SPIN model checker offers one approach for testing diverse interleavings of producer and consumer.

Just as there are "malloc debug" libraries, there are debug schedulers, that deliberately choose diverse interleavings during a test run. The fiber library you're currently linking against does not appear to support such a debug mode.

\$\endgroup\$
2
  • \$\begingroup\$ Thank you very much for your response. The algorithm is intended primary for a linux library. I was looking for an existing algorithm, but couldn't find one. The example you mentioned is a simple ringbuffer, but I'm looking for a message queue. The fiber library is neither mature nor solid, nor well tested. Its sole purpose is to test the algorithm. As I wrote, the library is for inter-process communication, so producer and consumer are running in different processes and all the atomic variables and messages are located in a shared memory and different cache lines. \$\endgroup\$
    – mausys
    Commented Apr 25 at 19:38
  • \$\begingroup\$ Thank you for the hint with SPIN. I will look into it. \$\endgroup\$
    – mausys
    Commented Apr 25 at 19:40
5
\$\begingroup\$

Be careful claiming it is wait-free

You claim your queue is wait-free, but you have a potentially unbounded for-loop doing a compare-exchange operation. At the very least, this means consumer_get_head()'s time complexity is actually \$O(N)\$. Sure, you are unlikely to hit that case, but the whole point of a wait-free algorithm is that you never have to wait.

Confusing API

The API is very confusing. As you mention in the comments, the producer already has to know the pointer to the first free element, and then producer_force_put() will mark that element as produced and returns a pointer to the next free element. And something similar is going on with the consumer functions. For me, that was very unexpected. It also means the caller now shares some of the bookkeeping.

I would rather have a function producer_get_head() that returns a pointer to the first free element, and a producer_force_get_head() to force it to free up an element if there is none, and then a producer_put() that returns void and that merely marks the element as produced and increments the head index.

\$\endgroup\$
1
  • 1
    \$\begingroup\$ Thank you for your response. That's not how the algorithm works, producer_force_put adds a message to the queue which pointers was retrieved on the previous call. The pointer returned is for the empty spot. consumer_get_tail has no for loop and consumer_get_head can only be stuck in the loop if the producer keeps creating messages very fast with high scheduling priority, but then something is already wrong in the system. \$\endgroup\$
    – mausys
    Commented Apr 26 at 10:14
2
\$\begingroup\$

Just a little review.

... code style, variable names and API may change.
(OP)

Array indexing

Code has typedef unsigned int index_t;, which I take as later code may use:

typedef size_t             index_t; // or
typedef unsigned long      index_t; // or
typedef uint32_t           index_t; // or
typedef unsigned char      index_t; // or
typedef some_unsigned_type index_t; // or
...

Given usage as an array index, size_t makes the most sense as it is the Goldilocks type for general array indexing, not too narrow nor too wide.


Consider maintenance

Yet look how code may break if that type definition is changed:

// What if we change this line?
// typedef unsigned int index_t;
typedef some_updated_unsigned_type index_t;

// This line needs updating
#define INDEX_END UINT_MAX

// So does this
#define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1))

// Maybe this as ~ORIGIN_MASK is extra wide if `index_t` was narrower than `unsigned`.
#define INDEX_MASK (~ORIGIN_MASK)

// And this
if (index >= msgq->n) {

// More ??

Sometimes, that is life, a type changed involves re-assessing many lines of code.

Yet how to reduce the impact?

INDEX_END (which I think should be called INDEX_MAX) can use ((index_t)-1) as that computation yields the correct value as the unsigned type definition of index_t changes.

//#define INDEX_END UINT_MAX
#define INDEX_END ((index_t)-1)

CONSUMED_FLAG is a little tricky:

//#define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1))
#define CONSUMED_FLAG (INDEX_END - INDEX_END/2)

... or #define CONSUMED_FLAG ((index_t)1 << (IMAX_BITS(INDEX_END) - 1))

Use a consistent type:

typedef struct msgq {
    // unsigned n;
    index_t n;  // <-----------
    ...

    if (index >= msgq->n) {
        return NULL;
    }

Some code checkers like MISRA may whine about casting, so we could use other techniques.

Or add comments that describe how index_t impacts INDEX_END and CONSUMED_FLAG.

\$\endgroup\$
3
  • \$\begingroup\$ index_t needs to be the same size as atomic_uint. INDEX_END is used to mark the end of the queue and also for an invalid index. \$\endgroup\$
    – mausys
    Commented yesterday
  • \$\begingroup\$ @mausys "index_t needs to be the same size as atomic_uint." --> OK. Why then even define typedef unsigned int index_t; and typedef atomic_uint atomic_index_t; and instead use unsigned and atomic_uint? Perhaps that rational needs to be commented in code, else those typedefs look like alias abstractions for potential future change - and hence the usefulness of using code that readily adjusts to change. \$\endgroup\$
    – chux
    Commented yesterday
  • \$\begingroup\$ Yes, in the future those definition will depend on ATOMIC_INT_LOCK_FREE == 2 (The atomic type is always lock-free). But as I wrote, this is just a proof of concept. I need first a verification, that this algorithm works, before I start porting it to different CPU architectures. \$\endgroup\$
    – mausys
    Commented yesterday

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.