From 7d8b4addb7ce453e93e928fc078d3c5c27ac7a9f Mon Sep 17 00:00:00 2001 From: amir Date: Mon, 9 Mar 2026 16:44:43 +0100 Subject: [PATCH] Implementing a semaphore in the MPMC queue to wake up consumers --- binaries/changelog.txt | 2 ++ lf_mpmc.h | 38 ++++++++++++++++++++------------------ platform_windows.c | 2 +- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/binaries/changelog.txt b/binaries/changelog.txt index d886bdb..13490ab 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -27,3 +27,5 @@ Making the hashing buffer reusable instead of malloc every file Implementing a general purpose arena to replace small allocations Small improvements of the LF MPMC queue Making the LF MPMC queue generic and in a seperate header file + +4.0: Implementing a semaphore in the LF MPMC queue to wake up consumers when there is items in the queue instead of spinning (busy waiting) or sleeping, this makes the queue spin only when the slots are transitionning (multiple consumers claiming the same slot) diff --git a/lf_mpmc.h b/lf_mpmc.h index 0d69996..b3f14c5 100644 --- a/lf_mpmc.h +++ b/lf_mpmc.h @@ -40,6 +40,8 @@ typedef struct { size_t commit_step; atomic_flag commit_lock; + HANDLE items_sem; + MPMCSlot *slots; } MPMCQueue; @@ -53,33 +55,29 @@ typedef struct { /* INIT */ /* ----------------------------------------------------------- */ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { - if (!max_capacity) { - fprintf(stderr, "capacity must positive\n"); + if ((max_capacity & (max_capacity - 1)) != 0) { + fprintf(stderr, "capacity must be power of two\n"); exit(1); } - u32 pagesize = plat_get_pagesize(); - - max_capacity = ALIGN_UP_POW2(max_capacity, pagesize); - q->capacity = max_capacity; q->mask = max_capacity - 1; size_t bytes = sizeof(MPMCSlot) * max_capacity; - q->slots = (MPMCSlot *)VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE); + q->slots = (MPMCSlot *)plat_mem_reserve(bytes); if (!q->slots) { fprintf(stderr, "VirtualAlloc reserve failed\n"); exit(1); } - q->commit_step = pagesize; + q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); atomic_flag_clear(&q->commit_lock); q->committed = q->commit_step; - VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT, - PAGE_READWRITE); + + plat_mem_commit(q->slots, q->commit_step * sizeof(MPMCSlot)); for (size_t i = 0; i < q->committed; i++) { atomic_init(&q->slots[i].seq, i); @@ -88,6 +86,8 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { atomic_init(&q->head, 0); atomic_init(&q->tail, 0); + + q->items_sem = CreateSemaphore(NULL, 0, LONG_MAX, NULL); } /* ----------------------------------------------------------- */ @@ -118,8 +118,7 @@ static void mpmc_commit_more(MPMCQueue *q) { size_t count = new_commit - start; - VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, - PAGE_READWRITE); + plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot)); for (size_t i = start; i < new_commit; i++) { atomic_init(&q->slots[i].seq, i); @@ -176,12 +175,17 @@ static void mpmc_push(MPMCQueue *q, void *item) { slot->data = item; atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); + + ReleaseSemaphore(q->items_sem, 1, NULL); } /* ----------------------------------------------------------- */ /* POP */ /* ----------------------------------------------------------- */ static void *mpmc_pop(MPMCQueue *q) { + + WaitForSingleObject(q->items_sem, INFINITE); + MPMCSlot *slot; size_t pos; @@ -202,12 +206,7 @@ static void *mpmc_pop(MPMCQueue *q) { memory_order_relaxed)) break; - } else if (diff < 0) { // queue is empty - - Sleep(500); - } else { // slot is still transitioning (written by another thread) - if (++spins > 10) { SwitchToThread(); // yield CPU spins = 0; @@ -246,10 +245,13 @@ static void mpmc_finish(MPMCQueue *q) { return; if (q->slots) { - VirtualFree(q->slots, 0, MEM_RELEASE); + plat_mem_release(q->slots, 0); q->slots = NULL; } + if (q->items_sem) + CloseHandle(q->items_sem); + q->capacity = 0; q->mask = 0; diff --git a/platform_windows.c b/platform_windows.c index d55968a..a596297 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -482,7 +482,7 @@ int main(int argc, char **argv) { // Step 1: Scan all folders // ------------------------------- - mpmc_init(&g_file_queue, GiB(1)); + mpmc_init(&g_file_queue, MiB(1)); DirQueue q; memset(&q, 0, sizeof(q));