Implementing a semaphore in the MPMC queue to wake up consumers
This commit is contained in:
38
lf_mpmc.h
38
lf_mpmc.h
@@ -40,6 +40,8 @@ typedef struct {
|
||||
size_t commit_step;
|
||||
atomic_flag commit_lock;
|
||||
|
||||
HANDLE items_sem;
|
||||
|
||||
MPMCSlot *slots;
|
||||
} MPMCQueue;
|
||||
|
||||
@@ -53,33 +55,29 @@ typedef struct {
|
||||
/* INIT */
|
||||
/* ----------------------------------------------------------- */
|
||||
static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
|
||||
if (!max_capacity) {
|
||||
fprintf(stderr, "capacity must positive\n");
|
||||
if ((max_capacity & (max_capacity - 1)) != 0) {
|
||||
fprintf(stderr, "capacity must be power of two\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
u32 pagesize = plat_get_pagesize();
|
||||
|
||||
max_capacity = ALIGN_UP_POW2(max_capacity, pagesize);
|
||||
|
||||
q->capacity = max_capacity;
|
||||
q->mask = max_capacity - 1;
|
||||
|
||||
size_t bytes = sizeof(MPMCSlot) * max_capacity;
|
||||
|
||||
q->slots = (MPMCSlot *)VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE);
|
||||
q->slots = (MPMCSlot *)plat_mem_reserve(bytes);
|
||||
|
||||
if (!q->slots) {
|
||||
fprintf(stderr, "VirtualAlloc reserve failed\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
q->commit_step = pagesize;
|
||||
q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot);
|
||||
atomic_flag_clear(&q->commit_lock);
|
||||
|
||||
q->committed = q->commit_step;
|
||||
VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT,
|
||||
PAGE_READWRITE);
|
||||
|
||||
plat_mem_commit(q->slots, q->commit_step * sizeof(MPMCSlot));
|
||||
|
||||
for (size_t i = 0; i < q->committed; i++) {
|
||||
atomic_init(&q->slots[i].seq, i);
|
||||
@@ -88,6 +86,8 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
|
||||
|
||||
atomic_init(&q->head, 0);
|
||||
atomic_init(&q->tail, 0);
|
||||
|
||||
q->items_sem = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
@@ -118,8 +118,7 @@ static void mpmc_commit_more(MPMCQueue *q) {
|
||||
|
||||
size_t count = new_commit - start;
|
||||
|
||||
VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT,
|
||||
PAGE_READWRITE);
|
||||
plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot));
|
||||
|
||||
for (size_t i = start; i < new_commit; i++) {
|
||||
atomic_init(&q->slots[i].seq, i);
|
||||
@@ -176,12 +175,17 @@ static void mpmc_push(MPMCQueue *q, void *item) {
|
||||
slot->data = item;
|
||||
|
||||
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
|
||||
|
||||
ReleaseSemaphore(q->items_sem, 1, NULL);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------- */
|
||||
/* POP */
|
||||
/* ----------------------------------------------------------- */
|
||||
static void *mpmc_pop(MPMCQueue *q) {
|
||||
|
||||
WaitForSingleObject(q->items_sem, INFINITE);
|
||||
|
||||
MPMCSlot *slot;
|
||||
size_t pos;
|
||||
|
||||
@@ -202,12 +206,7 @@ static void *mpmc_pop(MPMCQueue *q) {
|
||||
memory_order_relaxed))
|
||||
break;
|
||||
|
||||
} else if (diff < 0) { // queue is empty
|
||||
|
||||
Sleep(500);
|
||||
|
||||
} else { // slot is still transitioning (written by another thread)
|
||||
|
||||
if (++spins > 10) {
|
||||
SwitchToThread(); // yield CPU
|
||||
spins = 0;
|
||||
@@ -246,10 +245,13 @@ static void mpmc_finish(MPMCQueue *q) {
|
||||
return;
|
||||
|
||||
if (q->slots) {
|
||||
VirtualFree(q->slots, 0, MEM_RELEASE);
|
||||
plat_mem_release(q->slots, 0);
|
||||
q->slots = NULL;
|
||||
}
|
||||
|
||||
if (q->items_sem)
|
||||
CloseHandle(q->items_sem);
|
||||
|
||||
q->capacity = 0;
|
||||
q->mask = 0;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user