From b8104b0fc747253a79bf2a4b85ec0897929ef9c2 Mon Sep 17 00:00:00 2001 From: amir Date: Mon, 4 May 2026 14:06:48 +0100 Subject: [PATCH] MPMC queues implementation Now we have 3 different API compatible MPMC queues that we can swap with swapping the header. mt_mpmc.h, a blocking queue that uses a mutex/critical section. lf_mpmc.h, a lock free queue that uses atomics. sm_mpmc.h, a hybrid queue that uses atomics and a semaphore to block when the queue is empty. In this program, for max performance it is recommanded to use sm_mpmc.h or mt_mpmc.h, they are designed to avoid busy waiting which frees more CPU time to do useful work. --- lf_mpmc.h | 85 +----------- platform.c | 2 +- sm_mpmc.h | 388 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 395 insertions(+), 80 deletions(-) create mode 100644 sm_mpmc.h diff --git a/lf_mpmc.h b/lf_mpmc.h index 2f52f3b..ac0fbdb 100644 --- a/lf_mpmc.h +++ b/lf_mpmc.h @@ -2,7 +2,6 @@ #include "base.h" -// Cache align abstraction #define CACHELINE 64 #if defined(_MSC_VER) @@ -11,7 +10,6 @@ #define CACHE_ALIGN __attribute__((aligned(CACHELINE))) #endif -// Compiler hints #if defined(__GNUC__) || defined(__clang__) #define likely(x) __builtin_expect((x), 1) #define unlikely(x) __builtin_expect((x), 0) @@ -25,68 +23,6 @@ static void cpu_pause(void) { _mm_pause(); #endif } - -// Semaphores -#if defined(_WIN32) || defined(_WIN64) -typedef struct plat_sem { - HANDLE handle; -} plat_sem; - -static b32 plat_sem_init(plat_sem *s, u32 initial) { - s->handle = CreateSemaphore(NULL, initial, LONG_MAX, NULL); - return s->handle != NULL; -} - -static void plat_sem_wait(plat_sem *s) { - WaitForSingleObject(s->handle, INFINITE); -} - -// static b32 plat_sem_trywait(HANDLE sem) { // Comment to prevent warning: unused function -// DWORD r = WaitForSingleObject(sem, 0); -// return r == WAIT_OBJECT_0; -// } - -static void plat_sem_post(plat_sem *s, u32 count) { - ReleaseSemaphore(s->handle, count, NULL); -} - -// static void plat_sem_destroy(plat_sem *s) { // Comment to prevent warning: unused function -// if (s->handle) { -// CloseHandle(s->handle); -// s->handle = NULL; -// } -// } - -#elif defined(__linux__) -#include - -typedef struct plat_sem { - sem_t sem; -} plat_sem; - -static b32 plat_sem_init(plat_sem *s, u32 initial) { - return sem_init(&s->sem, 0, initial) == 0; -} - -static void plat_sem_wait(plat_sem *s) { - while (sem_wait(&s->sem) == -1 && errno == EINTR) { - } -} - -// static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; } // Comment to prevent warning: unused function - -static void plat_sem_post(plat_sem *s, u32 count) { - for (u32 i = 0; i < count; i++) { - sem_post(&s->sem); - } -} - -// static void plat_sem_destroy(plat_sem *s) { sem_destroy(&s->sem); } // Comment to prevent warning: unused function - -#endif - -typedef struct plat_sem plat_sem; - typedef struct CACHE_ALIGN { atomic_size_t seq; void *data; @@ -106,8 +42,6 @@ typedef struct { size_t commit_step; atomic_flag commit_lock; - plat_sem items_sem; - MPMCSlot *slots; } MPMCQueue; @@ -155,8 +89,6 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { atomic_init(&q->head, 0); atomic_init(&q->tail, 0); atomic_init(&q->work_count, 0); - - plat_sem_init(&q->items_sem, 0); } /* ----------------------------------------------------------- */ @@ -202,7 +134,6 @@ static void mpmc_commit_more(MPMCQueue *q) { /* ----------------------------------------------------------- */ /* PUSH */ /* ----------------------------------------------------------- */ -// Does not increment work static void mpmc_push(MPMCQueue *q, void *item) { MPMCSlot *slot; size_t pos; @@ -245,8 +176,6 @@ static void mpmc_push(MPMCQueue *q, void *item) { slot->data = item; atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); - - plat_sem_post(&q->items_sem, 1); } // Increment work @@ -294,16 +223,11 @@ static void mpmc_push_work(MPMCQueue *q, void *item) { atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); atomic_fetch_add(&q->work_count, 1); - plat_sem_post(&q->items_sem, 1); } - /* ----------------------------------------------------------- */ /* POP */ /* ----------------------------------------------------------- */ static void *mpmc_pop(MPMCQueue *q) { - - plat_sem_wait(&q->items_sem); - MPMCSlot *slot; size_t pos; @@ -324,9 +248,14 @@ static void *mpmc_pop(MPMCQueue *q) { memory_order_relaxed)) break; + } else if (diff < 0) { // queue is empty + + Sleep(500); + } else { // slot is still transitioning (written by another thread) + if (++spins > 10) { - sleep_ms(0); // yield CPU + SwitchToThread(); // yield CPU spins = 0; } else { cpu_pause(); @@ -377,8 +306,6 @@ static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) { // q->slots = NULL; // } // -// plat_sem_destroy(&q->items_sem); -// // q->capacity = 0; // q->mask = 0; // diff --git a/platform.c b/platform.c index 3b9f112..43f0d09 100644 --- a/platform.c +++ b/platform.c @@ -2,7 +2,7 @@ #include "arena.h" #include "base.h" -#include "lf_mpmc.h" +#include "sm_mpmc.h" #include "arena.c" #include diff --git a/sm_mpmc.h b/sm_mpmc.h new file mode 100644 index 0000000..2f52f3b --- /dev/null +++ b/sm_mpmc.h @@ -0,0 +1,388 @@ +#pragma once + +#include "base.h" + +// Cache align abstraction +#define CACHELINE 64 + +#if defined(_MSC_VER) +#define CACHE_ALIGN __declspec(align(CACHELINE)) +#else +#define CACHE_ALIGN __attribute__((aligned(CACHELINE))) +#endif + +// Compiler hints +#if defined(__GNUC__) || defined(__clang__) +#define likely(x) __builtin_expect((x), 1) +#define unlikely(x) __builtin_expect((x), 0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + +static void cpu_pause(void) { +#if defined(_MSC_VER) || defined(__x86_64__) || defined(__i386__) + _mm_pause(); +#endif +} + +// Semaphores +#if defined(_WIN32) || defined(_WIN64) +typedef struct plat_sem { + HANDLE handle; +} plat_sem; + +static b32 plat_sem_init(plat_sem *s, u32 initial) { + s->handle = CreateSemaphore(NULL, initial, LONG_MAX, NULL); + return s->handle != NULL; +} + +static void plat_sem_wait(plat_sem *s) { + WaitForSingleObject(s->handle, INFINITE); +} + +// static b32 plat_sem_trywait(HANDLE sem) { // Comment to prevent warning: unused function +// DWORD r = WaitForSingleObject(sem, 0); +// return r == WAIT_OBJECT_0; +// } + +static void plat_sem_post(plat_sem *s, u32 count) { + ReleaseSemaphore(s->handle, count, NULL); +} + +// static void plat_sem_destroy(plat_sem *s) { // Comment to prevent warning: unused function +// if (s->handle) { +// CloseHandle(s->handle); +// s->handle = NULL; +// } +// } + +#elif defined(__linux__) +#include + +typedef struct plat_sem { + sem_t sem; +} plat_sem; + +static b32 plat_sem_init(plat_sem *s, u32 initial) { + return sem_init(&s->sem, 0, initial) == 0; +} + +static void plat_sem_wait(plat_sem *s) { + while (sem_wait(&s->sem) == -1 && errno == EINTR) { + } +} + +// static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; } // Comment to prevent warning: unused function + +static void plat_sem_post(plat_sem *s, u32 count) { + for (u32 i = 0; i < count; i++) { + sem_post(&s->sem); + } +} + +// static void plat_sem_destroy(plat_sem *s) { sem_destroy(&s->sem); } // Comment to prevent warning: unused function + +#endif + +typedef struct plat_sem plat_sem; + +typedef struct CACHE_ALIGN { + atomic_size_t seq; + void *data; + char pad[64 - sizeof(atomic_size_t) - sizeof(void *)]; +} MPMCSlot; + +typedef struct { + CACHE_ALIGN atomic_size_t head; + CACHE_ALIGN atomic_size_t tail; + + CACHE_ALIGN atomic_size_t work_count; + + size_t capacity; + size_t mask; + + atomic_size_t committed; + size_t commit_step; + atomic_flag commit_lock; + + plat_sem items_sem; + + MPMCSlot *slots; +} MPMCQueue; + +// --------------- functions ---------------- +// static: each translation unit gets its own private copy this will solve the +// error: Function defined in a header file; function definitions in header +// files can lead to ODR violations (multiple definition errors if included in +// more than one file) + +/* ----------------------------------------------------------- */ +/* INIT */ +/* ----------------------------------------------------------- */ +static void mpmc_init(MPMCQueue *q, size_t max_capacity) { + + q->capacity = max_capacity; + q->mask = max_capacity - 1; + + u32 pagesize = plat_get_pagesize(); + + size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize); + + q->slots = (MPMCSlot *)plat_mem_reserve(bytes); + + if (!q->slots) { + fprintf(stderr, "VirtualAlloc reserve failed\n"); + exit(1); + } + + u64 commit_bytes = pagesize; + commit_bytes = ALIGN_UP_POW2(commit_bytes, pagesize); + + q->commit_step = commit_bytes / sizeof(MPMCSlot); + + atomic_flag_clear(&q->commit_lock); + + q->committed = q->commit_step; + + plat_mem_commit(q->slots, commit_bytes); + + for (size_t i = 0; i < q->committed; i++) { + atomic_init(&q->slots[i].seq, i); + q->slots[i].data = NULL; + } + + atomic_init(&q->head, 0); + atomic_init(&q->tail, 0); + atomic_init(&q->work_count, 0); + + plat_sem_init(&q->items_sem, 0); +} + +/* ----------------------------------------------------------- */ +/* COMMIT MORE MEMORY */ +/* ----------------------------------------------------------- */ +static void mpmc_commit_more(MPMCQueue *q) { + + if (atomic_flag_test_and_set(&q->commit_lock)) + return; + + size_t start = atomic_load_explicit(&q->committed, memory_order_acquire); + size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // another thread already committed enough + if (tail < start) { + atomic_flag_clear(&q->commit_lock); + return; + } + + if (start >= q->capacity) { + atomic_flag_clear(&q->commit_lock); + return; + } + + size_t new_commit = start + q->commit_step; + if (new_commit > q->capacity) + new_commit = q->capacity; + + size_t count = new_commit - start; + + plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot)); + + for (size_t i = start; i < new_commit; i++) { + atomic_init(&q->slots[i].seq, i); + q->slots[i].data = NULL; + } + + atomic_store_explicit(&q->committed, new_commit, memory_order_release); + + atomic_flag_clear(&q->commit_lock); +} + +/* ----------------------------------------------------------- */ +/* PUSH */ +/* ----------------------------------------------------------- */ +// Does not increment work +static void mpmc_push(MPMCQueue *q, void *item) { + MPMCSlot *slot; + size_t pos; + + for (;;) { + + pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // ensure the slot is committed BEFORE accessing it + size_t committed = + atomic_load_explicit(&q->committed, memory_order_relaxed); + + if (unlikely(pos >= committed)) { + mpmc_commit_more(q); + continue; + } + + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)pos; + + if (likely(diff == 0)) { + + if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else if (diff < 0) { // queue actually full + + sleep_ms(1000); + + } else { // waiting to grow + + sleep_ms(0); + } + } + + slot->data = item; + + atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); + + plat_sem_post(&q->items_sem, 1); +} + +// Increment work +static void mpmc_push_work(MPMCQueue *q, void *item) { + MPMCSlot *slot; + size_t pos; + + for (;;) { + + pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // ensure the slot is committed BEFORE accessing it + size_t committed = + atomic_load_explicit(&q->committed, memory_order_relaxed); + + if (unlikely(pos >= committed)) { + mpmc_commit_more(q); + continue; + } + + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)pos; + + if (likely(diff == 0)) { + + if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else if (diff < 0) { // queue actually full + + sleep_ms(1000); + + } else { // waiting to grow + + sleep_ms(0); + } + } + + slot->data = item; + + atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); + + atomic_fetch_add(&q->work_count, 1); + plat_sem_post(&q->items_sem, 1); +} + +/* ----------------------------------------------------------- */ +/* POP */ +/* ----------------------------------------------------------- */ +static void *mpmc_pop(MPMCQueue *q) { + + plat_sem_wait(&q->items_sem); + + MPMCSlot *slot; + size_t pos; + + int spins = 0; + + for (;;) { + + pos = atomic_load_explicit(&q->head, memory_order_relaxed); + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); + + if (likely(diff == 0)) { + + if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else { // slot is still transitioning (written by another thread) + if (++spins > 10) { + sleep_ms(0); // yield CPU + spins = 0; + } else { + cpu_pause(); + } + } + } + + void *data = slot->data; + + atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); + + return data; +} + +/* ----------------------------------------------------------- */ +/* PUSH POISON */ +/* ----------------------------------------------------------- */ +/*note: + After producers finishes, push N poison pills where N = number of consumer +threads, this is necessary to stop the consumers. +*/ + +static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) { + for (u8 i = 0; i < consumer_count; i++) { + mpmc_push(q, NULL); + } +} + +/* ----------------------------------------------------------- */ +/* Done */ +/* ----------------------------------------------------------- */ +static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) { + size_t prev = atomic_fetch_sub(&q->work_count, 1); + if (prev == 1) { + mpmc_producers_finished(q, consumer_count); + } +} + +/* ----------------------------------------------------------- */ +/* MPMC Cleanup */ +/* ----------------------------------------------------------- */ +// static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused function +// if (!q) +// return; +// +// if (q->slots) { +// plat_mem_release(q->slots, 0); +// q->slots = NULL; +// } +// +// plat_sem_destroy(&q->items_sem); +// +// q->capacity = 0; +// q->mask = 0; +// +// atomic_store_explicit(&q->head, 0, memory_order_relaxed); +// atomic_store_explicit(&q->tail, 0, memory_order_relaxed); +// atomic_store_explicit(&q->committed, 0, memory_order_relaxed); +// }