forked from amir/filehasher
Making the MPMC queue support when producers are consumers at the same time by adding a variable work, mpmc_push_work() that increments work and mpmc_task_done() that decrements work, and if work = 0 calls mpmc_producers_finished() that pushes poinsons to wake up sleeping threads and make them return NULL Replacing DirQueue, a queue growable with realloc with the MPMC queue
327 lines
8.0 KiB
C
327 lines
8.0 KiB
C
#pragma once
|
|
|
|
#include "base.h"
|
|
|
|
#define CACHELINE 64
|
|
|
|
#if defined(_MSC_VER)
|
|
#define CACHE_ALIGN __declspec(align(CACHELINE))
|
|
#else
|
|
#define CACHE_ALIGN __attribute__((aligned(CACHELINE)))
|
|
#endif
|
|
|
|
#if defined(__GNUC__) || defined(__clang__)
|
|
#define likely(x) __builtin_expect((x), 1)
|
|
#define unlikely(x) __builtin_expect((x), 0)
|
|
#else
|
|
#define likely(x) (x)
|
|
#define unlikely(x) (x)
|
|
#endif
|
|
|
|
static void cpu_pause(void) {
|
|
#if defined(_MSC_VER) || defined(__x86_64__) || defined(__i386__)
|
|
_mm_pause();
|
|
#endif
|
|
}
|
|
|
|
typedef struct plat_sem plat_sem;
|
|
|
|
typedef struct CACHE_ALIGN {
|
|
atomic_size_t seq;
|
|
void *data;
|
|
char pad[64 - sizeof(atomic_size_t) - sizeof(void *)];
|
|
} MPMCSlot;
|
|
|
|
typedef struct {
|
|
CACHE_ALIGN atomic_size_t head;
|
|
CACHE_ALIGN atomic_size_t tail;
|
|
|
|
CACHE_ALIGN atomic_size_t work_count;
|
|
|
|
size_t capacity;
|
|
size_t mask;
|
|
|
|
atomic_size_t committed;
|
|
size_t commit_step;
|
|
atomic_flag commit_lock;
|
|
|
|
plat_sem items_sem;
|
|
|
|
MPMCSlot *slots;
|
|
} MPMCQueue;
|
|
|
|
// --------------- functions ----------------
|
|
// static: each translation unit gets its own private copy this will solve the
|
|
// error: Function defined in a header file; function definitions in header
|
|
// files can lead to ODR violations (multiple definition errors if included in
|
|
// more than one file)
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* INIT */
|
|
/* ----------------------------------------------------------- */
|
|
static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
|
|
|
|
q->capacity = max_capacity;
|
|
q->mask = max_capacity - 1;
|
|
|
|
u32 pagesize = plat_get_pagesize();
|
|
|
|
size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize);
|
|
|
|
q->slots = (MPMCSlot *)plat_mem_reserve(bytes);
|
|
|
|
if (!q->slots) {
|
|
fprintf(stderr, "VirtualAlloc reserve failed\n");
|
|
exit(1);
|
|
}
|
|
|
|
u64 commit_bytes = pagesize;
|
|
commit_bytes = ALIGN_UP_POW2(commit_bytes, pagesize);
|
|
|
|
q->commit_step = commit_bytes / sizeof(MPMCSlot);
|
|
|
|
atomic_flag_clear(&q->commit_lock);
|
|
|
|
q->committed = q->commit_step;
|
|
|
|
plat_mem_commit(q->slots, commit_bytes);
|
|
|
|
for (size_t i = 0; i < q->committed; i++) {
|
|
atomic_init(&q->slots[i].seq, i);
|
|
q->slots[i].data = NULL;
|
|
}
|
|
|
|
atomic_init(&q->head, 0);
|
|
atomic_init(&q->tail, 0);
|
|
atomic_init(&q->work_count, 0);
|
|
|
|
plat_sem_init(&q->items_sem, 0);
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* COMMIT MORE MEMORY */
|
|
/* ----------------------------------------------------------- */
|
|
static void mpmc_commit_more(MPMCQueue *q) {
|
|
|
|
if (atomic_flag_test_and_set(&q->commit_lock))
|
|
return;
|
|
|
|
size_t start = atomic_load_explicit(&q->committed, memory_order_acquire);
|
|
size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
|
|
|
|
// another thread already committed enough
|
|
if (tail < start) {
|
|
atomic_flag_clear(&q->commit_lock);
|
|
return;
|
|
}
|
|
|
|
if (start >= q->capacity) {
|
|
atomic_flag_clear(&q->commit_lock);
|
|
return;
|
|
}
|
|
|
|
size_t new_commit = start + q->commit_step;
|
|
if (new_commit > q->capacity)
|
|
new_commit = q->capacity;
|
|
|
|
size_t count = new_commit - start;
|
|
|
|
plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot));
|
|
|
|
for (size_t i = start; i < new_commit; i++) {
|
|
atomic_init(&q->slots[i].seq, i);
|
|
q->slots[i].data = NULL;
|
|
}
|
|
|
|
atomic_store_explicit(&q->committed, new_commit, memory_order_release);
|
|
|
|
atomic_flag_clear(&q->commit_lock);
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* PUSH */
|
|
/* ----------------------------------------------------------- */
|
|
// Does not increment work
|
|
static void mpmc_push(MPMCQueue *q, void *item) {
|
|
MPMCSlot *slot;
|
|
size_t pos;
|
|
|
|
for (;;) {
|
|
|
|
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
|
|
|
|
// ensure the slot is committed BEFORE accessing it
|
|
size_t committed =
|
|
atomic_load_explicit(&q->committed, memory_order_relaxed);
|
|
|
|
if (unlikely(pos >= committed)) {
|
|
mpmc_commit_more(q);
|
|
continue;
|
|
}
|
|
|
|
slot = &q->slots[pos & q->mask];
|
|
|
|
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
|
|
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
|
|
|
|
if (likely(diff == 0)) {
|
|
|
|
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
|
|
memory_order_relaxed,
|
|
memory_order_relaxed))
|
|
break;
|
|
|
|
} else if (diff < 0) { // queue actually full
|
|
|
|
Sleep(1000);
|
|
|
|
} else { // waiting to grow
|
|
|
|
Sleep(0);
|
|
}
|
|
}
|
|
|
|
slot->data = item;
|
|
|
|
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
|
|
|
|
plat_sem_post(&q->items_sem, 1);
|
|
}
|
|
|
|
// Increment work
|
|
static void mpmc_push_work(MPMCQueue *q, void *item) {
|
|
MPMCSlot *slot;
|
|
size_t pos;
|
|
|
|
for (;;) {
|
|
|
|
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
|
|
|
|
// ensure the slot is committed BEFORE accessing it
|
|
size_t committed =
|
|
atomic_load_explicit(&q->committed, memory_order_relaxed);
|
|
|
|
if (unlikely(pos >= committed)) {
|
|
mpmc_commit_more(q);
|
|
continue;
|
|
}
|
|
|
|
slot = &q->slots[pos & q->mask];
|
|
|
|
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
|
|
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
|
|
|
|
if (likely(diff == 0)) {
|
|
|
|
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
|
|
memory_order_relaxed,
|
|
memory_order_relaxed))
|
|
break;
|
|
|
|
} else if (diff < 0) { // queue actually full
|
|
|
|
Sleep(1000);
|
|
|
|
} else { // waiting to grow
|
|
|
|
Sleep(0);
|
|
}
|
|
}
|
|
|
|
slot->data = item;
|
|
|
|
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
|
|
|
|
atomic_fetch_add(&q->work_count, 1);
|
|
plat_sem_post(&q->items_sem, 1);
|
|
}
|
|
/* ----------------------------------------------------------- */
|
|
/* POP */
|
|
/* ----------------------------------------------------------- */
|
|
static void *mpmc_pop(MPMCQueue *q) {
|
|
|
|
plat_sem_wait(&q->items_sem);
|
|
|
|
MPMCSlot *slot;
|
|
size_t pos;
|
|
|
|
int spins = 0;
|
|
|
|
for (;;) {
|
|
|
|
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
|
|
slot = &q->slots[pos & q->mask];
|
|
|
|
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
|
|
intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
|
|
|
|
if (likely(diff == 0)) {
|
|
|
|
if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
|
|
memory_order_relaxed,
|
|
memory_order_relaxed))
|
|
break;
|
|
|
|
} else { // slot is still transitioning (written by another thread)
|
|
if (++spins > 10) {
|
|
Sleep(0); // yield CPU
|
|
spins = 0;
|
|
} else {
|
|
cpu_pause();
|
|
}
|
|
}
|
|
}
|
|
|
|
void *data = slot->data;
|
|
|
|
atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
|
|
|
|
return data;
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* PUSH POISON */
|
|
/* ----------------------------------------------------------- */
|
|
/*note:
|
|
After producers finishes, push N poison pills where N = number of consumer
|
|
threads, this is necessary to stop the consumers.
|
|
*/
|
|
|
|
static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
|
|
for (u8 i = 0; i < consumer_count; i++) {
|
|
mpmc_push(q, NULL);
|
|
}
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* Done */
|
|
/* ----------------------------------------------------------- */
|
|
static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
|
|
size_t prev = atomic_fetch_sub(&q->work_count, 1);
|
|
if (prev == 1) {
|
|
mpmc_producers_finished(q, consumer_count);
|
|
}
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* MPMC Cleanup */
|
|
/* ----------------------------------------------------------- */
|
|
static void mpmc_finish(MPMCQueue *q) {
|
|
if (!q)
|
|
return;
|
|
|
|
if (q->slots) {
|
|
plat_mem_release(q->slots, 0);
|
|
q->slots = NULL;
|
|
}
|
|
|
|
plat_sem_destroy(&q->items_sem);
|
|
|
|
q->capacity = 0;
|
|
q->mask = 0;
|
|
|
|
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
|
|
atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
|
|
atomic_store_explicit(&q->committed, 0, memory_order_relaxed);
|
|
}
|