Improving the lock free mpmc queue

Making the queue growable
Add padding to avoir false sharing
Add sleep() and SwitchToThread() to limit spinning
This commit is contained in:
2026-03-07 01:40:31 +01:00
parent 86ad30788a
commit 4967591ff8
4 changed files with 83 additions and 17 deletions

View File

@@ -9,3 +9,7 @@ v3.0: Simple mutex/critical section based MPMC queue
reusable hashing buffer
v3.1: Lock free MPMC queue
v3.2: Making the lock free MPMC queue growable
Add padding to avoir false sharing
Add sleep() and SwitchToThread() to limit spinning

Binary file not shown.

View File

@@ -103,16 +103,22 @@ static double timer_stop(HiResTimer *t) {
typedef struct {
atomic_size_t seq;
FileEntry *data;
char pad[64 - sizeof(atomic_size_t) - sizeof(FileEntry *)];
} MPMCSlot;
typedef struct {
atomic_size_t head;
char pad1[64];
atomic_size_t tail;
char pad2[64];
size_t capacity;
size_t mask;
MPMCSlot *slots;
atomic_size_t committed;
size_t commit_step;
atomic_size_t head;
atomic_size_t tail;
MPMCSlot *slots;
} MPMCQueue;
static MPMCQueue g_file_queue;

View File

@@ -77,18 +77,32 @@ static void format_time(uint64_t t, char *out, size_t out_sz) {
}
// --------------- parallel directory scanning ----------------
void mpmc_init(MPMCQueue *q, size_t capacity) {
if ((capacity & (capacity - 1)) != 0) {
void mpmc_init(MPMCQueue *q, size_t max_capacity) {
if ((max_capacity & (max_capacity - 1)) != 0) {
fprintf(stderr, "capacity must be power of two\n");
exit(1);
}
q->capacity = capacity;
q->mask = capacity - 1;
q->capacity = max_capacity;
q->mask = max_capacity - 1;
q->slots = malloc(sizeof(MPMCSlot) * capacity);
size_t bytes = sizeof(MPMCSlot) * max_capacity;
for (size_t i = 0; i < capacity; i++) {
q->slots = VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE);
if (!q->slots) {
fprintf(stderr, "VirtualAlloc reserve failed\n");
exit(1);
}
q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot);
q->committed = q->commit_step;
VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT,
PAGE_READWRITE);
for (size_t i = 0; i < q->committed; i++) {
atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL;
}
@@ -97,6 +111,27 @@ void mpmc_init(MPMCQueue *q, size_t capacity) {
atomic_init(&q->tail, 0);
}
static void mpmc_commit_more(MPMCQueue *q) {
size_t start = q->committed;
size_t new_commit = start + q->commit_step;
if (new_commit > q->capacity)
new_commit = q->capacity;
if (!atomic_compare_exchange_strong(&q->committed, &start, new_commit))
return; // another thread already committed
size_t count = new_commit - start;
VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT,
PAGE_READWRITE);
for (size_t i = start; i < new_commit; i++) {
atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL;
}
}
void mpmc_push(MPMCQueue *q, FileEntry *item) {
MPMCSlot *slot;
size_t pos;
@@ -110,15 +145,27 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) {
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (diff == 0) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) {
// queue full
_mm_pause();
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (pos >= committed) {
mpmc_commit_more(q);
continue;
}
Sleep(1000); // queue actually full
} else {
_mm_pause();
Sleep(1000);
}
}
@@ -131,6 +178,8 @@ FileEntry *mpmc_pop(MPMCQueue *q) {
MPMCSlot *slot;
size_t pos;
int spins = 0;
for (;;) {
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
@@ -146,13 +195,20 @@ FileEntry *mpmc_pop(MPMCQueue *q) {
memory_order_relaxed))
break;
// } else if (diff < 0) {
//
// _mm_pause(); // wait for producers
} else if (diff < 0) {
Sleep(1000);
} else {
if (++spins > 10) {
_mm_pause();
SwitchToThread(); // yield CPU
spins = 0;
} else {
_mm_pause();
}
}
}
@@ -527,7 +583,7 @@ int main(int argc, char **argv) {
// Step 1: Scan all folders
// -------------------------------
mpmc_init(&g_file_queue, 65536);
mpmc_init(&g_file_queue, 1024 * 1024 * 1024);
DirQueue q;
memset(&q, 0, sizeof(q));