Files
filehasher/mt_mpmc.h
amir 16c6aeae65 Minor optimisations and bug fixes
Fix bug in mt_mpmc.c, in Linux mutexes are not recursive.
Add arena_trim_string() to the arena API
Removing arena->path, now paths are pushed to arena->metadata
Replacing fe->owner[128] with char *owner; the owner is not pushed as a
string to arena->metadata and trimed with arena_trim_string()
Improving cache locality in arena->metadata, the memory layout is not
fe; fe->path; fe->owner.
Cache aligning all arenas except HasherContext->arena to sizeof(void *).
Pushing elements one by one instead of snprintf() in finalize_file() and
hash_worker().
Getting the full path of current directory instead of "."
Fixing bug in path formating, this allow us to remove normalize_path()
from the hot loop.
2026-05-08 20:04:56 +01:00

255 lines
5.7 KiB
C

#pragma once
#include "base.h"
// Cache align abstraction
#define CACHELINE 64
#if defined(_MSC_VER)
#define CACHE_ALIGN __declspec(align(CACHELINE))
#else
#define CACHE_ALIGN __attribute__((aligned(CACHELINE)))
#endif
// Mutex/Critical section abstraction
#if defined(_WIN32)
#include <windows.h>
typedef CRITICAL_SECTION mtx_t;
typedef CONDITION_VARIABLE cond_t;
#define mtx_init(m) InitializeCriticalSection(m)
#define mtx_lock(m) EnterCriticalSection(m)
#define mtx_unlock(m) LeaveCriticalSection(m)
#define mtx_destroy(m) DeleteCriticalSection(m)
#define cond_init(c) InitializeConditionVariable(c)
#define cond_wait(c, m) SleepConditionVariableCS(c, m, INFINITE)
#define cond_signal(c) WakeConditionVariable(c)
#define cond_broadcast(c) WakeAllConditionVariable(c)
#else
#include <pthread.h>
typedef pthread_mutex_t mtx_t;
typedef pthread_cond_t cond_t;
#define mtx_init(m) pthread_mutex_init(m, NULL)
#define mtx_lock(m) pthread_mutex_lock(m)
#define mtx_unlock(m) pthread_mutex_unlock(m)
#define mtx_destroy(m) pthread_mutex_destroy(m)
#define cond_init(c) pthread_cond_init(c, NULL)
#define cond_wait(c, m) pthread_cond_wait(c, m)
#define cond_signal(c) pthread_cond_signal(c)
#define cond_broadcast(c) pthread_cond_broadcast(c)
#endif
typedef struct CACHE_ALIGN {
void *data;
char pad[64 - sizeof(void *)];
} MPMCSlot;
typedef struct {
CACHE_ALIGN size_t head;
CACHE_ALIGN size_t tail;
CACHE_ALIGN size_t count;
CACHE_ALIGN size_t work_count;
size_t capacity;
size_t mask;
size_t committed;
size_t commit_step;
mtx_t lock;
cond_t not_empty;
cond_t not_full;
MPMCSlot *slots;
} MPMCQueue;
/* ----------------------------------------------------------- */
/* INIT */
/* ----------------------------------------------------------- */
static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
q->capacity = max_capacity;
q->mask = max_capacity - 1;
size_t pagesize = plat_get_pagesize();
size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize);
q->slots = (MPMCSlot *)plat_mem_reserve(bytes);
if (!q->slots) {
fprintf(stderr, "plat_mem_reserve failed\n");
exit(1);
}
size_t commit_bytes = ALIGN_UP_POW2(pagesize, pagesize);
q->commit_step = commit_bytes / sizeof(MPMCSlot);
q->committed = q->commit_step;
plat_mem_commit(q->slots, commit_bytes);
for (size_t i = 0; i < q->committed; i++) {
q->slots[i].data = NULL;
}
q->head = 0;
q->tail = 0;
q->count = 0;
q->work_count = 0;
mtx_init(&q->lock);
cond_init(&q->not_empty);
cond_init(&q->not_full);
}
/* ----------------------------------------------------------- */
/* COMMIT MORE MEMORY */
/* ----------------------------------------------------------- */
static void mpmc_commit_more(MPMCQueue *q) {
size_t start = q->committed;
if (start >= q->capacity)
return;
size_t new_commit = start + q->commit_step;
if (new_commit > q->capacity)
new_commit = q->capacity;
size_t count = new_commit - start;
plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot));
for (size_t i = start; i < new_commit; i++) {
q->slots[i].data = NULL;
}
q->committed = new_commit;
}
/* ----------------------------------------------------------- */
/* PUSH */
/* ----------------------------------------------------------- */
// Does not increment work
static void mpmc_push(MPMCQueue *q, void *item) {
mtx_lock(&q->lock);
while (q->count == q->capacity) {
cond_wait(&q->not_full, &q->lock);
}
// Ensure committed
if (q->tail >= q->committed) {
mpmc_commit_more(q);
}
size_t pos = q->tail & q->mask;
q->slots[pos].data = item;
q->tail++;
q->count++;
cond_signal(&q->not_empty);
mtx_unlock(&q->lock);
}
// Increment work
static void mpmc_push_work(MPMCQueue *q, void *item) {
mtx_lock(&q->lock);
while (q->count == q->capacity) {
cond_wait(&q->not_full, &q->lock);
}
if (q->tail >= q->committed) {
mpmc_commit_more(q);
}
size_t pos = q->tail & q->mask;
q->slots[pos].data = item;
q->tail++;
q->count++;
q->work_count++;
cond_signal(&q->not_empty);
mtx_unlock(&q->lock);
}
/* ----------------------------------------------------------- */
/* POP */
/* ----------------------------------------------------------- */
static void *mpmc_pop(MPMCQueue *q) {
mtx_lock(&q->lock);
while (q->count == 0) {
cond_wait(&q->not_empty, &q->lock);
}
size_t pos = q->head & q->mask;
void *data = q->slots[pos].data;
q->head++;
q->count--;
cond_signal(&q->not_full);
mtx_unlock(&q->lock);
return data;
}
/* ----------------------------------------------------------- */
/* PUSH POISON */
/* ----------------------------------------------------------- */
static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
for (u8 i = 0; i < consumer_count; i++) {
mpmc_push(q, NULL);
}
}
/* ----------------------------------------------------------- */
/* Done */
/* ----------------------------------------------------------- */
static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
bool finished = false;
mtx_lock(&q->lock);
if (--q->work_count == 0) {
finished = true;
}
mtx_unlock(&q->lock);
if (finished) {
mpmc_producers_finished(q, consumer_count);
}
}
/* ----------------------------------------------------------- */
/* MPMC Cleanup */
/* ----------------------------------------------------------- */
// static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused
// function
// if (!q) return;
//
// if (q->slots) {
// plat_mem_release(q->slots, 0);
// q->slots = NULL;
// }
//
// mtx_destroy(&q->lock);
//
// #if !defined(_WIN32)
// pthread_cond_destroy(&q->not_empty);
// pthread_cond_destroy(&q->not_full);
// #endif
//
// q->capacity = 0;
// q->mask = 0;
// }