Bug fixes in the lock free mpmc queue

Fix bug slots used before initialization,compare and swap is protecting
updating committed, but it is not protecting the memory initialization.
Adding atomic_flag commit_lock to protect against that

Fix bug multiple threads committing at the same time, fixed by using
atomic_flag commit_lock and re-checking committed after acquiring the
lock
This commit is contained in:
2026-03-07 10:29:48 +01:00
parent 4967591ff8
commit 0cf0d6c26a
4 changed files with 39 additions and 15 deletions

View File

@@ -13,3 +13,6 @@ v3.1: Lock free MPMC queue
v3.2: Making the lock free MPMC queue growable v3.2: Making the lock free MPMC queue growable
Add padding to avoir false sharing Add padding to avoir false sharing
Add sleep() and SwitchToThread() to limit spinning Add sleep() and SwitchToThread() to limit spinning
v3.3: Fix bug slots used before initialization,compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that
Fix bug multiple threads committing at the same time, fixed by using atomic_flag commit_lock and re-checking committed after acquiring the lock

Binary file not shown.

View File

@@ -117,6 +117,7 @@ typedef struct {
atomic_size_t committed; atomic_size_t committed;
size_t commit_step; size_t commit_step;
atomic_flag commit_lock;
MPMCSlot *slots; MPMCSlot *slots;
} MPMCQueue; } MPMCQueue;

View File

@@ -96,6 +96,7 @@ void mpmc_init(MPMCQueue *q, size_t max_capacity) {
} }
q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot);
atomic_flag_clear(&q->commit_lock);
q->committed = q->commit_step; q->committed = q->commit_step;
@@ -112,15 +113,28 @@ void mpmc_init(MPMCQueue *q, size_t max_capacity) {
} }
static void mpmc_commit_more(MPMCQueue *q) { static void mpmc_commit_more(MPMCQueue *q) {
size_t start = q->committed;
if (atomic_flag_test_and_set(&q->commit_lock))
return;
size_t start = atomic_load_explicit(&q->committed, memory_order_acquire);
size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
// another thread already committed enough
if (tail < start) {
atomic_flag_clear(&q->commit_lock);
return;
}
if (start >= q->capacity) {
atomic_flag_clear(&q->commit_lock);
return;
}
size_t new_commit = start + q->commit_step; size_t new_commit = start + q->commit_step;
if (new_commit > q->capacity) if (new_commit > q->capacity)
new_commit = q->capacity; new_commit = q->capacity;
if (!atomic_compare_exchange_strong(&q->committed, &start, new_commit))
return; // another thread already committed
size_t count = new_commit - start; size_t count = new_commit - start;
VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT,
@@ -130,6 +144,10 @@ static void mpmc_commit_more(MPMCQueue *q) {
atomic_init(&q->slots[i].seq, i); atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL; q->slots[i].data = NULL;
} }
atomic_store_explicit(&q->committed, new_commit, memory_order_release);
atomic_flag_clear(&q->commit_lock);
} }
void mpmc_push(MPMCQueue *q, FileEntry *item) { void mpmc_push(MPMCQueue *q, FileEntry *item) {
@@ -139,6 +157,16 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) {
for (;;) { for (;;) {
pos = atomic_load_explicit(&q->tail, memory_order_relaxed); pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
// ensure the slot is committed BEFORE accessing it
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (pos >= committed) {
mpmc_commit_more(q);
continue;
}
slot = &q->slots[pos & q->mask]; slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
@@ -153,19 +181,11 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) {
} else if (diff < 0) { } else if (diff < 0) {
size_t committed = Sleep(100); // queue actually full
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (pos >= committed) {
mpmc_commit_more(q);
continue;
}
Sleep(1000); // queue actually full
} else { } else {
Sleep(1000); Sleep(100);
} }
} }
@@ -197,7 +217,7 @@ FileEntry *mpmc_pop(MPMCQueue *q) {
} else if (diff < 0) { } else if (diff < 0) {
Sleep(1000); Sleep(500);
} else { } else {
if (++spins > 10) { if (++spins > 10) {