From 7099c1ddd6ab1cf1b9fdbfca966eef0d4085273e Mon Sep 17 00:00:00 2001 From: amir Date: Fri, 6 Mar 2026 20:20:28 +0100 Subject: [PATCH] Implementing lock free MPMC queue --- platform.h | 18 +++--- platform_windows.c | 133 ++++++++++++++++++++++++++++----------------- 2 files changed, 92 insertions(+), 59 deletions(-) diff --git a/platform.h b/platform.h index 0153237..30ce896 100644 --- a/platform.h +++ b/platform.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -100,19 +101,18 @@ static double timer_stop(HiResTimer *t) { // ============================================================ typedef struct { - FileEntry **items; + atomic_size_t seq; + FileEntry *data; +} MPMCSlot; +typedef struct { size_t capacity; - size_t head; - size_t tail; - size_t count; + size_t mask; - int producers_active; - - CRITICAL_SECTION cs; - CONDITION_VARIABLE not_empty; - CONDITION_VARIABLE not_full; + MPMCSlot *slots; + atomic_size_t head; + atomic_size_t tail; } MPMCQueue; static MPMCQueue g_file_queue; diff --git a/platform_windows.c b/platform_windows.c index ad0b6de..e552849 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -77,62 +77,90 @@ static void format_time(uint64_t t, char *out, size_t out_sz) { } // --------------- parallel directory scanning ---------------- -static void mpmc_init(MPMCQueue *q, size_t capacity) { +void mpmc_init(MPMCQueue *q, size_t capacity) { + if ((capacity & (capacity - 1)) != 0) { + fprintf(stderr, "capacity must be power of two\n"); + exit(1); + } + q->capacity = capacity; - q->items = xmalloc(sizeof(FileEntry *) * capacity); + q->mask = capacity - 1; - q->head = 0; - q->tail = 0; - q->count = 0; - q->producers_active = 1; + q->slots = malloc(sizeof(MPMCSlot) * capacity); - InitializeCriticalSection(&q->cs); - InitializeConditionVariable(&q->not_empty); - InitializeConditionVariable(&q->not_full); -} - -static void mpmc_push(MPMCQueue *q, FileEntry *item) { - EnterCriticalSection(&q->cs); - - while (q->count == q->capacity) { - SleepConditionVariableCS(&q->not_full, &q->cs, INFINITE); + for (size_t i = 0; i < capacity; i++) { + atomic_init(&q->slots[i].seq, i); + q->slots[i].data = NULL; } - q->items[q->tail] = item; - q->tail = (q->tail + 1) % q->capacity; - q->count++; - - WakeConditionVariable(&q->not_empty); - LeaveCriticalSection(&q->cs); + atomic_init(&q->head, 0); + atomic_init(&q->tail, 0); } -static FileEntry *mpmc_pop(MPMCQueue *q) { - EnterCriticalSection(&q->cs); +void mpmc_push(MPMCQueue *q, FileEntry *item) { + MPMCSlot *slot; + size_t pos; - while (q->count == 0 && q->producers_active) { - SleepConditionVariableCS(&q->not_empty, &q->cs, INFINITE); + for (;;) { + + pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)pos; + + if (diff == 0) { + if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + } else if (diff < 0) { + // queue full + _mm_pause(); + } else { + _mm_pause(); + } } - if (q->count == 0 && !q->producers_active) { - LeaveCriticalSection(&q->cs); - return NULL; - } + slot->data = item; - FileEntry *item = q->items[q->head]; - q->head = (q->head + 1) % q->capacity; - q->count--; - - WakeConditionVariable(&q->not_full); - LeaveCriticalSection(&q->cs); - - return item; + atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); } -static void mpmc_finish(MPMCQueue *q) { - EnterCriticalSection(&q->cs); - q->producers_active = 0; - WakeAllConditionVariable(&q->not_empty); - LeaveCriticalSection(&q->cs); +FileEntry *mpmc_pop(MPMCQueue *q) { + MPMCSlot *slot; + size_t pos; + + for (;;) { + + pos = atomic_load_explicit(&q->head, memory_order_relaxed); + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); + + if (diff == 0) { + + if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + // } else if (diff < 0) { + // + // _mm_pause(); // wait for producers + + } else { + + _mm_pause(); + } + } + + FileEntry *data = slot->data; + + atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); + + return data; } // Add queue helper functions @@ -283,15 +311,16 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) { (unsigned long long)h.low64); } -// ----------------------------- Hash worker -// -------------------------------------- +// ------------------------- Hash worker -------------------------------- static DWORD WINAPI hash_worker(LPVOID arg) { MPMCQueue *q = (MPMCQueue *)arg; for (;;) { + FileEntry *fe = mpmc_pop(q); + if (!fe) - break; + break; // poison pill char hash[HASH_STRLEN]; xxh3_hash_file_stream(fe->path, hash); @@ -350,7 +379,7 @@ DWORD WINAPI progress_thread(void *arg) { if (!scan_done) { - printf("\rScanning: %llu files | Hashed: %llu | %.2f MB/s", + printf("\rScanning: %llu files | Hashed: %llu | %.2f MB/s ", (unsigned long long)found, (unsigned long long)hashed, displayed_speed); @@ -375,7 +404,7 @@ DWORD WINAPI progress_thread(void *arg) { bar[p++] = ']'; bar[p] = 0; - printf("\r%s %6.2f%% (%llu / %llu) %.2f MB/s", bar, pct * 100.0, + printf("\r%s %6.2f%% (%llu / %llu) %.2f MB/s ", bar, pct * 100.0, (unsigned long long)hashed, (unsigned long long)found, displayed_speed); } @@ -533,7 +562,11 @@ int main(int argc, char **argv) { WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); - mpmc_finish(&g_file_queue); + // mpmc_finish(&g_file_queue); + // debug + for (size_t i = 0; i < num_threads; i++) { + mpmc_push(&g_file_queue, NULL); + } atomic_store(&g_scan_done, 1); @@ -561,7 +594,7 @@ int main(int argc, char **argv) { CloseHandle(hash_threads[i]); free(hash_threads); - free(g_file_queue.items); + // free(g_file_queue.items); WaitForSingleObject(progress, INFINITE); CloseHandle(progress);