Skip to main content
Became Hot Network Question
added 2 characters in body
Source Link
toolic
  • 11.2k
  • 4
  • 25
  • 159

#include <stdbool.h>
#include <stdint.h>
#include <limits.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <assert.h>


typedef struct producer producer_t;
typedef struct consumer consumer_t;

typedef unsigned int index_t;
typedef atomic_uint atomic_index_t;

#define INDEX_END UINT_MAX

#define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1))

#define ORIGIN_MASK CONSUMED_FLAG

#define INDEX_MASK (~ORIGIN_MASK)

typedef struct msgq {
    unsigned n;
    size_t msg_size;
    uintptr_t msgs_buffer;
    /* producer and consumer can change the tail
    *  the MSB shows who has last modified the tail */
    atomic_index_t *tail;
    /* head is only written by producer and only used
    * in consumer_get_head */
    atomic_index_t *head;
    /* circular queue for ordering the messages,
    * initialized simple as queue[i] = (i + 1) % n,
    * but due to overruns might get scrambled.
    * only producer can modify the queue */
    atomic_index_t *queue;
} msgq_t;


typedef struct producer {
    msgq_t msgq;
    index_t head; /* last message in chain that can be used by consumer, chain[head] is always INDEX_END */
    index_t current; /* message used by producer, will become head  */
    index_t overrun; /* message used by consumer when tail moved away by producer, will become current when released by consumer */
} producer_t;


typedef struct consumer {
    msgq_t msgq;
    index_t current;
} consumer_t;


static void* get_message(const msgq_t *msgq, index_t index)
{
    if (index >= msgq->n) {
        return NULL;
    }

    return (void*)(msgq->msgs_buffer + (index * msgq->msg_size));
}


static index_t get_next(msgq_t *msgq, index_t current)
{
    return atomic_load(&msgq->queue[current]);
}

/* set the current message as head */
static index_t append_msg(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    index_t next = get_next(msgq, producer->current);

    /* current message is the new end of chain*/
    atomic_store(&msgq->queue[producer->current], INDEX_END);

    if (producer->head == INDEX_END) {
        /* first message */
        atomic_store(msgq->tail, producer->current);
    } else {
        /* append current message to the chain */
        atomic_store(&msgq->queue[producer->head], producer->current);
    }

    producer->head = producer->current;

    /* announce the new head for consumer_get_head */
    atomic_store(msgq->head, producer->head);

    return next;
}


static bool producer_move_tail(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t next = get_next(msgq, tail & INDEX_MASK);

    return atomic_compare_exchange_weak(producer->msgq.tail, &tail, next);
}


/* try to jump over tail blocked by consumer */
static void producer_overrun(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t new_current = get_next(msgq, tail & INDEX_MASK); /* next */
    index_t new_tail = get_next(msgq, new_current); /* after next */

    /* if atomic_compare_exchange_weak fails expected will be overwritten */
    index_t expected = tail;

    if (atomic_compare_exchange_weak(producer->msgq.tail, &expected, new_tail)) {
        producer->current = new_current;
        producer->overrun = tail & INDEX_MASK;
    } else {
        /* consumer just released tail, so use it */
        producer->current = tail & INDEX_MASK;
    }
}

/* inserts the current message into the queue and
 * if the queue is full, discard the last message that is not
 * used by consumer. Returns pointer to new message */
void* producer_force_put(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    if (producer->current == INDEX_END) {
        producer->current = 0;
         return get_message(msgq, producer->current);
    }

    index_t next = append_msg(producer);

    index_t tail = atomic_load(msgq->tail);

    bool consumed = !!(tail & CONSUMED_FLAG);

    bool full = (next == (tail & INDEX_MASK));

    /* only for testing */
    index_t old_current = producer->current;

    if (producer->overrun != INDEX_END) {
        /* we overran the consumer and moved the tail, use overran message as
        * soon as the consumer releases it */
        if (consumed) {
            /* consumer released overrun message, so we can use it */
            /* requeue overrun */
            atomic_store(&msgq->queue[producer->overrun], next);

            producer->current = producer->overrun;
            producer->overrun = INDEX_END;

        } else {
            /* consumer still blocks overran message, move the tail again,
             * because the message queue is still full */
            if (producer_move_tail(producer, tail)) {
                producer->current = tail & INDEX_MASK;
            } else {
                /* consumer just released overrun message, so we can use it */
                /* requeue overrun */
                atomic_store(&msgq->queue[producer->overrun], next);

                producer->current = producer->overrun;
                producer->overrun = INDEX_END;
            }
        }
    } else {
        /* no previous overrun, use next or after next message */
        if (!full) {
            /* message queue not full, simply use next */
            producer->current = next;
        } else {
            if (!consumed) {
                /* message queue is full, but no message is consumed yet, so try to move tail */
                if (producer_move_tail(producer, tail)) {
                    producer->current = tail & INDEX_MASK;
                } else {
                   /* consumer just started and consumed tail
                      if consumer already moved on, we will use tail  */
                    producer_overrun(producer, tail | CONSUMED_FLAG);
                }
            } else {
                /* overrun the consumer, if the consumer keeps tail*/
                producer_overrun(producer, tail);
            }
        }
    }

    assert(old_current != producer->current);

    return get_message(msgq, producer->current);
}


void* consumer_get_head(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;

    for (;;) {
        index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

        if (tail == INDEX_END) {
            /* or CONSUMED_FLAG doesn't change INDEX_END*/
            return NULL;
        }

        index_t head = atomic_load(msgq->head);

        tail |= CONSUMED_FLAG;

        if (atomic_compare_exchange_weak(msgq->tail, &tail, head | CONSUMED_FLAG)) {
            /* only accept head if producer didn't move tail,
            *  otherwise the producer could fill the whole queue and the head could be the
            *  producers current message  */
            consumer->current = head;
            break;
        }
    }

    return get_message(msgq, consumer->current);
}


void* consumer_get_tail(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;
    index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

    if (tail == INDEX_END)
        return NULL;

    if (tail == (consumer->current | CONSUMED_FLAG)) {
        /* try to get next message */
        index_t next = get_next(msgq, consumer->current);

        if (next != INDEX_END) {
            if (atomic_compare_exchange_weak(msgq->tail, &tail, next | CONSUMED_FLAG)) {
                consumer->current = next;
            } else {
                /* producer just moved tail, use it */
                 consumer->current = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);
            }
        }
    } else {
        /* producer moved tail, use it*/
        consumer->current = tail;
    }

    if (consumer->current == INDEX_END) {
        /* nothing produced yet */
        return NULL;
    }

    return get_message(msgq, consumer->current);
}

static void msgq_init(msgq_t *msgq, msgq_shm_t *shm)
{
    *msgq = (msgq_t) {
        .n = shm->n,
        .msg_size = shm->msg_size,
        .tail = msgq_shm_get_tail(shm),
        .head = msgq_shm_get_head(shm),
        .queue = msgq_shm_get_list(shm),
        .msgs_buffer = msgq_shm_get_buffer(shm),
    };
}

producer_t *producer_new(msgq_shm_t *shm)
{
    producer_t *producer = malloc(sizeof(producer_t));

    if (!producer)
        return NULL;

    msgq_init(&producer->msgq, shm);

    producer->current = INDEX_END;
    producer->overrun = INDEX_END;
    producer->head = INDEX_END;

    return producer;
}

void producer_delete(producer_t *producer)
{
    free(producer);
}

consumer_t* consumer_new(msgq_shm_t *shm)
{
    consumer_t *consumer = malloc(sizeof(consumer_t));

    if (!consumer)
        return NULL;

    msgq_init(&consumer->msgq, shm);

    consumer->current = INDEX_END;

    return consumer;
}

void consumer_delete(consumer_t *consumer)
{
    free(consumer);
}
```

#include <stdbool.h>
#include <stdint.h>
#include <limits.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <assert.h>


typedef struct producer producer_t;
typedef struct consumer consumer_t;

typedef unsigned int index_t;
typedef atomic_uint atomic_index_t;

#define INDEX_END UINT_MAX

#define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1))

#define ORIGIN_MASK CONSUMED_FLAG

#define INDEX_MASK (~ORIGIN_MASK)

typedef struct msgq {
    unsigned n;
    size_t msg_size;
    uintptr_t msgs_buffer;
    /* producer and consumer can change the tail
    *  the MSB shows who has last modified the tail */
    atomic_index_t *tail;
    /* head is only written by producer and only used
    * in consumer_get_head */
    atomic_index_t *head;
    /* circular queue for ordering the messages,
    * initialized simple as queue[i] = (i + 1) % n,
    * but due to overruns might get scrambled.
    * only producer can modify the queue */
    atomic_index_t *queue;
} msgq_t;


typedef struct producer {
    msgq_t msgq;
    index_t head; /* last message in chain that can be used by consumer, chain[head] is always INDEX_END */
    index_t current; /* message used by producer, will become head  */
    index_t overrun; /* message used by consumer when tail moved away by producer, will become current when released by consumer */
} producer_t;


typedef struct consumer {
    msgq_t msgq;
    index_t current;
} consumer_t;


static void* get_message(const msgq_t *msgq, index_t index)
{
    if (index >= msgq->n) {
        return NULL;
    }

    return (void*)(msgq->msgs_buffer + (index * msgq->msg_size));
}


static index_t get_next(msgq_t *msgq, index_t current)
{
    return atomic_load(&msgq->queue[current]);
}

/* set the current message as head */
static index_t append_msg(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    index_t next = get_next(msgq, producer->current);

    /* current message is the new end of chain*/
    atomic_store(&msgq->queue[producer->current], INDEX_END);

    if (producer->head == INDEX_END) {
        /* first message */
        atomic_store(msgq->tail, producer->current);
    } else {
        /* append current message to the chain */
        atomic_store(&msgq->queue[producer->head], producer->current);
    }

    producer->head = producer->current;

    /* announce the new head for consumer_get_head */
    atomic_store(msgq->head, producer->head);

    return next;
}


static bool producer_move_tail(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t next = get_next(msgq, tail & INDEX_MASK);

    return atomic_compare_exchange_weak(producer->msgq.tail, &tail, next);
}


/* try to jump over tail blocked by consumer */
static void producer_overrun(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t new_current = get_next(msgq, tail & INDEX_MASK); /* next */
    index_t new_tail = get_next(msgq, new_current); /* after next */

    /* if atomic_compare_exchange_weak fails expected will be overwritten */
    index_t expected = tail;

    if (atomic_compare_exchange_weak(producer->msgq.tail, &expected, new_tail)) {
        producer->current = new_current;
        producer->overrun = tail & INDEX_MASK;
    } else {
        /* consumer just released tail, so use it */
        producer->current = tail & INDEX_MASK;
    }
}

/* inserts the current message into the queue and
 * if the queue is full, discard the last message that is not
 * used by consumer. Returns pointer to new message */
void* producer_force_put(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    if (producer->current == INDEX_END) {
        producer->current = 0;
         return get_message(msgq, producer->current);
    }

    index_t next = append_msg(producer);

    index_t tail = atomic_load(msgq->tail);

    bool consumed = !!(tail & CONSUMED_FLAG);

    bool full = (next == (tail & INDEX_MASK));

    /* only for testing */
    index_t old_current = producer->current;

    if (producer->overrun != INDEX_END) {
        /* we overran the consumer and moved the tail, use overran message as
        * soon as the consumer releases it */
        if (consumed) {
            /* consumer released overrun message, so we can use it */
            /* requeue overrun */
            atomic_store(&msgq->queue[producer->overrun], next);

            producer->current = producer->overrun;
            producer->overrun = INDEX_END;

        } else {
            /* consumer still blocks overran message, move the tail again,
             * because the message queue is still full */
            if (producer_move_tail(producer, tail)) {
                producer->current = tail & INDEX_MASK;
            } else {
                /* consumer just released overrun message, so we can use it */
                /* requeue overrun */
                atomic_store(&msgq->queue[producer->overrun], next);

                producer->current = producer->overrun;
                producer->overrun = INDEX_END;
            }
        }
    } else {
        /* no previous overrun, use next or after next message */
        if (!full) {
            /* message queue not full, simply use next */
            producer->current = next;
        } else {
            if (!consumed) {
                /* message queue is full, but no message is consumed yet, so try to move tail */
                if (producer_move_tail(producer, tail)) {
                    producer->current = tail & INDEX_MASK;
                } else {
                   /* consumer just started and consumed tail
                      if consumer already moved on, we will use tail  */
                    producer_overrun(producer, tail | CONSUMED_FLAG);
                }
            } else {
                /* overrun the consumer, if the consumer keeps tail*/
                producer_overrun(producer, tail);
            }
        }
    }

    assert(old_current != producer->current);

    return get_message(msgq, producer->current);
}


void* consumer_get_head(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;

    for (;;) {
        index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

        if (tail == INDEX_END) {
            /* or CONSUMED_FLAG doesn't change INDEX_END*/
            return NULL;
        }

        index_t head = atomic_load(msgq->head);

        tail |= CONSUMED_FLAG;

        if (atomic_compare_exchange_weak(msgq->tail, &tail, head | CONSUMED_FLAG)) {
            /* only accept head if producer didn't move tail,
            *  otherwise the producer could fill the whole queue and the head could be the
            *  producers current message  */
            consumer->current = head;
            break;
        }
    }

    return get_message(msgq, consumer->current);
}


void* consumer_get_tail(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;
    index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

    if (tail == INDEX_END)
        return NULL;

    if (tail == (consumer->current | CONSUMED_FLAG)) {
        /* try to get next message */
        index_t next = get_next(msgq, consumer->current);

        if (next != INDEX_END) {
            if (atomic_compare_exchange_weak(msgq->tail, &tail, next | CONSUMED_FLAG)) {
                consumer->current = next;
            } else {
                /* producer just moved tail, use it */
                 consumer->current = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);
            }
        }
    } else {
        /* producer moved tail, use it*/
        consumer->current = tail;
    }

    if (consumer->current == INDEX_END) {
        /* nothing produced yet */
        return NULL;
    }

    return get_message(msgq, consumer->current);
}

static void msgq_init(msgq_t *msgq, msgq_shm_t *shm)
{
    *msgq = (msgq_t) {
        .n = shm->n,
        .msg_size = shm->msg_size,
        .tail = msgq_shm_get_tail(shm),
        .head = msgq_shm_get_head(shm),
        .queue = msgq_shm_get_list(shm),
        .msgs_buffer = msgq_shm_get_buffer(shm),
    };
}

producer_t *producer_new(msgq_shm_t *shm)
{
    producer_t *producer = malloc(sizeof(producer_t));

    if (!producer)
        return NULL;

    msgq_init(&producer->msgq, shm);

    producer->current = INDEX_END;
    producer->overrun = INDEX_END;
    producer->head = INDEX_END;

    return producer;
}

void producer_delete(producer_t *producer)
{
    free(producer);
}

consumer_t* consumer_new(msgq_shm_t *shm)
{
    consumer_t *consumer = malloc(sizeof(consumer_t));

    if (!consumer)
        return NULL;

    msgq_init(&consumer->msgq, shm);

    consumer->current = INDEX_END;

    return consumer;
}

void consumer_delete(consumer_t *consumer)
{
    free(consumer);
}
```

#include <stdbool.h>
#include <stdint.h>
#include <limits.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <assert.h>


typedef struct producer producer_t;
typedef struct consumer consumer_t;

typedef unsigned int index_t;
typedef atomic_uint atomic_index_t;

#define INDEX_END UINT_MAX

#define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1))

#define ORIGIN_MASK CONSUMED_FLAG

#define INDEX_MASK (~ORIGIN_MASK)

typedef struct msgq {
    unsigned n;
    size_t msg_size;
    uintptr_t msgs_buffer;
    /* producer and consumer can change the tail
    *  the MSB shows who has last modified the tail */
    atomic_index_t *tail;
    /* head is only written by producer and only used
    * in consumer_get_head */
    atomic_index_t *head;
    /* circular queue for ordering the messages,
    * initialized simple as queue[i] = (i + 1) % n,
    * but due to overruns might get scrambled.
    * only producer can modify the queue */
    atomic_index_t *queue;
} msgq_t;


typedef struct producer {
    msgq_t msgq;
    index_t head; /* last message in chain that can be used by consumer, chain[head] is always INDEX_END */
    index_t current; /* message used by producer, will become head  */
    index_t overrun; /* message used by consumer when tail moved away by producer, will become current when released by consumer */
} producer_t;


typedef struct consumer {
    msgq_t msgq;
    index_t current;
} consumer_t;


static void* get_message(const msgq_t *msgq, index_t index)
{
    if (index >= msgq->n) {
        return NULL;
    }

    return (void*)(msgq->msgs_buffer + (index * msgq->msg_size));
}


static index_t get_next(msgq_t *msgq, index_t current)
{
    return atomic_load(&msgq->queue[current]);
}

/* set the current message as head */
static index_t append_msg(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    index_t next = get_next(msgq, producer->current);

    /* current message is the new end of chain*/
    atomic_store(&msgq->queue[producer->current], INDEX_END);

    if (producer->head == INDEX_END) {
        /* first message */
        atomic_store(msgq->tail, producer->current);
    } else {
        /* append current message to the chain */
        atomic_store(&msgq->queue[producer->head], producer->current);
    }

    producer->head = producer->current;

    /* announce the new head for consumer_get_head */
    atomic_store(msgq->head, producer->head);

    return next;
}


static bool producer_move_tail(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t next = get_next(msgq, tail & INDEX_MASK);

    return atomic_compare_exchange_weak(producer->msgq.tail, &tail, next);
}


/* try to jump over tail blocked by consumer */
static void producer_overrun(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t new_current = get_next(msgq, tail & INDEX_MASK); /* next */
    index_t new_tail = get_next(msgq, new_current); /* after next */

    /* if atomic_compare_exchange_weak fails expected will be overwritten */
    index_t expected = tail;

    if (atomic_compare_exchange_weak(producer->msgq.tail, &expected, new_tail)) {
        producer->current = new_current;
        producer->overrun = tail & INDEX_MASK;
    } else {
        /* consumer just released tail, so use it */
        producer->current = tail & INDEX_MASK;
    }
}

/* inserts the current message into the queue and
 * if the queue is full, discard the last message that is not
 * used by consumer. Returns pointer to new message */
void* producer_force_put(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    if (producer->current == INDEX_END) {
        producer->current = 0;
         return get_message(msgq, producer->current);
    }

    index_t next = append_msg(producer);

    index_t tail = atomic_load(msgq->tail);

    bool consumed = !!(tail & CONSUMED_FLAG);

    bool full = (next == (tail & INDEX_MASK));

    /* only for testing */
    index_t old_current = producer->current;

    if (producer->overrun != INDEX_END) {
        /* we overran the consumer and moved the tail, use overran message as
        * soon as the consumer releases it */
        if (consumed) {
            /* consumer released overrun message, so we can use it */
            /* requeue overrun */
            atomic_store(&msgq->queue[producer->overrun], next);

            producer->current = producer->overrun;
            producer->overrun = INDEX_END;

        } else {
            /* consumer still blocks overran message, move the tail again,
             * because the message queue is still full */
            if (producer_move_tail(producer, tail)) {
                producer->current = tail & INDEX_MASK;
            } else {
                /* consumer just released overrun message, so we can use it */
                /* requeue overrun */
                atomic_store(&msgq->queue[producer->overrun], next);

                producer->current = producer->overrun;
                producer->overrun = INDEX_END;
            }
        }
    } else {
        /* no previous overrun, use next or after next message */
        if (!full) {
            /* message queue not full, simply use next */
            producer->current = next;
        } else {
            if (!consumed) {
                /* message queue is full, but no message is consumed yet, so try to move tail */
                if (producer_move_tail(producer, tail)) {
                    producer->current = tail & INDEX_MASK;
                } else {
                   /* consumer just started and consumed tail
                      if consumer already moved on, we will use tail  */
                    producer_overrun(producer, tail | CONSUMED_FLAG);
                }
            } else {
                /* overrun the consumer, if the consumer keeps tail*/
                producer_overrun(producer, tail);
            }
        }
    }

    assert(old_current != producer->current);

    return get_message(msgq, producer->current);
}


void* consumer_get_head(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;

    for (;;) {
        index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

        if (tail == INDEX_END) {
            /* or CONSUMED_FLAG doesn't change INDEX_END*/
            return NULL;
        }

        index_t head = atomic_load(msgq->head);

        tail |= CONSUMED_FLAG;

        if (atomic_compare_exchange_weak(msgq->tail, &tail, head | CONSUMED_FLAG)) {
            /* only accept head if producer didn't move tail,
            *  otherwise the producer could fill the whole queue and the head could be the
            *  producers current message  */
            consumer->current = head;
            break;
        }
    }

    return get_message(msgq, consumer->current);
}


void* consumer_get_tail(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;
    index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

    if (tail == INDEX_END)
        return NULL;

    if (tail == (consumer->current | CONSUMED_FLAG)) {
        /* try to get next message */
        index_t next = get_next(msgq, consumer->current);

        if (next != INDEX_END) {
            if (atomic_compare_exchange_weak(msgq->tail, &tail, next | CONSUMED_FLAG)) {
                consumer->current = next;
            } else {
                /* producer just moved tail, use it */
                 consumer->current = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);
            }
        }
    } else {
        /* producer moved tail, use it*/
        consumer->current = tail;
    }

    if (consumer->current == INDEX_END) {
        /* nothing produced yet */
        return NULL;
    }

    return get_message(msgq, consumer->current);
}

static void msgq_init(msgq_t *msgq, msgq_shm_t *shm)
{
    *msgq = (msgq_t) {
        .n = shm->n,
        .msg_size = shm->msg_size,
        .tail = msgq_shm_get_tail(shm),
        .head = msgq_shm_get_head(shm),
        .queue = msgq_shm_get_list(shm),
        .msgs_buffer = msgq_shm_get_buffer(shm),
    };
}

producer_t *producer_new(msgq_shm_t *shm)
{
    producer_t *producer = malloc(sizeof(producer_t));

    if (!producer)
        return NULL;

    msgq_init(&producer->msgq, shm);

    producer->current = INDEX_END;
    producer->overrun = INDEX_END;
    producer->head = INDEX_END;

    return producer;
}

void producer_delete(producer_t *producer)
{
    free(producer);
}

consumer_t* consumer_new(msgq_shm_t *shm)
{
    consumer_t *consumer = malloc(sizeof(consumer_t));

    if (!consumer)
        return NULL;

    msgq_init(&consumer->msgq, shm);

    consumer->current = INDEX_END;

    return consumer;
}

void consumer_delete(consumer_t *consumer)
{
    free(consumer);
}
Source Link
mausys
  • 117
  • 3

Single Consumer Single Producer Wait-Free Zero-Copy Circular Message Queue

The title describes pretty well what the algorithm is for. It will be used for a realtime inter-process communication library that only uses shared memory for exchanging data. For realtime systems, it's usually more important to have the current data than a lossless communication and there it gets tricky. The function producer_force_put should discard the oldest message when the producer adds a new one and the queue is already full. This is just a proof of concept, so code style, variable names and API may change. I want to focus on the correctness of the algorithm and find ways to verify it. For testing I started to write unit tests and using fibers for some race conditions. Code and test can be found here:

https://github.com/mausys/message-queue.git

But testing alone is not enough because, the order of memory accesses depends on compiler and CPU. Are there other ways to verify the correctness of a wait-free algorithm?

Implementation:


#include <stdbool.h>
#include <stdint.h>
#include <limits.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <assert.h>


typedef struct producer producer_t;
typedef struct consumer consumer_t;

typedef unsigned int index_t;
typedef atomic_uint atomic_index_t;

#define INDEX_END UINT_MAX

#define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1))

#define ORIGIN_MASK CONSUMED_FLAG

#define INDEX_MASK (~ORIGIN_MASK)

typedef struct msgq {
    unsigned n;
    size_t msg_size;
    uintptr_t msgs_buffer;
    /* producer and consumer can change the tail
    *  the MSB shows who has last modified the tail */
    atomic_index_t *tail;
    /* head is only written by producer and only used
    * in consumer_get_head */
    atomic_index_t *head;
    /* circular queue for ordering the messages,
    * initialized simple as queue[i] = (i + 1) % n,
    * but due to overruns might get scrambled.
    * only producer can modify the queue */
    atomic_index_t *queue;
} msgq_t;


typedef struct producer {
    msgq_t msgq;
    index_t head; /* last message in chain that can be used by consumer, chain[head] is always INDEX_END */
    index_t current; /* message used by producer, will become head  */
    index_t overrun; /* message used by consumer when tail moved away by producer, will become current when released by consumer */
} producer_t;


typedef struct consumer {
    msgq_t msgq;
    index_t current;
} consumer_t;


static void* get_message(const msgq_t *msgq, index_t index)
{
    if (index >= msgq->n) {
        return NULL;
    }

    return (void*)(msgq->msgs_buffer + (index * msgq->msg_size));
}


static index_t get_next(msgq_t *msgq, index_t current)
{
    return atomic_load(&msgq->queue[current]);
}

/* set the current message as head */
static index_t append_msg(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    index_t next = get_next(msgq, producer->current);

    /* current message is the new end of chain*/
    atomic_store(&msgq->queue[producer->current], INDEX_END);

    if (producer->head == INDEX_END) {
        /* first message */
        atomic_store(msgq->tail, producer->current);
    } else {
        /* append current message to the chain */
        atomic_store(&msgq->queue[producer->head], producer->current);
    }

    producer->head = producer->current;

    /* announce the new head for consumer_get_head */
    atomic_store(msgq->head, producer->head);

    return next;
}


static bool producer_move_tail(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t next = get_next(msgq, tail & INDEX_MASK);

    return atomic_compare_exchange_weak(producer->msgq.tail, &tail, next);
}


/* try to jump over tail blocked by consumer */
static void producer_overrun(producer_t *producer, index_t tail)
{
    msgq_t *msgq = &producer->msgq;
    index_t new_current = get_next(msgq, tail & INDEX_MASK); /* next */
    index_t new_tail = get_next(msgq, new_current); /* after next */

    /* if atomic_compare_exchange_weak fails expected will be overwritten */
    index_t expected = tail;

    if (atomic_compare_exchange_weak(producer->msgq.tail, &expected, new_tail)) {
        producer->current = new_current;
        producer->overrun = tail & INDEX_MASK;
    } else {
        /* consumer just released tail, so use it */
        producer->current = tail & INDEX_MASK;
    }
}

/* inserts the current message into the queue and
 * if the queue is full, discard the last message that is not
 * used by consumer. Returns pointer to new message */
void* producer_force_put(producer_t *producer)
{
    msgq_t *msgq = &producer->msgq;

    if (producer->current == INDEX_END) {
        producer->current = 0;
         return get_message(msgq, producer->current);
    }

    index_t next = append_msg(producer);

    index_t tail = atomic_load(msgq->tail);

    bool consumed = !!(tail & CONSUMED_FLAG);

    bool full = (next == (tail & INDEX_MASK));

    /* only for testing */
    index_t old_current = producer->current;

    if (producer->overrun != INDEX_END) {
        /* we overran the consumer and moved the tail, use overran message as
        * soon as the consumer releases it */
        if (consumed) {
            /* consumer released overrun message, so we can use it */
            /* requeue overrun */
            atomic_store(&msgq->queue[producer->overrun], next);

            producer->current = producer->overrun;
            producer->overrun = INDEX_END;

        } else {
            /* consumer still blocks overran message, move the tail again,
             * because the message queue is still full */
            if (producer_move_tail(producer, tail)) {
                producer->current = tail & INDEX_MASK;
            } else {
                /* consumer just released overrun message, so we can use it */
                /* requeue overrun */
                atomic_store(&msgq->queue[producer->overrun], next);

                producer->current = producer->overrun;
                producer->overrun = INDEX_END;
            }
        }
    } else {
        /* no previous overrun, use next or after next message */
        if (!full) {
            /* message queue not full, simply use next */
            producer->current = next;
        } else {
            if (!consumed) {
                /* message queue is full, but no message is consumed yet, so try to move tail */
                if (producer_move_tail(producer, tail)) {
                    producer->current = tail & INDEX_MASK;
                } else {
                   /* consumer just started and consumed tail
                      if consumer already moved on, we will use tail  */
                    producer_overrun(producer, tail | CONSUMED_FLAG);
                }
            } else {
                /* overrun the consumer, if the consumer keeps tail*/
                producer_overrun(producer, tail);
            }
        }
    }

    assert(old_current != producer->current);

    return get_message(msgq, producer->current);
}


void* consumer_get_head(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;

    for (;;) {
        index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

        if (tail == INDEX_END) {
            /* or CONSUMED_FLAG doesn't change INDEX_END*/
            return NULL;
        }

        index_t head = atomic_load(msgq->head);

        tail |= CONSUMED_FLAG;

        if (atomic_compare_exchange_weak(msgq->tail, &tail, head | CONSUMED_FLAG)) {
            /* only accept head if producer didn't move tail,
            *  otherwise the producer could fill the whole queue and the head could be the
            *  producers current message  */
            consumer->current = head;
            break;
        }
    }

    return get_message(msgq, consumer->current);
}


void* consumer_get_tail(consumer_t *consumer)
{
    msgq_t *msgq = &consumer->msgq;
    index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);

    if (tail == INDEX_END)
        return NULL;

    if (tail == (consumer->current | CONSUMED_FLAG)) {
        /* try to get next message */
        index_t next = get_next(msgq, consumer->current);

        if (next != INDEX_END) {
            if (atomic_compare_exchange_weak(msgq->tail, &tail, next | CONSUMED_FLAG)) {
                consumer->current = next;
            } else {
                /* producer just moved tail, use it */
                 consumer->current = atomic_fetch_or(msgq->tail, CONSUMED_FLAG);
            }
        }
    } else {
        /* producer moved tail, use it*/
        consumer->current = tail;
    }

    if (consumer->current == INDEX_END) {
        /* nothing produced yet */
        return NULL;
    }

    return get_message(msgq, consumer->current);
}

static void msgq_init(msgq_t *msgq, msgq_shm_t *shm)
{
    *msgq = (msgq_t) {
        .n = shm->n,
        .msg_size = shm->msg_size,
        .tail = msgq_shm_get_tail(shm),
        .head = msgq_shm_get_head(shm),
        .queue = msgq_shm_get_list(shm),
        .msgs_buffer = msgq_shm_get_buffer(shm),
    };
}

producer_t *producer_new(msgq_shm_t *shm)
{
    producer_t *producer = malloc(sizeof(producer_t));

    if (!producer)
        return NULL;

    msgq_init(&producer->msgq, shm);

    producer->current = INDEX_END;
    producer->overrun = INDEX_END;
    producer->head = INDEX_END;

    return producer;
}

void producer_delete(producer_t *producer)
{
    free(producer);
}

consumer_t* consumer_new(msgq_shm_t *shm)
{
    consumer_t *consumer = malloc(sizeof(consumer_t));

    if (!consumer)
        return NULL;

    msgq_init(&consumer->msgq, shm);

    consumer->current = INDEX_END;

    return consumer;
}

void consumer_delete(consumer_t *consumer)
{
    free(consumer);
}
```