diff --git a/binaries/changelog.txt b/binaries/changelog.txt index 1be6243..0db7f6c 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -50,6 +50,8 @@ Fixing user prompt parsing Reorganising the code Improving the scan function -5.0: Implementing the IO Ring instead of buffered hashing, huge performance gains. +5.0: Implementing the IO Ring instead of buffered hashing, huge performance gains. The IO Ring is thread local, uses DMA and direct disk I/O, bypassing the OS cash completely, it supports bashing multiple submissions and can handle multiple files at the same time. +Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers fixing the xxh_x86dispatch warnings Updating the progress printing function + diff --git a/file_hasher.c b/file_hasher.c index 772559d..9bbe3d2 100644 --- a/file_hasher.c +++ b/file_hasher.c @@ -87,7 +87,7 @@ int main(int argc, char **argv) { printf(" Selected instruction set: %s\n", get_xxhash_instruction_set()); // Align IO Ring block size to the system page size - u64 g_ioring_read_block = ALIGN_UP_POW2(IORING_READ_BLOCK, g_pagesize); + g_ioring_buffer_size = ALIGN_UP_POW2(IORING_BUFFER_SIZE, g_pagesize); // ------------------------------- // Scanning and hashing // ------------------------------- @@ -120,6 +120,7 @@ int main(int argc, char **argv) { // Starting hash threads size_t num_hash_threads = num_threads; + // size_t num_hash_threads = 1; WorkerContext workers[num_hash_threads]; Thread *hash_threads = diff --git a/platform.c b/platform.c index 0d611e7..c9cce03 100644 --- a/platform.c +++ b/platform.c @@ -874,14 +874,48 @@ static THREAD_RETURN progress_thread(void *arg) { // ======================== Hash worker IO Ring ======================== // -------------------------- Configuration --------------------------- +#define IORING_BUFFER_SIZE (KiB(512)) #define NUM_BUFFERS_PER_THREAD 16 +#define MAX_ACTIVE_FILES 8 #define SUBMIT_TIMEOUT_MS 30000 #define USERDATA_REGISTER 1 -#define IORING_READ_BLOCK (KiB(1024)) + // Globals -u64 g_ioring_read_block = 4096 * 64; +u64 g_ioring_buffer_size = 4096 * 64; static atomic_uint_fast64_t g_io_ring_fallbacks = 0; +// -------------------------- File context --------------------------- +typedef struct IoBuffer IoBuffer; + +typedef struct FileReadContext { + HANDLE hFile; + uint64_t file_size; + bool use_incremental_hash; + union { + XXH3_state_t hash_state; // For incremental hash (large files) + XXH128_hash_t single_hash; // For single-shot hash (small files) + }; + FileEntry *fe; + + // Completion tracking + int reads_submitted; + int reads_completed; + + int active_reads; + int failed_reads; + + int reads_hashed; + uint64_t bytes_hashed; + + // For in-order hashing + uint64_t next_hash_offset; + uint64_t next_read_offset; + + IoBuffer *head; + IoBuffer *tail; + +} FileReadContext; + // -------------------------- Buffer structure --------------------------- typedef struct IoBuffer { void *data; @@ -891,44 +925,55 @@ typedef struct IoBuffer { HRESULT result; int buffer_id; int completed; + + FileReadContext *file; + + struct IoBuffer *next; } IoBuffer; -// ============================================================================ // Thread-local I/O Ring context -// ============================================================================ typedef struct ThreadIoContext { HIORING ring; HANDLE completion_event; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD]; int free_count; + int num_submissions; + int active_files; int initialized; } ThreadIoContext; -// -------------------------- File context --------------------------- -typedef struct FileReadContext { - HANDLE hFile; - uint64_t file_size; - XXH3_state_t hash_state; - char path[MAX_PATH]; - - // Completion tracking - int reads_submitted; - int reads_completed; - int reads_hashed; - uint64_t bytes_hashed; - int failed_reads; - int active_reads; - int buffers_ready; // Count of buffers ready to hash - - // For in-order hashing - uint64_t next_hash_offset; // Next offset that needs to be hashed - uint64_t next_read_offset; // Next offset to submit for reading - -} FileReadContext; - static _Thread_local ThreadIoContext *g_thread_ctx = NULL; +typedef struct FileQueue { + FileReadContext files[MAX_ACTIVE_FILES]; + int head; + int tail; + int count; +} FileQueue; + +// ---------------------- FIFO queue operations --------------------------- +static FileReadContext *fq_push(FileQueue *fq) { + if (fq->count == MAX_ACTIVE_FILES) + return NULL; + + FileReadContext *f = &fq->files[fq->tail]; + fq->tail = (fq->tail + 1) % MAX_ACTIVE_FILES; + fq->count++; + return f; +} + +static FileReadContext *fq_peek(FileQueue *fq) { + if (fq->count == 0) + return NULL; + return &fq->files[fq->head]; +} + +static void fq_pop(FileQueue *fq) { + fq->head = (fq->head + 1) % MAX_ACTIVE_FILES; + fq->count--; +} + // ---------------------- Initialize thread context --------------------------- static ThreadIoContext *io_ring_init_thread(void) { if (g_thread_ctx && g_thread_ctx->initialized) { @@ -960,7 +1005,7 @@ static ThreadIoContext *io_ring_init_thread(void) { // Initialize buffer pool IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD]; - u64 buf_pool_size = g_ioring_read_block * NUM_BUFFERS_PER_THREAD; + u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; // Reserve and Commit the entire memory chunk void *base_ptr = plat_mem_reserve(buf_pool_size); @@ -975,13 +1020,13 @@ static ThreadIoContext *io_ring_init_thread(void) { for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) { - g_thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_read_block); + g_thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size); g_thread_ctx->buffer_pool[i] = i; g_thread_ctx->buffers[i].buffer_id = i; buf_info[i].Address = g_thread_ctx->buffers[i].data; - buf_info[i].Length = (ULONG)g_ioring_read_block; + buf_info[i].Length = (ULONG)g_ioring_buffer_size; } g_thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; @@ -997,6 +1042,9 @@ static ThreadIoContext *io_ring_init_thread(void) { // Submit registration SubmitIoRing(g_thread_ctx->ring, 0, 0, NULL); + g_thread_ctx->num_submissions = 0; + g_thread_ctx->active_files = 0; + g_thread_ctx->initialized = 1; return g_thread_ctx; @@ -1015,7 +1063,7 @@ static void io_ring_cleanup_thread(void) { g_thread_ctx = NULL; } -// ---------------------- Get a free buffer from pool ------------------------ +// ---------------------- Buffer get and return ------------------------ static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { if (ctx->free_count == 0) { @@ -1043,6 +1091,7 @@ static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx, IoBuffer *buf, uint64_t offset, size_t size) { buf->offset = offset; buf->size = size; + buf->file = file_ctx; IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->hFile); IORING_BUFFER_REF buffer_ref = @@ -1055,6 +1104,7 @@ static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx, if (SUCCEEDED(hr)) { file_ctx->active_reads++; file_ctx->reads_submitted++; + ctx->num_submissions++; } else { buf->completed = 1; return_buffer(ctx, buf); @@ -1062,11 +1112,40 @@ static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx, return hr; } +// ------------ Link completed buffers in an ordered list ------------- +static void insert_buffer_ordered(FileReadContext *file, IoBuffer *buf) { + buf->next = NULL; + + // empty list + if (!file->head) { + file->head = file->tail = buf; + return; + } + + // insert at head + if (buf->offset < file->head->offset) { + buf->next = file->head; + file->head = buf; + return; + } + + // find position + IoBuffer *cur = file->head; + + while (cur->next && cur->next->offset < buf->offset) { + cur = cur->next; + } + + buf->next = cur->next; + cur->next = buf; + + if (!buf->next) + file->tail = buf; +} + // -------------------------- Process completions --------------------------- -static int process_completions(ThreadIoContext *ctx, - FileReadContext *file_ctx) { +static void process_completions(ThreadIoContext *ctx, FileQueue *fq) { IORING_CQE cqe; - int processed = 0; while (PopIoRingCompletion(ctx->ring, &cqe) == S_OK) { @@ -1074,273 +1153,294 @@ static int process_completions(ThreadIoContext *ctx, continue; IoBuffer *buf = (IoBuffer *)cqe.UserData; + FileReadContext *file = buf->file; - if (buf && !buf->completed) { - buf->result = cqe.ResultCode; - buf->bytes_read = (DWORD)cqe.Information; - buf->completed = 1; + buf->result = cqe.ResultCode; + buf->bytes_read = (DWORD)cqe.Information; + buf->completed = 1; - if (SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) { - file_ctx->active_reads--; - file_ctx->reads_completed++; - file_ctx->buffers_ready++; - } else { - file_ctx->failed_reads++; - file_ctx->active_reads--; - file_ctx->reads_completed++; - } + file->active_reads--; + file->reads_completed++; + ctx->num_submissions--; - processed++; + if (SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) { + + buf->next = NULL; + + insert_buffer_ordered(file, buf); + + } else { + file->failed_reads++; + return_buffer(ctx, buf); } } - - return processed; } -// -------------------- Hash buffer in sequential order ----------------------- -static int hash_sequential_buffers(ThreadIoContext *ctx, - FileReadContext *file_ctx) { - int hashed = 0; +// -------------------- File operations ----------------------- +static int init_file(FileReadContext *f, FileEntry *fe) { + memset(f, 0, sizeof(*f)); - // Keep hashing while the next buffer in sequence is ready - while (file_ctx->next_hash_offset < file_ctx->file_size) { - // Find the buffer that contains next_hash_offset - IoBuffer *found_buf = NULL; + f->fe = fe; + f->file_size = fe->size_bytes; - for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) { - IoBuffer *buf = &ctx->buffers[i]; - if (buf->completed && buf->offset == file_ctx->next_hash_offset) { - found_buf = buf; - break; - } - } + f->hFile = CreateFileA( + fe->path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, + OPEN_EXISTING, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL); - if (!found_buf) - break; // Buffer not ready yet - - // Found the correct buffer in order - hash it - if (SUCCEEDED(found_buf->result) && found_buf->bytes_read > 0) { - XXH3_128bits_update(&file_ctx->hash_state, found_buf->data, - found_buf->bytes_read); - atomic_fetch_add(&g_bytes_processed, found_buf->bytes_read); - - // Update bytes_hashed for this file! - file_ctx->bytes_hashed += found_buf->bytes_read; - file_ctx->reads_hashed++; - file_ctx->buffers_ready--; - - // Mark as processed and return buffer to pool - found_buf->completed = 0; - return_buffer(ctx, found_buf); - - // Move to next offset - file_ctx->next_hash_offset += found_buf->size; - hashed++; - } else if (found_buf->bytes_read == 0 && SUCCEEDED(found_buf->result)) { - // Read operation failed with an error code - file_ctx->reads_hashed++; - file_ctx->buffers_ready--; - found_buf->completed = 0; - return_buffer(ctx, found_buf); - file_ctx->next_hash_offset += found_buf->size; - hashed++; - } else { - // Read failed - file_ctx->failed_reads++; - file_ctx->reads_hashed++; - file_ctx->buffers_ready--; - found_buf->completed = 0; - return_buffer(ctx, found_buf); - file_ctx->next_hash_offset += found_buf->size; - hashed++; - } + if (f->hFile == INVALID_HANDLE_VALUE) { + return 0; } - return hashed; + // Determine hash method based on file size + if (f->file_size > g_ioring_buffer_size) { + f->use_incremental_hash = true; + XXH3_128bits_reset(&f->hash_state); + } else { + f->use_incremental_hash = false; + } + return 1; +} + +static void finalize_file(FileReadContext *file, WorkerContext *ctx) { + + FileEntry *fe = file->fe; + + char hash[HASH_STRLEN]; + + if (file->failed_reads == 0 && file->bytes_hashed == file->file_size) { + if (file->use_incremental_hash) { + // Large file: digest the accumulated hash state + XXH128_hash_t h = XXH3_128bits_digest(&file->hash_state); + snprintf(hash, HASH_STRLEN, "%016llx%016llx", + (unsigned long long)h.high64, (unsigned long long)h.low64); + } else { + // Small file: hash already computed, stored directly in single_hash + snprintf(hash, HASH_STRLEN, "%016llx%016llx", + (unsigned long long)file->single_hash.high64, + (unsigned long long)file->single_hash.low64); + } + } else { + atomic_fetch_add(&g_io_ring_fallbacks, 1); + xxh3_hash_file_stream(fe->path, hash, NULL); + printf("Fallback for path: %s\n", fe->path); + } + + char created[32], modified[32]; + format_time(fe->created_time, created, sizeof(created)); + format_time(fe->modified_time, modified, sizeof(modified)); + + double size_kib = (double)fe->size_bytes / 1024.0; + + char stack_buf[1024]; + int len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", + hash, fe->path, size_kib, created, modified, fe->owner); + + char *dst = arena_push(&ctx->arena, len, false); + memcpy(dst, stack_buf, len); + + atomic_fetch_add(&g_files_hashed, 1); +} + +// -------------------- Hash head file ----------------------- +static void hash_head_file(ThreadIoContext *ctx, FileQueue *fq, + WorkerContext *wctx) { + + FileReadContext *file = fq_peek(fq); + if (!file) + return; + + while (file->head) { + + IoBuffer *buf = file->head; + + // Check ordering + if (buf->offset != file->bytes_hashed) + return; + + // Consume + file->head = buf->next; + if (!file->head) + file->tail = NULL; + + // Calculate actual bytes to hash (handle last partial sector) + size_t bytes_to_hash = buf->bytes_read; + + // If this is the last buffer and we read beyond file size, trim it + if (buf->offset + buf->bytes_read > file->file_size) { + bytes_to_hash = file->file_size - buf->offset; + } + + if (bytes_to_hash > 0) { + if (file->use_incremental_hash) { + // Large file: update incremental hash state + XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); + } else { + // Small file: single-shot hash + file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + } + + file->bytes_hashed += bytes_to_hash; + atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + } + + return_buffer(ctx, buf); + } + + // Finalize + if (file->active_reads == 0 && file->bytes_hashed >= file->file_size) { + finalize_file(file, wctx); + CloseHandle(file->hFile); + fq_pop(fq); + ctx->active_files--; + } } // ------------- Submit pending reads - fill all free buffers ----------------- -static int submit_pending_reads(ThreadIoContext *ctx, - FileReadContext *file_ctx) { - int submitted = 0; +static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, + WorkerContext *worker_ctx, int *submitting) { - while (1) { - // Check if we have more data to read - uint64_t current_offset = file_ctx->next_read_offset; - int has_data = (current_offset < file_ctx->file_size); + MPMCQueue *file_queue = worker_ctx->file_queue; - if (!has_data) - break; + // Try to submit reads for the current head file + FileReadContext *f = fq_peek(fq); - // Get a free buffer - IoBuffer *buf = get_free_buffer(ctx); - if (!buf) - break; + for (;;) { - size_t remaining = file_ctx->file_size - current_offset; + if (f) { + while (f->next_read_offset < f->file_size) { - size_t bytes_to_read; + IoBuffer *buf = get_free_buffer(ctx); + if (!buf) + return; - if (remaining >= g_ioring_read_block) { - bytes_to_read = g_ioring_read_block; - } else { - // Round UP to sector size (4096) - bytes_to_read = ALIGN_UP_POW2(remaining, g_pagesize); - } + size_t remaining = f->file_size - f->next_read_offset; + size_t size; - HRESULT hr = submit_read(ctx, file_ctx, buf, current_offset, bytes_to_read); + // Check if this is the last read and the file size is not + // sector-aligned + BOOL is_last_read = (remaining <= g_ioring_buffer_size); - if (SUCCEEDED(hr)) { - submitted++; + if (remaining >= g_ioring_buffer_size) { + // Normal full read + size = g_ioring_buffer_size; + } else { + // Last read - handle partial sector + if (remaining % g_pagesize != 0) { + size = ALIGN_UP_POW2(remaining, g_pagesize); - file_ctx->next_read_offset += bytes_to_read; - } else { - return_buffer(ctx, buf); - break; - } - } + } else { + size = remaining; + } + } - return submitted; -} + HRESULT hr = submit_read(ctx, f, buf, f->next_read_offset, size); -// -------------------------- Wait for completions --------------------------- -static void wait_for_completions(ThreadIoContext *ctx, - FileReadContext *file_ctx) { - int has_active = (file_ctx->active_reads > 0); + if (FAILED(hr)) { + return_buffer(ctx, buf); + f->failed_reads++; + f->active_reads = 0; + f->reads_submitted = 0; + f->next_read_offset = f->file_size; + break; + } - if (has_active && ctx->completion_event) { - WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); - } -} + f->next_read_offset += size; -// ---------------------- Main parallel hashing function ---------------------- -static void xxh3_hash_file_parallel(ThreadIoContext *ctx, const char *path, - char *out_hex, unsigned char *temp_buffer) { - - // Validate I/O Ring - if (!ctx || !ctx->ring) { - xxh3_hash_file_stream(path, out_hex, temp_buffer); - return; - } - - HANDLE hFile = CreateFileA( - path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, - OPEN_EXISTING, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL); - - if (hFile == INVALID_HANDLE_VALUE) { - xxh3_hash_file_stream(path, out_hex, temp_buffer); - return; - } - - LARGE_INTEGER file_size; - if (!GetFileSizeEx(hFile, &file_size)) { - xxh3_hash_file_stream(path, out_hex, temp_buffer); - CloseHandle(hFile); - return; - } - - FileReadContext file_ctx; - memset(&file_ctx, 0, sizeof(file_ctx)); - file_ctx.hFile = hFile; - file_ctx.file_size = file_size.QuadPart; - file_ctx.next_hash_offset = 0; - file_ctx.next_read_offset = 0; - strncpy(file_ctx.path, path, MAX_PATH - 1); - file_ctx.path[MAX_PATH - 1] = 0; - XXH3_128bits_reset(&file_ctx.hash_state); - - // Submit initial reads - submit_pending_reads(ctx, &file_ctx); - - if (file_ctx.reads_submitted > 0) { - UINT32 submitted = 0; - SubmitIoRing(ctx->ring, 0, 0, &submitted); - } - - // Main loop - while (file_ctx.reads_hashed < file_ctx.reads_submitted) { - // Process completions - process_completions(ctx, &file_ctx); - - // Hash buffers in sequential order (critical!) - hash_sequential_buffers(ctx, &file_ctx); - - // Submit more reads if needed - if (file_ctx.active_reads < NUM_BUFFERS_PER_THREAD && - file_ctx.next_read_offset < file_ctx.file_size) { - int new_reads = submit_pending_reads(ctx, &file_ctx); - if (new_reads > 0) { - UINT32 submitted = 0; - SubmitIoRing(ctx->ring, 0, 0, &submitted); + if (ctx->num_submissions >= NUM_BUFFERS_PER_THREAD) + return; } } - // Wait if nothing to hash and active reads exist - if (file_ctx.active_reads > 0 && file_ctx.buffers_ready == 0) { - wait_for_completions(ctx, &file_ctx); + // Add new file if possible + if (!*submitting) + return; + + if (ctx->active_files >= MAX_ACTIVE_FILES) + return; + + FileEntry *fe = mpmc_pop(file_queue); + if (!fe) { + *submitting = 0; + return; } + + FileReadContext *newf = fq_push(fq); + + if (!init_file(newf, fe)) { + // File can't be opened with NO_BUFFERING, process with fallback + char hash[HASH_STRLEN]; + finalize_file(newf, worker_ctx); + fq_pop(fq); + continue; + } + + f = newf; + ctx->active_files++; + } +} + +// -------------------------- Wait for completions --------------------------- +static void wait_for_completions(ThreadIoContext *ctx) { + + // If there are in-flight I/O requests → wait for completion + if (ctx->num_submissions > 0) { + WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); + return; } - // Final verification - if (file_ctx.bytes_hashed == file_ctx.file_size && - file_ctx.failed_reads == 0) { - XXH128_hash_t h = XXH3_128bits_digest(&file_ctx.hash_state); - snprintf(out_hex, HASH_STRLEN, "%016llx%016llx", - (unsigned long long)h.high64, (unsigned long long)h.low64); - } else { - if (file_ctx.bytes_hashed != file_ctx.file_size) { - atomic_fetch_add(&g_io_ring_fallbacks, 1); - } - xxh3_hash_file_stream(path, out_hex, temp_buffer); - } - - CloseHandle(hFile); + // Sleep(1); } // -------------------------- Hash worker I/O Ring --------------------------- static THREAD_RETURN hash_worker_io_ring(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; - unsigned char *temp_buffer = (unsigned char *)malloc(READ_BLOCK); - char hash[HASH_STRLEN]; - if (!temp_buffer) - return THREAD_RETURN_VALUE; - - // Initialize I/O Ring for this thread + // Init IO ring ThreadIoContext *ring_ctx = io_ring_init_thread(); if (!ring_ctx || !ring_ctx->ring) { printf("Thread %lu: I/O Ring unavailable, using buffered I/O\n", GetCurrentThreadId()); - free(temp_buffer); return hash_worker(arg); } + // Initialize pipeline state + FileQueue fq; + memset(&fq, 0, sizeof(fq)); + + int submitting = 1; + + // Main pipeline loop for (;;) { - FileEntry *fe = mpmc_pop(ctx->file_queue); - if (!fe) + + // 1. Submit new reads + submit_pending_reads(ring_ctx, &fq, ctx, &submitting); + + UINT32 submitted = 0; + SubmitIoRing(ring_ctx->ring, 0, 0, &submitted); + + // 5. Avoid busy witing + wait_for_completions(ring_ctx); + + // 2. Process completions + process_completions(ring_ctx, &fq); + + // 3. Hash files + for (int i = 0; i < fq.count; i++) { + hash_head_file(ring_ctx, &fq, ctx); + } + + // debug + // printf("Free buffers: %d, Submissions: %d, Active files: %d\n", + // ring_ctx->free_count, ring_ctx->num_submissions, + // ring_ctx->active_files); + + // 4. Exit condition + if (!submitting && ring_ctx->active_files == 0 && + ring_ctx->num_submissions == 0) { break; - - // Pass the I/O Ring context to the hashing function - xxh3_hash_file_parallel(ring_ctx, fe->path, hash, temp_buffer); - - char created[32], modified[32]; - format_time(fe->created_time, created, sizeof(created)); - format_time(fe->modified_time, modified, sizeof(modified)); - - double size_kib = (double)fe->size_bytes / 1024.0; - - char stack_buf[1024]; - int len = - snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", - hash, fe->path, size_kib, created, modified, fe->owner); - - char *dst = arena_push(&ctx->arena, len, false); - memcpy(dst, stack_buf, len); - atomic_fetch_add(&g_files_hashed, 1); + } } io_ring_cleanup_thread(); - free(temp_buffer); - return THREAD_RETURN_VALUE; }