The code is for a single producer, single consumer scenario, where the consumer only cares about the latest information shared by the producer.
This is just a simple proof of concept created for linux. Synchronization between the processes is done with a single variable: xchg in the zcl3b_map_t struct. On update the consumer receives the last buffer from the producer and flags it as locked. The producer sees this flag while submitting the current buffer and chooses a third buffer for writing. The producer will switch between the non locked buffers until the lock flag reappears.
It looks good on my AMD Ryzen, but will it work on other CPU architectures? Maybe I missed something.
Based on this concept, I created a library: rtipc
#define _GNU_SOURCE
#include <stdatomic.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#if ATOMIC_INT_LOCK_FREE == 2
typedef atomic_uint xchg_t;
#elif ATOMIC_SHORT_LOCK_FREE == 2
typedef atomic_ushort xchg_t;
#elif ATOMIC_CHAR_LOCK_FREE == 2
typedef atomic_uchar xchg_t;
#else
#warning "no suitable always lockfree datatype found"
typedef atomic_uint xchg_t;
#endif
#define SHM_ALIGNTO 8
#define SHM_ALIGN(len) (((len) + SHM_ALIGNTO - 1) & ~(SHM_ALIGNTO - 1))
#define ZCL3B_LOCK_FLAG 0x80
typedef enum {
BUFFER_0 = 0,
BUFFER_1,
BUFFER_2,
BUFFER_NONE,
} zcl3b_idx_t;
typedef struct {
void *mem;
xchg_t *xchg;
void *buffers[3];
unsigned buf_size;
unsigned shm_size;
} zcl3b_map_t;
typedef struct {
zcl3b_map_t map;
zcl3b_idx_t current;
zcl3b_idx_t locked;
} zcl3b_producer_t;
typedef struct {
zcl3b_map_t map;
unsigned updated;
unsigned idle;
unsigned missed;
} zcl3b_consumer_t;
typedef struct {
uint32_t data[128];
} msg_t;
static size_t get_shm_size(size_t buffer_size)
{
return SHM_ALIGN(sizeof(xchg_t)) + 3 * SHM_ALIGN(buffer_size);
}
static int create_shm(size_t buf_size)
{
size_t size = get_shm_size(buf_size);
int r = memfd_create("zcl3b", MFD_ALLOW_SEALING);
if (r < 0)
return -errno;
int fd = r;
r = ftruncate(fd, size);
if (r < 0) {
r = -errno;
goto fail;
}
r = fcntl(fd, F_ADD_SEALS, F_SEAL_GROW | F_SEAL_SHRINK | F_SEAL_SEAL);
if (r < 0) {
r = -errno;
goto fail;
}
return fd;
fail:
close(fd);
return r;
}
static int zcl3b_map_init(zcl3b_map_t *map, int fd, size_t buf_size)
{
map->buf_size = SHM_ALIGN(buf_size);
map->shm_size = get_shm_size(map->buf_size);
map->mem = mmap(NULL, map->shm_size, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
if (map->mem == MAP_FAILED)
return -errno;
map->xchg = map->mem;
void *offset = map->mem + SHM_ALIGN(sizeof(map->xchg));
for (int i = 0; i < 3; i++)
map->buffers[i] = offset + i * map->buf_size;
atomic_store_explicit(map->xchg, BUFFER_NONE, memory_order_relaxed);
return 0;
}
static void zcl3b_map_destroy(zcl3b_map_t *map)
{
int r = munmap(map->mem, map->shm_size);
if (r >= 0)
*map = (zcl3b_map_t) { 0 };
}
static void* zcl3b_producer_update(zcl3b_producer_t *producer)
{
unsigned old = atomic_exchange_explicit(producer->map.xchg, producer->current, memory_order_release);
if (old & ZCL3B_LOCK_FLAG)
producer->locked = (zcl3b_idx_t)(old & 0x3);
producer->current = (producer->current + 1) % 3;
if (producer->current == producer->locked)
producer->current = (producer->current + 1) % 3;
return producer->map.buffers[producer->current];
}
static void* zcl3b_consumer_update(zcl3b_consumer_t *consumer)
{
unsigned old = atomic_fetch_or_explicit(consumer->map.xchg, ZCL3B_LOCK_FLAG, memory_order_consume);
zcl3b_idx_t current = (zcl3b_idx_t)(old & 0x3);
if (current == BUFFER_NONE)
return NULL;
return consumer->map.buffers[current];
}
#define NUM_CYCLES 10000000
void consumer_task(int fd)
{
zcl3b_consumer_t consumer = { 0 };
int r = zcl3b_map_init(&consumer.map, fd, sizeof(msg_t));
if (r < 0) {
printf("consumer: mmamp failed (%d)\n", r);
return;
}
unsigned previous = 0;
for (int cycle = 0; cycle < NUM_CYCLES; cycle++) {
msg_t *msg = zcl3b_consumer_update(&consumer);
if (msg == NULL) {
usleep(1000);
continue;
}
unsigned first = msg->data[0];
int d = first - previous;
if (d == 0) {
consumer.idle++;
} else if (d == 1) {
consumer.updated++;
} else if (d > 1) {
consumer.missed += d - 1;
consumer.updated++;
} else if (d < 0) {
printf("consumer: sync error first=%u previous=%u\n", first, previous);
}
previous = first;
for (int i = 1; i < sizeof(msg->data) / sizeof(msg->data[0]); i++) {
if (msg->data[i] != first) {
printf("consumer: data error i=%u cycle=%u data=%u first=%u\n", i, cycle, msg->data[i], first);
break;
}
}
}
zcl3b_map_destroy(&consumer.map);
printf("consumer: idle=%u updated=%u missed=%u\n", consumer.idle, consumer.updated, consumer.missed);
}
static void producer_task(int fd)
{
zcl3b_producer_t producer = { 0 };
int r = zcl3b_map_init(&producer.map, fd, sizeof(msg_t));
if (r < 0) {
printf("producer: mmamp failed (%d)\n", r);
return;
}
for (int cycle = 0; cycle < NUM_CYCLES ; cycle++) {
msg_t *msg = zcl3b_producer_update(&producer);
for (int i = 0; i < sizeof(msg->data) / sizeof(msg->data[0]); i++)
msg->data[i] = cycle;
}
zcl3b_map_destroy(&producer.map);
}
int main()
{
int fd = create_shm(sizeof(msg_t));
pid_t pid = fork();
if (pid == 0)
consumer_task(fd);
else
producer_task(fd);
close (fd);
return 0;
}
-
in-errno
? How isreturn -errno;
OK for a function that should return a file descriptor? \$\endgroup\$