diff --git a/.gitignore b/.gitignore index 6fc1500..fd00ec0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +file_hasher.pdb +file_hasher.ilk +file_hasher.rdi file_hasher.exe file_hashes.txt file_list.txt diff --git a/binaries/changlog.txt b/binaries/changlog.txt new file mode 100644 index 0000000..bfd17e6 --- /dev/null +++ b/binaries/changlog.txt @@ -0,0 +1,21 @@ +V1.0: Recursive scan + +v2.0: Multi threaded scan +Collects more metadata + +v2.1: Uses AVX2 instead of SSE2 + +v3.0: Simple mutex/critical section based MPMC queue +reusable hashing buffer + +v3.1: Lock free MPMC queue Vyukov-style + +v3.2: Making the lock free MPMC queue growable +Add padding to avoir false sharing +Add sleep() and SwitchToThread() to limit spinning + +v3.3: Fix bug slots used before initialization,compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that +Fix bug multiple threads committing at the same time, fixed by using atomic_flag commit_lock and re-checking committed after acquiring the lock +Reorder helper functions + +v3.4: Rewriting hash_worker() to export file_hashes.txt diff --git a/binaries/file_hasher_v1.0.exe b/binaries/file_hasher_v1.0.exe new file mode 100644 index 0000000..97b90a0 Binary files /dev/null and b/binaries/file_hasher_v1.0.exe differ diff --git a/binaries/file_hasher_v2.0.exe b/binaries/file_hasher_v2.0.exe new file mode 100644 index 0000000..f00eaf7 Binary files /dev/null and b/binaries/file_hasher_v2.0.exe differ diff --git a/binaries/file_hasher_v2.1.exe b/binaries/file_hasher_v2.1.exe new file mode 100644 index 0000000..e4c2ad2 Binary files /dev/null and b/binaries/file_hasher_v2.1.exe differ diff --git a/binaries/file_hasher_v3.0.exe b/binaries/file_hasher_v3.0.exe new file mode 100644 index 0000000..661ffa2 Binary files /dev/null and b/binaries/file_hasher_v3.0.exe differ diff --git a/binaries/file_hasher_v3.1.exe b/binaries/file_hasher_v3.1.exe new file mode 100644 index 0000000..205bafa Binary files /dev/null and b/binaries/file_hasher_v3.1.exe differ diff --git a/binaries/file_hasher_v3.2.exe b/binaries/file_hasher_v3.2.exe new file mode 100644 index 0000000..8364396 Binary files /dev/null and b/binaries/file_hasher_v3.2.exe differ diff --git a/binaries/file_hasher_v3.3.exe b/binaries/file_hasher_v3.3.exe new file mode 100644 index 0000000..1712169 Binary files /dev/null and b/binaries/file_hasher_v3.3.exe differ diff --git a/binaries/file_hasher_v3.4.exe b/binaries/file_hasher_v3.4.exe new file mode 100644 index 0000000..3ad25de Binary files /dev/null and b/binaries/file_hasher_v3.4.exe differ diff --git a/platform.h b/platform.h index 0a40fba..337ce54 100644 --- a/platform.h +++ b/platform.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -26,14 +27,14 @@ #include #endif -#define XXH_VECTOR XXH_AVX2 // not recommanded to compile with gcc see xxhash.h line 4082 - // Must compile with /arch:AVX2 in clang-cl or -mavx2 in clang/gcc +#define XXH_VECTOR \ + XXH_AVX2 // not recommanded to compile with gcc see xxhash.h line 4082 + // Must compile with /arch:AVX2 in clang-cl or -mavx2 in clang/gcc #define XXH_INLINE_ALL #include "xxhash.c" #include "xxhash.h" // ----------------------------- Config ------------------------------------- -#define FILE_LIST_TXT "file_list.txt" #define FILE_HASHES_TXT "file_hashes.txt" #define HASH_STRLEN 33 // 128-bit hex (32 chars) + null #define MAX_PATHLEN 4096 @@ -95,17 +96,38 @@ static double timer_stop(HiResTimer *t) { (double)g_qpc_freq.QuadPart; } -/* Scan folders */ -typedef struct EntryBuffer { - FileEntry *entries; - size_t count; - size_t capacity; -} EntryBuffer; +// ============================================================ +// Simple Mutex-Based MPMC Queue (FileEntry*) +// ============================================================ +typedef struct { + atomic_size_t seq; + FileEntry *data; + char pad[64 - sizeof(atomic_size_t) - sizeof(FileEntry *)]; +} MPMCSlot; + +typedef struct { + atomic_size_t head; + char pad1[64]; + atomic_size_t tail; + char pad2[64]; + + size_t capacity; + size_t mask; + + atomic_size_t committed; + size_t commit_step; + atomic_flag commit_lock; + + MPMCSlot *slots; +} MPMCQueue; + +static MPMCQueue g_file_queue; + +/* Scan folders */ typedef struct DirQueue DirQueue; -void scan_folder_windows_parallel(const char *base, DirQueue *q, - EntryBuffer *buf); +void scan_folder_windows_parallel(const char *base, DirQueue *q); void scan_folder_posix_parallel(const char *base, DirQueue *q); typedef struct DirJob { diff --git a/platform_windows.c b/platform_windows.c index b106ca1..23c5897 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -1,14 +1,12 @@ #include "platform.h" +#include // ----------------------------- Globals ------------------------------------ -FileEntry *g_entries = NULL; -size_t g_entry_count = 0; -size_t g_entry_capacity = 0; -static atomic_int g_scan_done = 0; -static atomic_size_t g_files_found = 0; +static atomic_uint_fast64_t g_files_found = 0; +static atomic_uint_fast64_t g_files_hashed = 0; static atomic_uint_fast64_t g_bytes_processed = 0; +static atomic_int g_scan_done = 0; // __________________________________________________________________________ -static CRITICAL_SECTION g_entries_cs; // ----------------------------- Utils -------------------------------------- static void perror_exit(const char *msg) { @@ -23,36 +21,6 @@ static void *xmalloc(size_t n) { return p; } -static void global_entries_push(const FileEntry *src) { - if (g_entry_count == g_entry_capacity) { - size_t newcap = g_entry_capacity ? g_entry_capacity * 2 : 1024; - g_entries = realloc(g_entries, newcap * sizeof(FileEntry)); - if (!g_entries) - perror_exit("realloc"); - g_entry_capacity = newcap; - } - - FileEntry *dst = &g_entries[g_entry_count++]; - memset(dst, 0, sizeof(*dst)); - - dst->size_bytes = src->size_bytes; - dst->created_time = src->created_time; - dst->modified_time = src->modified_time; - dst->path = strdup(src->path); - strncpy(dst->owner, src->owner, sizeof(dst->owner) - 1); -} - -static void free_entries(void) { - for (size_t i = 0; i < g_entry_count; ++i) { - free(g_entries[i].path); - } - - free(g_entries); - g_entries = NULL; - g_entry_count = 0; - g_entry_capacity = 0; -} - // ----------------------------- Convert filetime to epoch -------------- static uint64_t filetime_to_epoch(const FILETIME *ft) { ULARGE_INTEGER ull; @@ -63,6 +31,25 @@ static uint64_t filetime_to_epoch(const FILETIME *ft) { return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL; } +// ----------------------------- Format time helper ------------------------- +static void format_time(uint64_t t, char *out, size_t out_sz) { + if (t == 0) { + snprintf(out, out_sz, "N/A"); + return; + } + + time_t tt = (time_t)t; + struct tm tm; + +#if PLATFORM_WINDOWS + localtime_s(&tm, &tt); +#else + localtime_r(&tt, &tm); +#endif + + strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); +} + // ----------------------------- Resolve file owner --------------------- static void get_file_owner(const char *path, char *out, size_t out_sz) { PSID sid = NULL; @@ -90,49 +77,186 @@ static void get_file_owner(const char *path, char *out, size_t out_sz) { LocalFree(sd); } -// ----------------------------- Format time helper ------------------------- -static void format_time(uint64_t t, char *out, size_t out_sz) { - if (t == 0) { - snprintf(out, out_sz, "N/A"); - return; +// ----------------------------- Get file metadata ------------------------- +void platform_get_file_times(const char *path, uint64_t *out_created, + uint64_t *out_modified) { + WIN32_FILE_ATTRIBUTE_DATA fad; + if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) { + *out_created = filetime_to_epoch(&fad.ftCreationTime); + *out_modified = filetime_to_epoch(&fad.ftLastWriteTime); + } else { + *out_created = 0; + *out_modified = 0; } +} - time_t tt = (time_t)t; - struct tm tm; - -#if PLATFORM_WINDOWS - localtime_s(&tm, &tt); -#else - localtime_r(&tt, &tm); -#endif - - strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); +void platform_get_file_owner(const char *path, char *out_owner, + size_t out_owner_size) { + get_file_owner(path, out_owner, out_owner_size); } // --------------- parallel directory scanning ---------------- -static void entrybuf_init(EntryBuffer *b) { - b->entries = NULL; - b->count = 0; - b->capacity = 0; -} - -static void entrybuf_push(EntryBuffer *b, const FileEntry *src) { - if (b->count == b->capacity) { - size_t newcap = b->capacity ? b->capacity * 2 : 256; - b->entries = realloc(b->entries, newcap * sizeof(FileEntry)); - if (!b->entries) - perror_exit("realloc"); - b->capacity = newcap; +void mpmc_init(MPMCQueue *q, size_t max_capacity) { + if ((max_capacity & (max_capacity - 1)) != 0) { + fprintf(stderr, "capacity must be power of two\n"); + exit(1); } - FileEntry *dst = &b->entries[b->count++]; - memset(dst, 0, sizeof(*dst)); + q->capacity = max_capacity; + q->mask = max_capacity - 1; - dst->size_bytes = src->size_bytes; - dst->created_time = src->created_time; - dst->modified_time = src->modified_time; - dst->path = strdup(src->path); - strncpy(dst->owner, src->owner, sizeof(dst->owner) - 1); + size_t bytes = sizeof(MPMCSlot) * max_capacity; + + q->slots = VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE); + + if (!q->slots) { + fprintf(stderr, "VirtualAlloc reserve failed\n"); + exit(1); + } + + q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); + atomic_flag_clear(&q->commit_lock); + + q->committed = q->commit_step; + + VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT, + PAGE_READWRITE); + + for (size_t i = 0; i < q->committed; i++) { + atomic_init(&q->slots[i].seq, i); + q->slots[i].data = NULL; + } + + atomic_init(&q->head, 0); + atomic_init(&q->tail, 0); +} + +static void mpmc_commit_more(MPMCQueue *q) { + + if (atomic_flag_test_and_set(&q->commit_lock)) + return; + + size_t start = atomic_load_explicit(&q->committed, memory_order_acquire); + size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // another thread already committed enough + if (tail < start) { + atomic_flag_clear(&q->commit_lock); + return; + } + + if (start >= q->capacity) { + atomic_flag_clear(&q->commit_lock); + return; + } + + size_t new_commit = start + q->commit_step; + if (new_commit > q->capacity) + new_commit = q->capacity; + + size_t count = new_commit - start; + + VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, + PAGE_READWRITE); + + for (size_t i = start; i < new_commit; i++) { + atomic_init(&q->slots[i].seq, i); + q->slots[i].data = NULL; + } + + atomic_store_explicit(&q->committed, new_commit, memory_order_release); + + atomic_flag_clear(&q->commit_lock); +} + +void mpmc_push(MPMCQueue *q, FileEntry *item) { + MPMCSlot *slot; + size_t pos; + + for (;;) { + + pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // ensure the slot is committed BEFORE accessing it + size_t committed = + atomic_load_explicit(&q->committed, memory_order_relaxed); + + if (pos >= committed) { + mpmc_commit_more(q); + continue; + } + + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)pos; + + if (diff == 0) { + + if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else if (diff < 0) { // queue actually full + + Sleep(1000); + + } else { // waiting to grow + + Sleep(0); + } + } + + slot->data = item; + + atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); +} + +FileEntry *mpmc_pop(MPMCQueue *q) { + MPMCSlot *slot; + size_t pos; + + int spins = 0; + + for (;;) { + + pos = atomic_load_explicit(&q->head, memory_order_relaxed); + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); + + if (diff == 0) { + + if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else if (diff < 0) { // queue is empty + + Sleep(500); + + } else { // slot is still transitioning (written by another thread) + + if (++spins > 10) { + + SwitchToThread(); // yield CPU + spins = 0; + + } else { + + _mm_pause(); // busy waiting + } + } + } + + FileEntry *data = slot->data; + + atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); + + return data; } // Add queue helper functions @@ -178,46 +302,22 @@ static void dirqueue_done(DirQueue *q) { static DWORD WINAPI scan_worker(LPVOID arg) { DirQueue *q = (DirQueue *)arg; - EntryBuffer local; - entrybuf_init(&local); - for (;;) { char *dir = dirqueue_pop(q); if (!dir) break; - scan_folder_windows_parallel(dir, q, &local); - // debug - // printf("[T%lu] scanning %s\n", GetCurrentThreadId(), dir); - // debug + scan_folder_windows_parallel(dir, q); free(dir); dirqueue_done(q); } - // merge once at end - EnterCriticalSection(&g_entries_cs); - - if (g_entry_count + local.count > g_entry_capacity) { - g_entry_capacity = g_entry_count + local.count; - g_entries = realloc(g_entries, g_entry_capacity * sizeof(FileEntry)); - if (!g_entries) - perror_exit("realloc"); - } - - memcpy(&g_entries[g_entry_count], local.entries, - local.count * sizeof(FileEntry)); - g_entry_count += local.count; - - LeaveCriticalSection(&g_entries_cs); - - free(local.entries); return 0; } // Scanning directory function -void scan_folder_windows_parallel(const char *base, DirQueue *q, - EntryBuffer *buf) { +void scan_folder_windows_parallel(const char *base, DirQueue *q) { char search[MAX_PATHLEN]; snprintf(search, sizeof(search), "%s\\*", base); @@ -242,18 +342,19 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q, atomic_fetch_add(&g_files_found, 1); - FileEntry fe; - memset(&fe, 0, sizeof(fe)); + FileEntry *fe = malloc(sizeof(FileEntry)); + memset(fe, 0, sizeof(FileEntry)); char norm[MAX_PATHLEN]; strncpy(norm, full, sizeof(norm) - 1); norm[sizeof(norm) - 1] = 0; normalize_path(norm); - fe.path = norm; - platform_get_file_times(full, &fe.created_time, &fe.modified_time); + fe->path = _strdup(norm); - platform_get_file_owner(full, fe.owner, sizeof(fe.owner)); + platform_get_file_times(full, &fe->created_time, &fe->modified_time); + + platform_get_file_owner(full, fe->owner, sizeof(fe->owner)); LARGE_INTEGER size; HANDLE hf = @@ -262,83 +363,18 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q, if (hf != INVALID_HANDLE_VALUE) { if (GetFileSizeEx(hf, &size)) - fe.size_bytes = (uint64_t)size.QuadPart; + fe->size_bytes = (uint64_t)size.QuadPart; CloseHandle(hf); } - entrybuf_push(buf, &fe); + mpmc_push(&g_file_queue, fe); } + } while (FindNextFileA(h, &fd)); FindClose(h); } -// Scan progress thread -static DWORD WINAPI scan_progress_thread(LPVOID arg) { - (void)arg; - - for (;;) { - if (atomic_load(&g_scan_done)) - break; - - Sleep(100); // 0.2 seconds - - size_t count = atomic_load(&g_files_found); - - printf("\rScanning... %zu files found", count); - fflush(stdout); - } - - return 0; -} - -// ----------------------------- Job queue ---------------------------------- -static void jobqueue_init(JobQueue *q) { - q->head = q->tail = NULL; - atomic_store(&q->count, 0); - q->stop = 0; - InitializeCriticalSection(&q->cs); - InitializeConditionVariable(&q->cv); -} - -static void jobqueue_push(JobQueue *q, Job *job) { - EnterCriticalSection(&q->cs); - job->next = NULL; - if (q->tail) - q->tail->next = job; - else - q->head = job; - q->tail = job; - atomic_fetch_add(&q->count, 1); - WakeConditionVariable(&q->cv); - LeaveCriticalSection(&q->cs); -} - -static Job *jobqueue_pop(JobQueue *q) { - EnterCriticalSection(&q->cs); - while (!q->head && !q->stop) - SleepConditionVariableCS(&q->cv, &q->cs, INFINITE); - if (q->stop && !q->head) { - LeaveCriticalSection(&q->cs); - return NULL; - } - Job *j = q->head; - q->head = j->next; - if (!q->head) - q->tail = NULL; - LeaveCriticalSection(&q->cs); - if (j) - atomic_fetch_sub(&q->count, 1); - return j; -} - -static void jobqueue_stop(JobQueue *q) { - EnterCriticalSection(&q->cs); - q->stop = 1; - WakeAllConditionVariable(&q->cv); - LeaveCriticalSection(&q->cs); -} - // ----------------------------- Hashing helpers ----------------------------- static void xxh3_hash_file_stream(const char *path, char *out_hex) { // compute XXH3_128 over file. POSIX and Windows use standard reads in this @@ -371,97 +407,137 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) { (unsigned long long)h.low64); } -// ----------------------------- Worker -------------------------------------- -static DWORD WINAPI worker_thread_windows(LPVOID argp) { - WorkerArg *w = (WorkerArg *)argp; - JobQueue *q = w->queue; - for (;;) { - Job *job = jobqueue_pop(q); - if (!job) - break; - char hex[HASH_STRLEN]; - // On Windows we use overlapped ReadFile for large files would be better, - // but ReadFile with NULL overlapped is sufficient inside parallel threads. - xxh3_hash_file_stream(job->file->path, hex); +// ------------------------- Hash worker -------------------------------- +static DWORD WINAPI hash_worker(LPVOID arg) { + MPMCQueue *q = (MPMCQueue *)arg; + + static CRITICAL_SECTION append_cs; + static LONG init = 0; + + if (InterlockedCompareExchange(&init, 1, 0) == 0) { + InitializeCriticalSection(&append_cs); + } + + for (;;) { + FileEntry *fe = mpmc_pop(q); + if (!fe) + break; + + char hash[HASH_STRLEN]; + xxh3_hash_file_stream(fe->path, hash); + + char created[32], modified[32]; + format_time(fe->created_time, created, sizeof(created)); + format_time(fe->modified_time, modified, sizeof(modified)); + + double size_kib = (double)fe->size_bytes / 1024.0; - // append to hashes file using a critical section to avoid races - static CRITICAL_SECTION append_cs; - static LONG init = 0; - if (InterlockedCompareExchange(&init, 1, 1) == 0) { - // first time initialize - InitializeCriticalSection(&append_cs); - InterlockedExchange(&init, 1); - } EnterCriticalSection(&append_cs); + FILE *hf = fopen(FILE_HASHES_TXT, "a"); if (hf) { - char created[32], modified[32]; - - format_time(job->file->created_time, created, sizeof(created)); - format_time(job->file->modified_time, modified, sizeof(modified)); - double size_kib = (double)job->file->size_bytes / (1024.0); - - fprintf(hf, "%s\t%s\t%.2f\t%s\t%s\t%s\n", hex, job->file->path, size_kib, - created, modified, job->file->owner); + fprintf(hf, "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib, + created, modified, fe->owner); fclose(hf); } + LeaveCriticalSection(&append_cs); - atomic_fetch_add(w->done_counter, 1); - free(job); + atomic_fetch_add(&g_files_hashed, 1); + + free(fe->path); + free(fe); } - atomic_fetch_sub(w->live_workers, 1); + return 0; } // ----------------------------- Progress display --------------------------- -static void print_progress(size_t done, size_t total) { - const int barw = 40; - double pct = total ? (double)done / (double)total : 0.0; - int filled = (int)(pct * barw + 0.5); - printf("\r["); - for (int i = 0; i < filled; ++i) - putchar('#'); - for (int i = filled; i < barw; ++i) - putchar(' '); - printf("] %6.2f%% (%zu / %zu) ", pct * 100.0, done, total); - fflush(stdout); -} +DWORD WINAPI progress_thread(void *arg) { -// ----------------------------- Helpers: load/save -------------------------- -static int file_exists(const char *path) { - DWORD attr = GetFileAttributesA(path); - return attr != INVALID_FILE_ATTRIBUTES; -} + LARGE_INTEGER freq, start; + QueryPerformanceFrequency(&freq); + QueryPerformanceCounter(&start); -static void save_file_list(const char *list_path) { - FILE *f = fopen(list_path, "w"); - if (!f) { - perror("fopen file_list"); - return; + uint64_t last_bytes = atomic_load(&g_bytes_processed); + double last_time = 0.0; + + double displayed_speed = 0.0; + const double sample_interval = 0.5; + + for (;;) { + + uint64_t found = atomic_load(&g_files_found); + uint64_t hashed = atomic_load(&g_files_hashed); + uint64_t bytes = atomic_load(&g_bytes_processed); + int scan_done = atomic_load(&g_scan_done); + + LARGE_INTEGER now; + QueryPerformanceCounter(&now); + + double t = (double)(now.QuadPart - start.QuadPart) / (double)freq.QuadPart; + + if (last_time == 0.0) { + last_time = t; + last_bytes = bytes; + } + + double dt = t - last_time; + + if (dt >= sample_interval) { + uint64_t db = bytes - last_bytes; + + if (db > 0 && dt > 0.0001) { + displayed_speed = (double)db / (1024.0 * 1024.0) / dt; + } + + last_bytes = bytes; + last_time = t; + } + + if (!scan_done) { + + printf("\rScanning: %llu files | Hashed: %llu | %.2f MB/s ", + (unsigned long long)found, (unsigned long long)hashed, + displayed_speed); + + } else { + + double pct = found ? (double)hashed / (double)found : 0.0; + + int barw = 40; + int filled = (int)(pct * barw); + + char bar[64]; + int p = 0; + + bar[p++] = '['; + + for (int i = 0; i < filled; i++) + bar[p++] = '#'; + + for (int i = filled; i < barw; i++) + bar[p++] = '.'; + + bar[p++] = ']'; + bar[p] = 0; + + printf("\r%s %6.2f%% (%llu / %llu) %.2f MB/s ", bar, pct * 100.0, + (unsigned long long)hashed, (unsigned long long)found, + displayed_speed); + } + + fflush(stdout); + + if (scan_done && hashed == found) + break; + + Sleep(100); } - for (size_t i = 0; i < g_entry_count; ++i) { - fprintf(f, "%s\n", g_entries[i].path); - } - fclose(f); -} -// ----------------------------- Get file metadata ------------------------- -void platform_get_file_times(const char *path, uint64_t *out_created, - uint64_t *out_modified) { - WIN32_FILE_ATTRIBUTE_DATA fad; - if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) { - *out_created = filetime_to_epoch(&fad.ftCreationTime); - *out_modified = filetime_to_epoch(&fad.ftLastWriteTime); - } else { - *out_created = 0; - *out_modified = 0; - } -} + printf("\n"); -void platform_get_file_owner(const char *path, char *out_owner, - size_t out_owner_size) { - get_file_owner(path, out_owner, out_owner_size); + return 0; } // ----------------------------- Main --------------------------------------- @@ -476,7 +552,6 @@ int main(int argc, char **argv) { HiResTimer total_timer; HiResTimer scan_timer; - HiResTimer hash_timer; timer_start(&total_timer); timer_start(&scan_timer); @@ -551,7 +626,8 @@ int main(int argc, char **argv) { // ------------------------------- // Step 1: Scan all folders // ------------------------------- - InitializeCriticalSection(&g_entries_cs); + + mpmc_init(&g_file_queue, 1024 * 1024 * 1024); DirQueue q; memset(&q, 0, sizeof(q)); @@ -559,8 +635,16 @@ int main(int argc, char **argv) { InitializeConditionVariable(&q.cv); q.active = 0; - HANDLE scan_progress = - CreateThread(NULL, 0, scan_progress_thread, NULL, 0, NULL); + // starting hash threads + HANDLE *hash_threads = malloc(sizeof(HANDLE) * num_threads); + + for (size_t i = 0; i < num_threads; ++i) { + hash_threads[i] = + CreateThread(NULL, 0, hash_worker, &g_file_queue, 0, NULL); + } + + // starting scan threads + HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL); for (int i = 0; i < folder_count; ++i) { dirqueue_push(&q, folders[i]); @@ -578,162 +662,51 @@ int main(int argc, char **argv) { WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); + // mpmc_finish(&g_file_queue); + // debug + for (size_t i = 0; i < num_threads; i++) { + mpmc_push(&g_file_queue, NULL); + } + atomic_store(&g_scan_done, 1); - WaitForSingleObject(scan_progress, INFINITE); - CloseHandle(scan_progress); for (size_t i = 0; i < scan_threads; ++i) CloseHandle(scan_tids[i]); free(scan_tids); double scan_seconds = timer_stop(&scan_timer); - double scan_rate = (double)g_entry_count / scan_seconds; + size_t total_found = atomic_load(&g_files_found); - printf(". Scan rate : %.1f files/sec\n", scan_rate); - printf("Completed scanning in %.2f seconds. Saving to %s\n\n", scan_seconds, - FILE_LIST_TXT); - save_file_list(FILE_LIST_TXT); + printf("\r%*s\r", 120, ""); // clear_console_line + printf("Completed scanning in %.2f seconds, found %zu files\n\n", + scan_seconds, total_found); - if (g_entry_count == 0) { - printf("No files to process.\n"); + // if no files found + if (total_found == 0) { + printf("No files found.\n"); return 0; } - DeleteCriticalSection(&g_entries_cs); + // stop hashing threads + WaitForMultipleObjects((DWORD)num_threads, hash_threads, TRUE, INFINITE); - // Prepare job queue - JobQueue queue; - jobqueue_init(&queue); - - size_t total_jobs = 0; - for (size_t i = 0; i < g_entry_count; ++i) { - Job *j = (Job *)malloc(sizeof(Job)); - j->file = &g_entries[i]; - j->next = NULL; - jobqueue_push(&queue, j); - ++total_jobs; - } - - if (total_jobs == 0) { - printf("Nothing to do — all files already hashed.\n"); - return 0; - } - - FILE *hf = fopen(FILE_HASHES_TXT, "w"); - if (hf) - fclose(hf); - - // Starting thread pool - atomic_size_t done_counter; - atomic_store(&done_counter, 0); - atomic_int live_workers; - atomic_store(&live_workers, (int)num_threads); - - WorkerArg warg = {.queue = &queue, - .done_counter = &done_counter, - .total_jobs = total_jobs, - .live_workers = &live_workers}; - - printf("Starting thread pool: %zu threads (CPU cores: %zu)\n", num_threads, - hw_threads); - - // Launch threads - HANDLE *tids = malloc(sizeof(HANDLE) * num_threads); - for (size_t i = 0; i < num_threads; ++i) { - tids[i] = CreateThread(NULL, 0, worker_thread_windows, &warg, 0, NULL); - } - - // Progress / timer - struct timespec tstart, tnow; - // fallback for windows - LARGE_INTEGER freq, start_li; - QueryPerformanceFrequency(&freq); - QueryPerformanceCounter(&start_li); - - size_t last_done = 0; - - // --------------- Hashing speed MB/s ---------------- - uint64_t last_bytes = atomic_load(&g_bytes_processed); - double last_time = 0.0; - double displayed_speed = 0.0; - const double sample_interval = 0.5; - char linebuf[256]; - - for (;;) { - size_t done = (size_t)atomic_load(&done_counter); - - // ---- monotonic time ---- - LARGE_INTEGER now_li; - QueryPerformanceCounter(&now_li); - double now = - (double)(now_li.QuadPart - start_li.QuadPart) / (double)freq.QuadPart; - - // ---- total processed bytes ---- - uint64_t bytes = atomic_load(&g_bytes_processed); - - // ---- real sampler (independent of UI sleep) ---- - if (last_time == 0.0) { - last_time = now; - last_bytes = bytes; - } - - double dt = now - last_time; - if (dt >= sample_interval) { - uint64_t db = bytes - last_bytes; - - if (db > 0 && dt > 0.0001) { - displayed_speed = (double)db / (1024.0 * 1024.0) / dt; - } - - last_bytes = bytes; - last_time = now; - } - - // ---- progress bar build ---- - const int barw = 40; - double pct = total_jobs ? (double)done / (double)total_jobs : 0.0; - int filled = (int)(pct * barw + 0.5); - - int p = 0; - p += snprintf(linebuf + p, sizeof(linebuf) - p, "["); - for (int i = 0; i < filled && p < (int)sizeof(linebuf); ++i) - p += snprintf(linebuf + p, sizeof(linebuf) - p, "#"); - for (int i = filled; i < barw && p < (int)sizeof(linebuf); ++i) - p += snprintf(linebuf + p, sizeof(linebuf) - p, "."); - - snprintf(linebuf + p, sizeof(linebuf) - p, - "] %6.2f%% (%zu / %zu) %8.2f MB/s", pct * 100.0, done, total_jobs, - displayed_speed); - - printf("\r%s", linebuf); - fflush(stdout); - - if (done >= total_jobs) - break; - - Sleep(100); - } - - printf("\n\n"); - - // stop queue and join threads - jobqueue_stop(&queue); - WaitForMultipleObjects((DWORD)num_threads, tids, TRUE, INFINITE); for (size_t i = 0; i < num_threads; ++i) - CloseHandle(tids[i]); + CloseHandle(hash_threads[i]); + + free(hash_threads); + // free(g_file_queue.items); + + WaitForSingleObject(progress, INFINITE); + CloseHandle(progress); // done time - LARGE_INTEGER end_li; - QueryPerformanceCounter(&end_li); - double elapsed = - (double)(end_li.QuadPart - start_li.QuadPart) / (double)freq.QuadPart; double total_seconds = timer_stop(&total_timer); - printf("Completed hashing %zu files in %.2f seconds\n", total_jobs, elapsed); + printf("Completed hashing %zu files\n", total_found); uint64_t total_bytes = (uint64_t)atomic_load(&g_bytes_processed); double total_mb = (double)total_bytes / (1024.0 * 1024.0); - double avg_mbps = total_mb / elapsed; + double avg_mbps = total_mb / total_seconds; printf("Total: %.2f MB, Average: %.2f MB/s\n", total_mb, avg_mbps); printf(" Total time : %.2f seconds\n", total_seconds);