Implementing simple MPMC queue

Rewrinting the pipeline and progress display
This commit is contained in:
2026-03-06 16:44:37 +01:00
parent ca1bbefeaf
commit 9b327c82a6
3 changed files with 221 additions and 363 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
file_hasher.pdb
file_hasher.ilk
file_hasher.rdi
file_hasher.exe
file_hashes.txt
file_list.txt

View File

@@ -26,14 +26,14 @@
#include <unistd.h>
#endif
#define XXH_VECTOR XXH_AVX2 // not recommanded to compile with gcc see xxhash.h line 4082
// Must compile with /arch:AVX2 in clang-cl or -mavx2 in clang/gcc
#define XXH_VECTOR \
XXH_AVX2 // not recommanded to compile with gcc see xxhash.h line 4082
// Must compile with /arch:AVX2 in clang-cl or -mavx2 in clang/gcc
#define XXH_INLINE_ALL
#include "xxhash.c"
#include "xxhash.h"
// ----------------------------- Config -------------------------------------
#define FILE_LIST_TXT "file_list.txt"
#define FILE_HASHES_TXT "file_hashes.txt"
#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null
#define MAX_PATHLEN 4096
@@ -95,17 +95,32 @@ static double timer_stop(HiResTimer *t) {
(double)g_qpc_freq.QuadPart;
}
/* Scan folders */
typedef struct EntryBuffer {
FileEntry *entries;
size_t count;
size_t capacity;
} EntryBuffer;
// ============================================================
// Simple Mutex-Based MPMC Queue (FileEntry*)
// ============================================================
typedef struct {
FileEntry **items;
size_t capacity;
size_t head;
size_t tail;
size_t count;
int producers_active;
CRITICAL_SECTION cs;
CONDITION_VARIABLE not_empty;
CONDITION_VARIABLE not_full;
} MPMCQueue;
static MPMCQueue g_file_queue;
/* Scan folders */
typedef struct DirQueue DirQueue;
void scan_folder_windows_parallel(const char *base, DirQueue *q,
EntryBuffer *buf);
void scan_folder_windows_parallel(const char *base, DirQueue *q);
void scan_folder_posix_parallel(const char *base, DirQueue *q);
typedef struct DirJob {

View File

@@ -1,14 +1,11 @@
#include "platform.h"
// ----------------------------- Globals ------------------------------------
FileEntry *g_entries = NULL;
size_t g_entry_count = 0;
size_t g_entry_capacity = 0;
static atomic_int g_scan_done = 0;
static atomic_size_t g_files_found = 0;
static atomic_uint_fast64_t g_files_found = 0;
static atomic_uint_fast64_t g_files_hashed = 0;
static atomic_uint_fast64_t g_bytes_processed = 0;
static atomic_int g_scan_done = 0;
// __________________________________________________________________________
static CRITICAL_SECTION g_entries_cs;
// ----------------------------- Utils --------------------------------------
static void perror_exit(const char *msg) {
@@ -23,36 +20,6 @@ static void *xmalloc(size_t n) {
return p;
}
static void global_entries_push(const FileEntry *src) {
if (g_entry_count == g_entry_capacity) {
size_t newcap = g_entry_capacity ? g_entry_capacity * 2 : 1024;
g_entries = realloc(g_entries, newcap * sizeof(FileEntry));
if (!g_entries)
perror_exit("realloc");
g_entry_capacity = newcap;
}
FileEntry *dst = &g_entries[g_entry_count++];
memset(dst, 0, sizeof(*dst));
dst->size_bytes = src->size_bytes;
dst->created_time = src->created_time;
dst->modified_time = src->modified_time;
dst->path = strdup(src->path);
strncpy(dst->owner, src->owner, sizeof(dst->owner) - 1);
}
static void free_entries(void) {
for (size_t i = 0; i < g_entry_count; ++i) {
free(g_entries[i].path);
}
free(g_entries);
g_entries = NULL;
g_entry_count = 0;
g_entry_capacity = 0;
}
// ----------------------------- Convert filetime to epoch --------------
static uint64_t filetime_to_epoch(const FILETIME *ft) {
ULARGE_INTEGER ull;
@@ -110,29 +77,62 @@ static void format_time(uint64_t t, char *out, size_t out_sz) {
}
// --------------- parallel directory scanning ----------------
static void entrybuf_init(EntryBuffer *b) {
b->entries = NULL;
b->count = 0;
b->capacity = 0;
static void mpmc_init(MPMCQueue *q, size_t capacity) {
q->capacity = capacity;
q->items = xmalloc(sizeof(FileEntry *) * capacity);
q->head = 0;
q->tail = 0;
q->count = 0;
q->producers_active = 1;
InitializeCriticalSection(&q->cs);
InitializeConditionVariable(&q->not_empty);
InitializeConditionVariable(&q->not_full);
}
static void entrybuf_push(EntryBuffer *b, const FileEntry *src) {
if (b->count == b->capacity) {
size_t newcap = b->capacity ? b->capacity * 2 : 256;
b->entries = realloc(b->entries, newcap * sizeof(FileEntry));
if (!b->entries)
perror_exit("realloc");
b->capacity = newcap;
static void mpmc_push(MPMCQueue *q, FileEntry *item) {
EnterCriticalSection(&q->cs);
while (q->count == q->capacity) {
SleepConditionVariableCS(&q->not_full, &q->cs, INFINITE);
}
FileEntry *dst = &b->entries[b->count++];
memset(dst, 0, sizeof(*dst));
q->items[q->tail] = item;
q->tail = (q->tail + 1) % q->capacity;
q->count++;
dst->size_bytes = src->size_bytes;
dst->created_time = src->created_time;
dst->modified_time = src->modified_time;
dst->path = strdup(src->path);
strncpy(dst->owner, src->owner, sizeof(dst->owner) - 1);
WakeConditionVariable(&q->not_empty);
LeaveCriticalSection(&q->cs);
}
static FileEntry *mpmc_pop(MPMCQueue *q) {
EnterCriticalSection(&q->cs);
while (q->count == 0 && q->producers_active) {
SleepConditionVariableCS(&q->not_empty, &q->cs, INFINITE);
}
if (q->count == 0 && !q->producers_active) {
LeaveCriticalSection(&q->cs);
return NULL;
}
FileEntry *item = q->items[q->head];
q->head = (q->head + 1) % q->capacity;
q->count--;
WakeConditionVariable(&q->not_full);
LeaveCriticalSection(&q->cs);
return item;
}
static void mpmc_finish(MPMCQueue *q) {
EnterCriticalSection(&q->cs);
q->producers_active = 0;
WakeAllConditionVariable(&q->not_empty);
LeaveCriticalSection(&q->cs);
}
// Add queue helper functions
@@ -178,46 +178,22 @@ static void dirqueue_done(DirQueue *q) {
static DWORD WINAPI scan_worker(LPVOID arg) {
DirQueue *q = (DirQueue *)arg;
EntryBuffer local;
entrybuf_init(&local);
for (;;) {
char *dir = dirqueue_pop(q);
if (!dir)
break;
scan_folder_windows_parallel(dir, q, &local);
// debug
// printf("[T%lu] scanning %s\n", GetCurrentThreadId(), dir);
// debug
scan_folder_windows_parallel(dir, q);
free(dir);
dirqueue_done(q);
}
// merge once at end
EnterCriticalSection(&g_entries_cs);
if (g_entry_count + local.count > g_entry_capacity) {
g_entry_capacity = g_entry_count + local.count;
g_entries = realloc(g_entries, g_entry_capacity * sizeof(FileEntry));
if (!g_entries)
perror_exit("realloc");
}
memcpy(&g_entries[g_entry_count], local.entries,
local.count * sizeof(FileEntry));
g_entry_count += local.count;
LeaveCriticalSection(&g_entries_cs);
free(local.entries);
return 0;
}
// Scanning directory function
void scan_folder_windows_parallel(const char *base, DirQueue *q,
EntryBuffer *buf) {
void scan_folder_windows_parallel(const char *base, DirQueue *q) {
char search[MAX_PATHLEN];
snprintf(search, sizeof(search), "%s\\*", base);
@@ -242,18 +218,19 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q,
atomic_fetch_add(&g_files_found, 1);
FileEntry fe;
memset(&fe, 0, sizeof(fe));
FileEntry *fe = malloc(sizeof(FileEntry));
memset(fe, 0, sizeof(FileEntry));
char norm[MAX_PATHLEN];
strncpy(norm, full, sizeof(norm) - 1);
norm[sizeof(norm) - 1] = 0;
normalize_path(norm);
fe.path = norm;
platform_get_file_times(full, &fe.created_time, &fe.modified_time);
fe->path = _strdup(norm);
platform_get_file_owner(full, fe.owner, sizeof(fe.owner));
platform_get_file_times(full, &fe->created_time, &fe->modified_time);
platform_get_file_owner(full, fe->owner, sizeof(fe->owner));
LARGE_INTEGER size;
HANDLE hf =
@@ -262,83 +239,18 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q,
if (hf != INVALID_HANDLE_VALUE) {
if (GetFileSizeEx(hf, &size))
fe.size_bytes = (uint64_t)size.QuadPart;
fe->size_bytes = (uint64_t)size.QuadPart;
CloseHandle(hf);
}
entrybuf_push(buf, &fe);
mpmc_push(&g_file_queue, fe);
}
} while (FindNextFileA(h, &fd));
FindClose(h);
}
// Scan progress thread
static DWORD WINAPI scan_progress_thread(LPVOID arg) {
(void)arg;
for (;;) {
if (atomic_load(&g_scan_done))
break;
Sleep(100); // 0.2 seconds
size_t count = atomic_load(&g_files_found);
printf("\rScanning... %zu files found", count);
fflush(stdout);
}
return 0;
}
// ----------------------------- Job queue ----------------------------------
static void jobqueue_init(JobQueue *q) {
q->head = q->tail = NULL;
atomic_store(&q->count, 0);
q->stop = 0;
InitializeCriticalSection(&q->cs);
InitializeConditionVariable(&q->cv);
}
static void jobqueue_push(JobQueue *q, Job *job) {
EnterCriticalSection(&q->cs);
job->next = NULL;
if (q->tail)
q->tail->next = job;
else
q->head = job;
q->tail = job;
atomic_fetch_add(&q->count, 1);
WakeConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs);
}
static Job *jobqueue_pop(JobQueue *q) {
EnterCriticalSection(&q->cs);
while (!q->head && !q->stop)
SleepConditionVariableCS(&q->cv, &q->cs, INFINITE);
if (q->stop && !q->head) {
LeaveCriticalSection(&q->cs);
return NULL;
}
Job *j = q->head;
q->head = j->next;
if (!q->head)
q->tail = NULL;
LeaveCriticalSection(&q->cs);
if (j)
atomic_fetch_sub(&q->count, 1);
return j;
}
static void jobqueue_stop(JobQueue *q) {
EnterCriticalSection(&q->cs);
q->stop = 1;
WakeAllConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs);
}
// ----------------------------- Hashing helpers -----------------------------
static void xxh3_hash_file_stream(const char *path, char *out_hex) {
// compute XXH3_128 over file. POSIX and Windows use standard reads in this
@@ -371,79 +283,114 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) {
(unsigned long long)h.low64);
}
// ----------------------------- Worker --------------------------------------
static DWORD WINAPI worker_thread_windows(LPVOID argp) {
WorkerArg *w = (WorkerArg *)argp;
JobQueue *q = w->queue;
// ----------------------------- Hash worker
// --------------------------------------
static DWORD WINAPI hash_worker(LPVOID arg) {
MPMCQueue *q = (MPMCQueue *)arg;
for (;;) {
Job *job = jobqueue_pop(q);
if (!job)
FileEntry *fe = mpmc_pop(q);
if (!fe)
break;
char hex[HASH_STRLEN];
// On Windows we use overlapped ReadFile for large files would be better,
// but ReadFile with NULL overlapped is sufficient inside parallel threads.
xxh3_hash_file_stream(job->file->path, hex);
// append to hashes file using a critical section to avoid races
static CRITICAL_SECTION append_cs;
static LONG init = 0;
if (InterlockedCompareExchange(&init, 1, 1) == 0) {
// first time initialize
InitializeCriticalSection(&append_cs);
InterlockedExchange(&init, 1);
}
EnterCriticalSection(&append_cs);
FILE *hf = fopen(FILE_HASHES_TXT, "a");
if (hf) {
char created[32], modified[32];
char hash[HASH_STRLEN];
xxh3_hash_file_stream(fe->path, hash);
format_time(job->file->created_time, created, sizeof(created));
format_time(job->file->modified_time, modified, sizeof(modified));
double size_kib = (double)job->file->size_bytes / (1024.0);
atomic_fetch_add(&g_files_hashed, 1);
fprintf(hf, "%s\t%s\t%.2f\t%s\t%s\t%s\n", hex, job->file->path, size_kib,
created, modified, job->file->owner);
fclose(hf);
}
LeaveCriticalSection(&append_cs);
atomic_fetch_add(w->done_counter, 1);
free(job);
free(fe->path);
free(fe);
}
atomic_fetch_sub(w->live_workers, 1);
return 0;
}
// ----------------------------- Progress display ---------------------------
static void print_progress(size_t done, size_t total) {
const int barw = 40;
double pct = total ? (double)done / (double)total : 0.0;
int filled = (int)(pct * barw + 0.5);
printf("\r[");
for (int i = 0; i < filled; ++i)
putchar('#');
for (int i = filled; i < barw; ++i)
putchar(' ');
printf("] %6.2f%% (%zu / %zu) ", pct * 100.0, done, total);
fflush(stdout);
}
DWORD WINAPI progress_thread(void *arg) {
// ----------------------------- Helpers: load/save --------------------------
static int file_exists(const char *path) {
DWORD attr = GetFileAttributesA(path);
return attr != INVALID_FILE_ATTRIBUTES;
}
LARGE_INTEGER freq, start;
QueryPerformanceFrequency(&freq);
QueryPerformanceCounter(&start);
static void save_file_list(const char *list_path) {
FILE *f = fopen(list_path, "w");
if (!f) {
perror("fopen file_list");
return;
uint64_t last_bytes = atomic_load(&g_bytes_processed);
double last_time = 0.0;
double displayed_speed = 0.0;
const double sample_interval = 0.5;
for (;;) {
uint64_t found = atomic_load(&g_files_found);
uint64_t hashed = atomic_load(&g_files_hashed);
uint64_t bytes = atomic_load(&g_bytes_processed);
int scan_done = atomic_load(&g_scan_done);
LARGE_INTEGER now;
QueryPerformanceCounter(&now);
double t = (double)(now.QuadPart - start.QuadPart) / (double)freq.QuadPart;
if (last_time == 0.0) {
last_time = t;
last_bytes = bytes;
}
double dt = t - last_time;
if (dt >= sample_interval) {
uint64_t db = bytes - last_bytes;
if (db > 0 && dt > 0.0001) {
displayed_speed = (double)db / (1024.0 * 1024.0) / dt;
}
last_bytes = bytes;
last_time = t;
}
if (!scan_done) {
printf("\rScanning: %llu files | Hashed: %llu | %.2f MB/s",
(unsigned long long)found, (unsigned long long)hashed,
displayed_speed);
} else {
double pct = found ? (double)hashed / (double)found : 0.0;
int barw = 40;
int filled = (int)(pct * barw);
char bar[64];
int p = 0;
bar[p++] = '[';
for (int i = 0; i < filled; i++)
bar[p++] = '#';
for (int i = filled; i < barw; i++)
bar[p++] = '.';
bar[p++] = ']';
bar[p] = 0;
printf("\r%s %6.2f%% (%llu / %llu) %.2f MB/s", bar, pct * 100.0,
(unsigned long long)hashed, (unsigned long long)found,
displayed_speed);
}
fflush(stdout);
if (scan_done && hashed == found)
break;
Sleep(100);
}
for (size_t i = 0; i < g_entry_count; ++i) {
fprintf(f, "%s\n", g_entries[i].path);
}
fclose(f);
printf("\n");
return 0;
}
// ----------------------------- Get file metadata -------------------------
@@ -476,7 +423,6 @@ int main(int argc, char **argv) {
HiResTimer total_timer;
HiResTimer scan_timer;
HiResTimer hash_timer;
timer_start(&total_timer);
timer_start(&scan_timer);
@@ -551,7 +497,8 @@ int main(int argc, char **argv) {
// -------------------------------
// Step 1: Scan all folders
// -------------------------------
InitializeCriticalSection(&g_entries_cs);
mpmc_init(&g_file_queue, 65536);
DirQueue q;
memset(&q, 0, sizeof(q));
@@ -559,8 +506,16 @@ int main(int argc, char **argv) {
InitializeConditionVariable(&q.cv);
q.active = 0;
HANDLE scan_progress =
CreateThread(NULL, 0, scan_progress_thread, NULL, 0, NULL);
// starting hash threads
HANDLE *hash_threads = malloc(sizeof(HANDLE) * num_threads);
for (size_t i = 0; i < num_threads; ++i) {
hash_threads[i] =
CreateThread(NULL, 0, hash_worker, &g_file_queue, 0, NULL);
}
// starting scan threads
HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL);
for (int i = 0; i < folder_count; ++i) {
dirqueue_push(&q, folders[i]);
@@ -578,162 +533,47 @@ int main(int argc, char **argv) {
WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE);
mpmc_finish(&g_file_queue);
atomic_store(&g_scan_done, 1);
WaitForSingleObject(scan_progress, INFINITE);
CloseHandle(scan_progress);
for (size_t i = 0; i < scan_threads; ++i)
CloseHandle(scan_tids[i]);
free(scan_tids);
double scan_seconds = timer_stop(&scan_timer);
double scan_rate = (double)g_entry_count / scan_seconds;
size_t total_found = atomic_load(&g_files_found);
printf(". Scan rate : %.1f files/sec\n", scan_rate);
printf("Completed scanning in %.2f seconds. Saving to %s\n\n", scan_seconds,
FILE_LIST_TXT);
save_file_list(FILE_LIST_TXT);
printf("\r%*s\r", 120, ""); // clear_console_line
printf("Completed scanning in %.2f seconds, found %zu files\n\n",
scan_seconds, total_found);
if (g_entry_count == 0) {
printf("No files to process.\n");
// if no files found
if (total_found == 0) {
printf("No files found.\n");
return 0;
}
DeleteCriticalSection(&g_entries_cs);
// stop hashing threads
WaitForMultipleObjects((DWORD)num_threads, hash_threads, TRUE, INFINITE);
// Prepare job queue
JobQueue queue;
jobqueue_init(&queue);
size_t total_jobs = 0;
for (size_t i = 0; i < g_entry_count; ++i) {
Job *j = (Job *)malloc(sizeof(Job));
j->file = &g_entries[i];
j->next = NULL;
jobqueue_push(&queue, j);
++total_jobs;
}
if (total_jobs == 0) {
printf("Nothing to do — all files already hashed.\n");
return 0;
}
FILE *hf = fopen(FILE_HASHES_TXT, "w");
if (hf)
fclose(hf);
// Starting thread pool
atomic_size_t done_counter;
atomic_store(&done_counter, 0);
atomic_int live_workers;
atomic_store(&live_workers, (int)num_threads);
WorkerArg warg = {.queue = &queue,
.done_counter = &done_counter,
.total_jobs = total_jobs,
.live_workers = &live_workers};
printf("Starting thread pool: %zu threads (CPU cores: %zu)\n", num_threads,
hw_threads);
// Launch threads
HANDLE *tids = malloc(sizeof(HANDLE) * num_threads);
for (size_t i = 0; i < num_threads; ++i) {
tids[i] = CreateThread(NULL, 0, worker_thread_windows, &warg, 0, NULL);
}
// Progress / timer
struct timespec tstart, tnow;
// fallback for windows
LARGE_INTEGER freq, start_li;
QueryPerformanceFrequency(&freq);
QueryPerformanceCounter(&start_li);
size_t last_done = 0;
// --------------- Hashing speed MB/s ----------------
uint64_t last_bytes = atomic_load(&g_bytes_processed);
double last_time = 0.0;
double displayed_speed = 0.0;
const double sample_interval = 0.5;
char linebuf[256];
for (;;) {
size_t done = (size_t)atomic_load(&done_counter);
// ---- monotonic time ----
LARGE_INTEGER now_li;
QueryPerformanceCounter(&now_li);
double now =
(double)(now_li.QuadPart - start_li.QuadPart) / (double)freq.QuadPart;
// ---- total processed bytes ----
uint64_t bytes = atomic_load(&g_bytes_processed);
// ---- real sampler (independent of UI sleep) ----
if (last_time == 0.0) {
last_time = now;
last_bytes = bytes;
}
double dt = now - last_time;
if (dt >= sample_interval) {
uint64_t db = bytes - last_bytes;
if (db > 0 && dt > 0.0001) {
displayed_speed = (double)db / (1024.0 * 1024.0) / dt;
}
last_bytes = bytes;
last_time = now;
}
// ---- progress bar build ----
const int barw = 40;
double pct = total_jobs ? (double)done / (double)total_jobs : 0.0;
int filled = (int)(pct * barw + 0.5);
int p = 0;
p += snprintf(linebuf + p, sizeof(linebuf) - p, "[");
for (int i = 0; i < filled && p < (int)sizeof(linebuf); ++i)
p += snprintf(linebuf + p, sizeof(linebuf) - p, "#");
for (int i = filled; i < barw && p < (int)sizeof(linebuf); ++i)
p += snprintf(linebuf + p, sizeof(linebuf) - p, ".");
snprintf(linebuf + p, sizeof(linebuf) - p,
"] %6.2f%% (%zu / %zu) %8.2f MB/s", pct * 100.0, done, total_jobs,
displayed_speed);
printf("\r%s", linebuf);
fflush(stdout);
if (done >= total_jobs)
break;
Sleep(100);
}
printf("\n\n");
// stop queue and join threads
jobqueue_stop(&queue);
WaitForMultipleObjects((DWORD)num_threads, tids, TRUE, INFINITE);
for (size_t i = 0; i < num_threads; ++i)
CloseHandle(tids[i]);
CloseHandle(hash_threads[i]);
free(hash_threads);
free(g_file_queue.items);
WaitForSingleObject(progress, INFINITE);
CloseHandle(progress);
// done time
LARGE_INTEGER end_li;
QueryPerformanceCounter(&end_li);
double elapsed =
(double)(end_li.QuadPart - start_li.QuadPart) / (double)freq.QuadPart;
double total_seconds = timer_stop(&total_timer);
printf("Completed hashing %zu files in %.2f seconds\n", total_jobs, elapsed);
printf("Completed hashing %zu files\n", total_found);
uint64_t total_bytes = (uint64_t)atomic_load(&g_bytes_processed);
double total_mb = (double)total_bytes / (1024.0 * 1024.0);
double avg_mbps = total_mb / elapsed;
double avg_mbps = total_mb / total_seconds;
printf("Total: %.2f MB, Average: %.2f MB/s\n", total_mb, avg_mbps);
printf(" Total time : %.2f seconds\n", total_seconds);