#pragma once #include "base.h" #define CACHELINE 64 #if defined(_MSC_VER) #define CACHE_ALIGN __declspec(align(CACHELINE)) #else #define CACHE_ALIGN __attribute__((aligned(CACHELINE))) #endif #if defined(__GNUC__) || defined(__clang__) #define likely(x) __builtin_expect((x), 1) #define unlikely(x) __builtin_expect((x), 0) #else #define likely(x) (x) #define unlikely(x) (x) #endif static void cpu_pause(void) { #if defined(_MSC_VER) || defined(__x86_64__) || defined(__i386__) _mm_pause(); #endif } typedef struct plat_sem plat_sem; typedef struct CACHE_ALIGN { atomic_size_t seq; void *data; char pad[64 - sizeof(atomic_size_t) - sizeof(void *)]; } MPMCSlot; typedef struct { CACHE_ALIGN atomic_size_t head; CACHE_ALIGN atomic_size_t tail; CACHE_ALIGN atomic_size_t work_count; size_t capacity; size_t mask; atomic_size_t committed; size_t commit_step; atomic_flag commit_lock; plat_sem items_sem; MPMCSlot *slots; } MPMCQueue; // --------------- functions ---------------- // static: each translation unit gets its own private copy this will solve the // error: Function defined in a header file; function definitions in header // files can lead to ODR violations (multiple definition errors if included in // more than one file) /* ----------------------------------------------------------- */ /* INIT */ /* ----------------------------------------------------------- */ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { q->capacity = max_capacity; q->mask = max_capacity - 1; u32 pagesize = plat_get_pagesize(); size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize); q->slots = (MPMCSlot *)plat_mem_reserve(bytes); if (!q->slots) { fprintf(stderr, "VirtualAlloc reserve failed\n"); exit(1); } u64 commit_bytes = pagesize; commit_bytes = ALIGN_UP_POW2(commit_bytes, pagesize); q->commit_step = commit_bytes / sizeof(MPMCSlot); atomic_flag_clear(&q->commit_lock); q->committed = q->commit_step; plat_mem_commit(q->slots, commit_bytes); for (size_t i = 0; i < q->committed; i++) { atomic_init(&q->slots[i].seq, i); q->slots[i].data = NULL; } atomic_init(&q->head, 0); atomic_init(&q->tail, 0); atomic_init(&q->work_count, 0); plat_sem_init(&q->items_sem, 0); } /* ----------------------------------------------------------- */ /* COMMIT MORE MEMORY */ /* ----------------------------------------------------------- */ static void mpmc_commit_more(MPMCQueue *q) { if (atomic_flag_test_and_set(&q->commit_lock)) return; size_t start = atomic_load_explicit(&q->committed, memory_order_acquire); size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); // another thread already committed enough if (tail < start) { atomic_flag_clear(&q->commit_lock); return; } if (start >= q->capacity) { atomic_flag_clear(&q->commit_lock); return; } size_t new_commit = start + q->commit_step; if (new_commit > q->capacity) new_commit = q->capacity; size_t count = new_commit - start; plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot)); for (size_t i = start; i < new_commit; i++) { atomic_init(&q->slots[i].seq, i); q->slots[i].data = NULL; } atomic_store_explicit(&q->committed, new_commit, memory_order_release); atomic_flag_clear(&q->commit_lock); } /* ----------------------------------------------------------- */ /* PUSH */ /* ----------------------------------------------------------- */ // Does not increment work static void mpmc_push(MPMCQueue *q, void *item) { MPMCSlot *slot; size_t pos; for (;;) { pos = atomic_load_explicit(&q->tail, memory_order_relaxed); // ensure the slot is committed BEFORE accessing it size_t committed = atomic_load_explicit(&q->committed, memory_order_relaxed); if (unlikely(pos >= committed)) { mpmc_commit_more(q); continue; } slot = &q->slots[pos & q->mask]; size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)pos; if (likely(diff == 0)) { if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, memory_order_relaxed, memory_order_relaxed)) break; } else if (diff < 0) { // queue actually full sleep_ms(1000); } else { // waiting to grow sleep_ms(0); } } slot->data = item; atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); plat_sem_post(&q->items_sem, 1); } // Increment work static void mpmc_push_work(MPMCQueue *q, void *item) { MPMCSlot *slot; size_t pos; for (;;) { pos = atomic_load_explicit(&q->tail, memory_order_relaxed); // ensure the slot is committed BEFORE accessing it size_t committed = atomic_load_explicit(&q->committed, memory_order_relaxed); if (unlikely(pos >= committed)) { mpmc_commit_more(q); continue; } slot = &q->slots[pos & q->mask]; size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)pos; if (likely(diff == 0)) { if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, memory_order_relaxed, memory_order_relaxed)) break; } else if (diff < 0) { // queue actually full sleep_ms(1000); } else { // waiting to grow sleep_ms(0); } } slot->data = item; atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); atomic_fetch_add(&q->work_count, 1); plat_sem_post(&q->items_sem, 1); } /* ----------------------------------------------------------- */ /* POP */ /* ----------------------------------------------------------- */ static void *mpmc_pop(MPMCQueue *q) { plat_sem_wait(&q->items_sem); MPMCSlot *slot; size_t pos; int spins = 0; for (;;) { pos = atomic_load_explicit(&q->head, memory_order_relaxed); slot = &q->slots[pos & q->mask]; size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); if (likely(diff == 0)) { if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, memory_order_relaxed, memory_order_relaxed)) break; } else { // slot is still transitioning (written by another thread) if (++spins > 10) { sleep_ms(0); // yield CPU spins = 0; } else { cpu_pause(); } } } void *data = slot->data; atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); return data; } /* ----------------------------------------------------------- */ /* PUSH POISON */ /* ----------------------------------------------------------- */ /*note: After producers finishes, push N poison pills where N = number of consumer threads, this is necessary to stop the consumers. */ static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) { for (u8 i = 0; i < consumer_count; i++) { mpmc_push(q, NULL); } } /* ----------------------------------------------------------- */ /* Done */ /* ----------------------------------------------------------- */ static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) { size_t prev = atomic_fetch_sub(&q->work_count, 1); if (prev == 1) { mpmc_producers_finished(q, consumer_count); } } /* ----------------------------------------------------------- */ /* MPMC Cleanup */ /* ----------------------------------------------------------- */ static void mpmc_finish(MPMCQueue *q) { if (!q) return; if (q->slots) { plat_mem_release(q->slots, 0); q->slots = NULL; } plat_sem_destroy(&q->items_sem); q->capacity = 0; q->mask = 0; atomic_store_explicit(&q->head, 0, memory_order_relaxed); atomic_store_explicit(&q->tail, 0, memory_order_relaxed); atomic_store_explicit(&q->committed, 0, memory_order_relaxed); }