#pragma once #include "base.h" // Cache align abstraction #define CACHELINE 64 #if defined(_MSC_VER) #define CACHE_ALIGN __declspec(align(CACHELINE)) #else #define CACHE_ALIGN __attribute__((aligned(CACHELINE))) #endif // Compiler hints #if defined(__GNUC__) || defined(__clang__) #define likely(x) __builtin_expect((x), 1) #define unlikely(x) __builtin_expect((x), 0) #else #define likely(x) (x) #define unlikely(x) (x) #endif static void cpu_pause(void) { #if defined(_MSC_VER) || defined(__x86_64__) || defined(__i386__) _mm_pause(); #endif } // Semaphores #if defined(_WIN32) || defined(_WIN64) typedef struct plat_sem { HANDLE handle; } plat_sem; static b32 plat_sem_init(plat_sem *s, u32 initial) { s->handle = CreateSemaphore(NULL, initial, LONG_MAX, NULL); return s->handle != NULL; } static void plat_sem_wait(plat_sem *s) { WaitForSingleObject(s->handle, INFINITE); } // static b32 plat_sem_trywait(HANDLE sem) { // Comment to prevent warning: unused function // DWORD r = WaitForSingleObject(sem, 0); // return r == WAIT_OBJECT_0; // } static void plat_sem_post(plat_sem *s, u32 count) { ReleaseSemaphore(s->handle, count, NULL); } // static void plat_sem_destroy(plat_sem *s) { // Comment to prevent warning: unused function // if (s->handle) { // CloseHandle(s->handle); // s->handle = NULL; // } // } #elif defined(__linux__) #include typedef struct plat_sem { sem_t sem; } plat_sem; static b32 plat_sem_init(plat_sem *s, u32 initial) { return sem_init(&s->sem, 0, initial) == 0; } static void plat_sem_wait(plat_sem *s) { while (sem_wait(&s->sem) == -1 && errno == EINTR) { } } // static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; } // Comment to prevent warning: unused function static void plat_sem_post(plat_sem *s, u32 count) { for (u32 i = 0; i < count; i++) { sem_post(&s->sem); } } // static void plat_sem_destroy(plat_sem *s) { sem_destroy(&s->sem); } // Comment to prevent warning: unused function #endif typedef struct plat_sem plat_sem; typedef struct CACHE_ALIGN { atomic_size_t seq; void *data; char pad[64 - sizeof(atomic_size_t) - sizeof(void *)]; } MPMCSlot; typedef struct { CACHE_ALIGN atomic_size_t head; CACHE_ALIGN atomic_size_t tail; CACHE_ALIGN atomic_size_t work_count; size_t capacity; size_t mask; atomic_size_t committed; size_t commit_step; atomic_flag commit_lock; plat_sem items_sem; MPMCSlot *slots; } MPMCQueue; // --------------- functions ---------------- // static: each translation unit gets its own private copy this will solve the // error: Function defined in a header file; function definitions in header // files can lead to ODR violations (multiple definition errors if included in // more than one file) /* ----------------------------------------------------------- */ /* INIT */ /* ----------------------------------------------------------- */ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { q->capacity = max_capacity; q->mask = max_capacity - 1; u32 pagesize = plat_get_pagesize(); size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize); q->slots = (MPMCSlot *)plat_mem_reserve(bytes); if (!q->slots) { fprintf(stderr, "VirtualAlloc reserve failed\n"); exit(1); } u64 commit_bytes = pagesize; commit_bytes = ALIGN_UP_POW2(commit_bytes, pagesize); q->commit_step = commit_bytes / sizeof(MPMCSlot); atomic_flag_clear(&q->commit_lock); q->committed = q->commit_step; plat_mem_commit(q->slots, commit_bytes); for (size_t i = 0; i < q->committed; i++) { atomic_init(&q->slots[i].seq, i); q->slots[i].data = NULL; } atomic_init(&q->head, 0); atomic_init(&q->tail, 0); atomic_init(&q->work_count, 0); plat_sem_init(&q->items_sem, 0); } /* ----------------------------------------------------------- */ /* COMMIT MORE MEMORY */ /* ----------------------------------------------------------- */ static void mpmc_commit_more(MPMCQueue *q) { if (atomic_flag_test_and_set(&q->commit_lock)) return; size_t start = atomic_load_explicit(&q->committed, memory_order_acquire); size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); // another thread already committed enough if (tail < start) { atomic_flag_clear(&q->commit_lock); return; } if (start >= q->capacity) { atomic_flag_clear(&q->commit_lock); return; } size_t new_commit = start + q->commit_step; if (new_commit > q->capacity) new_commit = q->capacity; size_t count = new_commit - start; plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot)); for (size_t i = start; i < new_commit; i++) { atomic_init(&q->slots[i].seq, i); q->slots[i].data = NULL; } atomic_store_explicit(&q->committed, new_commit, memory_order_release); atomic_flag_clear(&q->commit_lock); } /* ----------------------------------------------------------- */ /* PUSH */ /* ----------------------------------------------------------- */ // Does not increment work static void mpmc_push(MPMCQueue *q, void *item) { MPMCSlot *slot; size_t pos; for (;;) { pos = atomic_load_explicit(&q->tail, memory_order_relaxed); // ensure the slot is committed BEFORE accessing it size_t committed = atomic_load_explicit(&q->committed, memory_order_relaxed); if (unlikely(pos >= committed)) { mpmc_commit_more(q); continue; } slot = &q->slots[pos & q->mask]; size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)pos; if (likely(diff == 0)) { if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, memory_order_relaxed, memory_order_relaxed)) break; } else if (diff < 0) { // queue actually full sleep_ms(1000); } else { // waiting to grow sleep_ms(0); } } slot->data = item; atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); plat_sem_post(&q->items_sem, 1); } // Increment work static void mpmc_push_work(MPMCQueue *q, void *item) { MPMCSlot *slot; size_t pos; for (;;) { pos = atomic_load_explicit(&q->tail, memory_order_relaxed); // ensure the slot is committed BEFORE accessing it size_t committed = atomic_load_explicit(&q->committed, memory_order_relaxed); if (unlikely(pos >= committed)) { mpmc_commit_more(q); continue; } slot = &q->slots[pos & q->mask]; size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)pos; if (likely(diff == 0)) { if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, memory_order_relaxed, memory_order_relaxed)) break; } else if (diff < 0) { // queue actually full sleep_ms(1000); } else { // waiting to grow sleep_ms(0); } } slot->data = item; atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); atomic_fetch_add(&q->work_count, 1); plat_sem_post(&q->items_sem, 1); } /* ----------------------------------------------------------- */ /* POP */ /* ----------------------------------------------------------- */ static void *mpmc_pop(MPMCQueue *q) { plat_sem_wait(&q->items_sem); MPMCSlot *slot; size_t pos; int spins = 0; for (;;) { pos = atomic_load_explicit(&q->head, memory_order_relaxed); slot = &q->slots[pos & q->mask]; size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); if (likely(diff == 0)) { if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, memory_order_relaxed, memory_order_relaxed)) break; } else { // slot is still transitioning (written by another thread) if (++spins > 10) { sleep_ms(0); // yield CPU spins = 0; } else { cpu_pause(); } } } void *data = slot->data; atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); return data; } /* ----------------------------------------------------------- */ /* PUSH POISON */ /* ----------------------------------------------------------- */ /*note: After producers finishes, push N poison pills where N = number of consumer threads, this is necessary to stop the consumers. */ static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) { for (u8 i = 0; i < consumer_count; i++) { mpmc_push(q, NULL); } } /* ----------------------------------------------------------- */ /* Done */ /* ----------------------------------------------------------- */ static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) { size_t prev = atomic_fetch_sub(&q->work_count, 1); if (prev == 1) { mpmc_producers_finished(q, consumer_count); } } /* ----------------------------------------------------------- */ /* MPMC Cleanup */ /* ----------------------------------------------------------- */ // static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused function // if (!q) // return; // // if (q->slots) { // plat_mem_release(q->slots, 0); // q->slots = NULL; // } // // plat_sem_destroy(&q->items_sem); // // q->capacity = 0; // q->mask = 0; // // atomic_store_explicit(&q->head, 0, memory_order_relaxed); // atomic_store_explicit(&q->tail, 0, memory_order_relaxed); // atomic_store_explicit(&q->committed, 0, memory_order_relaxed); // }