From b2f444af0008ea587971f3013315f23b7792ffe6 Mon Sep 17 00:00:00 2001 From: amir Date: Mon, 9 Mar 2026 01:14:24 +0100 Subject: [PATCH] Making the LF MPMC queue generic and isolating it's code in a separate header file --- lf_mpmc.h | 202 +++++++++++++++++++++++++++++++++++++++++++++ platform.h | 28 +------ platform_windows.c | 164 ------------------------------------ 3 files changed, 204 insertions(+), 190 deletions(-) create mode 100644 lf_mpmc.h diff --git a/lf_mpmc.h b/lf_mpmc.h new file mode 100644 index 0000000..fb52c8a --- /dev/null +++ b/lf_mpmc.h @@ -0,0 +1,202 @@ +#pragma once +/*note: + After producers finishes, push N poison pills where N = number of consumer +threads. + +for (size_t i = 0; i < num_threads; i++) { + mpmc_push(&g_file_queue, NULL); +} + */ + +#include "base.h" + +typedef struct { + atomic_size_t seq; + void *data; + char pad[64 - sizeof(atomic_size_t) - sizeof(void *)]; +} MPMCSlot; + +typedef struct { + atomic_size_t head; + char pad1[64]; + atomic_size_t tail; + char pad2[64]; + + size_t capacity; + size_t mask; + + atomic_size_t committed; + size_t commit_step; + atomic_flag commit_lock; + + MPMCSlot *slots; +} MPMCQueue; + +// --------------- functions ---------------- +// static: each translation unit gets its own private copy this will solve the +// error: Function defined in a header file; function definitions in header +// files can lead to ODR violations (multiple definition errors if included in +// more than one file) + +static void mpmc_init(MPMCQueue *q, size_t max_capacity) { + if ((max_capacity & (max_capacity - 1)) != 0) { + fprintf(stderr, "capacity must be power of two\n"); + exit(1); + } + + q->capacity = max_capacity; + q->mask = max_capacity - 1; + + size_t bytes = sizeof(MPMCSlot) * max_capacity; + + q->slots = (MPMCSlot *)VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE); + + if (!q->slots) { + fprintf(stderr, "VirtualAlloc reserve failed\n"); + exit(1); + } + + q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); + atomic_flag_clear(&q->commit_lock); + + q->committed = q->commit_step; + + VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT, + PAGE_READWRITE); + + for (size_t i = 0; i < q->committed; i++) { + atomic_init(&q->slots[i].seq, i); + q->slots[i].data = NULL; + } + + atomic_init(&q->head, 0); + atomic_init(&q->tail, 0); +} + +static void mpmc_commit_more(MPMCQueue *q) { + + if (atomic_flag_test_and_set(&q->commit_lock)) + return; + + size_t start = atomic_load_explicit(&q->committed, memory_order_acquire); + size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // another thread already committed enough + if (tail < start) { + atomic_flag_clear(&q->commit_lock); + return; + } + + if (start >= q->capacity) { + atomic_flag_clear(&q->commit_lock); + return; + } + + size_t new_commit = start + q->commit_step; + if (new_commit > q->capacity) + new_commit = q->capacity; + + size_t count = new_commit - start; + + VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, + PAGE_READWRITE); + + for (size_t i = start; i < new_commit; i++) { + atomic_init(&q->slots[i].seq, i); + q->slots[i].data = NULL; + } + + atomic_store_explicit(&q->committed, new_commit, memory_order_release); + + atomic_flag_clear(&q->commit_lock); +} + +static void mpmc_push(MPMCQueue *q, void *item) { + MPMCSlot *slot; + size_t pos; + + for (;;) { + + pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // ensure the slot is committed BEFORE accessing it + size_t committed = + atomic_load_explicit(&q->committed, memory_order_relaxed); + + if (pos >= committed) { + mpmc_commit_more(q); + continue; + } + + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)pos; + + if (diff == 0) { + + if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else if (diff < 0) { // queue actually full + + Sleep(1000); + + } else { // waiting to grow + + Sleep(0); + } + } + + slot->data = item; + + atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); +} + +static void *mpmc_pop(MPMCQueue *q) { + MPMCSlot *slot; + size_t pos; + + int spins = 0; + + for (;;) { + + pos = atomic_load_explicit(&q->head, memory_order_relaxed); + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); + + if (diff == 0) { + + if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else if (diff < 0) { // queue is empty + + Sleep(500); + + } else { // slot is still transitioning (written by another thread) + + if (++spins > 10) { + + SwitchToThread(); // yield CPU + spins = 0; + + } else { + + _mm_pause(); // busy waiting + } + } + } + + void *data = slot->data; + + atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); + + return data; +} diff --git a/platform.h b/platform.h index db5b7fa..8e77590 100644 --- a/platform.h +++ b/platform.h @@ -3,6 +3,7 @@ #include "arena.h" #include "base.h" +#include "lf_mpmc.h" #include "arena.c" // ----------------------------- Config ------------------------------------- @@ -44,32 +45,7 @@ static double timer_stop(HiResTimer *t) { (double)g_qpc_freq.QuadPart; } -// ============================================================ -// Simple lock free MPMC Queue -// ============================================================ - -typedef struct { - atomic_size_t seq; - FileEntry *data; - char pad[64 - sizeof(atomic_size_t) - sizeof(FileEntry *)]; -} MPMCSlot; - -typedef struct { - atomic_size_t head; - char pad1[64]; - atomic_size_t tail; - char pad2[64]; - - size_t capacity; - size_t mask; - - atomic_size_t committed; - size_t commit_step; - atomic_flag commit_lock; - - MPMCSlot *slots; -} MPMCQueue; - +// MPMC Queue static MPMCQueue g_file_queue; typedef struct { diff --git a/platform_windows.c b/platform_windows.c index 8894cab..550f731 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -104,170 +104,6 @@ void platform_get_file_owner(const char *path, char *out_owner, get_file_owner(path, out_owner, out_owner_size); } -// --------------- parallel directory scanning ---------------- -void mpmc_init(MPMCQueue *q, size_t max_capacity) { - if ((max_capacity & (max_capacity - 1)) != 0) { - fprintf(stderr, "capacity must be power of two\n"); - exit(1); - } - - q->capacity = max_capacity; - q->mask = max_capacity - 1; - - size_t bytes = sizeof(MPMCSlot) * max_capacity; - - q->slots = VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE); - - if (!q->slots) { - fprintf(stderr, "VirtualAlloc reserve failed\n"); - exit(1); - } - - q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); - atomic_flag_clear(&q->commit_lock); - - q->committed = q->commit_step; - - VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT, - PAGE_READWRITE); - - for (size_t i = 0; i < q->committed; i++) { - atomic_init(&q->slots[i].seq, i); - q->slots[i].data = NULL; - } - - atomic_init(&q->head, 0); - atomic_init(&q->tail, 0); -} - -static void mpmc_commit_more(MPMCQueue *q) { - - if (atomic_flag_test_and_set(&q->commit_lock)) - return; - - size_t start = atomic_load_explicit(&q->committed, memory_order_acquire); - size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); - - // another thread already committed enough - if (tail < start) { - atomic_flag_clear(&q->commit_lock); - return; - } - - if (start >= q->capacity) { - atomic_flag_clear(&q->commit_lock); - return; - } - - size_t new_commit = start + q->commit_step; - if (new_commit > q->capacity) - new_commit = q->capacity; - - size_t count = new_commit - start; - - VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, - PAGE_READWRITE); - - for (size_t i = start; i < new_commit; i++) { - atomic_init(&q->slots[i].seq, i); - q->slots[i].data = NULL; - } - - atomic_store_explicit(&q->committed, new_commit, memory_order_release); - - atomic_flag_clear(&q->commit_lock); -} - -void mpmc_push(MPMCQueue *q, FileEntry *item) { - MPMCSlot *slot; - size_t pos; - - for (;;) { - - pos = atomic_load_explicit(&q->tail, memory_order_relaxed); - - // ensure the slot is committed BEFORE accessing it - size_t committed = - atomic_load_explicit(&q->committed, memory_order_relaxed); - - if (pos >= committed) { - mpmc_commit_more(q); - continue; - } - - slot = &q->slots[pos & q->mask]; - - size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); - intptr_t diff = (intptr_t)seq - (intptr_t)pos; - - if (diff == 0) { - - if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, - memory_order_relaxed, - memory_order_relaxed)) - break; - - } else if (diff < 0) { // queue actually full - - Sleep(1000); - - } else { // waiting to grow - - Sleep(0); - } - } - - slot->data = item; - - atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); -} - -FileEntry *mpmc_pop(MPMCQueue *q) { - MPMCSlot *slot; - size_t pos; - - int spins = 0; - - for (;;) { - - pos = atomic_load_explicit(&q->head, memory_order_relaxed); - slot = &q->slots[pos & q->mask]; - - size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); - intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); - - if (diff == 0) { - - if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, - memory_order_relaxed, - memory_order_relaxed)) - break; - - } else if (diff < 0) { // queue is empty - - Sleep(500); - - } else { // slot is still transitioning (written by another thread) - - if (++spins > 10) { - - SwitchToThread(); // yield CPU - spins = 0; - - } else { - - _mm_pause(); // busy waiting - } - } - } - - FileEntry *data = slot->data; - - atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); - - return data; -} - // --------------- parallel directory scanning ---------------- // Add queue helper functions static void dirqueue_push(DirQueue *q, const char *path) {