I'm a newbie in C, and currently following Stanford CS107 - Programming Paradigms. For assignment 6, I find it'd be better to isolate the threads management from the service logic.
The following code is a producer-consumer pattern using semaphore to limit the maximum number of threads. recycle_threads() itself is a thread, which automatically waits for previous threads to exit and free the resources. Users can call create_thread() function whenever they want to run a function in a new thread.
Feel free to leave any comment, about the coding style, design, or utility. Especially, I want to know for what purpose, a real-life thread pool is designed, and what's the feature it must realize.
threads_pool.h
#ifndef _THREADS_POOL_
#define _THREADS_POOL_
#define THREAD_POOL_INIT_OK 0
#define THREAD_POOL_INIT_ERR 1
typedef void *(*thread_fn)(void *arg);
/**
* Create a new thread pool
*/
int threads_pool_init(unsigned size);
/**
* Apply a new thread in threads pool.
*/
void create_thread(thread_fn thd_fn, void *arg);
/**
* Notice no more threads to be created.
* Threads pool can start to recycle and destroy resources.
*/
void threads_pool_close(void);
#endif // _THREADS_POOL_
threads_pool.c
#include "threads_pool.h"
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
/* threads pool */
static pthread_t *pool;
static unsigned pool_size;
static unsigned create_p;
static unsigned recycle_p;
/* semaphore */
static sem_t *create_sem;
static sem_t *recycle_sem;
static const char *kThreadPoolCreateSem = "/create_sem";
static const char *kThreadPoolRecycleSem = "/recycle_sem";
/* recycle thread id */
static pthread_t recycle_tid;
/* signal recycle thread to exit */
static volatile sig_atomic_t recycle_sig;
#define RECYCLE_RUN 0
#define RECYCLE_TO_EXIT 1
/**
* Producer of producer-consumer pattern.
*/
void create_thread(thread_fn thd_fn, void *arg) {
sem_wait(create_sem);
pthread_create(pool + create_p++ % pool_size, NULL, thd_fn, arg);
sem_post(recycle_sem);
}
/**
* Consumer of producer-consumer pattern.
* Automatically recycle exited threads.
*/
static void *recycle_threads(void *arg) {
while (RECYCLE_RUN == recycle_sig) {
sem_wait(recycle_sem);
pthread_join(*(pool + recycle_p++ % pool_size), NULL);
sem_post(create_sem);
}
while (recycle_p < create_p) {
sem_wait(recycle_sem);
pthread_join(*(pool + recycle_p++ % pool_size), NULL);
sem_post(create_sem);
}
pthread_exit(NULL);
}
/**
* Create the threads pool, and run recycle_thread() thread.
*/
int threads_pool_init(unsigned size) {
pool = malloc(size * sizeof *pool);
pool_size = size;
create_p = 0;
recycle_p = 0;
create_sem = sem_open(kThreadPoolCreateSem, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, size);
recycle_sem = sem_open(kThreadPoolRecycleSem, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, 0);
recycle_sig = RECYCLE_RUN;
int code = pthread_create(&recycle_tid, NULL, recycle_threads, NULL);
if (0 == code) return THREAD_POOL_INIT_OK;
return THREAD_POOL_INIT_ERR;
}
/**
* Dispose resources
*/
static void threads_pool_destroy(void) {
free(pool);
sem_unlink(kThreadPoolCreateSem);
sem_unlink(kThreadPoolRecycleSem);
}
/**
* Signal recycle_threads() to exit, and wait it to finish it's job.
*/
void threads_pool_close(void) {
recycle_sig = RECYCLE_TO_EXIT;
pthread_join(recycle_tid,NULL);
threads_pool_destroy();
}
A simple use-case, simulating tickets selling: agents_tickets.c
/**
* Using semaphore to limit maximum thread number.
* https://stackoverflow.com/questions/66404929/always-unlink-the-posix-named-semaphore-in-shared-memory?noredirect=1
*/
#include "threads_pool.h"
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
typedef struct {
unsigned agent_id; // simulate an agent
unsigned tickets_tosell; // agent's personal goal of the day
unsigned *tickets_pool; // shared tickets pool
pthread_mutex_t *pool_lock; // mutex lock for visiting the shared tickets pool
} agent;
/**
* Constructor
*/
static void new_agent(agent *a, unsigned agentid, unsigned tickets_num, unsigned *pool, pthread_mutex_t *lock) {
a->agent_id = agentid;
a->tickets_tosell = tickets_num;
a->tickets_pool = pool;
a->pool_lock = lock;
}
/**
* Implement void *(*start_rtn)(void *);
* -------------------------------------
* Each thread execute this function.
*/
static void *sell_tickets(void *agent_addr) {
agent *a = (agent *)agent_addr;
while (a->tickets_tosell > 0) {
pthread_mutex_lock(a->pool_lock); // begin of race condition
(*a->tickets_pool)--;
fprintf(stdout, "agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
fflush(stdout);
pthread_mutex_unlock(a->pool_lock); // end of race condition
a->tickets_tosell--;
fprintf(stdout, "agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
fflush(stdout);
}
pthread_exit((void *)&a->agent_id);
}
typedef struct {
unsigned num_agents;
unsigned num_tickets;
} project;
void run(project *p) {
unsigned tickets_pool;
pthread_mutex_t lock;
agent agents[p->num_agents];
unsigned id;
tickets_pool = p->num_tickets; // shared resource
pthread_mutex_init(&lock, NULL);
threads_pool_init(10);
for (int i = 0; i < p->num_agents; i++) {
id = i + 1;
new_agent(&agents[i], id, p->num_tickets / p->num_agents, &tickets_pool, &lock);
create_thread(sell_tickets, &agents[i]);
}
threads_pool_close();
pthread_mutex_destroy(&lock);
}
int main(void) {
project p;
p.num_agents = 30;
p.num_tickets = 300;
run(&p);
}