Fix bug in mt_mpmc.c, in Linux mutexes are not recursive. Add arena_trim_string() to the arena API Removing arena->path, now paths are pushed to arena->metadata Replacing fe->owner[128] with char *owner; the owner is not pushed as a string to arena->metadata and trimed with arena_trim_string() Improving cache locality in arena->metadata, the memory layout is not fe; fe->path; fe->owner. Cache aligning all arenas except HasherContext->arena to sizeof(void *). Pushing elements one by one instead of snprintf() in finalize_file() and hash_worker(). Getting the full path of current directory instead of "." Fixing bug in path formating, this allow us to remove normalize_path() from the hot loop.
255 lines
5.7 KiB
C
255 lines
5.7 KiB
C
#pragma once
|
|
|
|
#include "base.h"
|
|
|
|
// Cache align abstraction
|
|
#define CACHELINE 64
|
|
|
|
#if defined(_MSC_VER)
|
|
#define CACHE_ALIGN __declspec(align(CACHELINE))
|
|
#else
|
|
#define CACHE_ALIGN __attribute__((aligned(CACHELINE)))
|
|
#endif
|
|
|
|
// Mutex/Critical section abstraction
|
|
#if defined(_WIN32)
|
|
#include <windows.h>
|
|
typedef CRITICAL_SECTION mtx_t;
|
|
typedef CONDITION_VARIABLE cond_t;
|
|
|
|
#define mtx_init(m) InitializeCriticalSection(m)
|
|
#define mtx_lock(m) EnterCriticalSection(m)
|
|
#define mtx_unlock(m) LeaveCriticalSection(m)
|
|
#define mtx_destroy(m) DeleteCriticalSection(m)
|
|
|
|
#define cond_init(c) InitializeConditionVariable(c)
|
|
#define cond_wait(c, m) SleepConditionVariableCS(c, m, INFINITE)
|
|
#define cond_signal(c) WakeConditionVariable(c)
|
|
#define cond_broadcast(c) WakeAllConditionVariable(c)
|
|
|
|
#else
|
|
#include <pthread.h>
|
|
typedef pthread_mutex_t mtx_t;
|
|
typedef pthread_cond_t cond_t;
|
|
|
|
#define mtx_init(m) pthread_mutex_init(m, NULL)
|
|
#define mtx_lock(m) pthread_mutex_lock(m)
|
|
#define mtx_unlock(m) pthread_mutex_unlock(m)
|
|
#define mtx_destroy(m) pthread_mutex_destroy(m)
|
|
|
|
#define cond_init(c) pthread_cond_init(c, NULL)
|
|
#define cond_wait(c, m) pthread_cond_wait(c, m)
|
|
#define cond_signal(c) pthread_cond_signal(c)
|
|
#define cond_broadcast(c) pthread_cond_broadcast(c)
|
|
|
|
#endif
|
|
|
|
typedef struct CACHE_ALIGN {
|
|
void *data;
|
|
char pad[64 - sizeof(void *)];
|
|
} MPMCSlot;
|
|
|
|
typedef struct {
|
|
CACHE_ALIGN size_t head;
|
|
CACHE_ALIGN size_t tail;
|
|
|
|
CACHE_ALIGN size_t count;
|
|
CACHE_ALIGN size_t work_count;
|
|
|
|
size_t capacity;
|
|
size_t mask;
|
|
|
|
size_t committed;
|
|
size_t commit_step;
|
|
|
|
mtx_t lock;
|
|
cond_t not_empty;
|
|
cond_t not_full;
|
|
|
|
MPMCSlot *slots;
|
|
} MPMCQueue;
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* INIT */
|
|
/* ----------------------------------------------------------- */
|
|
static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
|
|
q->capacity = max_capacity;
|
|
q->mask = max_capacity - 1;
|
|
|
|
size_t pagesize = plat_get_pagesize();
|
|
size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize);
|
|
|
|
q->slots = (MPMCSlot *)plat_mem_reserve(bytes);
|
|
if (!q->slots) {
|
|
fprintf(stderr, "plat_mem_reserve failed\n");
|
|
exit(1);
|
|
}
|
|
|
|
size_t commit_bytes = ALIGN_UP_POW2(pagesize, pagesize);
|
|
q->commit_step = commit_bytes / sizeof(MPMCSlot);
|
|
|
|
q->committed = q->commit_step;
|
|
plat_mem_commit(q->slots, commit_bytes);
|
|
|
|
for (size_t i = 0; i < q->committed; i++) {
|
|
q->slots[i].data = NULL;
|
|
}
|
|
|
|
q->head = 0;
|
|
q->tail = 0;
|
|
q->count = 0;
|
|
q->work_count = 0;
|
|
|
|
mtx_init(&q->lock);
|
|
cond_init(&q->not_empty);
|
|
cond_init(&q->not_full);
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* COMMIT MORE MEMORY */
|
|
/* ----------------------------------------------------------- */
|
|
static void mpmc_commit_more(MPMCQueue *q) {
|
|
size_t start = q->committed;
|
|
|
|
if (start >= q->capacity)
|
|
return;
|
|
|
|
size_t new_commit = start + q->commit_step;
|
|
if (new_commit > q->capacity)
|
|
new_commit = q->capacity;
|
|
|
|
size_t count = new_commit - start;
|
|
|
|
plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot));
|
|
|
|
for (size_t i = start; i < new_commit; i++) {
|
|
q->slots[i].data = NULL;
|
|
}
|
|
|
|
q->committed = new_commit;
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* PUSH */
|
|
/* ----------------------------------------------------------- */
|
|
// Does not increment work
|
|
static void mpmc_push(MPMCQueue *q, void *item) {
|
|
mtx_lock(&q->lock);
|
|
|
|
while (q->count == q->capacity) {
|
|
cond_wait(&q->not_full, &q->lock);
|
|
}
|
|
|
|
// Ensure committed
|
|
if (q->tail >= q->committed) {
|
|
mpmc_commit_more(q);
|
|
}
|
|
|
|
size_t pos = q->tail & q->mask;
|
|
|
|
q->slots[pos].data = item;
|
|
q->tail++;
|
|
q->count++;
|
|
|
|
cond_signal(&q->not_empty);
|
|
mtx_unlock(&q->lock);
|
|
}
|
|
|
|
// Increment work
|
|
static void mpmc_push_work(MPMCQueue *q, void *item) {
|
|
mtx_lock(&q->lock);
|
|
|
|
while (q->count == q->capacity) {
|
|
cond_wait(&q->not_full, &q->lock);
|
|
}
|
|
|
|
if (q->tail >= q->committed) {
|
|
mpmc_commit_more(q);
|
|
}
|
|
|
|
size_t pos = q->tail & q->mask;
|
|
|
|
q->slots[pos].data = item;
|
|
q->tail++;
|
|
q->count++;
|
|
q->work_count++;
|
|
|
|
cond_signal(&q->not_empty);
|
|
mtx_unlock(&q->lock);
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* POP */
|
|
/* ----------------------------------------------------------- */
|
|
static void *mpmc_pop(MPMCQueue *q) {
|
|
mtx_lock(&q->lock);
|
|
|
|
while (q->count == 0) {
|
|
cond_wait(&q->not_empty, &q->lock);
|
|
}
|
|
|
|
size_t pos = q->head & q->mask;
|
|
|
|
void *data = q->slots[pos].data;
|
|
|
|
q->head++;
|
|
q->count--;
|
|
|
|
cond_signal(&q->not_full);
|
|
mtx_unlock(&q->lock);
|
|
|
|
return data;
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* PUSH POISON */
|
|
/* ----------------------------------------------------------- */
|
|
static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
|
|
for (u8 i = 0; i < consumer_count; i++) {
|
|
mpmc_push(q, NULL);
|
|
}
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* Done */
|
|
/* ----------------------------------------------------------- */
|
|
static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
|
|
|
|
bool finished = false;
|
|
|
|
mtx_lock(&q->lock);
|
|
|
|
if (--q->work_count == 0) {
|
|
finished = true;
|
|
}
|
|
|
|
mtx_unlock(&q->lock);
|
|
|
|
if (finished) {
|
|
mpmc_producers_finished(q, consumer_count);
|
|
}
|
|
}
|
|
|
|
/* ----------------------------------------------------------- */
|
|
/* MPMC Cleanup */
|
|
/* ----------------------------------------------------------- */
|
|
// static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused
|
|
// function
|
|
// if (!q) return;
|
|
//
|
|
// if (q->slots) {
|
|
// plat_mem_release(q->slots, 0);
|
|
// q->slots = NULL;
|
|
// }
|
|
//
|
|
// mtx_destroy(&q->lock);
|
|
//
|
|
// #if !defined(_WIN32)
|
|
// pthread_cond_destroy(&q->not_empty);
|
|
// pthread_cond_destroy(&q->not_full);
|
|
// #endif
|
|
//
|
|
// q->capacity = 0;
|
|
// q->mask = 0;
|
|
// }
|