MPMC queues implementation

Now we have 3 different API compatible MPMC queues that we can swap with
swapping the header.
mt_mpmc.h, a blocking queue that uses a mutex/critical section.
lf_mpmc.h, a lock free queue that uses atomics.
sm_mpmc.h, a hybrid queue that uses atomics and a semaphore to block
when the queue is empty.
In this program, for max performance it is recommanded to use sm_mpmc.h
or mt_mpmc.h, they are designed to avoid busy waiting which frees more
CPU time to do useful work.
This commit is contained in:
2026-05-04 14:06:48 +01:00
parent 759fdfda1e
commit b8104b0fc7
3 changed files with 395 additions and 80 deletions

View File

@@ -2,7 +2,6 @@
#include "base.h"
// Cache align abstraction
#define CACHELINE 64
#if defined(_MSC_VER)
@@ -11,7 +10,6 @@
#define CACHE_ALIGN __attribute__((aligned(CACHELINE)))
#endif
// Compiler hints
#if defined(__GNUC__) || defined(__clang__)
#define likely(x) __builtin_expect((x), 1)
#define unlikely(x) __builtin_expect((x), 0)
@@ -25,68 +23,6 @@ static void cpu_pause(void) {
_mm_pause();
#endif
}
// Semaphores
#if defined(_WIN32) || defined(_WIN64)
typedef struct plat_sem {
HANDLE handle;
} plat_sem;
static b32 plat_sem_init(plat_sem *s, u32 initial) {
s->handle = CreateSemaphore(NULL, initial, LONG_MAX, NULL);
return s->handle != NULL;
}
static void plat_sem_wait(plat_sem *s) {
WaitForSingleObject(s->handle, INFINITE);
}
// static b32 plat_sem_trywait(HANDLE sem) { // Comment to prevent warning: unused function
// DWORD r = WaitForSingleObject(sem, 0);
// return r == WAIT_OBJECT_0;
// }
static void plat_sem_post(plat_sem *s, u32 count) {
ReleaseSemaphore(s->handle, count, NULL);
}
// static void plat_sem_destroy(plat_sem *s) { // Comment to prevent warning: unused function
// if (s->handle) {
// CloseHandle(s->handle);
// s->handle = NULL;
// }
// }
#elif defined(__linux__)
#include <semaphore.h>
typedef struct plat_sem {
sem_t sem;
} plat_sem;
static b32 plat_sem_init(plat_sem *s, u32 initial) {
return sem_init(&s->sem, 0, initial) == 0;
}
static void plat_sem_wait(plat_sem *s) {
while (sem_wait(&s->sem) == -1 && errno == EINTR) {
}
}
// static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; } // Comment to prevent warning: unused function
static void plat_sem_post(plat_sem *s, u32 count) {
for (u32 i = 0; i < count; i++) {
sem_post(&s->sem);
}
}
// static void plat_sem_destroy(plat_sem *s) { sem_destroy(&s->sem); } // Comment to prevent warning: unused function
#endif
typedef struct plat_sem plat_sem;
typedef struct CACHE_ALIGN {
atomic_size_t seq;
void *data;
@@ -106,8 +42,6 @@ typedef struct {
size_t commit_step;
atomic_flag commit_lock;
plat_sem items_sem;
MPMCSlot *slots;
} MPMCQueue;
@@ -155,8 +89,6 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
atomic_init(&q->work_count, 0);
plat_sem_init(&q->items_sem, 0);
}
/* ----------------------------------------------------------- */
@@ -202,7 +134,6 @@ static void mpmc_commit_more(MPMCQueue *q) {
/* ----------------------------------------------------------- */
/* PUSH */
/* ----------------------------------------------------------- */
// Does not increment work
static void mpmc_push(MPMCQueue *q, void *item) {
MPMCSlot *slot;
size_t pos;
@@ -245,8 +176,6 @@ static void mpmc_push(MPMCQueue *q, void *item) {
slot->data = item;
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
plat_sem_post(&q->items_sem, 1);
}
// Increment work
@@ -294,16 +223,11 @@ static void mpmc_push_work(MPMCQueue *q, void *item) {
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
atomic_fetch_add(&q->work_count, 1);
plat_sem_post(&q->items_sem, 1);
}
/* ----------------------------------------------------------- */
/* POP */
/* ----------------------------------------------------------- */
static void *mpmc_pop(MPMCQueue *q) {
plat_sem_wait(&q->items_sem);
MPMCSlot *slot;
size_t pos;
@@ -324,9 +248,14 @@ static void *mpmc_pop(MPMCQueue *q) {
memory_order_relaxed))
break;
} else if (diff < 0) { // queue is empty
Sleep(500);
} else { // slot is still transitioning (written by another thread)
if (++spins > 10) {
sleep_ms(0); // yield CPU
SwitchToThread(); // yield CPU
spins = 0;
} else {
cpu_pause();
@@ -377,8 +306,6 @@ static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
// q->slots = NULL;
// }
//
// plat_sem_destroy(&q->items_sem);
//
// q->capacity = 0;
// q->mask = 0;
//