Project reordering and mpmc code
This commit is contained in:
246
mt_mpmc.h
Normal file
246
mt_mpmc.h
Normal file
@@ -0,0 +1,246 @@
|
||||
#pragma once
|
||||
|
||||
#include "base.h"
|
||||
|
||||
// Cache align abstraction
|
||||
#define CACHELINE 64
|
||||
|
||||
#if defined(_MSC_VER)
|
||||
#define CACHE_ALIGN __declspec(align(CACHELINE))
|
||||
#else
|
||||
#define CACHE_ALIGN __attribute__((aligned(CACHELINE)))
|
||||
#endif
|
||||
|
||||
// Mutex/Critical section abstraction
|
||||
#if defined(_WIN32)
|
||||
#include <windows.h>
|
||||
typedef CRITICAL_SECTION mtx_t;
|
||||
typedef CONDITION_VARIABLE cond_t;
|
||||
|
||||
#define mtx_init(m) InitializeCriticalSection(m)
|
||||
#define mtx_lock(m) EnterCriticalSection(m)
|
||||
#define mtx_unlock(m) LeaveCriticalSection(m)
|
||||
#define mtx_destroy(m) DeleteCriticalSection(m)
|
||||
|
||||
#define cond_init(c) InitializeConditionVariable(c)
|
||||
#define cond_wait(c, m) SleepConditionVariableCS(c, m, INFINITE)
|
||||
#define cond_signal(c) WakeConditionVariable(c)
|
||||
#define cond_broadcast(c) WakeAllConditionVariable(c)
|
||||
|
||||
#else
|
||||
#include <pthread.h>
|
||||
typedef pthread_mutex_t mtx_t;
|
||||
typedef pthread_cond_t cond_t;
|
||||
|
||||
#define mtx_init(m) pthread_mutex_init(m, NULL)
|
||||
#define mtx_lock(m) pthread_mutex_lock(m)
|
||||
#define mtx_unlock(m) pthread_mutex_unlock(m)
|
||||
#define mtx_destroy(m) pthread_mutex_destroy(m)
|
||||
|
||||
#define cond_init(c) pthread_cond_init(c, NULL)
|
||||
#define cond_wait(c, m) pthread_cond_wait(c, m)
|
||||
#define cond_signal(c) pthread_cond_signal(c)
|
||||
#define cond_broadcast(c) pthread_cond_broadcast(c)
|
||||
|
||||
#endif
|
||||
|
||||
typedef struct CACHE_ALIGN {
|
||||
void *data;
|
||||
char pad[64 - sizeof(void *)];
|
||||
} MPMCSlot;
|
||||
|
||||
typedef struct {
|
||||
CACHE_ALIGN size_t head;
|
||||
CACHE_ALIGN size_t tail;
|
||||
|
||||
CACHE_ALIGN size_t count;
|
||||
CACHE_ALIGN size_t work_count;
|
||||
|
||||
size_t capacity;
|
||||
size_t mask;
|
||||
|
||||
size_t committed;
|
||||
size_t commit_step;
|
||||
|
||||
mtx_t lock;
|
||||
cond_t not_empty;
|
||||
cond_t not_full;
|
||||
|
||||
MPMCSlot *slots;
|
||||
} MPMCQueue;
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
/* INIT */
|
||||
/* ----------------------------------------------------------- */
|
||||
static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
|
||||
q->capacity = max_capacity;
|
||||
q->mask = max_capacity - 1;
|
||||
|
||||
size_t pagesize = plat_get_pagesize();
|
||||
size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize);
|
||||
|
||||
q->slots = (MPMCSlot *)plat_mem_reserve(bytes);
|
||||
if (!q->slots) {
|
||||
fprintf(stderr, "plat_mem_reserve failed\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
size_t commit_bytes = ALIGN_UP_POW2(pagesize, pagesize);
|
||||
q->commit_step = commit_bytes / sizeof(MPMCSlot);
|
||||
|
||||
q->committed = q->commit_step;
|
||||
plat_mem_commit(q->slots, commit_bytes);
|
||||
|
||||
for (size_t i = 0; i < q->committed; i++) {
|
||||
q->slots[i].data = NULL;
|
||||
}
|
||||
|
||||
q->head = 0;
|
||||
q->tail = 0;
|
||||
q->count = 0;
|
||||
q->work_count = 0;
|
||||
|
||||
mtx_init(&q->lock);
|
||||
cond_init(&q->not_empty);
|
||||
cond_init(&q->not_full);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
/* COMMIT MORE MEMORY */
|
||||
/* ----------------------------------------------------------- */
|
||||
static void mpmc_commit_more(MPMCQueue *q) {
|
||||
size_t start = q->committed;
|
||||
|
||||
if (start >= q->capacity)
|
||||
return;
|
||||
|
||||
size_t new_commit = start + q->commit_step;
|
||||
if (new_commit > q->capacity)
|
||||
new_commit = q->capacity;
|
||||
|
||||
size_t count = new_commit - start;
|
||||
|
||||
plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot));
|
||||
|
||||
for (size_t i = start; i < new_commit; i++) {
|
||||
q->slots[i].data = NULL;
|
||||
}
|
||||
|
||||
q->committed = new_commit;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
/* PUSH */
|
||||
/* ----------------------------------------------------------- */
|
||||
// Does not increment work
|
||||
static void mpmc_push(MPMCQueue *q, void *item) {
|
||||
mtx_lock(&q->lock);
|
||||
|
||||
while (q->count == q->capacity) {
|
||||
cond_wait(&q->not_full, &q->lock);
|
||||
}
|
||||
|
||||
// Ensure committed
|
||||
if (q->tail >= q->committed) {
|
||||
mpmc_commit_more(q);
|
||||
}
|
||||
|
||||
size_t pos = q->tail & q->mask;
|
||||
|
||||
q->slots[pos].data = item;
|
||||
q->tail++;
|
||||
q->count++;
|
||||
|
||||
cond_signal(&q->not_empty);
|
||||
mtx_unlock(&q->lock);
|
||||
}
|
||||
|
||||
// Increment work
|
||||
static void mpmc_push_work(MPMCQueue *q, void *item) {
|
||||
mtx_lock(&q->lock);
|
||||
|
||||
while (q->count == q->capacity) {
|
||||
cond_wait(&q->not_full, &q->lock);
|
||||
}
|
||||
|
||||
if (q->tail >= q->committed) {
|
||||
mpmc_commit_more(q);
|
||||
}
|
||||
|
||||
size_t pos = q->tail & q->mask;
|
||||
|
||||
q->slots[pos].data = item;
|
||||
q->tail++;
|
||||
q->count++;
|
||||
q->work_count++;
|
||||
|
||||
cond_signal(&q->not_empty);
|
||||
mtx_unlock(&q->lock);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
/* POP */
|
||||
/* ----------------------------------------------------------- */
|
||||
static void *mpmc_pop(MPMCQueue *q) {
|
||||
mtx_lock(&q->lock);
|
||||
|
||||
while (q->count == 0) {
|
||||
cond_wait(&q->not_empty, &q->lock);
|
||||
}
|
||||
|
||||
size_t pos = q->head & q->mask;
|
||||
|
||||
void *data = q->slots[pos].data;
|
||||
|
||||
q->head++;
|
||||
q->count--;
|
||||
|
||||
cond_signal(&q->not_full);
|
||||
mtx_unlock(&q->lock);
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
/* PUSH POISON */
|
||||
/* ----------------------------------------------------------- */
|
||||
static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
|
||||
for (u8 i = 0; i < consumer_count; i++) {
|
||||
mpmc_push(q, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
/* Done */
|
||||
/* ----------------------------------------------------------- */
|
||||
static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
|
||||
mtx_lock(&q->lock);
|
||||
|
||||
if (--q->work_count == 0) {
|
||||
mpmc_producers_finished(q, consumer_count);
|
||||
}
|
||||
|
||||
mtx_unlock(&q->lock);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
/* MPMC Cleanup */
|
||||
/* ----------------------------------------------------------- */
|
||||
// static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused function
|
||||
// if (!q) return;
|
||||
//
|
||||
// if (q->slots) {
|
||||
// plat_mem_release(q->slots, 0);
|
||||
// q->slots = NULL;
|
||||
// }
|
||||
//
|
||||
// mtx_destroy(&q->lock);
|
||||
//
|
||||
// #if !defined(_WIN32)
|
||||
// pthread_cond_destroy(&q->not_empty);
|
||||
// pthread_cond_destroy(&q->not_full);
|
||||
// #endif
|
||||
//
|
||||
// q->capacity = 0;
|
||||
// q->mask = 0;
|
||||
// }
|
||||
Reference in New Issue
Block a user