Updating the LF MPMC queue and replacing DirQueue with it

Making the MPMC queue support when producers are consumers at the same
time by adding a variable work, mpmc_push_work() that increments work
and mpmc_task_done() that decrements work, and if work = 0 calls
mpmc_producers_finished() that pushes poinsons to wake up sleeping
threads and make them return NULL

Replacing DirQueue, a queue growable with realloc with the MPMC queue
This commit is contained in:
2026-03-11 17:48:36 +01:00
parent 0e3ec5b09c
commit b2dc2d3b91
6 changed files with 58 additions and 155 deletions

2
.gitignore vendored
View File

@@ -4,4 +4,4 @@ file_hasher.rdi
file_hasher.exe
file_hashes.txt
file_list.txt
temp.c
temp_code.c

View File

@@ -37,3 +37,7 @@ Align the MPMC queue to pagesize
Getting file size from FindFirstFileA() instead of CreateFileA(), since we already call FindFirstFileA() and it returns the size there is no need to open/close every file to get it's size
Replacing Malloc and strdup in scan helper function with FileEntry and path arenas
Making the MPMC queue support when producers are consumers at the same time by adding a variable work, mpmc_push_work() that increments work and mpmc_task_done() that decrements work, and if work = 0 calls mpmc_producers_finished() that pushes poinsons to wake up sleeping threads and make them return NULL
Replacing DirQueue, a queue growable with realloc with the MPMC queue

Binary file not shown.

View File

@@ -279,52 +279,6 @@ static void *mpmc_pop(MPMCQueue *q) {
return data;
}
/* ----------------------------------------------------------- */
/* TRY POP (non blocking) */
/* ----------------------------------------------------------- */
// static b32 mpmc_try_pop(MPMCQueue *q, void **out) {
//
// if (!plat_sem_trywait(&q->items_sem))
// return false;
//
// MPMCSlot *slot;
// size_t pos;
//
// int spins = 0;
//
// for (;;) {
//
// pos = atomic_load_explicit(&q->head, memory_order_relaxed);
// slot = &q->slots[pos & q->mask];
//
// size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
// intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
//
// if (likely(diff == 0)) {
//
// if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
// memory_order_relaxed,
// memory_order_relaxed))
// break;
//
// } else {
//
// if (++spins > 10) {
// SwitchToThread();
// spins = 0;
// } else {
// cpu_pause();
// }
// }
// }
//
// *out = slot->data;
//
// atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
//
// return true;
// }
/* ----------------------------------------------------------- */
/* PUSH POISON */
/* ----------------------------------------------------------- */

View File

@@ -45,43 +45,19 @@ static double timer_stop(HiResTimer *t) {
(double)g_qpc_freq.QuadPart;
}
/* Scan folders */
typedef struct DirQueue DirQueue;
typedef struct DirQueue {
char **items;
size_t count;
size_t cap;
size_t active;
int stop;
#if PLATFORM_WINDOWS
CRITICAL_SECTION cs;
CONDITION_VARIABLE cv;
#else
pthread_mutex_t mutex;
pthread_cond_t cond;
#endif
} DirQueue;
// MPMC Queue
static MPMCQueue g_dir_queue;
static MPMCQueue g_file_queue;
// Workers context
typedef struct {
DirQueue *dir_queue;
u8 num_threads;
mem_arena *path_arena;
mem_arena *meta_arena;
MPMCQueue *dir_queue;
MPMCQueue *file_queue;
} ScannerContext;
typedef struct {
mem_arena *arena;
MPMCQueue *file_queue;
} WorkerContext;
// void scan_folder_windows_parallel(const char *base, ScannerContext *ctx);
// void scan_folder_posix_parallel(const char *base, ScannerContext *ctx);
//

View File

@@ -1,4 +1,3 @@
#include "arena.h"
#include "platform.h"
// ----------------------------- Globals ------------------------------------
@@ -104,48 +103,6 @@ void platform_get_file_owner(const char *path, char *out_owner,
get_file_owner(path, out_owner, out_owner_size);
}
// --------------- parallel directory scanning ----------------
// Add queue helper functions
static void dirqueue_push(DirQueue *q, const char *path) {
EnterCriticalSection(&q->cs);
if (q->count + 1 > q->cap) {
q->cap = q->cap ? q->cap * 2 : 1024;
q->items = realloc(q->items, q->cap * sizeof(char *));
}
q->items[q->count++] = _strdup(path);
WakeConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs);
}
static char *dirqueue_pop(DirQueue *q) {
EnterCriticalSection(&q->cs);
while (q->count == 0 && q->active > 0) {
SleepConditionVariableCS(&q->cv, &q->cs, INFINITE);
}
if (q->count == 0 && q->active == 0) {
LeaveCriticalSection(&q->cs);
return NULL; // truly done
}
char *dir = q->items[--q->count];
q->active++;
LeaveCriticalSection(&q->cs);
return dir;
}
static void dirqueue_done(DirQueue *q) {
EnterCriticalSection(&q->cs);
q->active--;
WakeAllConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs);
}
// ----------------------------- Scan helpers -----------------------------
void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
char search[MAX_PATHLEN];
@@ -167,7 +124,12 @@ void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
continue;
if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
dirqueue_push(ctx->dir_queue, full);
size_t len = strlen(full) + 1;
char *dir = arena_push(&ctx->path_arena, len, false);
memcpy(dir, full, len);
mpmc_push_work(ctx->dir_queue, dir);
} else {
atomic_fetch_add(&g_files_found, 1);
@@ -192,7 +154,7 @@ void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
fe->size_bytes = ((uint64_t)fd.nFileSizeHigh << 32) | fd.nFileSizeLow;
mpmc_push(&g_file_queue, fe);
mpmc_push(ctx->file_queue, fe);
}
} while (FindNextFileA(h, &fd));
@@ -203,19 +165,16 @@ void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
// ------------------------- Scan worker --------------------------------
static DWORD WINAPI scan_worker(LPVOID arg) {
ScannerContext *ctx = arg;
DirQueue *q = ctx->dir_queue;
for (;;) {
char *dir = dirqueue_pop(q);
char *dir = mpmc_pop(ctx->dir_queue);
if (!dir)
break;
scan_folder_windows_parallel(dir, ctx);
free(dir);
dirqueue_done(q);
mpmc_task_done(ctx->dir_queue, ctx->num_threads);
}
return 0;
}
@@ -252,11 +211,10 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) {
static DWORD WINAPI hash_worker(LPVOID arg) {
WorkerContext *ctx = (WorkerContext *)arg;
mem_arena *local_arena = ctx->arena;
BYTE *buf = (BYTE *)malloc(READ_BLOCK);
for (;;) {
FileEntry *fe = mpmc_pop(&g_file_queue);
FileEntry *fe = mpmc_pop(ctx->file_queue);
if (!fe)
break;
@@ -275,12 +233,12 @@ static DWORD WINAPI hash_worker(LPVOID arg) {
snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n",
hash, fe->path, size_kib, created, modified, fe->owner);
char *dst = arena_push(&local_arena, len, false);
char *dst = arena_push(&ctx->arena, len, false);
memcpy(dst, stack_buf, len);
atomic_fetch_add(&g_files_hashed, 1);
}
free(buf);
// free(buf); It will be freed by the system when the program exits
return 0;
}
@@ -420,9 +378,7 @@ int main(int argc, char **argv) {
folder_count = 1;
}
// -------------------------------
// Display selected folders
// -------------------------------
printf("Processing %d folder(s):\n", folder_count);
for (int i = 0; i < folder_count; ++i) {
printf(" - %s\n", folders[i]);
@@ -472,20 +428,13 @@ int main(int argc, char **argv) {
size_t num_threads = hw_threads * 2;
// -------------------------------
// Step 1: Scan all folders
// Scanning and hashing
// -------------------------------
MPMCQueue dir_queue;
mpmc_init(&dir_queue, MiB(1));
mpmc_init(&g_file_queue, MiB(1));
DirQueue q;
memset(&q, 0, sizeof(q));
InitializeCriticalSection(&q.cs);
InitializeConditionVariable(&q.cv);
q.active = 0;
for (int i = 0; i < folder_count; ++i) {
dirqueue_push(&q, folders[i]);
}
MPMCQueue file_queue;
mpmc_init(&file_queue, MiB(1));
// starting hash threads
size_t num_hash_threads = num_threads;
@@ -498,6 +447,7 @@ int main(int argc, char **argv) {
for (size_t i = 0; i < num_hash_threads; ++i) {
workers[i].arena = arena_create(&params);
workers[i].file_queue = &file_queue;
hash_threads[i] = CreateThread(NULL, 0, hash_worker, &workers[i], 0, NULL);
}
@@ -514,26 +464,40 @@ int main(int argc, char **argv) {
arena_push(&gp_arena, sizeof(HANDLE) * num_scan_threads, true);
for (size_t i = 0; i < num_scan_threads; i++) {
scanners[i].dir_queue = &q;
scanners[i].num_threads = num_scan_threads;
scanners[i].path_arena = arena_create(&params);
scanners[i].meta_arena = arena_create(&params);
scanners[i].dir_queue = &dir_queue;
scanners[i].file_queue = &file_queue;
scan_threads[i] = CreateThread(NULL, 0, scan_worker, &scanners[i], 0, NULL);
}
WaitForMultipleObjects((DWORD)num_scan_threads, scan_threads, TRUE, INFINITE);
// Initial folder push
for (int i = 0; i < folder_count; i++) {
for (size_t i = 0; i < num_hash_threads; i++) {
mpmc_push(&g_file_queue, NULL);
size_t len = strlen(folders[i]) + 1;
char *path = arena_push(&scanners[0].path_arena, len, false);
memcpy(path, folders[i], len);
mpmc_push_work(&dir_queue, path);
}
atomic_store(&g_scan_done, 1);
// Stop scan threads
WaitForMultipleObjects((DWORD)num_scan_threads, scan_threads, TRUE, INFINITE);
for (size_t i = 0; i < num_scan_threads; ++i)
CloseHandle(scan_threads[i]);
for (size_t i = 0; i < num_hash_threads; i++) {
mpmc_push(&file_queue, NULL);
}
atomic_store(&g_scan_done, 1);
arena_free(&gp_arena, (u8 **)&scan_threads,
sizeof(HANDLE) * num_scan_threads);
@@ -544,13 +508,13 @@ int main(int argc, char **argv) {
printf("Completed scanning in %.2f seconds, found %zu files\n\n",
scan_seconds, total_found);
// if no files found
// If no files found
if (total_found == 0) {
printf("No files found.\n");
return 0;
}
// stop hashing threads
// Stop hashing threads
WaitForMultipleObjects((DWORD)num_hash_threads, hash_threads, TRUE, INFINITE);
for (size_t i = 0; i < num_hash_threads; ++i)
@@ -559,10 +523,13 @@ int main(int argc, char **argv) {
arena_free(&gp_arena, (u8 **)&hash_threads,
sizeof(HANDLE) * num_hash_threads);
// Stop progress printing thread
WaitForSingleObject(progress, INFINITE);
CloseHandle(progress);
// write file_hashes.txt
// -------------------------------
// Export file_hashes.txt
// -------------------------------
// FILE *f = fopen(FILE_HASHES_TXT, "wb");
//
@@ -591,7 +558,9 @@ int main(int argc, char **argv) {
WriteFile(h, arena_base, (DWORD)local_hash_arena->pos, &written, NULL);
}
// done time
// -------------------------------
// Print summary
// -------------------------------
double total_seconds = timer_stop(&total_timer);
printf("Completed hashing %zu files\n", total_found);