Compare commits

...

2 Commits

Author SHA1 Message Date
0e3ec5b09c Replacing Malloc and strdup in scan helper function with FileEntry and path arenas 2026-03-11 16:17:22 +01:00
aef070192f Using FindFirstFileA() instead of CreateFileA() to get the file size
Since we already call FindFirstFileA() and it returns the size there is
no need to open/close every file to get it's size
2026-03-11 09:02:17 +01:00
5 changed files with 221 additions and 95 deletions

7
base.h
View File

@@ -137,6 +137,11 @@ static void plat_sem_wait(plat_sem *s) {
WaitForSingleObject(s->handle, INFINITE); WaitForSingleObject(s->handle, INFINITE);
} }
static b32 plat_sem_trywait(HANDLE sem) {
DWORD r = WaitForSingleObject(sem, 0);
return r == WAIT_OBJECT_0;
}
static void plat_sem_post(plat_sem *s, u32 count) { static void plat_sem_post(plat_sem *s, u32 count) {
ReleaseSemaphore(s->handle, count, NULL); ReleaseSemaphore(s->handle, count, NULL);
} }
@@ -203,6 +208,8 @@ static void plat_sem_wait(plat_sem *s) {
} }
} }
static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; }
static void plat_sem_post(plat_sem *s, u32 count) { static void plat_sem_post(plat_sem *s, u32 count) {
for (u32 i = 0; i < count; i++) { for (u32 i = 0; i < count; i++) {
sem_post(&s->sem); sem_post(&s->sem);

View File

@@ -33,3 +33,7 @@ Making the LF MPMC queue generic and in a seperate header file
Making the MPMC queue platform agnostic Making the MPMC queue platform agnostic
Align the MPMC queue to pagesize Align the MPMC queue to pagesize
Getting file size from FindFirstFileA() instead of CreateFileA(), since we already call FindFirstFileA() and it returns the size there is no need to open/close every file to get it's size
Replacing Malloc and strdup in scan helper function with FileEntry and path arenas

109
lf_mpmc.h
View File

@@ -36,6 +36,8 @@ typedef struct {
CACHE_ALIGN atomic_size_t head; CACHE_ALIGN atomic_size_t head;
CACHE_ALIGN atomic_size_t tail; CACHE_ALIGN atomic_size_t tail;
CACHE_ALIGN atomic_size_t work_count;
size_t capacity; size_t capacity;
size_t mask; size_t mask;
@@ -91,6 +93,7 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
atomic_init(&q->head, 0); atomic_init(&q->head, 0);
atomic_init(&q->tail, 0); atomic_init(&q->tail, 0);
atomic_init(&q->work_count, 0);
plat_sem_init(&q->items_sem, 0); plat_sem_init(&q->items_sem, 0);
} }
@@ -138,6 +141,7 @@ static void mpmc_commit_more(MPMCQueue *q) {
/* ----------------------------------------------------------- */ /* ----------------------------------------------------------- */
/* PUSH */ /* PUSH */
/* ----------------------------------------------------------- */ /* ----------------------------------------------------------- */
// Does not increment work
static void mpmc_push(MPMCQueue *q, void *item) { static void mpmc_push(MPMCQueue *q, void *item) {
MPMCSlot *slot; MPMCSlot *slot;
size_t pos; size_t pos;
@@ -184,6 +188,53 @@ static void mpmc_push(MPMCQueue *q, void *item) {
plat_sem_post(&q->items_sem, 1); plat_sem_post(&q->items_sem, 1);
} }
// Increment work
static void mpmc_push_work(MPMCQueue *q, void *item) {
MPMCSlot *slot;
size_t pos;
for (;;) {
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
// ensure the slot is committed BEFORE accessing it
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (unlikely(pos >= committed)) {
mpmc_commit_more(q);
continue;
}
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (likely(diff == 0)) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) { // queue actually full
Sleep(1000);
} else { // waiting to grow
Sleep(0);
}
}
slot->data = item;
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
atomic_fetch_add(&q->work_count, 1);
plat_sem_post(&q->items_sem, 1);
}
/* ----------------------------------------------------------- */ /* ----------------------------------------------------------- */
/* POP */ /* POP */
/* ----------------------------------------------------------- */ /* ----------------------------------------------------------- */
@@ -213,7 +264,7 @@ static void *mpmc_pop(MPMCQueue *q) {
} else { // slot is still transitioning (written by another thread) } else { // slot is still transitioning (written by another thread)
if (++spins > 10) { if (++spins > 10) {
SwitchToThread(); // yield CPU Sleep(0); // yield CPU
spins = 0; spins = 0;
} else { } else {
cpu_pause(); cpu_pause();
@@ -228,6 +279,52 @@ static void *mpmc_pop(MPMCQueue *q) {
return data; return data;
} }
/* ----------------------------------------------------------- */
/* TRY POP (non blocking) */
/* ----------------------------------------------------------- */
// static b32 mpmc_try_pop(MPMCQueue *q, void **out) {
//
// if (!plat_sem_trywait(&q->items_sem))
// return false;
//
// MPMCSlot *slot;
// size_t pos;
//
// int spins = 0;
//
// for (;;) {
//
// pos = atomic_load_explicit(&q->head, memory_order_relaxed);
// slot = &q->slots[pos & q->mask];
//
// size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
// intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
//
// if (likely(diff == 0)) {
//
// if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
// memory_order_relaxed,
// memory_order_relaxed))
// break;
//
// } else {
//
// if (++spins > 10) {
// SwitchToThread();
// spins = 0;
// } else {
// cpu_pause();
// }
// }
// }
//
// *out = slot->data;
//
// atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
//
// return true;
// }
/* ----------------------------------------------------------- */ /* ----------------------------------------------------------- */
/* PUSH POISON */ /* PUSH POISON */
/* ----------------------------------------------------------- */ /* ----------------------------------------------------------- */
@@ -242,6 +339,16 @@ static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
} }
} }
/* ----------------------------------------------------------- */
/* Done */
/* ----------------------------------------------------------- */
static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
size_t prev = atomic_fetch_sub(&q->work_count, 1);
if (prev == 1) {
mpmc_producers_finished(q, consumer_count);
}
}
/* ----------------------------------------------------------- */ /* ----------------------------------------------------------- */
/* MPMC Cleanup */ /* MPMC Cleanup */
/* ----------------------------------------------------------- */ /* ----------------------------------------------------------- */

View File

@@ -45,25 +45,10 @@ static double timer_stop(HiResTimer *t) {
(double)g_qpc_freq.QuadPart; (double)g_qpc_freq.QuadPart;
} }
// MPMC Queue
static MPMCQueue g_file_queue;
typedef struct {
MPMCQueue *queue;
mem_arena *arena;
} WorkerContext;
/* Scan folders */ /* Scan folders */
typedef struct DirQueue DirQueue; typedef struct DirQueue DirQueue;
void scan_folder_windows_parallel(const char *base, DirQueue *q);
void scan_folder_posix_parallel(const char *base, DirQueue *q);
typedef struct DirJob {
char *path;
struct DirJob *next;
} DirJob;
typedef struct DirQueue { typedef struct DirQueue {
char **items; char **items;
size_t count; size_t count;
@@ -80,3 +65,23 @@ typedef struct DirQueue {
pthread_cond_t cond; pthread_cond_t cond;
#endif #endif
} DirQueue; } DirQueue;
// MPMC Queue
static MPMCQueue g_dir_queue;
static MPMCQueue g_file_queue;
typedef struct {
DirQueue *dir_queue;
mem_arena *path_arena;
mem_arena *meta_arena;
} ScannerContext;
typedef struct {
mem_arena *arena;
} WorkerContext;
// void scan_folder_windows_parallel(const char *base, ScannerContext *ctx);
// void scan_folder_posix_parallel(const char *base, ScannerContext *ctx);
//

View File

@@ -1,3 +1,4 @@
#include "arena.h"
#include "platform.h" #include "platform.h"
// ----------------------------- Globals ------------------------------------ // ----------------------------- Globals ------------------------------------
@@ -144,25 +145,9 @@ static void dirqueue_done(DirQueue *q) {
WakeAllConditionVariable(&q->cv); WakeAllConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs); LeaveCriticalSection(&q->cs);
} }
static DWORD WINAPI scan_worker(LPVOID arg) {
DirQueue *q = (DirQueue *)arg;
for (;;) { // ----------------------------- Scan helpers -----------------------------
char *dir = dirqueue_pop(q); void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
if (!dir)
break;
scan_folder_windows_parallel(dir, q);
free(dir);
dirqueue_done(q);
}
return 0;
}
// Scanning directory function
void scan_folder_windows_parallel(const char *base, DirQueue *q) {
char search[MAX_PATHLEN]; char search[MAX_PATHLEN];
snprintf(search, sizeof(search), "%s\\*", base); snprintf(search, sizeof(search), "%s\\*", base);
@@ -182,35 +167,30 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q) {
continue; continue;
if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
dirqueue_push(q, full); dirqueue_push(ctx->dir_queue, full);
} else { } else {
atomic_fetch_add(&g_files_found, 1); atomic_fetch_add(&g_files_found, 1);
FileEntry *fe = malloc(sizeof(FileEntry)); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true);
memset(fe, 0, sizeof(FileEntry));
char norm[MAX_PATHLEN]; char norm[MAX_PATHLEN];
strncpy(norm, full, sizeof(norm) - 1); strncpy(norm, full, sizeof(norm) - 1);
norm[sizeof(norm) - 1] = 0; norm[sizeof(norm) - 1] = 0;
normalize_path(norm); normalize_path(norm);
fe->path = _strdup(norm); size_t len = strlen(norm) + 1;
char *path = arena_push(&ctx->path_arena, len, false);
memcpy(path, norm, len);
fe->path = path;
platform_get_file_times(full, &fe->created_time, &fe->modified_time); platform_get_file_times(full, &fe->created_time, &fe->modified_time);
platform_get_file_owner(full, fe->owner, sizeof(fe->owner)); platform_get_file_owner(full, fe->owner, sizeof(fe->owner));
LARGE_INTEGER size; fe->size_bytes = ((uint64_t)fd.nFileSizeHigh << 32) | fd.nFileSizeLow;
HANDLE hf =
CreateFileA(full, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
if (hf != INVALID_HANDLE_VALUE) {
if (GetFileSizeEx(hf, &size))
fe->size_bytes = (uint64_t)size.QuadPart;
CloseHandle(hf);
}
mpmc_push(&g_file_queue, fe); mpmc_push(&g_file_queue, fe);
} }
@@ -220,6 +200,25 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q) {
FindClose(h); FindClose(h);
} }
// ------------------------- Scan worker --------------------------------
static DWORD WINAPI scan_worker(LPVOID arg) {
ScannerContext *ctx = arg;
DirQueue *q = ctx->dir_queue;
for (;;) {
char *dir = dirqueue_pop(q);
if (!dir)
break;
scan_folder_windows_parallel(dir, ctx);
free(dir);
dirqueue_done(q);
}
return 0;
}
// ----------------------------- Hashing helpers ----------------------------- // ----------------------------- Hashing helpers -----------------------------
static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) { static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) {
// compute XXH3_128 over file. POSIX and Windows use standard reads in this // compute XXH3_128 over file. POSIX and Windows use standard reads in this
@@ -253,12 +252,11 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) {
static DWORD WINAPI hash_worker(LPVOID arg) { static DWORD WINAPI hash_worker(LPVOID arg) {
WorkerContext *ctx = (WorkerContext *)arg; WorkerContext *ctx = (WorkerContext *)arg;
MPMCQueue *q = ctx->queue;
mem_arena *local_arena = ctx->arena; mem_arena *local_arena = ctx->arena;
BYTE *buf = (BYTE *)malloc(READ_BLOCK); BYTE *buf = (BYTE *)malloc(READ_BLOCK);
for (;;) { for (;;) {
FileEntry *fe = mpmc_pop(q); FileEntry *fe = mpmc_pop(&g_file_queue);
if (!fe) if (!fe)
break; break;
@@ -281,9 +279,6 @@ static DWORD WINAPI hash_worker(LPVOID arg) {
memcpy(dst, stack_buf, len); memcpy(dst, stack_buf, len);
atomic_fetch_add(&g_files_hashed, 1); atomic_fetch_add(&g_files_hashed, 1);
free(fe->path);
free(fe);
} }
free(buf); free(buf);
@@ -451,7 +446,7 @@ int main(int argc, char **argv) {
mem_arena *gp_arena = arena_create(&params); mem_arena *gp_arena = arena_create(&params);
// ------------------------------- // -------------------------------
// Detect hardware threads (CPU cores) // Detect hardware threads
// ------------------------------- // -------------------------------
size_t hw_threads = 1; size_t hw_threads = 1;
// --- Windows: detect PHYSICAL cores (not logical threads) --- // --- Windows: detect PHYSICAL cores (not logical threads) ---
@@ -473,10 +468,8 @@ int main(int argc, char **argv) {
} }
arena_free(&gp_arena, (u8 **)&buf, len); arena_free(&gp_arena, (u8 **)&buf, len);
// Add some extra threads to overlap I/O more aggressively // Logical threads = CPU cores * 2
u8 num_threads = hw_threads * 2; size_t num_threads = hw_threads * 2;
if (num_threads < 2)
num_threads = 2;
// ------------------------------- // -------------------------------
// Step 1: Scan all folders // Step 1: Scan all folders
@@ -490,50 +483,59 @@ int main(int argc, char **argv) {
InitializeConditionVariable(&q.cv); InitializeConditionVariable(&q.cv);
q.active = 0; q.active = 0;
// starting hash threads
WorkerContext workers[num_threads];
for (int i = 0; i < num_threads; i++) {
workers[i].queue = &g_file_queue;
workers[i].arena = arena_create(&params);
}
HANDLE *hash_threads =
arena_push(&gp_arena, sizeof(HANDLE) * num_threads, true);
for (size_t i = 0; i < num_threads; ++i) {
hash_threads[i] = CreateThread(NULL, 0, hash_worker, &workers[i], 0, NULL);
}
// starting scan threads
HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL);
for (int i = 0; i < folder_count; ++i) { for (int i = 0; i < folder_count; ++i) {
dirqueue_push(&q, folders[i]); dirqueue_push(&q, folders[i]);
} }
size_t scan_threads = hw_threads; // starting hash threads
if (scan_threads < 2) size_t num_hash_threads = num_threads;
scan_threads = 2;
HANDLE *scan_tids = WorkerContext workers[num_hash_threads];
arena_push(&gp_arena, sizeof(HANDLE) * scan_threads, true);
for (size_t i = 0; i < scan_threads; ++i) { HANDLE *hash_threads =
scan_tids[i] = arena_push(&gp_arena, sizeof(HANDLE) * num_hash_threads, true);
CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)scan_worker, &q, 0, NULL);
for (size_t i = 0; i < num_hash_threads; ++i) {
workers[i].arena = arena_create(&params);
hash_threads[i] = CreateThread(NULL, 0, hash_worker, &workers[i], 0, NULL);
} }
WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); // starting progress printing thread
HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL);
mpmc_producers_finished(&g_file_queue, num_threads); // starting scan threads
size_t num_scan_threads = num_threads;
ScannerContext scanners[num_scan_threads];
HANDLE *scan_threads =
arena_push(&gp_arena, sizeof(HANDLE) * num_scan_threads, true);
for (size_t i = 0; i < num_scan_threads; i++) {
scanners[i].dir_queue = &q;
scanners[i].path_arena = arena_create(&params);
scanners[i].meta_arena = arena_create(&params);
scan_threads[i] = CreateThread(NULL, 0, scan_worker, &scanners[i], 0, NULL);
}
WaitForMultipleObjects((DWORD)num_scan_threads, scan_threads, TRUE, INFINITE);
for (size_t i = 0; i < num_hash_threads; i++) {
mpmc_push(&g_file_queue, NULL);
}
atomic_store(&g_scan_done, 1); atomic_store(&g_scan_done, 1);
for (size_t i = 0; i < scan_threads; ++i) for (size_t i = 0; i < num_scan_threads; ++i)
CloseHandle(scan_tids[i]); CloseHandle(scan_threads[i]);
arena_free(&gp_arena, (u8 **)&scan_tids, sizeof(HANDLE) * scan_threads); arena_free(&gp_arena, (u8 **)&scan_threads,
sizeof(HANDLE) * num_scan_threads);
double scan_seconds = timer_stop(&scan_timer); double scan_seconds = timer_stop(&scan_timer);
size_t total_found = atomic_load(&g_files_found); size_t total_found = atomic_load(&g_files_found);
@@ -549,12 +551,13 @@ int main(int argc, char **argv) {
} }
// stop hashing threads // stop hashing threads
WaitForMultipleObjects((DWORD)num_threads, hash_threads, TRUE, INFINITE); WaitForMultipleObjects((DWORD)num_hash_threads, hash_threads, TRUE, INFINITE);
for (size_t i = 0; i < num_threads; ++i) for (size_t i = 0; i < num_hash_threads; ++i)
CloseHandle(hash_threads[i]); CloseHandle(hash_threads[i]);
arena_free(&gp_arena, (u8 **)&hash_threads, sizeof(HANDLE) * num_threads); arena_free(&gp_arena, (u8 **)&hash_threads,
sizeof(HANDLE) * num_hash_threads);
WaitForSingleObject(progress, INFINITE); WaitForSingleObject(progress, INFINITE);
CloseHandle(progress); CloseHandle(progress);
@@ -576,7 +579,7 @@ int main(int argc, char **argv) {
HANDLE h = CreateFileA(FILE_HASHES_TXT, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, HANDLE h = CreateFileA(FILE_HASHES_TXT, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS,
FILE_ATTRIBUTE_NORMAL, NULL); FILE_ATTRIBUTE_NORMAL, NULL);
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_hash_threads; i++) {
mem_arena *local_hash_arena = workers[i].arena; mem_arena *local_hash_arena = workers[i].arena;
@@ -597,7 +600,7 @@ int main(int argc, char **argv) {
double total_mb = (double)total_bytes / (1024.0 * 1024.0); double total_mb = (double)total_bytes / (1024.0 * 1024.0);
double avg_mbps = total_mb / total_seconds; double avg_mbps = total_mb / total_seconds;
printf("Total: %.2f MB, Average: %.2f MB/s\n", total_mb, avg_mbps); printf("Total: %.2f MB, Average: %.2f MB/s\n", total_mb, avg_mbps);
printf(" Total time : %.2f seconds\n", total_seconds); printf(" Total time : %.2f seconds\n\n", total_seconds);
return 0; return 0;
} }