Implementing lock free MPMC queue

This commit is contained in:
2026-03-06 20:20:28 +01:00
parent 9b327c82a6
commit 7099c1ddd6
2 changed files with 92 additions and 59 deletions

View File

@@ -1,5 +1,6 @@
#pragma once #pragma once
#include <immintrin.h>
#include <stdatomic.h> #include <stdatomic.h>
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
@@ -100,19 +101,18 @@ static double timer_stop(HiResTimer *t) {
// ============================================================ // ============================================================
typedef struct { typedef struct {
FileEntry **items; atomic_size_t seq;
FileEntry *data;
} MPMCSlot;
typedef struct {
size_t capacity; size_t capacity;
size_t head; size_t mask;
size_t tail;
size_t count;
int producers_active; MPMCSlot *slots;
CRITICAL_SECTION cs;
CONDITION_VARIABLE not_empty;
CONDITION_VARIABLE not_full;
atomic_size_t head;
atomic_size_t tail;
} MPMCQueue; } MPMCQueue;
static MPMCQueue g_file_queue; static MPMCQueue g_file_queue;

View File

@@ -77,62 +77,90 @@ static void format_time(uint64_t t, char *out, size_t out_sz) {
} }
// --------------- parallel directory scanning ---------------- // --------------- parallel directory scanning ----------------
static void mpmc_init(MPMCQueue *q, size_t capacity) { void mpmc_init(MPMCQueue *q, size_t capacity) {
if ((capacity & (capacity - 1)) != 0) {
fprintf(stderr, "capacity must be power of two\n");
exit(1);
}
q->capacity = capacity; q->capacity = capacity;
q->items = xmalloc(sizeof(FileEntry *) * capacity); q->mask = capacity - 1;
q->head = 0; q->slots = malloc(sizeof(MPMCSlot) * capacity);
q->tail = 0;
q->count = 0;
q->producers_active = 1;
InitializeCriticalSection(&q->cs); for (size_t i = 0; i < capacity; i++) {
InitializeConditionVariable(&q->not_empty); atomic_init(&q->slots[i].seq, i);
InitializeConditionVariable(&q->not_full); q->slots[i].data = NULL;
} }
static void mpmc_push(MPMCQueue *q, FileEntry *item) { atomic_init(&q->head, 0);
EnterCriticalSection(&q->cs); atomic_init(&q->tail, 0);
while (q->count == q->capacity) {
SleepConditionVariableCS(&q->not_full, &q->cs, INFINITE);
} }
q->items[q->tail] = item; void mpmc_push(MPMCQueue *q, FileEntry *item) {
q->tail = (q->tail + 1) % q->capacity; MPMCSlot *slot;
q->count++; size_t pos;
WakeConditionVariable(&q->not_empty); for (;;) {
LeaveCriticalSection(&q->cs);
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (diff == 0) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) {
// queue full
_mm_pause();
} else {
_mm_pause();
}
} }
static FileEntry *mpmc_pop(MPMCQueue *q) { slot->data = item;
EnterCriticalSection(&q->cs);
while (q->count == 0 && q->producers_active) { atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
SleepConditionVariableCS(&q->not_empty, &q->cs, INFINITE);
} }
if (q->count == 0 && !q->producers_active) { FileEntry *mpmc_pop(MPMCQueue *q) {
LeaveCriticalSection(&q->cs); MPMCSlot *slot;
return NULL; size_t pos;
for (;;) {
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
if (diff == 0) {
if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
// } else if (diff < 0) {
//
// _mm_pause(); // wait for producers
} else {
_mm_pause();
}
} }
FileEntry *item = q->items[q->head]; FileEntry *data = slot->data;
q->head = (q->head + 1) % q->capacity;
q->count--;
WakeConditionVariable(&q->not_full); atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
LeaveCriticalSection(&q->cs);
return item; return data;
}
static void mpmc_finish(MPMCQueue *q) {
EnterCriticalSection(&q->cs);
q->producers_active = 0;
WakeAllConditionVariable(&q->not_empty);
LeaveCriticalSection(&q->cs);
} }
// Add queue helper functions // Add queue helper functions
@@ -283,15 +311,16 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) {
(unsigned long long)h.low64); (unsigned long long)h.low64);
} }
// ----------------------------- Hash worker // ------------------------- Hash worker --------------------------------
// --------------------------------------
static DWORD WINAPI hash_worker(LPVOID arg) { static DWORD WINAPI hash_worker(LPVOID arg) {
MPMCQueue *q = (MPMCQueue *)arg; MPMCQueue *q = (MPMCQueue *)arg;
for (;;) { for (;;) {
FileEntry *fe = mpmc_pop(q); FileEntry *fe = mpmc_pop(q);
if (!fe) if (!fe)
break; break; // poison pill
char hash[HASH_STRLEN]; char hash[HASH_STRLEN];
xxh3_hash_file_stream(fe->path, hash); xxh3_hash_file_stream(fe->path, hash);
@@ -533,7 +562,11 @@ int main(int argc, char **argv) {
WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE);
mpmc_finish(&g_file_queue); // mpmc_finish(&g_file_queue);
// debug
for (size_t i = 0; i < num_threads; i++) {
mpmc_push(&g_file_queue, NULL);
}
atomic_store(&g_scan_done, 1); atomic_store(&g_scan_done, 1);
@@ -561,7 +594,7 @@ int main(int argc, char **argv) {
CloseHandle(hash_threads[i]); CloseHandle(hash_threads[i]);
free(hash_threads); free(hash_threads);
free(g_file_queue.items); // free(g_file_queue.items);
WaitForSingleObject(progress, INFINITE); WaitForSingleObject(progress, INFINITE);
CloseHandle(progress); CloseHandle(progress);