Bug fixes in lock free MPMC queue

Fix bug slots used before initialization,compare and swap is protecting
updating committed, but it is not protecting the memory initialization.
Adding atomic_flag commit_lock to protect against that

Fix bug multiple threads committing at the same time, fixed by using
atomic_flag commit_lock and re-checking committed after acquiring the
lock

Reorder helper functions
This commit is contained in:
2026-03-07 11:36:11 +01:00
parent 4967591ff8
commit 417dbad374
4 changed files with 82 additions and 55 deletions

View File

@@ -8,8 +8,12 @@ v2.1: Uses AVX2 instead of SSE2
v3.0: Simple mutex/critical section based MPMC queue v3.0: Simple mutex/critical section based MPMC queue
reusable hashing buffer reusable hashing buffer
v3.1: Lock free MPMC queue v3.1: Lock free MPMC queue Vyukov-style
v3.2: Making the lock free MPMC queue growable v3.2: Making the lock free MPMC queue growable
Add padding to avoir false sharing Add padding to avoir false sharing
Add sleep() and SwitchToThread() to limit spinning Add sleep() and SwitchToThread() to limit spinning
v3.3: Fix bug slots used before initialization,compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that
Fix bug multiple threads committing at the same time, fixed by using atomic_flag commit_lock and re-checking committed after acquiring the lock
Reorder helper functions

Binary file not shown.

View File

@@ -117,6 +117,7 @@ typedef struct {
atomic_size_t committed; atomic_size_t committed;
size_t commit_step; size_t commit_step;
atomic_flag commit_lock;
MPMCSlot *slots; MPMCSlot *slots;
} MPMCQueue; } MPMCQueue;

View File

@@ -1,4 +1,5 @@
#include "platform.h" #include "platform.h"
#include <stdio.h>
// ----------------------------- Globals ------------------------------------ // ----------------------------- Globals ------------------------------------
static atomic_uint_fast64_t g_files_found = 0; static atomic_uint_fast64_t g_files_found = 0;
@@ -30,6 +31,25 @@ static uint64_t filetime_to_epoch(const FILETIME *ft) {
return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL; return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL;
} }
// ----------------------------- Format time helper -------------------------
static void format_time(uint64_t t, char *out, size_t out_sz) {
if (t == 0) {
snprintf(out, out_sz, "N/A");
return;
}
time_t tt = (time_t)t;
struct tm tm;
#if PLATFORM_WINDOWS
localtime_s(&tm, &tt);
#else
localtime_r(&tt, &tm);
#endif
strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm);
}
// ----------------------------- Resolve file owner --------------------- // ----------------------------- Resolve file owner ---------------------
static void get_file_owner(const char *path, char *out, size_t out_sz) { static void get_file_owner(const char *path, char *out, size_t out_sz) {
PSID sid = NULL; PSID sid = NULL;
@@ -57,23 +77,22 @@ static void get_file_owner(const char *path, char *out, size_t out_sz) {
LocalFree(sd); LocalFree(sd);
} }
// ----------------------------- Format time helper ------------------------- // ----------------------------- Get file metadata -------------------------
static void format_time(uint64_t t, char *out, size_t out_sz) { void platform_get_file_times(const char *path, uint64_t *out_created,
if (t == 0) { uint64_t *out_modified) {
snprintf(out, out_sz, "N/A"); WIN32_FILE_ATTRIBUTE_DATA fad;
return; if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) {
*out_created = filetime_to_epoch(&fad.ftCreationTime);
*out_modified = filetime_to_epoch(&fad.ftLastWriteTime);
} else {
*out_created = 0;
*out_modified = 0;
}
} }
time_t tt = (time_t)t; void platform_get_file_owner(const char *path, char *out_owner,
struct tm tm; size_t out_owner_size) {
get_file_owner(path, out_owner, out_owner_size);
#if PLATFORM_WINDOWS
localtime_s(&tm, &tt);
#else
localtime_r(&tt, &tm);
#endif
strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm);
} }
// --------------- parallel directory scanning ---------------- // --------------- parallel directory scanning ----------------
@@ -96,6 +115,7 @@ void mpmc_init(MPMCQueue *q, size_t max_capacity) {
} }
q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot);
atomic_flag_clear(&q->commit_lock);
q->committed = q->commit_step; q->committed = q->commit_step;
@@ -112,15 +132,28 @@ void mpmc_init(MPMCQueue *q, size_t max_capacity) {
} }
static void mpmc_commit_more(MPMCQueue *q) { static void mpmc_commit_more(MPMCQueue *q) {
size_t start = q->committed;
if (atomic_flag_test_and_set(&q->commit_lock))
return;
size_t start = atomic_load_explicit(&q->committed, memory_order_acquire);
size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
// another thread already committed enough
if (tail < start) {
atomic_flag_clear(&q->commit_lock);
return;
}
if (start >= q->capacity) {
atomic_flag_clear(&q->commit_lock);
return;
}
size_t new_commit = start + q->commit_step; size_t new_commit = start + q->commit_step;
if (new_commit > q->capacity) if (new_commit > q->capacity)
new_commit = q->capacity; new_commit = q->capacity;
if (!atomic_compare_exchange_strong(&q->committed, &start, new_commit))
return; // another thread already committed
size_t count = new_commit - start; size_t count = new_commit - start;
VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT,
@@ -130,6 +163,10 @@ static void mpmc_commit_more(MPMCQueue *q) {
atomic_init(&q->slots[i].seq, i); atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL; q->slots[i].data = NULL;
} }
atomic_store_explicit(&q->committed, new_commit, memory_order_release);
atomic_flag_clear(&q->commit_lock);
} }
void mpmc_push(MPMCQueue *q, FileEntry *item) { void mpmc_push(MPMCQueue *q, FileEntry *item) {
@@ -139,6 +176,16 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) {
for (;;) { for (;;) {
pos = atomic_load_explicit(&q->tail, memory_order_relaxed); pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
// ensure the slot is committed BEFORE accessing it
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (pos >= committed) {
mpmc_commit_more(q);
continue;
}
slot = &q->slots[pos & q->mask]; slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
@@ -151,21 +198,13 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) {
memory_order_relaxed)) memory_order_relaxed))
break; break;
} else if (diff < 0) { } else if (diff < 0) { // queue actually full
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (pos >= committed) {
mpmc_commit_more(q);
continue;
}
Sleep(1000); // queue actually full
} else {
Sleep(1000); Sleep(1000);
} else { // waiting to grow
Sleep(0);
} }
} }
@@ -195,11 +234,12 @@ FileEntry *mpmc_pop(MPMCQueue *q) {
memory_order_relaxed)) memory_order_relaxed))
break; break;
} else if (diff < 0) { } else if (diff < 0) { // queue is empty
Sleep(1000); Sleep(500);
} else { // slot is still transitioning (written by another thread)
} else {
if (++spins > 10) { if (++spins > 10) {
SwitchToThread(); // yield CPU SwitchToThread(); // yield CPU
@@ -207,7 +247,7 @@ FileEntry *mpmc_pop(MPMCQueue *q) {
} else { } else {
_mm_pause(); _mm_pause(); // busy waiting
} }
} }
} }
@@ -478,24 +518,6 @@ DWORD WINAPI progress_thread(void *arg) {
return 0; return 0;
} }
// ----------------------------- Get file metadata -------------------------
void platform_get_file_times(const char *path, uint64_t *out_created,
uint64_t *out_modified) {
WIN32_FILE_ATTRIBUTE_DATA fad;
if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) {
*out_created = filetime_to_epoch(&fad.ftCreationTime);
*out_modified = filetime_to_epoch(&fad.ftLastWriteTime);
} else {
*out_created = 0;
*out_modified = 0;
}
}
void platform_get_file_owner(const char *path, char *out_owner,
size_t out_owner_size) {
get_file_owner(path, out_owner, out_owner_size);
}
// ----------------------------- Main --------------------------------------- // ----------------------------- Main ---------------------------------------
int main(int argc, char **argv) { int main(int argc, char **argv) {
char folders[64][MAX_PATHLEN]; // up to 64 input folders char folders[64][MAX_PATHLEN]; // up to 64 input folders