LF MPMC queue improvements

Small improvements of the LF MPMC queue

Making the LF MPMC queue generic and in a seperate header file
This commit is contained in:
2026-03-09 13:21:45 +01:00
parent b2f444af00
commit a299c4a1e1
9 changed files with 230 additions and 148 deletions

105
lf_mpmc.h
View File

@@ -1,26 +1,37 @@
#pragma once
/*note:
After producers finishes, push N poison pills where N = number of consumer
threads.
for (size_t i = 0; i < num_threads; i++) {
mpmc_push(&g_file_queue, NULL);
}
*/
#include "base.h"
typedef struct {
#define CACHELINE 64
#if defined(_MSC_VER)
#define CACHE_ALIGN __declspec(align(CACHELINE))
#else
#define CACHE_ALIGN __attribute__((aligned(CACHELINE)))
#endif
#if defined(__GNUC__) || defined(__clang__)
#define likely(x) __builtin_expect((x), 1)
#define unlikely(x) __builtin_expect((x), 0)
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif
static void cpu_pause(void) {
#if defined(_MSC_VER) || defined(__x86_64__) || defined(__i386__)
_mm_pause();
#endif
}
typedef struct CACHE_ALIGN {
atomic_size_t seq;
void *data;
char pad[64 - sizeof(atomic_size_t) - sizeof(void *)];
} MPMCSlot;
typedef struct {
atomic_size_t head;
char pad1[64];
atomic_size_t tail;
char pad2[64];
CACHE_ALIGN atomic_size_t head;
CACHE_ALIGN atomic_size_t tail;
size_t capacity;
size_t mask;
@@ -38,12 +49,19 @@ typedef struct {
// files can lead to ODR violations (multiple definition errors if included in
// more than one file)
/* ----------------------------------------------------------- */
/* INIT */
/* ----------------------------------------------------------- */
static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
if ((max_capacity & (max_capacity - 1)) != 0) {
fprintf(stderr, "capacity must be power of two\n");
if (!max_capacity) {
fprintf(stderr, "capacity must positive\n");
exit(1);
}
u32 pagesize = plat_get_pagesize();
max_capacity = ALIGN_UP_POW2(max_capacity, pagesize);
q->capacity = max_capacity;
q->mask = max_capacity - 1;
@@ -56,11 +74,10 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
exit(1);
}
q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot);
q->commit_step = pagesize;
atomic_flag_clear(&q->commit_lock);
q->committed = q->commit_step;
VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT,
PAGE_READWRITE);
@@ -73,6 +90,9 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
atomic_init(&q->tail, 0);
}
/* ----------------------------------------------------------- */
/* COMMIT MORE MEMORY */
/* ----------------------------------------------------------- */
static void mpmc_commit_more(MPMCQueue *q) {
if (atomic_flag_test_and_set(&q->commit_lock))
@@ -111,6 +131,9 @@ static void mpmc_commit_more(MPMCQueue *q) {
atomic_flag_clear(&q->commit_lock);
}
/* ----------------------------------------------------------- */
/* PUSH */
/* ----------------------------------------------------------- */
static void mpmc_push(MPMCQueue *q, void *item) {
MPMCSlot *slot;
size_t pos;
@@ -123,7 +146,7 @@ static void mpmc_push(MPMCQueue *q, void *item) {
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (pos >= committed) {
if (unlikely(pos >= committed)) {
mpmc_commit_more(q);
continue;
}
@@ -133,7 +156,7 @@ static void mpmc_push(MPMCQueue *q, void *item) {
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (diff == 0) {
if (likely(diff == 0)) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
@@ -155,6 +178,9 @@ static void mpmc_push(MPMCQueue *q, void *item) {
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
}
/* ----------------------------------------------------------- */
/* POP */
/* ----------------------------------------------------------- */
static void *mpmc_pop(MPMCQueue *q) {
MPMCSlot *slot;
size_t pos;
@@ -169,7 +195,7 @@ static void *mpmc_pop(MPMCQueue *q) {
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
if (diff == 0) {
if (likely(diff == 0)) {
if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
memory_order_relaxed,
@@ -183,13 +209,10 @@ static void *mpmc_pop(MPMCQueue *q) {
} else { // slot is still transitioning (written by another thread)
if (++spins > 10) {
SwitchToThread(); // yield CPU
spins = 0;
} else {
_mm_pause(); // busy waiting
cpu_pause();
}
}
}
@@ -200,3 +223,37 @@ static void *mpmc_pop(MPMCQueue *q) {
return data;
}
/* ----------------------------------------------------------- */
/* PUSH POISON */
/* ----------------------------------------------------------- */
/*note:
After producers finishes, push N poison pills where N = number of consumer
threads, this is necessary to stop the consumers.
*/
static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
for (u8 i = 0; i < consumer_count; i++) {
mpmc_push(q, NULL);
}
}
/* ----------------------------------------------------------- */
/* MPMC Cleanup */
/* ----------------------------------------------------------- */
static void mpmc_finish(MPMCQueue *q) {
if (!q)
return;
if (q->slots) {
VirtualFree(q->slots, 0, MEM_RELEASE);
q->slots = NULL;
}
q->capacity = 0;
q->mask = 0;
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
atomic_store_explicit(&q->committed, 0, memory_order_relaxed);
}