From 9b327c82a6474bee163ecdaa8eb48c3b10a107a6 Mon Sep 17 00:00:00 2001 From: amir Date: Fri, 6 Mar 2026 16:44:37 +0100 Subject: [PATCH] Implementing simple MPMC queue Rewrinting the pipeline and progress display --- .gitignore | 3 + platform.h | 37 ++- platform_windows.c | 544 ++++++++++++++++----------------------------- 3 files changed, 221 insertions(+), 363 deletions(-) diff --git a/.gitignore b/.gitignore index 6fc1500..fd00ec0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +file_hasher.pdb +file_hasher.ilk +file_hasher.rdi file_hasher.exe file_hashes.txt file_list.txt diff --git a/platform.h b/platform.h index 0a40fba..0153237 100644 --- a/platform.h +++ b/platform.h @@ -26,14 +26,14 @@ #include #endif -#define XXH_VECTOR XXH_AVX2 // not recommanded to compile with gcc see xxhash.h line 4082 - // Must compile with /arch:AVX2 in clang-cl or -mavx2 in clang/gcc +#define XXH_VECTOR \ + XXH_AVX2 // not recommanded to compile with gcc see xxhash.h line 4082 + // Must compile with /arch:AVX2 in clang-cl or -mavx2 in clang/gcc #define XXH_INLINE_ALL #include "xxhash.c" #include "xxhash.h" // ----------------------------- Config ------------------------------------- -#define FILE_LIST_TXT "file_list.txt" #define FILE_HASHES_TXT "file_hashes.txt" #define HASH_STRLEN 33 // 128-bit hex (32 chars) + null #define MAX_PATHLEN 4096 @@ -95,17 +95,32 @@ static double timer_stop(HiResTimer *t) { (double)g_qpc_freq.QuadPart; } -/* Scan folders */ -typedef struct EntryBuffer { - FileEntry *entries; - size_t count; - size_t capacity; -} EntryBuffer; +// ============================================================ +// Simple Mutex-Based MPMC Queue (FileEntry*) +// ============================================================ +typedef struct { + FileEntry **items; + + size_t capacity; + size_t head; + size_t tail; + size_t count; + + int producers_active; + + CRITICAL_SECTION cs; + CONDITION_VARIABLE not_empty; + CONDITION_VARIABLE not_full; + +} MPMCQueue; + +static MPMCQueue g_file_queue; + +/* Scan folders */ typedef struct DirQueue DirQueue; -void scan_folder_windows_parallel(const char *base, DirQueue *q, - EntryBuffer *buf); +void scan_folder_windows_parallel(const char *base, DirQueue *q); void scan_folder_posix_parallel(const char *base, DirQueue *q); typedef struct DirJob { diff --git a/platform_windows.c b/platform_windows.c index b106ca1..ad0b6de 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -1,14 +1,11 @@ #include "platform.h" // ----------------------------- Globals ------------------------------------ -FileEntry *g_entries = NULL; -size_t g_entry_count = 0; -size_t g_entry_capacity = 0; -static atomic_int g_scan_done = 0; -static atomic_size_t g_files_found = 0; +static atomic_uint_fast64_t g_files_found = 0; +static atomic_uint_fast64_t g_files_hashed = 0; static atomic_uint_fast64_t g_bytes_processed = 0; +static atomic_int g_scan_done = 0; // __________________________________________________________________________ -static CRITICAL_SECTION g_entries_cs; // ----------------------------- Utils -------------------------------------- static void perror_exit(const char *msg) { @@ -23,36 +20,6 @@ static void *xmalloc(size_t n) { return p; } -static void global_entries_push(const FileEntry *src) { - if (g_entry_count == g_entry_capacity) { - size_t newcap = g_entry_capacity ? g_entry_capacity * 2 : 1024; - g_entries = realloc(g_entries, newcap * sizeof(FileEntry)); - if (!g_entries) - perror_exit("realloc"); - g_entry_capacity = newcap; - } - - FileEntry *dst = &g_entries[g_entry_count++]; - memset(dst, 0, sizeof(*dst)); - - dst->size_bytes = src->size_bytes; - dst->created_time = src->created_time; - dst->modified_time = src->modified_time; - dst->path = strdup(src->path); - strncpy(dst->owner, src->owner, sizeof(dst->owner) - 1); -} - -static void free_entries(void) { - for (size_t i = 0; i < g_entry_count; ++i) { - free(g_entries[i].path); - } - - free(g_entries); - g_entries = NULL; - g_entry_count = 0; - g_entry_capacity = 0; -} - // ----------------------------- Convert filetime to epoch -------------- static uint64_t filetime_to_epoch(const FILETIME *ft) { ULARGE_INTEGER ull; @@ -110,29 +77,62 @@ static void format_time(uint64_t t, char *out, size_t out_sz) { } // --------------- parallel directory scanning ---------------- -static void entrybuf_init(EntryBuffer *b) { - b->entries = NULL; - b->count = 0; - b->capacity = 0; +static void mpmc_init(MPMCQueue *q, size_t capacity) { + q->capacity = capacity; + q->items = xmalloc(sizeof(FileEntry *) * capacity); + + q->head = 0; + q->tail = 0; + q->count = 0; + q->producers_active = 1; + + InitializeCriticalSection(&q->cs); + InitializeConditionVariable(&q->not_empty); + InitializeConditionVariable(&q->not_full); } -static void entrybuf_push(EntryBuffer *b, const FileEntry *src) { - if (b->count == b->capacity) { - size_t newcap = b->capacity ? b->capacity * 2 : 256; - b->entries = realloc(b->entries, newcap * sizeof(FileEntry)); - if (!b->entries) - perror_exit("realloc"); - b->capacity = newcap; +static void mpmc_push(MPMCQueue *q, FileEntry *item) { + EnterCriticalSection(&q->cs); + + while (q->count == q->capacity) { + SleepConditionVariableCS(&q->not_full, &q->cs, INFINITE); } - FileEntry *dst = &b->entries[b->count++]; - memset(dst, 0, sizeof(*dst)); + q->items[q->tail] = item; + q->tail = (q->tail + 1) % q->capacity; + q->count++; - dst->size_bytes = src->size_bytes; - dst->created_time = src->created_time; - dst->modified_time = src->modified_time; - dst->path = strdup(src->path); - strncpy(dst->owner, src->owner, sizeof(dst->owner) - 1); + WakeConditionVariable(&q->not_empty); + LeaveCriticalSection(&q->cs); +} + +static FileEntry *mpmc_pop(MPMCQueue *q) { + EnterCriticalSection(&q->cs); + + while (q->count == 0 && q->producers_active) { + SleepConditionVariableCS(&q->not_empty, &q->cs, INFINITE); + } + + if (q->count == 0 && !q->producers_active) { + LeaveCriticalSection(&q->cs); + return NULL; + } + + FileEntry *item = q->items[q->head]; + q->head = (q->head + 1) % q->capacity; + q->count--; + + WakeConditionVariable(&q->not_full); + LeaveCriticalSection(&q->cs); + + return item; +} + +static void mpmc_finish(MPMCQueue *q) { + EnterCriticalSection(&q->cs); + q->producers_active = 0; + WakeAllConditionVariable(&q->not_empty); + LeaveCriticalSection(&q->cs); } // Add queue helper functions @@ -178,46 +178,22 @@ static void dirqueue_done(DirQueue *q) { static DWORD WINAPI scan_worker(LPVOID arg) { DirQueue *q = (DirQueue *)arg; - EntryBuffer local; - entrybuf_init(&local); - for (;;) { char *dir = dirqueue_pop(q); if (!dir) break; - scan_folder_windows_parallel(dir, q, &local); - // debug - // printf("[T%lu] scanning %s\n", GetCurrentThreadId(), dir); - // debug + scan_folder_windows_parallel(dir, q); free(dir); dirqueue_done(q); } - // merge once at end - EnterCriticalSection(&g_entries_cs); - - if (g_entry_count + local.count > g_entry_capacity) { - g_entry_capacity = g_entry_count + local.count; - g_entries = realloc(g_entries, g_entry_capacity * sizeof(FileEntry)); - if (!g_entries) - perror_exit("realloc"); - } - - memcpy(&g_entries[g_entry_count], local.entries, - local.count * sizeof(FileEntry)); - g_entry_count += local.count; - - LeaveCriticalSection(&g_entries_cs); - - free(local.entries); return 0; } // Scanning directory function -void scan_folder_windows_parallel(const char *base, DirQueue *q, - EntryBuffer *buf) { +void scan_folder_windows_parallel(const char *base, DirQueue *q) { char search[MAX_PATHLEN]; snprintf(search, sizeof(search), "%s\\*", base); @@ -242,18 +218,19 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q, atomic_fetch_add(&g_files_found, 1); - FileEntry fe; - memset(&fe, 0, sizeof(fe)); + FileEntry *fe = malloc(sizeof(FileEntry)); + memset(fe, 0, sizeof(FileEntry)); char norm[MAX_PATHLEN]; strncpy(norm, full, sizeof(norm) - 1); norm[sizeof(norm) - 1] = 0; normalize_path(norm); - fe.path = norm; - platform_get_file_times(full, &fe.created_time, &fe.modified_time); + fe->path = _strdup(norm); - platform_get_file_owner(full, fe.owner, sizeof(fe.owner)); + platform_get_file_times(full, &fe->created_time, &fe->modified_time); + + platform_get_file_owner(full, fe->owner, sizeof(fe->owner)); LARGE_INTEGER size; HANDLE hf = @@ -262,83 +239,18 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q, if (hf != INVALID_HANDLE_VALUE) { if (GetFileSizeEx(hf, &size)) - fe.size_bytes = (uint64_t)size.QuadPart; + fe->size_bytes = (uint64_t)size.QuadPart; CloseHandle(hf); } - entrybuf_push(buf, &fe); + mpmc_push(&g_file_queue, fe); } + } while (FindNextFileA(h, &fd)); FindClose(h); } -// Scan progress thread -static DWORD WINAPI scan_progress_thread(LPVOID arg) { - (void)arg; - - for (;;) { - if (atomic_load(&g_scan_done)) - break; - - Sleep(100); // 0.2 seconds - - size_t count = atomic_load(&g_files_found); - - printf("\rScanning... %zu files found", count); - fflush(stdout); - } - - return 0; -} - -// ----------------------------- Job queue ---------------------------------- -static void jobqueue_init(JobQueue *q) { - q->head = q->tail = NULL; - atomic_store(&q->count, 0); - q->stop = 0; - InitializeCriticalSection(&q->cs); - InitializeConditionVariable(&q->cv); -} - -static void jobqueue_push(JobQueue *q, Job *job) { - EnterCriticalSection(&q->cs); - job->next = NULL; - if (q->tail) - q->tail->next = job; - else - q->head = job; - q->tail = job; - atomic_fetch_add(&q->count, 1); - WakeConditionVariable(&q->cv); - LeaveCriticalSection(&q->cs); -} - -static Job *jobqueue_pop(JobQueue *q) { - EnterCriticalSection(&q->cs); - while (!q->head && !q->stop) - SleepConditionVariableCS(&q->cv, &q->cs, INFINITE); - if (q->stop && !q->head) { - LeaveCriticalSection(&q->cs); - return NULL; - } - Job *j = q->head; - q->head = j->next; - if (!q->head) - q->tail = NULL; - LeaveCriticalSection(&q->cs); - if (j) - atomic_fetch_sub(&q->count, 1); - return j; -} - -static void jobqueue_stop(JobQueue *q) { - EnterCriticalSection(&q->cs); - q->stop = 1; - WakeAllConditionVariable(&q->cv); - LeaveCriticalSection(&q->cs); -} - // ----------------------------- Hashing helpers ----------------------------- static void xxh3_hash_file_stream(const char *path, char *out_hex) { // compute XXH3_128 over file. POSIX and Windows use standard reads in this @@ -371,79 +283,114 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) { (unsigned long long)h.low64); } -// ----------------------------- Worker -------------------------------------- -static DWORD WINAPI worker_thread_windows(LPVOID argp) { - WorkerArg *w = (WorkerArg *)argp; - JobQueue *q = w->queue; +// ----------------------------- Hash worker +// -------------------------------------- +static DWORD WINAPI hash_worker(LPVOID arg) { + MPMCQueue *q = (MPMCQueue *)arg; + for (;;) { - Job *job = jobqueue_pop(q); - if (!job) + FileEntry *fe = mpmc_pop(q); + if (!fe) break; - char hex[HASH_STRLEN]; - // On Windows we use overlapped ReadFile for large files would be better, - // but ReadFile with NULL overlapped is sufficient inside parallel threads. - xxh3_hash_file_stream(job->file->path, hex); - // append to hashes file using a critical section to avoid races - static CRITICAL_SECTION append_cs; - static LONG init = 0; - if (InterlockedCompareExchange(&init, 1, 1) == 0) { - // first time initialize - InitializeCriticalSection(&append_cs); - InterlockedExchange(&init, 1); - } - EnterCriticalSection(&append_cs); - FILE *hf = fopen(FILE_HASHES_TXT, "a"); - if (hf) { - char created[32], modified[32]; + char hash[HASH_STRLEN]; + xxh3_hash_file_stream(fe->path, hash); - format_time(job->file->created_time, created, sizeof(created)); - format_time(job->file->modified_time, modified, sizeof(modified)); - double size_kib = (double)job->file->size_bytes / (1024.0); + atomic_fetch_add(&g_files_hashed, 1); - fprintf(hf, "%s\t%s\t%.2f\t%s\t%s\t%s\n", hex, job->file->path, size_kib, - created, modified, job->file->owner); - fclose(hf); - } - LeaveCriticalSection(&append_cs); - - atomic_fetch_add(w->done_counter, 1); - free(job); + free(fe->path); + free(fe); } - atomic_fetch_sub(w->live_workers, 1); + return 0; } // ----------------------------- Progress display --------------------------- -static void print_progress(size_t done, size_t total) { - const int barw = 40; - double pct = total ? (double)done / (double)total : 0.0; - int filled = (int)(pct * barw + 0.5); - printf("\r["); - for (int i = 0; i < filled; ++i) - putchar('#'); - for (int i = filled; i < barw; ++i) - putchar(' '); - printf("] %6.2f%% (%zu / %zu) ", pct * 100.0, done, total); - fflush(stdout); -} +DWORD WINAPI progress_thread(void *arg) { -// ----------------------------- Helpers: load/save -------------------------- -static int file_exists(const char *path) { - DWORD attr = GetFileAttributesA(path); - return attr != INVALID_FILE_ATTRIBUTES; -} + LARGE_INTEGER freq, start; + QueryPerformanceFrequency(&freq); + QueryPerformanceCounter(&start); -static void save_file_list(const char *list_path) { - FILE *f = fopen(list_path, "w"); - if (!f) { - perror("fopen file_list"); - return; + uint64_t last_bytes = atomic_load(&g_bytes_processed); + double last_time = 0.0; + + double displayed_speed = 0.0; + const double sample_interval = 0.5; + + for (;;) { + + uint64_t found = atomic_load(&g_files_found); + uint64_t hashed = atomic_load(&g_files_hashed); + uint64_t bytes = atomic_load(&g_bytes_processed); + int scan_done = atomic_load(&g_scan_done); + + LARGE_INTEGER now; + QueryPerformanceCounter(&now); + + double t = (double)(now.QuadPart - start.QuadPart) / (double)freq.QuadPart; + + if (last_time == 0.0) { + last_time = t; + last_bytes = bytes; + } + + double dt = t - last_time; + + if (dt >= sample_interval) { + uint64_t db = bytes - last_bytes; + + if (db > 0 && dt > 0.0001) { + displayed_speed = (double)db / (1024.0 * 1024.0) / dt; + } + + last_bytes = bytes; + last_time = t; + } + + if (!scan_done) { + + printf("\rScanning: %llu files | Hashed: %llu | %.2f MB/s", + (unsigned long long)found, (unsigned long long)hashed, + displayed_speed); + + } else { + + double pct = found ? (double)hashed / (double)found : 0.0; + + int barw = 40; + int filled = (int)(pct * barw); + + char bar[64]; + int p = 0; + + bar[p++] = '['; + + for (int i = 0; i < filled; i++) + bar[p++] = '#'; + + for (int i = filled; i < barw; i++) + bar[p++] = '.'; + + bar[p++] = ']'; + bar[p] = 0; + + printf("\r%s %6.2f%% (%llu / %llu) %.2f MB/s", bar, pct * 100.0, + (unsigned long long)hashed, (unsigned long long)found, + displayed_speed); + } + + fflush(stdout); + + if (scan_done && hashed == found) + break; + + Sleep(100); } - for (size_t i = 0; i < g_entry_count; ++i) { - fprintf(f, "%s\n", g_entries[i].path); - } - fclose(f); + + printf("\n"); + + return 0; } // ----------------------------- Get file metadata ------------------------- @@ -476,7 +423,6 @@ int main(int argc, char **argv) { HiResTimer total_timer; HiResTimer scan_timer; - HiResTimer hash_timer; timer_start(&total_timer); timer_start(&scan_timer); @@ -551,7 +497,8 @@ int main(int argc, char **argv) { // ------------------------------- // Step 1: Scan all folders // ------------------------------- - InitializeCriticalSection(&g_entries_cs); + + mpmc_init(&g_file_queue, 65536); DirQueue q; memset(&q, 0, sizeof(q)); @@ -559,8 +506,16 @@ int main(int argc, char **argv) { InitializeConditionVariable(&q.cv); q.active = 0; - HANDLE scan_progress = - CreateThread(NULL, 0, scan_progress_thread, NULL, 0, NULL); + // starting hash threads + HANDLE *hash_threads = malloc(sizeof(HANDLE) * num_threads); + + for (size_t i = 0; i < num_threads; ++i) { + hash_threads[i] = + CreateThread(NULL, 0, hash_worker, &g_file_queue, 0, NULL); + } + + // starting scan threads + HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL); for (int i = 0; i < folder_count; ++i) { dirqueue_push(&q, folders[i]); @@ -578,162 +533,47 @@ int main(int argc, char **argv) { WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); + mpmc_finish(&g_file_queue); + atomic_store(&g_scan_done, 1); - WaitForSingleObject(scan_progress, INFINITE); - CloseHandle(scan_progress); for (size_t i = 0; i < scan_threads; ++i) CloseHandle(scan_tids[i]); free(scan_tids); double scan_seconds = timer_stop(&scan_timer); - double scan_rate = (double)g_entry_count / scan_seconds; + size_t total_found = atomic_load(&g_files_found); - printf(". Scan rate : %.1f files/sec\n", scan_rate); - printf("Completed scanning in %.2f seconds. Saving to %s\n\n", scan_seconds, - FILE_LIST_TXT); - save_file_list(FILE_LIST_TXT); + printf("\r%*s\r", 120, ""); // clear_console_line + printf("Completed scanning in %.2f seconds, found %zu files\n\n", + scan_seconds, total_found); - if (g_entry_count == 0) { - printf("No files to process.\n"); + // if no files found + if (total_found == 0) { + printf("No files found.\n"); return 0; } - DeleteCriticalSection(&g_entries_cs); + // stop hashing threads + WaitForMultipleObjects((DWORD)num_threads, hash_threads, TRUE, INFINITE); - // Prepare job queue - JobQueue queue; - jobqueue_init(&queue); - - size_t total_jobs = 0; - for (size_t i = 0; i < g_entry_count; ++i) { - Job *j = (Job *)malloc(sizeof(Job)); - j->file = &g_entries[i]; - j->next = NULL; - jobqueue_push(&queue, j); - ++total_jobs; - } - - if (total_jobs == 0) { - printf("Nothing to do — all files already hashed.\n"); - return 0; - } - - FILE *hf = fopen(FILE_HASHES_TXT, "w"); - if (hf) - fclose(hf); - - // Starting thread pool - atomic_size_t done_counter; - atomic_store(&done_counter, 0); - atomic_int live_workers; - atomic_store(&live_workers, (int)num_threads); - - WorkerArg warg = {.queue = &queue, - .done_counter = &done_counter, - .total_jobs = total_jobs, - .live_workers = &live_workers}; - - printf("Starting thread pool: %zu threads (CPU cores: %zu)\n", num_threads, - hw_threads); - - // Launch threads - HANDLE *tids = malloc(sizeof(HANDLE) * num_threads); - for (size_t i = 0; i < num_threads; ++i) { - tids[i] = CreateThread(NULL, 0, worker_thread_windows, &warg, 0, NULL); - } - - // Progress / timer - struct timespec tstart, tnow; - // fallback for windows - LARGE_INTEGER freq, start_li; - QueryPerformanceFrequency(&freq); - QueryPerformanceCounter(&start_li); - - size_t last_done = 0; - - // --------------- Hashing speed MB/s ---------------- - uint64_t last_bytes = atomic_load(&g_bytes_processed); - double last_time = 0.0; - double displayed_speed = 0.0; - const double sample_interval = 0.5; - char linebuf[256]; - - for (;;) { - size_t done = (size_t)atomic_load(&done_counter); - - // ---- monotonic time ---- - LARGE_INTEGER now_li; - QueryPerformanceCounter(&now_li); - double now = - (double)(now_li.QuadPart - start_li.QuadPart) / (double)freq.QuadPart; - - // ---- total processed bytes ---- - uint64_t bytes = atomic_load(&g_bytes_processed); - - // ---- real sampler (independent of UI sleep) ---- - if (last_time == 0.0) { - last_time = now; - last_bytes = bytes; - } - - double dt = now - last_time; - if (dt >= sample_interval) { - uint64_t db = bytes - last_bytes; - - if (db > 0 && dt > 0.0001) { - displayed_speed = (double)db / (1024.0 * 1024.0) / dt; - } - - last_bytes = bytes; - last_time = now; - } - - // ---- progress bar build ---- - const int barw = 40; - double pct = total_jobs ? (double)done / (double)total_jobs : 0.0; - int filled = (int)(pct * barw + 0.5); - - int p = 0; - p += snprintf(linebuf + p, sizeof(linebuf) - p, "["); - for (int i = 0; i < filled && p < (int)sizeof(linebuf); ++i) - p += snprintf(linebuf + p, sizeof(linebuf) - p, "#"); - for (int i = filled; i < barw && p < (int)sizeof(linebuf); ++i) - p += snprintf(linebuf + p, sizeof(linebuf) - p, "."); - - snprintf(linebuf + p, sizeof(linebuf) - p, - "] %6.2f%% (%zu / %zu) %8.2f MB/s", pct * 100.0, done, total_jobs, - displayed_speed); - - printf("\r%s", linebuf); - fflush(stdout); - - if (done >= total_jobs) - break; - - Sleep(100); - } - - printf("\n\n"); - - // stop queue and join threads - jobqueue_stop(&queue); - WaitForMultipleObjects((DWORD)num_threads, tids, TRUE, INFINITE); for (size_t i = 0; i < num_threads; ++i) - CloseHandle(tids[i]); + CloseHandle(hash_threads[i]); + + free(hash_threads); + free(g_file_queue.items); + + WaitForSingleObject(progress, INFINITE); + CloseHandle(progress); // done time - LARGE_INTEGER end_li; - QueryPerformanceCounter(&end_li); - double elapsed = - (double)(end_li.QuadPart - start_li.QuadPart) / (double)freq.QuadPart; double total_seconds = timer_stop(&total_timer); - printf("Completed hashing %zu files in %.2f seconds\n", total_jobs, elapsed); + printf("Completed hashing %zu files\n", total_found); uint64_t total_bytes = (uint64_t)atomic_load(&g_bytes_processed); double total_mb = (double)total_bytes / (1024.0 * 1024.0); - double avg_mbps = total_mb / elapsed; + double avg_mbps = total_mb / total_seconds; printf("Total: %.2f MB, Average: %.2f MB/s\n", total_mb, avg_mbps); printf(" Total time : %.2f seconds\n", total_seconds);