5
\$\begingroup\$

The code is for a single producer, single consumer scenario, where the consumer only cares about the latest information shared by the producer.

This is just a simple proof of concept created for linux. Synchronization between the processes is done with a single variable: xchg in the zcl3b_map_t struct. On update the consumer receives the last buffer from the producer and flags it as locked. The producer sees this flag while submitting the current buffer and chooses a third buffer for writing. The producer will switch between the non locked buffers until the lock flag reappears.

It looks good on my AMD Ryzen, but will it work on other CPU architectures? Maybe I missed something.

Based on this concept, I created a library: rtipc

#define _GNU_SOURCE

#include <stdatomic.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>

#include <sys/mman.h>


#if ATOMIC_INT_LOCK_FREE == 2
typedef atomic_uint xchg_t;
#elif ATOMIC_SHORT_LOCK_FREE == 2
typedef atomic_ushort xchg_t;
#elif ATOMIC_CHAR_LOCK_FREE == 2
typedef atomic_uchar xchg_t;
#else
#warning "no suitable always lockfree datatype found"
typedef atomic_uint xchg_t;
#endif


#define SHM_ALIGNTO 8

#define SHM_ALIGN(len) (((len) + SHM_ALIGNTO - 1) & ~(SHM_ALIGNTO - 1))

#define ZCL3B_LOCK_FLAG 0x80


typedef enum {
    BUFFER_0 = 0,
    BUFFER_1,
    BUFFER_2,
    BUFFER_NONE,
} zcl3b_idx_t;


typedef struct {
    void *mem;
    xchg_t *xchg;
    void *buffers[3];
    unsigned buf_size;
    unsigned shm_size;
} zcl3b_map_t;


typedef struct {
    zcl3b_map_t map;
    zcl3b_idx_t current;
    zcl3b_idx_t locked;
} zcl3b_producer_t;


typedef struct {
    zcl3b_map_t map;
    unsigned  updated;
    unsigned  idle;
    unsigned  missed;
} zcl3b_consumer_t;


typedef struct {
    uint32_t data[128];
} msg_t;


static size_t get_shm_size(size_t buffer_size)
{
    return SHM_ALIGN(sizeof(xchg_t)) + 3 * SHM_ALIGN(buffer_size);
}


static int create_shm(size_t buf_size)
{
    size_t size = get_shm_size(buf_size);

    int r = memfd_create("zcl3b", MFD_ALLOW_SEALING);

    if (r < 0)
        return -errno;

    int fd = r;

    r = ftruncate(fd, size);

    if (r < 0) {
        r = -errno;
        goto fail;
    }

    r = fcntl(fd, F_ADD_SEALS, F_SEAL_GROW | F_SEAL_SHRINK | F_SEAL_SEAL);

    if (r < 0) {
        r = -errno;
        goto fail;
    }

    return fd;

fail:
    close(fd);
    return r;
}


static int zcl3b_map_init(zcl3b_map_t *map, int fd, size_t buf_size)
{
    map->buf_size = SHM_ALIGN(buf_size);
    map->shm_size = get_shm_size(map->buf_size);

    map->mem = mmap(NULL, map->shm_size, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);

    if (map->mem == MAP_FAILED)
        return -errno;

    map->xchg = map->mem;

    void *offset = map->mem + SHM_ALIGN(sizeof(map->xchg));

    for (int i = 0; i < 3; i++)
        map->buffers[i] = offset + i * map->buf_size;

    atomic_store_explicit(map->xchg, BUFFER_NONE, memory_order_relaxed);

    return 0;
}


static void zcl3b_map_destroy(zcl3b_map_t *map)
{
    int r = munmap(map->mem, map->shm_size);

    if (r >= 0)
        *map = (zcl3b_map_t) { 0 };
}


static void* zcl3b_producer_update(zcl3b_producer_t *producer)
{
    unsigned old = atomic_exchange_explicit(producer->map.xchg, producer->current, memory_order_release);

    if (old & ZCL3B_LOCK_FLAG)
        producer->locked = (zcl3b_idx_t)(old & 0x3);

    producer->current = (producer->current + 1) % 3;

    if (producer->current == producer->locked)
        producer->current = (producer->current + 1) % 3;

    return producer->map.buffers[producer->current];
}


static void* zcl3b_consumer_update(zcl3b_consumer_t *consumer)
{
    unsigned old = atomic_fetch_or_explicit(consumer->map.xchg, ZCL3B_LOCK_FLAG, memory_order_consume);

    zcl3b_idx_t current = (zcl3b_idx_t)(old & 0x3);

    if (current == BUFFER_NONE)
        return NULL;

    return consumer->map.buffers[current];
}


#define NUM_CYCLES 10000000

void consumer_task(int fd)
{
    zcl3b_consumer_t consumer = { 0 };

    int r = zcl3b_map_init(&consumer.map, fd, sizeof(msg_t));

    if (r < 0) {
        printf("consumer: mmamp failed (%d)\n", r);
        return;
    }

    unsigned previous = 0;

    for (int cycle = 0; cycle < NUM_CYCLES; cycle++) {
        msg_t *msg = zcl3b_consumer_update(&consumer);

        if (msg == NULL) {
            usleep(1000);
            continue;
        }

        unsigned first = msg->data[0];
        int d = first - previous;

        if (d == 0) {
            consumer.idle++;
        } else if (d == 1) {
            consumer.updated++;
        } else if (d > 1) {
            consumer.missed += d - 1;
            consumer.updated++;
        } else if (d < 0) {
            printf("consumer: sync error first=%u previous=%u\n", first, previous);
        }

        previous = first;

        for (int i = 1; i < sizeof(msg->data) / sizeof(msg->data[0]); i++) {
            if (msg->data[i] != first) {
                printf("consumer: data error i=%u cycle=%u data=%u first=%u\n", i, cycle, msg->data[i], first);
                break;
            }
        }
    }
    zcl3b_map_destroy(&consumer.map);

    printf("consumer: idle=%u updated=%u missed=%u\n", consumer.idle, consumer.updated, consumer.missed);
}


static void producer_task(int fd)
{
    zcl3b_producer_t producer = { 0 };

    int r = zcl3b_map_init(&producer.map, fd, sizeof(msg_t));

    if (r < 0) {
        printf("producer: mmamp failed (%d)\n", r);
        return;
    }

    for (int cycle = 0; cycle < NUM_CYCLES ; cycle++) {
        msg_t *msg = zcl3b_producer_update(&producer);

        for (int i = 0; i < sizeof(msg->data) / sizeof(msg->data[0]); i++)
            msg->data[i] = cycle;
    }

    zcl3b_map_destroy(&producer.map);
}


int main()
{
    int fd = create_shm(sizeof(msg_t));

    pid_t pid = fork();

    if (pid == 0)
        consumer_task(fd);
    else
        producer_task(fd);

    close (fd);
    return 0;
}


\$\endgroup\$
1
  • 2
    \$\begingroup\$ mausys, why the - in -errno? How is return -errno; OK for a function that should return a file descriptor? \$\endgroup\$
    – chux
    Commented Jul 19, 2023 at 17:19

3 Answers 3

7
\$\begingroup\$

fork() lacks error-checking:

On success, the PID of the child process is returned in the parent, and 0 is returned in the child. On failure, -1 is returned in the parent, no child process is created, and errno is set to indicate the error.

 if (pid == 0) {
        consumer_task(fd);
 } else if (pid == -1) {      /* Add. */
        close(fd);
        return EXIT_FAILURE;  /* #include <stdlib.h> */
 } else
        producer_task(fd);
 }

Send error messages to stderr:

/* Spelling mistake? */
#if 0
    printf("producer: mmamp failed (%d)\n", r);
#else 
    fprintf (stderr, "Producer: mmap() failed (%d)\n", r);
#endif

We should use perror() to send error message to stderr, which has the advantage of producing a human-readable error message instead of the platform-specific value of -errno.

\$\endgroup\$
0
6
\$\begingroup\$

I think we should use more compiler warnings. With a sensibly strict GCC invocation, I get quite a few that are easy to fix:

gcc-13 -std=c17 -fPIC -gdwarf-4 -g -Wall -Wextra -Wwrite-strings -Wno-parentheses -Wpedantic -Warray-bounds -Wmissing-braces -Wconversion  -Wstrict-prototypes -fanalyzer       286140.c    -o 286140
286140.c: In function ‘get_shm_size’:
286140.c:29:51: warning: unsigned conversion from ‘int’ to ‘long unsigned int’ changes value from ‘-8’ to ‘18446744073709551608’ [-Wsign-conversion]
   29 | #define SHM_ALIGN(len) (((len) + SHM_ALIGNTO - 1) & ~(SHM_ALIGNTO - 1))
      |                                                   ^
286140.c:73:12: note: in expansion of macro ‘SHM_ALIGN’
   73 |     return SHM_ALIGN(sizeof(xchg_t)) + 3 * SHM_ALIGN(buffer_size);
      |            ^~~~~~~~~
286140.c:29:51: warning: unsigned conversion from ‘int’ to ‘size_t’ {aka ‘long unsigned int’} changes value from ‘-8’ to ‘18446744073709551608’ [-Wsign-conversion]
   29 | #define SHM_ALIGN(len) (((len) + SHM_ALIGNTO - 1) & ~(SHM_ALIGNTO - 1))
      |                                                   ^
286140.c:73:44: note: in expansion of macro ‘SHM_ALIGN’
   73 |     return SHM_ALIGN(sizeof(xchg_t)) + 3 * SHM_ALIGN(buffer_size);
      |                                            ^~~~~~~~~
286140.c: In function ‘create_shm’:
286140.c:88:23: warning: conversion to ‘__off_t’ {aka ‘long int’} from ‘size_t’ {aka ‘long unsigned int’} may change the sign of the result [-Wsign-conversion]
   88 |     r = ftruncate(fd, size);
      |                       ^~~~
286140.c: In function ‘zcl3b_map_init’:
286140.c:29:51: warning: unsigned conversion from ‘int’ to ‘size_t’ {aka ‘long unsigned int’} changes value from ‘-8’ to ‘18446744073709551608’ [-Wsign-conversion]
   29 | #define SHM_ALIGN(len) (((len) + SHM_ALIGNTO - 1) & ~(SHM_ALIGNTO - 1))
      |                                                   ^
286140.c:112:21: note: in expansion of macro ‘SHM_ALIGN’
  112 |     map->buf_size = SHM_ALIGN(buf_size);
      |                     ^~~~~~~~~
286140.c:29:24: warning: conversion from ‘size_t’ {aka ‘long unsigned int’} to ‘unsigned int’ may change value [-Wconversion]
   29 | #define SHM_ALIGN(len) (((len) + SHM_ALIGNTO - 1) & ~(SHM_ALIGNTO - 1))
      |                        ^
286140.c:112:21: note: in expansion of macro ‘SHM_ALIGN’
  112 |     map->buf_size = SHM_ALIGN(buf_size);
      |                     ^~~~~~~~~
286140.c:113:21: warning: conversion from ‘size_t’ {aka ‘long unsigned int’} to ‘unsigned int’ may change value [-Wconversion]
  113 |     map->shm_size = get_shm_size(map->buf_size);
      |                     ^~~~~~~~~~~~
286140.c:29:51: warning: unsigned conversion from ‘int’ to ‘long unsigned int’ changes value from ‘-8’ to ‘18446744073709551608’ [-Wsign-conversion]
   29 | #define SHM_ALIGN(len) (((len) + SHM_ALIGNTO - 1) & ~(SHM_ALIGNTO - 1))
      |                                                   ^
286140.c:122:31: note: in expansion of macro ‘SHM_ALIGN’
  122 |     void *offset = map->mem + SHM_ALIGN(sizeof(map->xchg));
      |                               ^~~~~~~~~
286140.c:122:29: warning: pointer of type ‘void *’ used in arithmetic [-Wpointer-arith]
  122 |     void *offset = map->mem + SHM_ALIGN(sizeof(map->xchg));
      |                             ^
286140.c:125:38: warning: conversion to ‘unsigned int’ from ‘int’ may change the sign of the result [-Wsign-conversion]
  125 |         map->buffers[i] = offset + i * map->buf_size;
      |                                      ^
286140.c:125:34: warning: pointer of type ‘void *’ used in arithmetic [-Wpointer-arith]
  125 |         map->buffers[i] = offset + i * map->buf_size;
      |                                  ^
286140.c: In function ‘consumer_task’:
286140.c:195:17: warning: conversion to ‘int’ from ‘unsigned int’ may change the sign of the result [-Wsign-conversion]
  195 |         int d = first - previous;
      |                 ^~~~~
286140.c:202:29: warning: conversion to ‘unsigned int’ from ‘int’ may change the sign of the result [-Wsign-conversion]
  202 |             consumer.missed += d - 1;
      |                             ^~
286140.c:210:27: warning: comparison of integer expressions of different signedness: ‘int’ and ‘long unsigned int’ [-Wsign-compare]
  210 |         for (int i = 1; i < sizeof(msg->data) / sizeof(msg->data[0]); i++) {
      |                           ^
286140.c: In function ‘producer_task’:
286140.c:237:27: warning: comparison of integer expressions of different signedness: ‘int’ and ‘long unsigned int’ [-Wsign-compare]
  237 |         for (int i = 0; i < sizeof(msg->data) / sizeof(msg->data[0]); i++)
      |                           ^
286140.c:238:28: warning: conversion to ‘uint32_t’ {aka ‘unsigned int’} from ‘int’ may change the sign of the result [-Wsign-conversion]
  238 |             msg->data[i] = cycle;
      |                            ^~~~~
286140.c: At top level:
286140.c:245:5: warning: function declaration isn’t a prototype [-Wstrict-prototypes]
  245 | int main()
      |     ^~~~

For example, simply changing the definition of SHM_ALIGNTO eliminates the first four of those:

#define SHM_ALIGNTO 8u
\$\endgroup\$
3
\$\begingroup\$

use braces

Even a single-line if body deserves { } braces.

Sooner or later, someone is going to maintain this code. It might not be you. They might fall into the trap you've laid for them.


cite your reference

You didn't mention any documentation URLs. It would really help the review context if you explain what a ZCL (zigbee cluster?) or a ZCL3B is. On first reading I did not get that this denotes a zero copy lock-free triple buffer, nor did google. In a double- or triple- buffering context, the zero copy aspect seems a bit redundant. Maybe shorten the identifier?


compound type

This just isn't enough:

#define ZCL3B_LOCK_FLAG 0x80

You really need to /* explain */ that you're adding 128 to what would otherwise be a valid buffer index. That is, we have {flag_bit, five_padding_zeros, index}. As written, I would expect this to go into a "flags byte". Alternatively, consider defining a 0x3 mask on the subsequent line, again with an eye on explaining the fields together in one place.

It is nice that you've grouped this with zcl3b_idx_t, thank you. I just had no idea they were deliberately grouped, upon my first reading.


magic buf len

typedef struct {
    uint32_t data[128];
} msg_t;
        ...
        for (int i = 1; i < sizeof(msg->data) / sizeof(msg->data[0]); i++) {
        ...
        for (int i = 0; i < sizeof(msg->data) / sizeof(msg->data[0]); i++)

The magic number 128 appears in three places, albeit two of them will be automatically dealt with if a maintainer e.g. doubles the buffer size.

Better to define a constant for the number of entries.


don't ignore errors

static int create_shm(size_t buf_size)
{
    ...
    int r = memfd_create("zcl3b", MFD_ALLOW_SEALING);
    if (r < 0)
        return -errno;
    ...
int main()
{
    int fd = create_shm(sizeof(msg_t));
    pid_t pid = fork();

and then eventually zcl3b_map_init will notice the negative file descriptor.

Prefer to deal with errors where they happen, rather than letting them cascade down the call stacks of two separate processes. The contracts of these functions would then be much simpler.

If we write bugs, we want to write shallow bugs that some poor maintainer down the road can easily diagnose.

The goto fail is very nice, it's a standard pattern.

Dealing with other failed sys calls, such as fork(), would also be good.


synchronous writes at init

    atomic_store_explicit(map->xchg, BUFFER_NONE, memory_order_relaxed);

This achieves atomic writes with relaxed_ordering where we probably wanted sequential consistency across processes. It only runs once per process so it would be hard to observe race lossage.


one more stat

        msg_t *msg = zcl3b_consumer_update(&consumer);

        if (msg == NULL) {
            usleep(1000);
            continue;
        }

In consumer_task, consider incrementing a separate statistic there. (I suppose you already have it, if you sum statistics and subtract that from NUM_CYCLES.)

Alternatively, consider writing a "wait for producer startup" loop that will only let us continue on to the main for loop once it has seen non-zero data from the producer.


torture test

The point of this code is to make a convincing argument that consumer never sees unlocked (inconsistent) data. So consider performing the verification reads according to some shuffled permuted order, rather than sequentially. Or more simply, do the reads in reverse order.

The producer may as well use memset, to go as fast as possible on whatever CPU technology we ran on.


This code achieves its design goals.

I would be willing to delegate or accept maintenance tasks on it.

\$\endgroup\$

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.