Merge branch 'mpmc_queue'

This commit is contained in:
2026-03-07 17:12:09 +01:00
12 changed files with 412 additions and 393 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
file_hasher.pdb
file_hasher.ilk
file_hasher.rdi
file_hasher.exe
file_hashes.txt
file_list.txt

21
binaries/changlog.txt Normal file
View File

@@ -0,0 +1,21 @@
V1.0: Recursive scan
v2.0: Multi threaded scan
Collects more metadata
v2.1: Uses AVX2 instead of SSE2
v3.0: Simple mutex/critical section based MPMC queue
reusable hashing buffer
v3.1: Lock free MPMC queue Vyukov-style
v3.2: Making the lock free MPMC queue growable
Add padding to avoir false sharing
Add sleep() and SwitchToThread() to limit spinning
v3.3: Fix bug slots used before initialization,compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that
Fix bug multiple threads committing at the same time, fixed by using atomic_flag commit_lock and re-checking committed after acquiring the lock
Reorder helper functions
v3.4: Rewriting hash_worker() to export file_hashes.txt

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,5 +1,6 @@
#pragma once
#include <immintrin.h>
#include <stdatomic.h>
#include <stdint.h>
#include <stdio.h>
@@ -26,14 +27,14 @@
#include <unistd.h>
#endif
#define XXH_VECTOR XXH_AVX2 // not recommanded to compile with gcc see xxhash.h line 4082
// Must compile with /arch:AVX2 in clang-cl or -mavx2 in clang/gcc
#define XXH_VECTOR \
XXH_AVX2 // not recommanded to compile with gcc see xxhash.h line 4082
// Must compile with /arch:AVX2 in clang-cl or -mavx2 in clang/gcc
#define XXH_INLINE_ALL
#include "xxhash.c"
#include "xxhash.h"
// ----------------------------- Config -------------------------------------
#define FILE_LIST_TXT "file_list.txt"
#define FILE_HASHES_TXT "file_hashes.txt"
#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null
#define MAX_PATHLEN 4096
@@ -95,17 +96,38 @@ static double timer_stop(HiResTimer *t) {
(double)g_qpc_freq.QuadPart;
}
/* Scan folders */
typedef struct EntryBuffer {
FileEntry *entries;
size_t count;
size_t capacity;
} EntryBuffer;
// ============================================================
// Simple Mutex-Based MPMC Queue (FileEntry*)
// ============================================================
typedef struct {
atomic_size_t seq;
FileEntry *data;
char pad[64 - sizeof(atomic_size_t) - sizeof(FileEntry *)];
} MPMCSlot;
typedef struct {
atomic_size_t head;
char pad1[64];
atomic_size_t tail;
char pad2[64];
size_t capacity;
size_t mask;
atomic_size_t committed;
size_t commit_step;
atomic_flag commit_lock;
MPMCSlot *slots;
} MPMCQueue;
static MPMCQueue g_file_queue;
/* Scan folders */
typedef struct DirQueue DirQueue;
void scan_folder_windows_parallel(const char *base, DirQueue *q,
EntryBuffer *buf);
void scan_folder_windows_parallel(const char *base, DirQueue *q);
void scan_folder_posix_parallel(const char *base, DirQueue *q);
typedef struct DirJob {

View File

@@ -1,14 +1,12 @@
#include "platform.h"
#include <stdio.h>
// ----------------------------- Globals ------------------------------------
FileEntry *g_entries = NULL;
size_t g_entry_count = 0;
size_t g_entry_capacity = 0;
static atomic_int g_scan_done = 0;
static atomic_size_t g_files_found = 0;
static atomic_uint_fast64_t g_files_found = 0;
static atomic_uint_fast64_t g_files_hashed = 0;
static atomic_uint_fast64_t g_bytes_processed = 0;
static atomic_int g_scan_done = 0;
// __________________________________________________________________________
static CRITICAL_SECTION g_entries_cs;
// ----------------------------- Utils --------------------------------------
static void perror_exit(const char *msg) {
@@ -23,36 +21,6 @@ static void *xmalloc(size_t n) {
return p;
}
static void global_entries_push(const FileEntry *src) {
if (g_entry_count == g_entry_capacity) {
size_t newcap = g_entry_capacity ? g_entry_capacity * 2 : 1024;
g_entries = realloc(g_entries, newcap * sizeof(FileEntry));
if (!g_entries)
perror_exit("realloc");
g_entry_capacity = newcap;
}
FileEntry *dst = &g_entries[g_entry_count++];
memset(dst, 0, sizeof(*dst));
dst->size_bytes = src->size_bytes;
dst->created_time = src->created_time;
dst->modified_time = src->modified_time;
dst->path = strdup(src->path);
strncpy(dst->owner, src->owner, sizeof(dst->owner) - 1);
}
static void free_entries(void) {
for (size_t i = 0; i < g_entry_count; ++i) {
free(g_entries[i].path);
}
free(g_entries);
g_entries = NULL;
g_entry_count = 0;
g_entry_capacity = 0;
}
// ----------------------------- Convert filetime to epoch --------------
static uint64_t filetime_to_epoch(const FILETIME *ft) {
ULARGE_INTEGER ull;
@@ -63,6 +31,25 @@ static uint64_t filetime_to_epoch(const FILETIME *ft) {
return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL;
}
// ----------------------------- Format time helper -------------------------
static void format_time(uint64_t t, char *out, size_t out_sz) {
if (t == 0) {
snprintf(out, out_sz, "N/A");
return;
}
time_t tt = (time_t)t;
struct tm tm;
#if PLATFORM_WINDOWS
localtime_s(&tm, &tt);
#else
localtime_r(&tt, &tm);
#endif
strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm);
}
// ----------------------------- Resolve file owner ---------------------
static void get_file_owner(const char *path, char *out, size_t out_sz) {
PSID sid = NULL;
@@ -90,49 +77,186 @@ static void get_file_owner(const char *path, char *out, size_t out_sz) {
LocalFree(sd);
}
// ----------------------------- Format time helper -------------------------
static void format_time(uint64_t t, char *out, size_t out_sz) {
if (t == 0) {
snprintf(out, out_sz, "N/A");
return;
// ----------------------------- Get file metadata -------------------------
void platform_get_file_times(const char *path, uint64_t *out_created,
uint64_t *out_modified) {
WIN32_FILE_ATTRIBUTE_DATA fad;
if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) {
*out_created = filetime_to_epoch(&fad.ftCreationTime);
*out_modified = filetime_to_epoch(&fad.ftLastWriteTime);
} else {
*out_created = 0;
*out_modified = 0;
}
}
time_t tt = (time_t)t;
struct tm tm;
#if PLATFORM_WINDOWS
localtime_s(&tm, &tt);
#else
localtime_r(&tt, &tm);
#endif
strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm);
void platform_get_file_owner(const char *path, char *out_owner,
size_t out_owner_size) {
get_file_owner(path, out_owner, out_owner_size);
}
// --------------- parallel directory scanning ----------------
static void entrybuf_init(EntryBuffer *b) {
b->entries = NULL;
b->count = 0;
b->capacity = 0;
}
static void entrybuf_push(EntryBuffer *b, const FileEntry *src) {
if (b->count == b->capacity) {
size_t newcap = b->capacity ? b->capacity * 2 : 256;
b->entries = realloc(b->entries, newcap * sizeof(FileEntry));
if (!b->entries)
perror_exit("realloc");
b->capacity = newcap;
void mpmc_init(MPMCQueue *q, size_t max_capacity) {
if ((max_capacity & (max_capacity - 1)) != 0) {
fprintf(stderr, "capacity must be power of two\n");
exit(1);
}
FileEntry *dst = &b->entries[b->count++];
memset(dst, 0, sizeof(*dst));
q->capacity = max_capacity;
q->mask = max_capacity - 1;
dst->size_bytes = src->size_bytes;
dst->created_time = src->created_time;
dst->modified_time = src->modified_time;
dst->path = strdup(src->path);
strncpy(dst->owner, src->owner, sizeof(dst->owner) - 1);
size_t bytes = sizeof(MPMCSlot) * max_capacity;
q->slots = VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE);
if (!q->slots) {
fprintf(stderr, "VirtualAlloc reserve failed\n");
exit(1);
}
q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot);
atomic_flag_clear(&q->commit_lock);
q->committed = q->commit_step;
VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT,
PAGE_READWRITE);
for (size_t i = 0; i < q->committed; i++) {
atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL;
}
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
}
static void mpmc_commit_more(MPMCQueue *q) {
if (atomic_flag_test_and_set(&q->commit_lock))
return;
size_t start = atomic_load_explicit(&q->committed, memory_order_acquire);
size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
// another thread already committed enough
if (tail < start) {
atomic_flag_clear(&q->commit_lock);
return;
}
if (start >= q->capacity) {
atomic_flag_clear(&q->commit_lock);
return;
}
size_t new_commit = start + q->commit_step;
if (new_commit > q->capacity)
new_commit = q->capacity;
size_t count = new_commit - start;
VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT,
PAGE_READWRITE);
for (size_t i = start; i < new_commit; i++) {
atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL;
}
atomic_store_explicit(&q->committed, new_commit, memory_order_release);
atomic_flag_clear(&q->commit_lock);
}
void mpmc_push(MPMCQueue *q, FileEntry *item) {
MPMCSlot *slot;
size_t pos;
for (;;) {
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
// ensure the slot is committed BEFORE accessing it
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (pos >= committed) {
mpmc_commit_more(q);
continue;
}
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (diff == 0) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) { // queue actually full
Sleep(1000);
} else { // waiting to grow
Sleep(0);
}
}
slot->data = item;
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
}
FileEntry *mpmc_pop(MPMCQueue *q) {
MPMCSlot *slot;
size_t pos;
int spins = 0;
for (;;) {
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
if (diff == 0) {
if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) { // queue is empty
Sleep(500);
} else { // slot is still transitioning (written by another thread)
if (++spins > 10) {
SwitchToThread(); // yield CPU
spins = 0;
} else {
_mm_pause(); // busy waiting
}
}
}
FileEntry *data = slot->data;
atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
return data;
}
// Add queue helper functions
@@ -178,46 +302,22 @@ static void dirqueue_done(DirQueue *q) {
static DWORD WINAPI scan_worker(LPVOID arg) {
DirQueue *q = (DirQueue *)arg;
EntryBuffer local;
entrybuf_init(&local);
for (;;) {
char *dir = dirqueue_pop(q);
if (!dir)
break;
scan_folder_windows_parallel(dir, q, &local);
// debug
// printf("[T%lu] scanning %s\n", GetCurrentThreadId(), dir);
// debug
scan_folder_windows_parallel(dir, q);
free(dir);
dirqueue_done(q);
}
// merge once at end
EnterCriticalSection(&g_entries_cs);
if (g_entry_count + local.count > g_entry_capacity) {
g_entry_capacity = g_entry_count + local.count;
g_entries = realloc(g_entries, g_entry_capacity * sizeof(FileEntry));
if (!g_entries)
perror_exit("realloc");
}
memcpy(&g_entries[g_entry_count], local.entries,
local.count * sizeof(FileEntry));
g_entry_count += local.count;
LeaveCriticalSection(&g_entries_cs);
free(local.entries);
return 0;
}
// Scanning directory function
void scan_folder_windows_parallel(const char *base, DirQueue *q,
EntryBuffer *buf) {
void scan_folder_windows_parallel(const char *base, DirQueue *q) {
char search[MAX_PATHLEN];
snprintf(search, sizeof(search), "%s\\*", base);
@@ -242,18 +342,19 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q,
atomic_fetch_add(&g_files_found, 1);
FileEntry fe;
memset(&fe, 0, sizeof(fe));
FileEntry *fe = malloc(sizeof(FileEntry));
memset(fe, 0, sizeof(FileEntry));
char norm[MAX_PATHLEN];
strncpy(norm, full, sizeof(norm) - 1);
norm[sizeof(norm) - 1] = 0;
normalize_path(norm);
fe.path = norm;
platform_get_file_times(full, &fe.created_time, &fe.modified_time);
fe->path = _strdup(norm);
platform_get_file_owner(full, fe.owner, sizeof(fe.owner));
platform_get_file_times(full, &fe->created_time, &fe->modified_time);
platform_get_file_owner(full, fe->owner, sizeof(fe->owner));
LARGE_INTEGER size;
HANDLE hf =
@@ -262,83 +363,18 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q,
if (hf != INVALID_HANDLE_VALUE) {
if (GetFileSizeEx(hf, &size))
fe.size_bytes = (uint64_t)size.QuadPart;
fe->size_bytes = (uint64_t)size.QuadPart;
CloseHandle(hf);
}
entrybuf_push(buf, &fe);
mpmc_push(&g_file_queue, fe);
}
} while (FindNextFileA(h, &fd));
FindClose(h);
}
// Scan progress thread
static DWORD WINAPI scan_progress_thread(LPVOID arg) {
(void)arg;
for (;;) {
if (atomic_load(&g_scan_done))
break;
Sleep(100); // 0.2 seconds
size_t count = atomic_load(&g_files_found);
printf("\rScanning... %zu files found", count);
fflush(stdout);
}
return 0;
}
// ----------------------------- Job queue ----------------------------------
static void jobqueue_init(JobQueue *q) {
q->head = q->tail = NULL;
atomic_store(&q->count, 0);
q->stop = 0;
InitializeCriticalSection(&q->cs);
InitializeConditionVariable(&q->cv);
}
static void jobqueue_push(JobQueue *q, Job *job) {
EnterCriticalSection(&q->cs);
job->next = NULL;
if (q->tail)
q->tail->next = job;
else
q->head = job;
q->tail = job;
atomic_fetch_add(&q->count, 1);
WakeConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs);
}
static Job *jobqueue_pop(JobQueue *q) {
EnterCriticalSection(&q->cs);
while (!q->head && !q->stop)
SleepConditionVariableCS(&q->cv, &q->cs, INFINITE);
if (q->stop && !q->head) {
LeaveCriticalSection(&q->cs);
return NULL;
}
Job *j = q->head;
q->head = j->next;
if (!q->head)
q->tail = NULL;
LeaveCriticalSection(&q->cs);
if (j)
atomic_fetch_sub(&q->count, 1);
return j;
}
static void jobqueue_stop(JobQueue *q) {
EnterCriticalSection(&q->cs);
q->stop = 1;
WakeAllConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs);
}
// ----------------------------- Hashing helpers -----------------------------
static void xxh3_hash_file_stream(const char *path, char *out_hex) {
// compute XXH3_128 over file. POSIX and Windows use standard reads in this
@@ -371,97 +407,137 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) {
(unsigned long long)h.low64);
}
// ----------------------------- Worker --------------------------------------
static DWORD WINAPI worker_thread_windows(LPVOID argp) {
WorkerArg *w = (WorkerArg *)argp;
JobQueue *q = w->queue;
for (;;) {
Job *job = jobqueue_pop(q);
if (!job)
break;
char hex[HASH_STRLEN];
// On Windows we use overlapped ReadFile for large files would be better,
// but ReadFile with NULL overlapped is sufficient inside parallel threads.
xxh3_hash_file_stream(job->file->path, hex);
// ------------------------- Hash worker --------------------------------
static DWORD WINAPI hash_worker(LPVOID arg) {
MPMCQueue *q = (MPMCQueue *)arg;
static CRITICAL_SECTION append_cs;
static LONG init = 0;
if (InterlockedCompareExchange(&init, 1, 0) == 0) {
InitializeCriticalSection(&append_cs);
}
for (;;) {
FileEntry *fe = mpmc_pop(q);
if (!fe)
break;
char hash[HASH_STRLEN];
xxh3_hash_file_stream(fe->path, hash);
char created[32], modified[32];
format_time(fe->created_time, created, sizeof(created));
format_time(fe->modified_time, modified, sizeof(modified));
double size_kib = (double)fe->size_bytes / 1024.0;
// append to hashes file using a critical section to avoid races
static CRITICAL_SECTION append_cs;
static LONG init = 0;
if (InterlockedCompareExchange(&init, 1, 1) == 0) {
// first time initialize
InitializeCriticalSection(&append_cs);
InterlockedExchange(&init, 1);
}
EnterCriticalSection(&append_cs);
FILE *hf = fopen(FILE_HASHES_TXT, "a");
if (hf) {
char created[32], modified[32];
format_time(job->file->created_time, created, sizeof(created));
format_time(job->file->modified_time, modified, sizeof(modified));
double size_kib = (double)job->file->size_bytes / (1024.0);
fprintf(hf, "%s\t%s\t%.2f\t%s\t%s\t%s\n", hex, job->file->path, size_kib,
created, modified, job->file->owner);
fprintf(hf, "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib,
created, modified, fe->owner);
fclose(hf);
}
LeaveCriticalSection(&append_cs);
atomic_fetch_add(w->done_counter, 1);
free(job);
atomic_fetch_add(&g_files_hashed, 1);
free(fe->path);
free(fe);
}
atomic_fetch_sub(w->live_workers, 1);
return 0;
}
// ----------------------------- Progress display ---------------------------
static void print_progress(size_t done, size_t total) {
const int barw = 40;
double pct = total ? (double)done / (double)total : 0.0;
int filled = (int)(pct * barw + 0.5);
printf("\r[");
for (int i = 0; i < filled; ++i)
putchar('#');
for (int i = filled; i < barw; ++i)
putchar(' ');
printf("] %6.2f%% (%zu / %zu) ", pct * 100.0, done, total);
fflush(stdout);
}
DWORD WINAPI progress_thread(void *arg) {
// ----------------------------- Helpers: load/save --------------------------
static int file_exists(const char *path) {
DWORD attr = GetFileAttributesA(path);
return attr != INVALID_FILE_ATTRIBUTES;
}
LARGE_INTEGER freq, start;
QueryPerformanceFrequency(&freq);
QueryPerformanceCounter(&start);
static void save_file_list(const char *list_path) {
FILE *f = fopen(list_path, "w");
if (!f) {
perror("fopen file_list");
return;
uint64_t last_bytes = atomic_load(&g_bytes_processed);
double last_time = 0.0;
double displayed_speed = 0.0;
const double sample_interval = 0.5;
for (;;) {
uint64_t found = atomic_load(&g_files_found);
uint64_t hashed = atomic_load(&g_files_hashed);
uint64_t bytes = atomic_load(&g_bytes_processed);
int scan_done = atomic_load(&g_scan_done);
LARGE_INTEGER now;
QueryPerformanceCounter(&now);
double t = (double)(now.QuadPart - start.QuadPart) / (double)freq.QuadPart;
if (last_time == 0.0) {
last_time = t;
last_bytes = bytes;
}
double dt = t - last_time;
if (dt >= sample_interval) {
uint64_t db = bytes - last_bytes;
if (db > 0 && dt > 0.0001) {
displayed_speed = (double)db / (1024.0 * 1024.0) / dt;
}
last_bytes = bytes;
last_time = t;
}
if (!scan_done) {
printf("\rScanning: %llu files | Hashed: %llu | %.2f MB/s ",
(unsigned long long)found, (unsigned long long)hashed,
displayed_speed);
} else {
double pct = found ? (double)hashed / (double)found : 0.0;
int barw = 40;
int filled = (int)(pct * barw);
char bar[64];
int p = 0;
bar[p++] = '[';
for (int i = 0; i < filled; i++)
bar[p++] = '#';
for (int i = filled; i < barw; i++)
bar[p++] = '.';
bar[p++] = ']';
bar[p] = 0;
printf("\r%s %6.2f%% (%llu / %llu) %.2f MB/s ", bar, pct * 100.0,
(unsigned long long)hashed, (unsigned long long)found,
displayed_speed);
}
fflush(stdout);
if (scan_done && hashed == found)
break;
Sleep(100);
}
for (size_t i = 0; i < g_entry_count; ++i) {
fprintf(f, "%s\n", g_entries[i].path);
}
fclose(f);
}
// ----------------------------- Get file metadata -------------------------
void platform_get_file_times(const char *path, uint64_t *out_created,
uint64_t *out_modified) {
WIN32_FILE_ATTRIBUTE_DATA fad;
if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) {
*out_created = filetime_to_epoch(&fad.ftCreationTime);
*out_modified = filetime_to_epoch(&fad.ftLastWriteTime);
} else {
*out_created = 0;
*out_modified = 0;
}
}
printf("\n");
void platform_get_file_owner(const char *path, char *out_owner,
size_t out_owner_size) {
get_file_owner(path, out_owner, out_owner_size);
return 0;
}
// ----------------------------- Main ---------------------------------------
@@ -476,7 +552,6 @@ int main(int argc, char **argv) {
HiResTimer total_timer;
HiResTimer scan_timer;
HiResTimer hash_timer;
timer_start(&total_timer);
timer_start(&scan_timer);
@@ -551,7 +626,8 @@ int main(int argc, char **argv) {
// -------------------------------
// Step 1: Scan all folders
// -------------------------------
InitializeCriticalSection(&g_entries_cs);
mpmc_init(&g_file_queue, 1024 * 1024 * 1024);
DirQueue q;
memset(&q, 0, sizeof(q));
@@ -559,8 +635,16 @@ int main(int argc, char **argv) {
InitializeConditionVariable(&q.cv);
q.active = 0;
HANDLE scan_progress =
CreateThread(NULL, 0, scan_progress_thread, NULL, 0, NULL);
// starting hash threads
HANDLE *hash_threads = malloc(sizeof(HANDLE) * num_threads);
for (size_t i = 0; i < num_threads; ++i) {
hash_threads[i] =
CreateThread(NULL, 0, hash_worker, &g_file_queue, 0, NULL);
}
// starting scan threads
HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL);
for (int i = 0; i < folder_count; ++i) {
dirqueue_push(&q, folders[i]);
@@ -578,162 +662,51 @@ int main(int argc, char **argv) {
WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE);
// mpmc_finish(&g_file_queue);
// debug
for (size_t i = 0; i < num_threads; i++) {
mpmc_push(&g_file_queue, NULL);
}
atomic_store(&g_scan_done, 1);
WaitForSingleObject(scan_progress, INFINITE);
CloseHandle(scan_progress);
for (size_t i = 0; i < scan_threads; ++i)
CloseHandle(scan_tids[i]);
free(scan_tids);
double scan_seconds = timer_stop(&scan_timer);
double scan_rate = (double)g_entry_count / scan_seconds;
size_t total_found = atomic_load(&g_files_found);
printf(". Scan rate : %.1f files/sec\n", scan_rate);
printf("Completed scanning in %.2f seconds. Saving to %s\n\n", scan_seconds,
FILE_LIST_TXT);
save_file_list(FILE_LIST_TXT);
printf("\r%*s\r", 120, ""); // clear_console_line
printf("Completed scanning in %.2f seconds, found %zu files\n\n",
scan_seconds, total_found);
if (g_entry_count == 0) {
printf("No files to process.\n");
// if no files found
if (total_found == 0) {
printf("No files found.\n");
return 0;
}
DeleteCriticalSection(&g_entries_cs);
// stop hashing threads
WaitForMultipleObjects((DWORD)num_threads, hash_threads, TRUE, INFINITE);
// Prepare job queue
JobQueue queue;
jobqueue_init(&queue);
size_t total_jobs = 0;
for (size_t i = 0; i < g_entry_count; ++i) {
Job *j = (Job *)malloc(sizeof(Job));
j->file = &g_entries[i];
j->next = NULL;
jobqueue_push(&queue, j);
++total_jobs;
}
if (total_jobs == 0) {
printf("Nothing to do — all files already hashed.\n");
return 0;
}
FILE *hf = fopen(FILE_HASHES_TXT, "w");
if (hf)
fclose(hf);
// Starting thread pool
atomic_size_t done_counter;
atomic_store(&done_counter, 0);
atomic_int live_workers;
atomic_store(&live_workers, (int)num_threads);
WorkerArg warg = {.queue = &queue,
.done_counter = &done_counter,
.total_jobs = total_jobs,
.live_workers = &live_workers};
printf("Starting thread pool: %zu threads (CPU cores: %zu)\n", num_threads,
hw_threads);
// Launch threads
HANDLE *tids = malloc(sizeof(HANDLE) * num_threads);
for (size_t i = 0; i < num_threads; ++i) {
tids[i] = CreateThread(NULL, 0, worker_thread_windows, &warg, 0, NULL);
}
// Progress / timer
struct timespec tstart, tnow;
// fallback for windows
LARGE_INTEGER freq, start_li;
QueryPerformanceFrequency(&freq);
QueryPerformanceCounter(&start_li);
size_t last_done = 0;
// --------------- Hashing speed MB/s ----------------
uint64_t last_bytes = atomic_load(&g_bytes_processed);
double last_time = 0.0;
double displayed_speed = 0.0;
const double sample_interval = 0.5;
char linebuf[256];
for (;;) {
size_t done = (size_t)atomic_load(&done_counter);
// ---- monotonic time ----
LARGE_INTEGER now_li;
QueryPerformanceCounter(&now_li);
double now =
(double)(now_li.QuadPart - start_li.QuadPart) / (double)freq.QuadPart;
// ---- total processed bytes ----
uint64_t bytes = atomic_load(&g_bytes_processed);
// ---- real sampler (independent of UI sleep) ----
if (last_time == 0.0) {
last_time = now;
last_bytes = bytes;
}
double dt = now - last_time;
if (dt >= sample_interval) {
uint64_t db = bytes - last_bytes;
if (db > 0 && dt > 0.0001) {
displayed_speed = (double)db / (1024.0 * 1024.0) / dt;
}
last_bytes = bytes;
last_time = now;
}
// ---- progress bar build ----
const int barw = 40;
double pct = total_jobs ? (double)done / (double)total_jobs : 0.0;
int filled = (int)(pct * barw + 0.5);
int p = 0;
p += snprintf(linebuf + p, sizeof(linebuf) - p, "[");
for (int i = 0; i < filled && p < (int)sizeof(linebuf); ++i)
p += snprintf(linebuf + p, sizeof(linebuf) - p, "#");
for (int i = filled; i < barw && p < (int)sizeof(linebuf); ++i)
p += snprintf(linebuf + p, sizeof(linebuf) - p, ".");
snprintf(linebuf + p, sizeof(linebuf) - p,
"] %6.2f%% (%zu / %zu) %8.2f MB/s", pct * 100.0, done, total_jobs,
displayed_speed);
printf("\r%s", linebuf);
fflush(stdout);
if (done >= total_jobs)
break;
Sleep(100);
}
printf("\n\n");
// stop queue and join threads
jobqueue_stop(&queue);
WaitForMultipleObjects((DWORD)num_threads, tids, TRUE, INFINITE);
for (size_t i = 0; i < num_threads; ++i)
CloseHandle(tids[i]);
CloseHandle(hash_threads[i]);
free(hash_threads);
// free(g_file_queue.items);
WaitForSingleObject(progress, INFINITE);
CloseHandle(progress);
// done time
LARGE_INTEGER end_li;
QueryPerformanceCounter(&end_li);
double elapsed =
(double)(end_li.QuadPart - start_li.QuadPart) / (double)freq.QuadPart;
double total_seconds = timer_stop(&total_timer);
printf("Completed hashing %zu files in %.2f seconds\n", total_jobs, elapsed);
printf("Completed hashing %zu files\n", total_found);
uint64_t total_bytes = (uint64_t)atomic_load(&g_bytes_processed);
double total_mb = (double)total_bytes / (1024.0 * 1024.0);
double avg_mbps = total_mb / elapsed;
double avg_mbps = total_mb / total_seconds;
printf("Total: %.2f MB, Average: %.2f MB/s\n", total_mb, avg_mbps);
printf(" Total time : %.2f seconds\n", total_seconds);