Making the LF MPMC queue generic and isolating it's code in a separate header file

This commit is contained in:
2026-03-09 01:14:24 +01:00
parent 75c2592bfe
commit b2f444af00
3 changed files with 204 additions and 190 deletions

View File

@@ -104,170 +104,6 @@ void platform_get_file_owner(const char *path, char *out_owner,
get_file_owner(path, out_owner, out_owner_size);
}
// --------------- parallel directory scanning ----------------
void mpmc_init(MPMCQueue *q, size_t max_capacity) {
if ((max_capacity & (max_capacity - 1)) != 0) {
fprintf(stderr, "capacity must be power of two\n");
exit(1);
}
q->capacity = max_capacity;
q->mask = max_capacity - 1;
size_t bytes = sizeof(MPMCSlot) * max_capacity;
q->slots = VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE);
if (!q->slots) {
fprintf(stderr, "VirtualAlloc reserve failed\n");
exit(1);
}
q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot);
atomic_flag_clear(&q->commit_lock);
q->committed = q->commit_step;
VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT,
PAGE_READWRITE);
for (size_t i = 0; i < q->committed; i++) {
atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL;
}
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
}
static void mpmc_commit_more(MPMCQueue *q) {
if (atomic_flag_test_and_set(&q->commit_lock))
return;
size_t start = atomic_load_explicit(&q->committed, memory_order_acquire);
size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
// another thread already committed enough
if (tail < start) {
atomic_flag_clear(&q->commit_lock);
return;
}
if (start >= q->capacity) {
atomic_flag_clear(&q->commit_lock);
return;
}
size_t new_commit = start + q->commit_step;
if (new_commit > q->capacity)
new_commit = q->capacity;
size_t count = new_commit - start;
VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT,
PAGE_READWRITE);
for (size_t i = start; i < new_commit; i++) {
atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL;
}
atomic_store_explicit(&q->committed, new_commit, memory_order_release);
atomic_flag_clear(&q->commit_lock);
}
void mpmc_push(MPMCQueue *q, FileEntry *item) {
MPMCSlot *slot;
size_t pos;
for (;;) {
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
// ensure the slot is committed BEFORE accessing it
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (pos >= committed) {
mpmc_commit_more(q);
continue;
}
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (diff == 0) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) { // queue actually full
Sleep(1000);
} else { // waiting to grow
Sleep(0);
}
}
slot->data = item;
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
}
FileEntry *mpmc_pop(MPMCQueue *q) {
MPMCSlot *slot;
size_t pos;
int spins = 0;
for (;;) {
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
if (diff == 0) {
if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) { // queue is empty
Sleep(500);
} else { // slot is still transitioning (written by another thread)
if (++spins > 10) {
SwitchToThread(); // yield CPU
spins = 0;
} else {
_mm_pause(); // busy waiting
}
}
}
FileEntry *data = slot->data;
atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
return data;
}
// --------------- parallel directory scanning ----------------
// Add queue helper functions
static void dirqueue_push(DirQueue *q, const char *path) {