Making the hashing buffer reusable instead of malloc every file

This commit is contained in:
2026-03-08 10:59:13 +01:00
parent c846952cbf
commit 75c2592bfe
4 changed files with 65 additions and 90 deletions

View File

@@ -293,7 +293,8 @@ typedef enum arena_commit_policy {
typedef struct arena_params { typedef struct arena_params {
u64 reserve_size; // size of one arena block u64 reserve_size; // size of one arena block
u64 commit_size; // initial commit size u64 commit_size; // initial commit size
u64 align; // allocation alignment (0 = default) u64 align; // allocation alignment, 0 to disable and ARENA_ALIGN to align
// according to architecture
// Element size rules: // Element size rules:
// - stack mode : push_size > 0 (mandatory) // - stack mode : push_size > 0 (mandatory)

View File

@@ -7,7 +7,6 @@ v2.0: Multi threaded scan
v2.1: Uses AVX2 instead of SSE2 v2.1: Uses AVX2 instead of SSE2
v3.0: Simple mutex/critical section based MPMC queue v3.0: Simple mutex/critical section based MPMC queue
reusable hashing buffer
v3.1: Lock free MPMC queue Vyukov-style v3.1: Lock free MPMC queue Vyukov-style
@@ -22,5 +21,7 @@ Reorder helper functions
v3.4: Rewriting hash_worker() to export file_hashes.txt v3.4: Rewriting hash_worker() to export file_hashes.txt
v4.0: Instead of writing directly to file_hashes.txt, hash_workers now are using a local arena, writing everything once at the end v4.0: Instead of writing directly to file_hashes.txt, hash_workers now are using a local arena, writing everything once at the end
using #pragma once to ensure that a given header file is included only once in a single compilation unit Using #pragma once to ensure that a given header file is included only once in a single compilation unit
forcing xxhash to use the stack instead of the heap Forcing xxhash to use the stack instead of the heap
Making the hashing buffer reusable instead of malloc every file
Implementing a general purpose arena to replace small allocations

View File

@@ -21,29 +21,6 @@ typedef struct FileEntry {
char owner[128]; // resolved owner name char owner[128]; // resolved owner name
} FileEntry; } FileEntry;
/* File path and metadata */
static void normalize_path(char *p) {
char *src = p;
char *dst = p;
int prev_slash = 0;
while (*src) {
char c = *src++;
if (c == '\\' || c == '/') {
if (!prev_slash) {
*dst++ = '/';
prev_slash = 1;
}
} else {
*dst++ = c;
prev_slash = 0;
}
}
*dst = '\0';
}
void platform_get_file_times(const char *path, uint64_t *out_created, void platform_get_file_times(const char *path, uint64_t *out_created,
uint64_t *out_modified); uint64_t *out_modified);
void platform_get_file_owner(const char *path, char *out_owner, void platform_get_file_owner(const char *path, char *out_owner,
@@ -68,7 +45,7 @@ static double timer_stop(HiResTimer *t) {
} }
// ============================================================ // ============================================================
// Simple Mutex-Based MPMC Queue (FileEntry*) // Simple lock free MPMC Queue
// ============================================================ // ============================================================
typedef struct { typedef struct {
@@ -127,25 +104,3 @@ typedef struct DirQueue {
pthread_cond_t cond; pthread_cond_t cond;
#endif #endif
} DirQueue; } DirQueue;
/* Hashing */
typedef struct Job {
FileEntry *file;
struct Job *next;
} Job;
typedef struct {
Job *head;
Job *tail;
CRITICAL_SECTION cs;
CONDITION_VARIABLE cv;
atomic_size_t count; // queued jobs
int stop;
} JobQueue;
typedef struct {
JobQueue *queue;
atomic_size_t *done_counter;
size_t total_jobs;
atomic_int *live_workers;
} WorkerArg;

View File

@@ -1,3 +1,4 @@
#include "arena.h"
#include "platform.h" #include "platform.h"
// ----------------------------- Globals ------------------------------------ // ----------------------------- Globals ------------------------------------
@@ -5,19 +6,29 @@ static atomic_uint_fast64_t g_files_found = 0;
static atomic_uint_fast64_t g_files_hashed = 0; static atomic_uint_fast64_t g_files_hashed = 0;
static atomic_uint_fast64_t g_bytes_processed = 0; static atomic_uint_fast64_t g_bytes_processed = 0;
static atomic_int g_scan_done = 0; static atomic_int g_scan_done = 0;
// __________________________________________________________________________
// ----------------------------- Utils -------------------------------------- // ============================= Utils ======================================
static void perror_exit(const char *msg) { // ----------------------------- Normalize path --------------
perror(msg); static void normalize_path(char *p) {
exit(1); char *src = p;
char *dst = p;
int prev_slash = 0;
while (*src) {
char c = *src++;
if (c == '\\' || c == '/') {
if (!prev_slash) {
*dst++ = '/';
prev_slash = 1;
}
} else {
*dst++ = c;
prev_slash = 0;
}
} }
static void *xmalloc(size_t n) { *dst = '\0';
void *p = malloc(n);
if (!p)
perror_exit("malloc");
return p;
} }
// ----------------------------- Convert filetime to epoch -------------- // ----------------------------- Convert filetime to epoch --------------
@@ -29,7 +40,6 @@ static uint64_t filetime_to_epoch(const FILETIME *ft) {
// Windows epoch (1601) → Unix epoch (1970) // Windows epoch (1601) → Unix epoch (1970)
return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL; return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL;
} }
// ----------------------------- Format time helper ------------------------- // ----------------------------- Format time helper -------------------------
static void format_time(uint64_t t, char *out, size_t out_sz) { static void format_time(uint64_t t, char *out, size_t out_sz) {
if (t == 0) { if (t == 0) {
@@ -258,6 +268,7 @@ FileEntry *mpmc_pop(MPMCQueue *q) {
return data; return data;
} }
// --------------- parallel directory scanning ----------------
// Add queue helper functions // Add queue helper functions
static void dirqueue_push(DirQueue *q, const char *path) { static void dirqueue_push(DirQueue *q, const char *path) {
EnterCriticalSection(&q->cs); EnterCriticalSection(&q->cs);
@@ -375,7 +386,7 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q) {
} }
// ----------------------------- Hashing helpers ----------------------------- // ----------------------------- Hashing helpers -----------------------------
static void xxh3_hash_file_stream(const char *path, char *out_hex) { static void xxh3_hash_file_stream(const char *path, char *out_hex, BYTE *buf) {
// compute XXH3_128 over file. POSIX and Windows use standard reads in this // compute XXH3_128 over file. POSIX and Windows use standard reads in this
// helper. // helper.
// On Windows try to use overlapped synchronous chunked reads for higher // On Windows try to use overlapped synchronous chunked reads for higher
@@ -391,7 +402,6 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) {
XXH3_state_t state; XXH3_state_t state;
XXH3_128bits_reset(&state); XXH3_128bits_reset(&state);
BYTE *buf = (BYTE *)malloc(READ_BLOCK);
DWORD read = 0; DWORD read = 0;
BOOL ok; BOOL ok;
while (ReadFile(hFile, buf, READ_BLOCK, &read, NULL) && read > 0) { while (ReadFile(hFile, buf, READ_BLOCK, &read, NULL) && read > 0) {
@@ -400,7 +410,6 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex) {
} }
h = XXH3_128bits_digest(&state); h = XXH3_128bits_digest(&state);
CloseHandle(hFile); CloseHandle(hFile);
free(buf);
snprintf(out_hex, HASH_STRLEN, "%016llx%016llx", (unsigned long long)h.high64, snprintf(out_hex, HASH_STRLEN, "%016llx%016llx", (unsigned long long)h.high64,
(unsigned long long)h.low64); (unsigned long long)h.low64);
} }
@@ -411,6 +420,7 @@ static DWORD WINAPI hash_worker(LPVOID arg) {
WorkerContext *ctx = (WorkerContext *)arg; WorkerContext *ctx = (WorkerContext *)arg;
MPMCQueue *q = ctx->queue; MPMCQueue *q = ctx->queue;
mem_arena *local_arena = ctx->arena; mem_arena *local_arena = ctx->arena;
BYTE *buf = (BYTE *)malloc(READ_BLOCK);
for (;;) { for (;;) {
FileEntry *fe = mpmc_pop(q); FileEntry *fe = mpmc_pop(q);
@@ -418,7 +428,7 @@ static DWORD WINAPI hash_worker(LPVOID arg) {
break; break;
char hash[HASH_STRLEN]; char hash[HASH_STRLEN];
xxh3_hash_file_stream(fe->path, hash); xxh3_hash_file_stream(fe->path, hash, buf);
char created[32], modified[32]; char created[32], modified[32];
format_time(fe->created_time, created, sizeof(created)); format_time(fe->created_time, created, sizeof(created));
@@ -440,6 +450,7 @@ static DWORD WINAPI hash_worker(LPVOID arg) {
free(fe->path); free(fe->path);
free(fe); free(fe);
} }
free(buf);
return 0; return 0;
} }
@@ -587,6 +598,23 @@ int main(int argc, char **argv) {
printf(" - %s\n", folders[i]); printf(" - %s\n", folders[i]);
} }
// -------------------------------
// Creating a general purpose arena
// -------------------------------
arena_params params = {
.reserve_size = GiB(1),
.commit_size = MiB(16),
.align = 0,
.push_size = 0,
.allow_free_list = true,
.allow_swapback = false,
.growth_policy = ARENA_GROWTH_NORMAL,
.commit_policy = ARENA_COMMIT_LAZY,
.max_nbre_blocks = 1,
};
mem_arena *gp_arena = arena_create(&params);
// ------------------------------- // -------------------------------
// Detect hardware threads (CPU cores) // Detect hardware threads (CPU cores)
// ------------------------------- // -------------------------------
@@ -596,7 +624,7 @@ int main(int argc, char **argv) {
GetLogicalProcessorInformation(NULL, &len); GetLogicalProcessorInformation(NULL, &len);
SYSTEM_LOGICAL_PROCESSOR_INFORMATION *buf = SYSTEM_LOGICAL_PROCESSOR_INFORMATION *buf =
(SYSTEM_LOGICAL_PROCESSOR_INFORMATION *)malloc(len); (SYSTEM_LOGICAL_PROCESSOR_INFORMATION *)arena_push(&gp_arena, len, true);
if (GetLogicalProcessorInformation(buf, &len)) { if (GetLogicalProcessorInformation(buf, &len)) {
DWORD count = 0; DWORD count = 0;
@@ -608,7 +636,7 @@ int main(int argc, char **argv) {
if (count > 0) if (count > 0)
hw_threads = count; hw_threads = count;
} }
free(buf); arena_free(&gp_arena, (u8 **)&buf, len);
// Add some extra threads to overlap I/O more aggressively // Add some extra threads to overlap I/O more aggressively
size_t num_threads = hw_threads * 2; size_t num_threads = hw_threads * 2;
@@ -628,19 +656,6 @@ int main(int argc, char **argv) {
q.active = 0; q.active = 0;
// starting hash threads // starting hash threads
arena_params params = {
.reserve_size = GiB(1),
.commit_size = MiB(16),
.align = 0,
.push_size = 0,
.allow_free_list = true,
.allow_swapback = false,
.growth_policy = ARENA_GROWTH_NORMAL,
.commit_policy = ARENA_COMMIT_LAZY,
.max_nbre_blocks = 0,
};
WorkerContext workers[num_threads]; WorkerContext workers[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
@@ -648,7 +663,8 @@ int main(int argc, char **argv) {
workers[i].arena = arena_create(&params); workers[i].arena = arena_create(&params);
} }
HANDLE *hash_threads = malloc(sizeof(HANDLE) * num_threads); HANDLE *hash_threads =
arena_push(&gp_arena, sizeof(HANDLE) * num_threads, true);
for (size_t i = 0; i < num_threads; ++i) { for (size_t i = 0; i < num_threads; ++i) {
hash_threads[i] = CreateThread(NULL, 0, hash_worker, &workers[i], 0, NULL); hash_threads[i] = CreateThread(NULL, 0, hash_worker, &workers[i], 0, NULL);
@@ -665,7 +681,9 @@ int main(int argc, char **argv) {
if (scan_threads < 2) if (scan_threads < 2)
scan_threads = 2; scan_threads = 2;
HANDLE *scan_tids = malloc(sizeof(HANDLE) * scan_threads); HANDLE *scan_tids =
arena_push(&gp_arena, sizeof(HANDLE) * scan_threads, true);
for (size_t i = 0; i < scan_threads; ++i) { for (size_t i = 0; i < scan_threads; ++i) {
scan_tids[i] = scan_tids[i] =
CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)scan_worker, &q, 0, NULL); CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)scan_worker, &q, 0, NULL);
@@ -673,7 +691,6 @@ int main(int argc, char **argv) {
WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE);
// debug
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < num_threads; i++) {
mpmc_push(&g_file_queue, NULL); mpmc_push(&g_file_queue, NULL);
} }
@@ -682,7 +699,8 @@ int main(int argc, char **argv) {
for (size_t i = 0; i < scan_threads; ++i) for (size_t i = 0; i < scan_threads; ++i)
CloseHandle(scan_tids[i]); CloseHandle(scan_tids[i]);
free(scan_tids);
arena_free(&gp_arena, (u8 **)&scan_tids, sizeof(HANDLE) * scan_threads);
double scan_seconds = timer_stop(&scan_timer); double scan_seconds = timer_stop(&scan_timer);
size_t total_found = atomic_load(&g_files_found); size_t total_found = atomic_load(&g_files_found);
@@ -703,7 +721,7 @@ int main(int argc, char **argv) {
for (size_t i = 0; i < num_threads; ++i) for (size_t i = 0; i < num_threads; ++i)
CloseHandle(hash_threads[i]); CloseHandle(hash_threads[i]);
free(hash_threads); arena_free(&gp_arena, (u8 **)&hash_threads, sizeof(HANDLE) * num_threads);
WaitForSingleObject(progress, INFINITE); WaitForSingleObject(progress, INFINITE);
CloseHandle(progress); CloseHandle(progress);
@@ -727,14 +745,14 @@ int main(int argc, char **argv) {
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
mem_arena *arena = workers[i].arena; mem_arena *local_hash_arena = workers[i].arena;
DWORD written; DWORD written;
u8 *arena_base = u8 *arena_base = (u8 *)local_hash_arena +
(u8 *)arena + ALIGN_UP_POW2(sizeof(mem_arena), arena->align); ALIGN_UP_POW2(sizeof(mem_arena), local_hash_arena->align);
WriteFile(h, arena_base, (DWORD)arena->pos, &written, NULL); WriteFile(h, arena_base, (DWORD)local_hash_arena->pos, &written, NULL);
} }
// done time // done time