hashers now use thread local arena

Instead of writing directly to file_hashes.txt, hash_workers now are
using a local arena, writing everything once at the end

using #pragma once to ensure that a given header file is included only
once in a single compilation unit
This commit is contained in:
2026-03-08 10:46:05 +01:00
parent ee02b83094
commit dd0797df79
7 changed files with 177 additions and 73 deletions

View File

@@ -1,5 +1,4 @@
#include "platform.h"
#include <stdio.h>
// ----------------------------- Globals ------------------------------------
static atomic_uint_fast64_t g_files_found = 0;
@@ -409,14 +408,10 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) {
// ------------------------- Hash worker --------------------------------
static DWORD WINAPI hash_worker(LPVOID arg) {
MPMCQueue *q = (MPMCQueue *)arg;
static CRITICAL_SECTION append_cs;
static LONG init = 0;
if (InterlockedCompareExchange(&init, 1, 0) == 0) {
InitializeCriticalSection(&append_cs);
}
WorkerContext *ctx = (WorkerContext *)arg;
MPMCQueue *q = ctx->queue;
mem_arena *local_arena = ctx->arena;
for (;;) {
FileEntry *fe = mpmc_pop(q);
@@ -432,16 +427,14 @@ static DWORD WINAPI hash_worker(LPVOID arg) {
double size_kib = (double)fe->size_bytes / 1024.0;
EnterCriticalSection(&append_cs);
char stack_buf[1024];
FILE *hf = fopen(FILE_HASHES_TXT, "a");
if (hf) {
fprintf(hf, "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib,
created, modified, fe->owner);
fclose(hf);
}
int len =
snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n",
hash, fe->path, size_kib, created, modified, fe->owner);
LeaveCriticalSection(&append_cs);
char *dst = arena_push(&local_arena, len, false);
memcpy(dst, stack_buf, len);
atomic_fetch_add(&g_files_hashed, 1);
@@ -627,7 +620,7 @@ int main(int argc, char **argv) {
// Step 1: Scan all folders
// -------------------------------
mpmc_init(&g_file_queue, 1024 * 1024 * 1024);
mpmc_init(&g_file_queue, GiB(1));
DirQueue q;
memset(&q, 0, sizeof(q));
@@ -636,11 +629,30 @@ int main(int argc, char **argv) {
q.active = 0;
// starting hash threads
arena_params params = {
.reserve_size = GiB(1),
.commit_size = MiB(16),
.align = 0,
.push_size = 0,
.allow_free_list = true,
.allow_swapback = false,
.growth_policy = ARENA_GROWTH_NORMAL,
.commit_policy = ARENA_COMMIT_LAZY,
.max_nbre_blocks = 0,
};
WorkerContext workers[num_threads];
for (int i = 0; i < num_threads; i++) {
workers[i].queue = &g_file_queue;
workers[i].arena = arena_create(&params);
}
HANDLE *hash_threads = malloc(sizeof(HANDLE) * num_threads);
for (size_t i = 0; i < num_threads; ++i) {
hash_threads[i] =
CreateThread(NULL, 0, hash_worker, &g_file_queue, 0, NULL);
hash_threads[i] = CreateThread(NULL, 0, hash_worker, &workers[i], 0, NULL);
}
// starting scan threads
@@ -662,7 +674,6 @@ int main(int argc, char **argv) {
WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE);
// mpmc_finish(&g_file_queue);
// debug
for (size_t i = 0; i < num_threads; i++) {
mpmc_push(&g_file_queue, NULL);
@@ -694,11 +705,39 @@ int main(int argc, char **argv) {
CloseHandle(hash_threads[i]);
free(hash_threads);
// free(g_file_queue.items);
WaitForSingleObject(progress, INFINITE);
CloseHandle(progress);
// write file_hashes.txt
// FILE *f = fopen(FILE_HASHES_TXT, "wb");
//
// for (int i = 0; i < num_threads; i++) {
// mem_arena *arena = workers[i].arena;
//
// u8 *arena_base =
// (u8 *)arena + ALIGN_UP_POW2(sizeof(mem_arena), arena->align);
// fwrite(arena_base, 1, arena->pos, f);
// }
//
// fclose(f);
HANDLE h = CreateFileA(FILE_HASHES_TXT, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS,
FILE_ATTRIBUTE_NORMAL, NULL);
for (int i = 0; i < num_threads; i++) {
mem_arena *arena = workers[i].arena;
DWORD written;
u8 *arena_base =
(u8 *)arena + ALIGN_UP_POW2(sizeof(mem_arena), arena->align);
WriteFile(h, arena_base, (DWORD)arena->pos, &written, NULL);
}
// done time
double total_seconds = timer_stop(&total_timer);