diff --git a/binaries/changelog.txt b/binaries/changelog.txt index e6058eb..d464b0c 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -35,3 +35,5 @@ Making the MPMC queue platform agnostic Align the MPMC queue to pagesize Getting file size from FindFirstFileA() instead of CreateFileA(), since we already call FindFirstFileA() and it returns the size there is no need to open/close every file to get it's size + +Replacing Malloc and strdup in scan helper function with FileEntry and path arenas diff --git a/lf_mpmc.h b/lf_mpmc.h index 9c51abd..640b874 100644 --- a/lf_mpmc.h +++ b/lf_mpmc.h @@ -36,6 +36,8 @@ typedef struct { CACHE_ALIGN atomic_size_t head; CACHE_ALIGN atomic_size_t tail; + CACHE_ALIGN atomic_size_t work_count; + size_t capacity; size_t mask; @@ -91,6 +93,7 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { atomic_init(&q->head, 0); atomic_init(&q->tail, 0); + atomic_init(&q->work_count, 0); plat_sem_init(&q->items_sem, 0); } @@ -138,6 +141,7 @@ static void mpmc_commit_more(MPMCQueue *q) { /* ----------------------------------------------------------- */ /* PUSH */ /* ----------------------------------------------------------- */ +// Does not increment work static void mpmc_push(MPMCQueue *q, void *item) { MPMCSlot *slot; size_t pos; @@ -184,8 +188,55 @@ static void mpmc_push(MPMCQueue *q, void *item) { plat_sem_post(&q->items_sem, 1); } +// Increment work +static void mpmc_push_work(MPMCQueue *q, void *item) { + MPMCSlot *slot; + size_t pos; + + for (;;) { + + pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // ensure the slot is committed BEFORE accessing it + size_t committed = + atomic_load_explicit(&q->committed, memory_order_relaxed); + + if (unlikely(pos >= committed)) { + mpmc_commit_more(q); + continue; + } + + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)pos; + + if (likely(diff == 0)) { + + if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else if (diff < 0) { // queue actually full + + Sleep(1000); + + } else { // waiting to grow + + Sleep(0); + } + } + + slot->data = item; + + atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); + + atomic_fetch_add(&q->work_count, 1); + plat_sem_post(&q->items_sem, 1); +} /* ----------------------------------------------------------- */ -/* POP (blocking with semaphore) */ +/* POP */ /* ----------------------------------------------------------- */ static void *mpmc_pop(MPMCQueue *q) { @@ -213,7 +264,7 @@ static void *mpmc_pop(MPMCQueue *q) { } else { // slot is still transitioning (written by another thread) if (++spins > 10) { - SwitchToThread(); // yield CPU + Sleep(0); // yield CPU spins = 0; } else { cpu_pause(); @@ -231,48 +282,48 @@ static void *mpmc_pop(MPMCQueue *q) { /* ----------------------------------------------------------- */ /* TRY POP (non blocking) */ /* ----------------------------------------------------------- */ -static b32 mpmc_try_pop(MPMCQueue *q, void **out) { - - if (!plat_sem_trywait(&q->items_sem)) - return false; - - MPMCSlot *slot; - size_t pos; - - int spins = 0; - - for (;;) { - - pos = atomic_load_explicit(&q->head, memory_order_relaxed); - slot = &q->slots[pos & q->mask]; - - size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); - intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); - - if (likely(diff == 0)) { - - if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, - memory_order_relaxed, - memory_order_relaxed)) - break; - - } else { - - if (++spins > 10) { - SwitchToThread(); - spins = 0; - } else { - cpu_pause(); - } - } - } - - *out = slot->data; - - atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); - - return true; -} +// static b32 mpmc_try_pop(MPMCQueue *q, void **out) { +// +// if (!plat_sem_trywait(&q->items_sem)) +// return false; +// +// MPMCSlot *slot; +// size_t pos; +// +// int spins = 0; +// +// for (;;) { +// +// pos = atomic_load_explicit(&q->head, memory_order_relaxed); +// slot = &q->slots[pos & q->mask]; +// +// size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); +// intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); +// +// if (likely(diff == 0)) { +// +// if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, +// memory_order_relaxed, +// memory_order_relaxed)) +// break; +// +// } else { +// +// if (++spins > 10) { +// SwitchToThread(); +// spins = 0; +// } else { +// cpu_pause(); +// } +// } +// } +// +// *out = slot->data; +// +// atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); +// +// return true; +// } /* ----------------------------------------------------------- */ /* PUSH POISON */ @@ -288,6 +339,16 @@ static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) { } } +/* ----------------------------------------------------------- */ +/* Done */ +/* ----------------------------------------------------------- */ +static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) { + size_t prev = atomic_fetch_sub(&q->work_count, 1); + if (prev == 1) { + mpmc_producers_finished(q, consumer_count); + } +} + /* ----------------------------------------------------------- */ /* MPMC Cleanup */ /* ----------------------------------------------------------- */ diff --git a/platform.h b/platform.h index 572793e..088d564 100644 --- a/platform.h +++ b/platform.h @@ -45,23 +45,6 @@ static double timer_stop(HiResTimer *t) { (double)g_qpc_freq.QuadPart; } -// MPMC Queue -static MPMCQueue g_dir_queue; -static MPMCQueue g_file_queue; - -typedef struct { - mem_arena *path_arena; - mem_arena *meta_arena; - - MPMCQueue *dir_queue; - MPMCQueue *file_queue; -} ScannerContext; - -typedef struct { - MPMCQueue *queue; - mem_arena *arena; -} WorkerContext; - /* Scan folders */ typedef struct DirQueue DirQueue; @@ -83,6 +66,22 @@ typedef struct DirQueue { #endif } DirQueue; +// MPMC Queue + +static MPMCQueue g_dir_queue; +static MPMCQueue g_file_queue; + +typedef struct { + DirQueue *dir_queue; + + mem_arena *path_arena; + mem_arena *meta_arena; +} ScannerContext; + +typedef struct { + mem_arena *arena; +} WorkerContext; + // void scan_folder_windows_parallel(const char *base, ScannerContext *ctx); // void scan_folder_posix_parallel(const char *base, ScannerContext *ctx); -void scan_folder_windows_parallel(const char *base, DirQueue *q); +// diff --git a/platform_windows.c b/platform_windows.c index 2392593..e54312f 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -145,25 +145,9 @@ static void dirqueue_done(DirQueue *q) { WakeAllConditionVariable(&q->cv); LeaveCriticalSection(&q->cs); } -static DWORD WINAPI scan_worker(LPVOID arg) { - DirQueue *q = (DirQueue *)arg; - for (;;) { - char *dir = dirqueue_pop(q); - if (!dir) - break; - - scan_folder_windows_parallel(dir, q); - - free(dir); - dirqueue_done(q); - } - - return 0; -} - -// Scanning directory function -void scan_folder_windows_parallel(const char *base, DirQueue *q) { +// ----------------------------- Scan helpers ----------------------------- +void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) { char search[MAX_PATHLEN]; snprintf(search, sizeof(search), "%s\\*", base); @@ -183,20 +167,24 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q) { continue; if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { - dirqueue_push(q, full); + dirqueue_push(ctx->dir_queue, full); } else { atomic_fetch_add(&g_files_found, 1); - FileEntry *fe = malloc(sizeof(FileEntry)); - memset(fe, 0, sizeof(FileEntry)); + FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); char norm[MAX_PATHLEN]; strncpy(norm, full, sizeof(norm) - 1); norm[sizeof(norm) - 1] = 0; normalize_path(norm); - fe->path = _strdup(norm); + size_t len = strlen(norm) + 1; + + char *path = arena_push(&ctx->path_arena, len, false); + memcpy(path, norm, len); + + fe->path = path; platform_get_file_times(full, &fe->created_time, &fe->modified_time); @@ -212,6 +200,25 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q) { FindClose(h); } +// ------------------------- Scan worker -------------------------------- +static DWORD WINAPI scan_worker(LPVOID arg) { + ScannerContext *ctx = arg; + DirQueue *q = ctx->dir_queue; + + for (;;) { + char *dir = dirqueue_pop(q); + if (!dir) + break; + + scan_folder_windows_parallel(dir, ctx); + + free(dir); + dirqueue_done(q); + } + + return 0; +} + // ----------------------------- Hashing helpers ----------------------------- static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) { // compute XXH3_128 over file. POSIX and Windows use standard reads in this @@ -245,12 +252,11 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) { static DWORD WINAPI hash_worker(LPVOID arg) { WorkerContext *ctx = (WorkerContext *)arg; - MPMCQueue *q = ctx->queue; mem_arena *local_arena = ctx->arena; BYTE *buf = (BYTE *)malloc(READ_BLOCK); for (;;) { - FileEntry *fe = mpmc_pop(q); + FileEntry *fe = mpmc_pop(&g_file_queue); if (!fe) break; @@ -273,9 +279,6 @@ static DWORD WINAPI hash_worker(LPVOID arg) { memcpy(dst, stack_buf, len); atomic_fetch_add(&g_files_hashed, 1); - - free(fe->path); - free(fe); } free(buf); @@ -443,7 +446,7 @@ int main(int argc, char **argv) { mem_arena *gp_arena = arena_create(¶ms); // ------------------------------- - // Detect hardware threads (CPU cores) + // Detect hardware threads // ------------------------------- size_t hw_threads = 1; // --- Windows: detect PHYSICAL cores (not logical threads) --- @@ -465,10 +468,8 @@ int main(int argc, char **argv) { } arena_free(&gp_arena, (u8 **)&buf, len); - // Add some extra threads to overlap I/O more aggressively + // Logical threads = CPU cores * 2 size_t num_threads = hw_threads * 2; - if (num_threads < 2) - num_threads = 2; // ------------------------------- // Step 1: Scan all folders @@ -487,47 +488,54 @@ int main(int argc, char **argv) { } // starting hash threads - WorkerContext workers[num_threads]; + size_t num_hash_threads = num_threads; - for (int i = 0; i < num_threads; i++) { - workers[i].queue = &g_file_queue; - workers[i].arena = arena_create(¶ms); - } + WorkerContext workers[num_hash_threads]; HANDLE *hash_threads = - arena_push(&gp_arena, sizeof(HANDLE) * num_threads, true); + arena_push(&gp_arena, sizeof(HANDLE) * num_hash_threads, true); + + for (size_t i = 0; i < num_hash_threads; ++i) { + + workers[i].arena = arena_create(¶ms); - for (size_t i = 0; i < num_threads; ++i) { hash_threads[i] = CreateThread(NULL, 0, hash_worker, &workers[i], 0, NULL); } - // starting scan threads + // starting progress printing thread HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL); - size_t scan_threads = hw_threads; - if (scan_threads < 2) - scan_threads = 2; + // starting scan threads + size_t num_scan_threads = num_threads; - HANDLE *scan_tids = - arena_push(&gp_arena, sizeof(HANDLE) * scan_threads, true); + ScannerContext scanners[num_scan_threads]; - for (size_t i = 0; i < scan_threads; ++i) { - scan_tids[i] = - CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)scan_worker, &q, 0, NULL); + HANDLE *scan_threads = + arena_push(&gp_arena, sizeof(HANDLE) * num_scan_threads, true); + + for (size_t i = 0; i < num_scan_threads; i++) { + + scanners[i].dir_queue = &q; + + scanners[i].path_arena = arena_create(¶ms); + scanners[i].meta_arena = arena_create(¶ms); + + scan_threads[i] = CreateThread(NULL, 0, scan_worker, &scanners[i], 0, NULL); } - WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); + WaitForMultipleObjects((DWORD)num_scan_threads, scan_threads, TRUE, INFINITE); - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < num_hash_threads; i++) { mpmc_push(&g_file_queue, NULL); } atomic_store(&g_scan_done, 1); - for (size_t i = 0; i < scan_threads; ++i) - CloseHandle(scan_tids[i]); + for (size_t i = 0; i < num_scan_threads; ++i) + CloseHandle(scan_threads[i]); - arena_free(&gp_arena, (u8 **)&scan_tids, sizeof(HANDLE) * scan_threads); + arena_free(&gp_arena, (u8 **)&scan_threads, + sizeof(HANDLE) * num_scan_threads); double scan_seconds = timer_stop(&scan_timer); size_t total_found = atomic_load(&g_files_found); @@ -543,12 +551,13 @@ int main(int argc, char **argv) { } // stop hashing threads - WaitForMultipleObjects((DWORD)num_threads, hash_threads, TRUE, INFINITE); + WaitForMultipleObjects((DWORD)num_hash_threads, hash_threads, TRUE, INFINITE); - for (size_t i = 0; i < num_threads; ++i) + for (size_t i = 0; i < num_hash_threads; ++i) CloseHandle(hash_threads[i]); - arena_free(&gp_arena, (u8 **)&hash_threads, sizeof(HANDLE) * num_threads); + arena_free(&gp_arena, (u8 **)&hash_threads, + sizeof(HANDLE) * num_hash_threads); WaitForSingleObject(progress, INFINITE); CloseHandle(progress); @@ -570,7 +579,7 @@ int main(int argc, char **argv) { HANDLE h = CreateFileA(FILE_HASHES_TXT, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); - for (int i = 0; i < num_threads; i++) { + for (int i = 0; i < num_hash_threads; i++) { mem_arena *local_hash_arena = workers[i].arena;