#pragma once #include "base.h" // Cache align abstraction #define CACHELINE 64 #if defined(_MSC_VER) #define CACHE_ALIGN __declspec(align(CACHELINE)) #else #define CACHE_ALIGN __attribute__((aligned(CACHELINE))) #endif // Mutex/Critical section abstraction #if defined(_WIN32) #include typedef CRITICAL_SECTION mtx_t; typedef CONDITION_VARIABLE cond_t; #define mtx_init(m) InitializeCriticalSection(m) #define mtx_lock(m) EnterCriticalSection(m) #define mtx_unlock(m) LeaveCriticalSection(m) #define mtx_destroy(m) DeleteCriticalSection(m) #define cond_init(c) InitializeConditionVariable(c) #define cond_wait(c, m) SleepConditionVariableCS(c, m, INFINITE) #define cond_signal(c) WakeConditionVariable(c) #define cond_broadcast(c) WakeAllConditionVariable(c) #else #include typedef pthread_mutex_t mtx_t; typedef pthread_cond_t cond_t; #define mtx_init(m) pthread_mutex_init(m, NULL) #define mtx_lock(m) pthread_mutex_lock(m) #define mtx_unlock(m) pthread_mutex_unlock(m) #define mtx_destroy(m) pthread_mutex_destroy(m) #define cond_init(c) pthread_cond_init(c, NULL) #define cond_wait(c, m) pthread_cond_wait(c, m) #define cond_signal(c) pthread_cond_signal(c) #define cond_broadcast(c) pthread_cond_broadcast(c) #endif typedef struct CACHE_ALIGN { void *data; char pad[64 - sizeof(void *)]; } MPMCSlot; typedef struct { CACHE_ALIGN size_t head; CACHE_ALIGN size_t tail; CACHE_ALIGN size_t count; CACHE_ALIGN size_t work_count; size_t capacity; size_t mask; size_t committed; size_t commit_step; mtx_t lock; cond_t not_empty; cond_t not_full; MPMCSlot *slots; } MPMCQueue; /* ----------------------------------------------------------- */ /* INIT */ /* ----------------------------------------------------------- */ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { q->capacity = max_capacity; q->mask = max_capacity - 1; size_t pagesize = plat_get_pagesize(); size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize); q->slots = (MPMCSlot *)plat_mem_reserve(bytes); if (!q->slots) { fprintf(stderr, "plat_mem_reserve failed\n"); exit(1); } size_t commit_bytes = ALIGN_UP_POW2(pagesize, pagesize); q->commit_step = commit_bytes / sizeof(MPMCSlot); q->committed = q->commit_step; plat_mem_commit(q->slots, commit_bytes); for (size_t i = 0; i < q->committed; i++) { q->slots[i].data = NULL; } q->head = 0; q->tail = 0; q->count = 0; q->work_count = 0; mtx_init(&q->lock); cond_init(&q->not_empty); cond_init(&q->not_full); } /* ----------------------------------------------------------- */ /* COMMIT MORE MEMORY */ /* ----------------------------------------------------------- */ static void mpmc_commit_more(MPMCQueue *q) { size_t start = q->committed; if (start >= q->capacity) return; size_t new_commit = start + q->commit_step; if (new_commit > q->capacity) new_commit = q->capacity; size_t count = new_commit - start; plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot)); for (size_t i = start; i < new_commit; i++) { q->slots[i].data = NULL; } q->committed = new_commit; } /* ----------------------------------------------------------- */ /* PUSH */ /* ----------------------------------------------------------- */ // Does not increment work static void mpmc_push(MPMCQueue *q, void *item) { mtx_lock(&q->lock); while (q->count == q->capacity) { cond_wait(&q->not_full, &q->lock); } // Ensure committed if (q->tail >= q->committed) { mpmc_commit_more(q); } size_t pos = q->tail & q->mask; q->slots[pos].data = item; q->tail++; q->count++; cond_signal(&q->not_empty); mtx_unlock(&q->lock); } // Increment work static void mpmc_push_work(MPMCQueue *q, void *item) { mtx_lock(&q->lock); while (q->count == q->capacity) { cond_wait(&q->not_full, &q->lock); } if (q->tail >= q->committed) { mpmc_commit_more(q); } size_t pos = q->tail & q->mask; q->slots[pos].data = item; q->tail++; q->count++; q->work_count++; cond_signal(&q->not_empty); mtx_unlock(&q->lock); } /* ----------------------------------------------------------- */ /* POP */ /* ----------------------------------------------------------- */ static void *mpmc_pop(MPMCQueue *q) { mtx_lock(&q->lock); while (q->count == 0) { cond_wait(&q->not_empty, &q->lock); } size_t pos = q->head & q->mask; void *data = q->slots[pos].data; q->head++; q->count--; cond_signal(&q->not_full); mtx_unlock(&q->lock); return data; } /* ----------------------------------------------------------- */ /* PUSH POISON */ /* ----------------------------------------------------------- */ static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) { for (u8 i = 0; i < consumer_count; i++) { mpmc_push(q, NULL); } } /* ----------------------------------------------------------- */ /* Done */ /* ----------------------------------------------------------- */ static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) { mtx_lock(&q->lock); if (--q->work_count == 0) { mpmc_producers_finished(q, consumer_count); } mtx_unlock(&q->lock); } /* ----------------------------------------------------------- */ /* MPMC Cleanup */ /* ----------------------------------------------------------- */ // static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused function // if (!q) return; // // if (q->slots) { // plat_mem_release(q->slots, 0); // q->slots = NULL; // } // // mtx_destroy(&q->lock); // // #if !defined(_WIN32) // pthread_cond_destroy(&q->not_empty); // pthread_cond_destroy(&q->not_full); // #endif // // q->capacity = 0; // q->mask = 0; // }