Replacing Malloc and strdup in scan helper function with FileEntry and path arenas

This commit is contained in:
2026-03-11 16:17:22 +01:00
parent aef070192f
commit 0e3ec5b09c
4 changed files with 189 additions and 118 deletions

View File

@@ -35,3 +35,5 @@ Making the MPMC queue platform agnostic
Align the MPMC queue to pagesize
Getting file size from FindFirstFileA() instead of CreateFileA(), since we already call FindFirstFileA() and it returns the size there is no need to open/close every file to get it's size
Replacing Malloc and strdup in scan helper function with FileEntry and path arenas

149
lf_mpmc.h
View File

@@ -36,6 +36,8 @@ typedef struct {
CACHE_ALIGN atomic_size_t head;
CACHE_ALIGN atomic_size_t tail;
CACHE_ALIGN atomic_size_t work_count;
size_t capacity;
size_t mask;
@@ -91,6 +93,7 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
atomic_init(&q->work_count, 0);
plat_sem_init(&q->items_sem, 0);
}
@@ -138,6 +141,7 @@ static void mpmc_commit_more(MPMCQueue *q) {
/* ----------------------------------------------------------- */
/* PUSH */
/* ----------------------------------------------------------- */
// Does not increment work
static void mpmc_push(MPMCQueue *q, void *item) {
MPMCSlot *slot;
size_t pos;
@@ -184,8 +188,55 @@ static void mpmc_push(MPMCQueue *q, void *item) {
plat_sem_post(&q->items_sem, 1);
}
// Increment work
static void mpmc_push_work(MPMCQueue *q, void *item) {
MPMCSlot *slot;
size_t pos;
for (;;) {
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
// ensure the slot is committed BEFORE accessing it
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (unlikely(pos >= committed)) {
mpmc_commit_more(q);
continue;
}
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (likely(diff == 0)) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) { // queue actually full
Sleep(1000);
} else { // waiting to grow
Sleep(0);
}
}
slot->data = item;
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
atomic_fetch_add(&q->work_count, 1);
plat_sem_post(&q->items_sem, 1);
}
/* ----------------------------------------------------------- */
/* POP (blocking with semaphore) */
/* POP */
/* ----------------------------------------------------------- */
static void *mpmc_pop(MPMCQueue *q) {
@@ -213,7 +264,7 @@ static void *mpmc_pop(MPMCQueue *q) {
} else { // slot is still transitioning (written by another thread)
if (++spins > 10) {
SwitchToThread(); // yield CPU
Sleep(0); // yield CPU
spins = 0;
} else {
cpu_pause();
@@ -231,48 +282,48 @@ static void *mpmc_pop(MPMCQueue *q) {
/* ----------------------------------------------------------- */
/* TRY POP (non blocking) */
/* ----------------------------------------------------------- */
static b32 mpmc_try_pop(MPMCQueue *q, void **out) {
if (!plat_sem_trywait(&q->items_sem))
return false;
MPMCSlot *slot;
size_t pos;
int spins = 0;
for (;;) {
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
if (likely(diff == 0)) {
if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else {
if (++spins > 10) {
SwitchToThread();
spins = 0;
} else {
cpu_pause();
}
}
}
*out = slot->data;
atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
return true;
}
// static b32 mpmc_try_pop(MPMCQueue *q, void **out) {
//
// if (!plat_sem_trywait(&q->items_sem))
// return false;
//
// MPMCSlot *slot;
// size_t pos;
//
// int spins = 0;
//
// for (;;) {
//
// pos = atomic_load_explicit(&q->head, memory_order_relaxed);
// slot = &q->slots[pos & q->mask];
//
// size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
// intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
//
// if (likely(diff == 0)) {
//
// if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
// memory_order_relaxed,
// memory_order_relaxed))
// break;
//
// } else {
//
// if (++spins > 10) {
// SwitchToThread();
// spins = 0;
// } else {
// cpu_pause();
// }
// }
// }
//
// *out = slot->data;
//
// atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
//
// return true;
// }
/* ----------------------------------------------------------- */
/* PUSH POISON */
@@ -288,6 +339,16 @@ static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
}
}
/* ----------------------------------------------------------- */
/* Done */
/* ----------------------------------------------------------- */
static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
size_t prev = atomic_fetch_sub(&q->work_count, 1);
if (prev == 1) {
mpmc_producers_finished(q, consumer_count);
}
}
/* ----------------------------------------------------------- */
/* MPMC Cleanup */
/* ----------------------------------------------------------- */

View File

@@ -45,23 +45,6 @@ static double timer_stop(HiResTimer *t) {
(double)g_qpc_freq.QuadPart;
}
// MPMC Queue
static MPMCQueue g_dir_queue;
static MPMCQueue g_file_queue;
typedef struct {
mem_arena *path_arena;
mem_arena *meta_arena;
MPMCQueue *dir_queue;
MPMCQueue *file_queue;
} ScannerContext;
typedef struct {
MPMCQueue *queue;
mem_arena *arena;
} WorkerContext;
/* Scan folders */
typedef struct DirQueue DirQueue;
@@ -83,6 +66,22 @@ typedef struct DirQueue {
#endif
} DirQueue;
// MPMC Queue
static MPMCQueue g_dir_queue;
static MPMCQueue g_file_queue;
typedef struct {
DirQueue *dir_queue;
mem_arena *path_arena;
mem_arena *meta_arena;
} ScannerContext;
typedef struct {
mem_arena *arena;
} WorkerContext;
// void scan_folder_windows_parallel(const char *base, ScannerContext *ctx);
// void scan_folder_posix_parallel(const char *base, ScannerContext *ctx);
void scan_folder_windows_parallel(const char *base, DirQueue *q);
//

View File

@@ -145,25 +145,9 @@ static void dirqueue_done(DirQueue *q) {
WakeAllConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs);
}
static DWORD WINAPI scan_worker(LPVOID arg) {
DirQueue *q = (DirQueue *)arg;
for (;;) {
char *dir = dirqueue_pop(q);
if (!dir)
break;
scan_folder_windows_parallel(dir, q);
free(dir);
dirqueue_done(q);
}
return 0;
}
// Scanning directory function
void scan_folder_windows_parallel(const char *base, DirQueue *q) {
// ----------------------------- Scan helpers -----------------------------
void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
char search[MAX_PATHLEN];
snprintf(search, sizeof(search), "%s\\*", base);
@@ -183,20 +167,24 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q) {
continue;
if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
dirqueue_push(q, full);
dirqueue_push(ctx->dir_queue, full);
} else {
atomic_fetch_add(&g_files_found, 1);
FileEntry *fe = malloc(sizeof(FileEntry));
memset(fe, 0, sizeof(FileEntry));
FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true);
char norm[MAX_PATHLEN];
strncpy(norm, full, sizeof(norm) - 1);
norm[sizeof(norm) - 1] = 0;
normalize_path(norm);
fe->path = _strdup(norm);
size_t len = strlen(norm) + 1;
char *path = arena_push(&ctx->path_arena, len, false);
memcpy(path, norm, len);
fe->path = path;
platform_get_file_times(full, &fe->created_time, &fe->modified_time);
@@ -212,6 +200,25 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q) {
FindClose(h);
}
// ------------------------- Scan worker --------------------------------
static DWORD WINAPI scan_worker(LPVOID arg) {
ScannerContext *ctx = arg;
DirQueue *q = ctx->dir_queue;
for (;;) {
char *dir = dirqueue_pop(q);
if (!dir)
break;
scan_folder_windows_parallel(dir, ctx);
free(dir);
dirqueue_done(q);
}
return 0;
}
// ----------------------------- Hashing helpers -----------------------------
static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) {
// compute XXH3_128 over file. POSIX and Windows use standard reads in this
@@ -245,12 +252,11 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) {
static DWORD WINAPI hash_worker(LPVOID arg) {
WorkerContext *ctx = (WorkerContext *)arg;
MPMCQueue *q = ctx->queue;
mem_arena *local_arena = ctx->arena;
BYTE *buf = (BYTE *)malloc(READ_BLOCK);
for (;;) {
FileEntry *fe = mpmc_pop(q);
FileEntry *fe = mpmc_pop(&g_file_queue);
if (!fe)
break;
@@ -273,9 +279,6 @@ static DWORD WINAPI hash_worker(LPVOID arg) {
memcpy(dst, stack_buf, len);
atomic_fetch_add(&g_files_hashed, 1);
free(fe->path);
free(fe);
}
free(buf);
@@ -443,7 +446,7 @@ int main(int argc, char **argv) {
mem_arena *gp_arena = arena_create(&params);
// -------------------------------
// Detect hardware threads (CPU cores)
// Detect hardware threads
// -------------------------------
size_t hw_threads = 1;
// --- Windows: detect PHYSICAL cores (not logical threads) ---
@@ -465,10 +468,8 @@ int main(int argc, char **argv) {
}
arena_free(&gp_arena, (u8 **)&buf, len);
// Add some extra threads to overlap I/O more aggressively
// Logical threads = CPU cores * 2
size_t num_threads = hw_threads * 2;
if (num_threads < 2)
num_threads = 2;
// -------------------------------
// Step 1: Scan all folders
@@ -487,47 +488,54 @@ int main(int argc, char **argv) {
}
// starting hash threads
WorkerContext workers[num_threads];
size_t num_hash_threads = num_threads;
for (int i = 0; i < num_threads; i++) {
workers[i].queue = &g_file_queue;
workers[i].arena = arena_create(&params);
}
WorkerContext workers[num_hash_threads];
HANDLE *hash_threads =
arena_push(&gp_arena, sizeof(HANDLE) * num_threads, true);
arena_push(&gp_arena, sizeof(HANDLE) * num_hash_threads, true);
for (size_t i = 0; i < num_hash_threads; ++i) {
workers[i].arena = arena_create(&params);
for (size_t i = 0; i < num_threads; ++i) {
hash_threads[i] = CreateThread(NULL, 0, hash_worker, &workers[i], 0, NULL);
}
// starting scan threads
// starting progress printing thread
HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL);
size_t scan_threads = hw_threads;
if (scan_threads < 2)
scan_threads = 2;
// starting scan threads
size_t num_scan_threads = num_threads;
HANDLE *scan_tids =
arena_push(&gp_arena, sizeof(HANDLE) * scan_threads, true);
ScannerContext scanners[num_scan_threads];
for (size_t i = 0; i < scan_threads; ++i) {
scan_tids[i] =
CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)scan_worker, &q, 0, NULL);
HANDLE *scan_threads =
arena_push(&gp_arena, sizeof(HANDLE) * num_scan_threads, true);
for (size_t i = 0; i < num_scan_threads; i++) {
scanners[i].dir_queue = &q;
scanners[i].path_arena = arena_create(&params);
scanners[i].meta_arena = arena_create(&params);
scan_threads[i] = CreateThread(NULL, 0, scan_worker, &scanners[i], 0, NULL);
}
WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE);
WaitForMultipleObjects((DWORD)num_scan_threads, scan_threads, TRUE, INFINITE);
for (size_t i = 0; i < num_threads; i++) {
for (size_t i = 0; i < num_hash_threads; i++) {
mpmc_push(&g_file_queue, NULL);
}
atomic_store(&g_scan_done, 1);
for (size_t i = 0; i < scan_threads; ++i)
CloseHandle(scan_tids[i]);
for (size_t i = 0; i < num_scan_threads; ++i)
CloseHandle(scan_threads[i]);
arena_free(&gp_arena, (u8 **)&scan_tids, sizeof(HANDLE) * scan_threads);
arena_free(&gp_arena, (u8 **)&scan_threads,
sizeof(HANDLE) * num_scan_threads);
double scan_seconds = timer_stop(&scan_timer);
size_t total_found = atomic_load(&g_files_found);
@@ -543,12 +551,13 @@ int main(int argc, char **argv) {
}
// stop hashing threads
WaitForMultipleObjects((DWORD)num_threads, hash_threads, TRUE, INFINITE);
WaitForMultipleObjects((DWORD)num_hash_threads, hash_threads, TRUE, INFINITE);
for (size_t i = 0; i < num_threads; ++i)
for (size_t i = 0; i < num_hash_threads; ++i)
CloseHandle(hash_threads[i]);
arena_free(&gp_arena, (u8 **)&hash_threads, sizeof(HANDLE) * num_threads);
arena_free(&gp_arena, (u8 **)&hash_threads,
sizeof(HANDLE) * num_hash_threads);
WaitForSingleObject(progress, INFINITE);
CloseHandle(progress);
@@ -570,7 +579,7 @@ int main(int argc, char **argv) {
HANDLE h = CreateFileA(FILE_HASHES_TXT, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS,
FILE_ATTRIBUTE_NORMAL, NULL);
for (int i = 0; i < num_threads; i++) {
for (int i = 0; i < num_hash_threads; i++) {
mem_arena *local_hash_arena = workers[i].arena;