diff --git a/binaries/changelog.txt b/binaries/changelog.txt index d464b0c..f346008 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -37,3 +37,7 @@ Align the MPMC queue to pagesize Getting file size from FindFirstFileA() instead of CreateFileA(), since we already call FindFirstFileA() and it returns the size there is no need to open/close every file to get it's size Replacing Malloc and strdup in scan helper function with FileEntry and path arenas + +Making the MPMC queue support when producers are consumers at the same time by adding a variable work, mpmc_push_work() that increments work and mpmc_task_done() that decrements work, and if work = 0 calls mpmc_producers_finished() that pushes poinsons to wake up sleeping threads and make them return NULL + +Replacing DirQueue, a queue growable with realloc with the MPMC queue diff --git a/binaries/file_hasher_v4.0.exe b/binaries/file_hasher_v4.0.exe new file mode 100644 index 0000000..75e93c8 Binary files /dev/null and b/binaries/file_hasher_v4.0.exe differ diff --git a/lf_mpmc.h b/lf_mpmc.h index 640b874..0524923 100644 --- a/lf_mpmc.h +++ b/lf_mpmc.h @@ -279,52 +279,6 @@ static void *mpmc_pop(MPMCQueue *q) { return data; } -/* ----------------------------------------------------------- */ -/* TRY POP (non blocking) */ -/* ----------------------------------------------------------- */ -// static b32 mpmc_try_pop(MPMCQueue *q, void **out) { -// -// if (!plat_sem_trywait(&q->items_sem)) -// return false; -// -// MPMCSlot *slot; -// size_t pos; -// -// int spins = 0; -// -// for (;;) { -// -// pos = atomic_load_explicit(&q->head, memory_order_relaxed); -// slot = &q->slots[pos & q->mask]; -// -// size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); -// intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); -// -// if (likely(diff == 0)) { -// -// if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, -// memory_order_relaxed, -// memory_order_relaxed)) -// break; -// -// } else { -// -// if (++spins > 10) { -// SwitchToThread(); -// spins = 0; -// } else { -// cpu_pause(); -// } -// } -// } -// -// *out = slot->data; -// -// atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); -// -// return true; -// } - /* ----------------------------------------------------------- */ /* PUSH POISON */ /* ----------------------------------------------------------- */ diff --git a/platform.h b/platform.h index 088d564..20b3b3b 100644 --- a/platform.h +++ b/platform.h @@ -45,35 +45,12 @@ static double timer_stop(HiResTimer *t) { (double)g_qpc_freq.QuadPart; } -/* Scan folders */ - -typedef struct DirQueue DirQueue; - -typedef struct DirQueue { - char **items; - size_t count; - size_t cap; - size_t active; - - int stop; - -#if PLATFORM_WINDOWS - CRITICAL_SECTION cs; - CONDITION_VARIABLE cv; -#else - pthread_mutex_t mutex; - pthread_cond_t cond; -#endif -} DirQueue; - -// MPMC Queue +// Workers context static MPMCQueue g_dir_queue; static MPMCQueue g_file_queue; typedef struct { - DirQueue *dir_queue; - mem_arena *path_arena; mem_arena *meta_arena; } ScannerContext; @@ -81,7 +58,3 @@ typedef struct { typedef struct { mem_arena *arena; } WorkerContext; - -// void scan_folder_windows_parallel(const char *base, ScannerContext *ctx); -// void scan_folder_posix_parallel(const char *base, ScannerContext *ctx); -// diff --git a/platform_windows.c b/platform_windows.c index e54312f..ba89039 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -1,4 +1,3 @@ -#include "arena.h" #include "platform.h" // ----------------------------- Globals ------------------------------------ @@ -105,47 +104,6 @@ void platform_get_file_owner(const char *path, char *out_owner, } // --------------- parallel directory scanning ---------------- -// Add queue helper functions -static void dirqueue_push(DirQueue *q, const char *path) { - EnterCriticalSection(&q->cs); - - if (q->count + 1 > q->cap) { - q->cap = q->cap ? q->cap * 2 : 1024; - q->items = realloc(q->items, q->cap * sizeof(char *)); - } - - q->items[q->count++] = _strdup(path); - - WakeConditionVariable(&q->cv); - LeaveCriticalSection(&q->cs); -} - -static char *dirqueue_pop(DirQueue *q) { - EnterCriticalSection(&q->cs); - - while (q->count == 0 && q->active > 0) { - SleepConditionVariableCS(&q->cv, &q->cs, INFINITE); - } - - if (q->count == 0 && q->active == 0) { - LeaveCriticalSection(&q->cs); - return NULL; // truly done - } - - char *dir = q->items[--q->count]; - q->active++; - - LeaveCriticalSection(&q->cs); - return dir; -} - -static void dirqueue_done(DirQueue *q) { - EnterCriticalSection(&q->cs); - q->active--; - WakeAllConditionVariable(&q->cv); - LeaveCriticalSection(&q->cs); -} - // ----------------------------- Scan helpers ----------------------------- void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) { char search[MAX_PATHLEN]; @@ -167,7 +125,12 @@ void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) { continue; if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { - dirqueue_push(ctx->dir_queue, full); + size_t len = strlen(full) + 1; + + char *dir = arena_push(&ctx->path_arena, len, false); + memcpy(dir, full, len); + + mpmc_push_work(&g_dir_queue, dir); } else { atomic_fetch_add(&g_files_found, 1); @@ -203,19 +166,16 @@ void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) { // ------------------------- Scan worker -------------------------------- static DWORD WINAPI scan_worker(LPVOID arg) { ScannerContext *ctx = arg; - DirQueue *q = ctx->dir_queue; for (;;) { - char *dir = dirqueue_pop(q); + char *dir = mpmc_pop(&g_dir_queue); if (!dir) break; scan_folder_windows_parallel(dir, ctx); - free(dir); - dirqueue_done(q); + mpmc_task_done(&g_dir_queue, 12); } - return 0; } @@ -420,9 +380,7 @@ int main(int argc, char **argv) { folder_count = 1; } - // ------------------------------- // Display selected folders - // ------------------------------- printf("Processing %d folder(s):\n", folder_count); for (int i = 0; i < folder_count; ++i) { printf(" - %s\n", folders[i]); @@ -472,21 +430,12 @@ int main(int argc, char **argv) { size_t num_threads = hw_threads * 2; // ------------------------------- - // Step 1: Scan all folders + // Scanning and hashing // ------------------------------- + mpmc_init(&g_dir_queue, MiB(1)); mpmc_init(&g_file_queue, MiB(1)); - DirQueue q; - memset(&q, 0, sizeof(q)); - InitializeCriticalSection(&q.cs); - InitializeConditionVariable(&q.cv); - q.active = 0; - - for (int i = 0; i < folder_count; ++i) { - dirqueue_push(&q, folders[i]); - } - // starting hash threads size_t num_hash_threads = num_threads; @@ -514,26 +463,35 @@ int main(int argc, char **argv) { arena_push(&gp_arena, sizeof(HANDLE) * num_scan_threads, true); for (size_t i = 0; i < num_scan_threads; i++) { - - scanners[i].dir_queue = &q; - scanners[i].path_arena = arena_create(¶ms); scanners[i].meta_arena = arena_create(¶ms); scan_threads[i] = CreateThread(NULL, 0, scan_worker, &scanners[i], 0, NULL); } + // Initial folder push + for (int i = 0; i < folder_count; i++) { + + size_t len = strlen(folders[i]) + 1; + + char *path = arena_push(&scanners[0].path_arena, len, false); + memcpy(path, folders[i], len); + + mpmc_push_work(&g_dir_queue, path); + } + + // Stop scan threads WaitForMultipleObjects((DWORD)num_scan_threads, scan_threads, TRUE, INFINITE); + for (size_t i = 0; i < num_scan_threads; ++i) + CloseHandle(scan_threads[i]); + for (size_t i = 0; i < num_hash_threads; i++) { mpmc_push(&g_file_queue, NULL); } atomic_store(&g_scan_done, 1); - for (size_t i = 0; i < num_scan_threads; ++i) - CloseHandle(scan_threads[i]); - arena_free(&gp_arena, (u8 **)&scan_threads, sizeof(HANDLE) * num_scan_threads); @@ -544,13 +502,13 @@ int main(int argc, char **argv) { printf("Completed scanning in %.2f seconds, found %zu files\n\n", scan_seconds, total_found); - // if no files found + // If no files found if (total_found == 0) { printf("No files found.\n"); return 0; } - // stop hashing threads + // Stop hashing threads WaitForMultipleObjects((DWORD)num_hash_threads, hash_threads, TRUE, INFINITE); for (size_t i = 0; i < num_hash_threads; ++i) @@ -559,10 +517,13 @@ int main(int argc, char **argv) { arena_free(&gp_arena, (u8 **)&hash_threads, sizeof(HANDLE) * num_hash_threads); + // Stop progress printing thread WaitForSingleObject(progress, INFINITE); CloseHandle(progress); - // write file_hashes.txt + // ------------------------------- + // Export file_hashes.txt + // ------------------------------- // FILE *f = fopen(FILE_HASHES_TXT, "wb"); // @@ -591,7 +552,9 @@ int main(int argc, char **argv) { WriteFile(h, arena_base, (DWORD)local_hash_arena->pos, &written, NULL); } - // done time + // ------------------------------- + // Print summary + // ------------------------------- double total_seconds = timer_stop(&total_timer); printf("Completed hashing %zu files\n", total_found);