#pragma once // ensure that a given header file is included only once in a // single compilation unit #include "arena.h" #include "base.h" #include "lf_mpmc.h" #include "arena.c" // xxhash include #define XXH_STATIC_LINKING_ONLY #include "xxh_x86dispatch.h" // ----------------------------- Config ------------------------------------- #define FILE_HASHES_TXT "file_hashes.txt" #define HASH_STRLEN 33 // 128-bit hex (32 chars) + null #define MAX_PATHLEN 4096 #define READ_BLOCK (KiB(64)) // ----------------------------- Globals ------------------------------------ static atomic_uint_fast64_t g_files_found = 0; static atomic_uint_fast64_t g_files_hashed = 0; static atomic_uint_fast64_t g_bytes_processed = 0; static atomic_int g_scan_done = 0; // ================== OS-agnostic functions abstraction ===================== // ----------------------------- Timer functions -------------- typedef struct { u64 start; u64 now; } HiResTimer; #if defined(_WIN32) || defined(_WIN64) static LARGE_INTEGER g_freq; static void timer_init(void) { QueryPerformanceFrequency(&g_freq); } static void timer_start(HiResTimer *t) { LARGE_INTEGER v; QueryPerformanceCounter(&v); t->start = v.QuadPart; } static double timer_elapsed(HiResTimer *t) { LARGE_INTEGER v; QueryPerformanceCounter(&v); t->now = v.QuadPart; return (double)(t->now - t->start) / (double)g_freq.QuadPart; } #elif defined(__linux__) void timer_init(void) {} void timer_start(HiResTimer *t) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); t->start = ts.tv_sec * 1000000000ULL + ts.tv_nsec; } double timer_elapsed(HiResTimer *t) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); uint64_t now = ts.tv_sec * 1000000000ULL + ts.tv_nsec; return (double)(now - t->start) / 1e9; } #endif // ----------------------------- Get HW info -------------- #if defined(_WIN32) || defined(_WIN64) size_t platform_physical_cores(void) { DWORD len = 0; GetLogicalProcessorInformation(NULL, &len); SYSTEM_LOGICAL_PROCESSOR_INFORMATION buf[len]; GetLogicalProcessorInformation(buf, &len); DWORD count = 0; DWORD n = len / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION); for (DWORD i = 0; i < n; i++) { if (buf[i].Relationship == RelationProcessorCore) count++; } return count ? count : 1; } #elif defined(__linux__) size_t platform_physical_cores(void) { long n = sysconf(_SC_NPROCESSORS_ONLN); return n > 0 ? (size_t)n : 1; } #endif const char *get_xxhash_instruction_set(void) { int vecID = XXH_featureTest(); switch (vecID) { case XXH_SCALAR: return "Scalar (portable C)"; case XXH_SSE2: return "SSE2"; case XXH_AVX2: return "AVX2"; case XXH_AVX512: return "AVX-512"; default: return "Unknown"; } } // -------------------- File IO ------------------- #if defined(_WIN32) || defined(_WIN64) typedef HANDLE FileHandle; #define INVALID_FILE_HANDLE INVALID_HANDLE_VALUE // File open function static FileHandle os_file_open(const char *path) { return CreateFileA(path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_FLAG_SEQUENTIAL_SCAN, NULL); } // File read function static int os_file_read(FileHandle handle, void *buf, size_t count, uint64_t *bytes_read) { DWORD read = 0; BOOL result = ReadFile(handle, buf, (DWORD)count, &read, NULL); *bytes_read = read; return (result && read > 0) ? 0 : -1; } // File close function static void os_file_close(FileHandle handle) { CloseHandle(handle); } #elif defined(__linux__) typedef int FileHandle; #define INVALID_FILE_HANDLE (-1) // File open function static FileHandle os_file_open(const char *path) { return open(path, O_RDONLY | O_NOFOLLOW); } // File read function static int os_file_read(FileHandle handle, void *buf, size_t count, uint64_t *bytes_read) { ssize_t result = read(handle, buf, count); if (result >= 0) { *bytes_read = (uint64_t)result; return 0; } *bytes_read = 0; return -1; } // File close function static void os_file_close(FileHandle handle) { close(handle); } #endif // -------------------- Thread abstraction ------------------- // Threads context typedef struct { u8 num_threads; mem_arena *path_arena; mem_arena *meta_arena; MPMCQueue *dir_queue; MPMCQueue *file_queue; } ScannerContext; typedef struct { mem_arena *arena; MPMCQueue *file_queue; } WorkerContext; #if defined(_WIN32) || defined(_WIN64) typedef HANDLE ThreadHandle; typedef DWORD(WINAPI *ThreadFunc)(void *); #define THREAD_RETURN DWORD WINAPI #define THREAD_RETURN_VALUE 0; typedef struct { ThreadHandle handle; int valid; // Track if thread was successfully created } Thread; // Thread function wrapper to handle different return types #define THREAD_FUNCTION(name) DWORD WINAPI name(LPVOID arg) // Thread creation function static int thread_create(Thread *thread, ThreadFunc func, void *arg) { thread->handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)func, arg, 0, NULL); return (thread->handle != NULL) ? 0 : -1; } // Thread join function static int thread_join(Thread *thread) { return (WaitForSingleObject(thread->handle, INFINITE) == WAIT_OBJECT_0) ? 0 : -1; } // Thread close/detach function static void thread_close(Thread *thread) { CloseHandle(thread->handle); } // Wait for multiple threads static int thread_wait_multiple(Thread *threads, size_t count) { HANDLE handles[64]; // Max 64 threads for Windows for (size_t i = 0; i < count; i++) { handles[i] = threads[i].handle; } return (WaitForMultipleObjects((DWORD)count, handles, TRUE, INFINITE) == WAIT_OBJECT_0) ? 0 : -1; } #elif defined(__linux__) typedef pthread_t ThreadHandle; typedef void *(*ThreadFunc)(void *); #define THREAD_RETURN void * #define THREAD_RETURN_VALUE NULL; typedef struct { ThreadHandle handle; int valid; // Track if thread was successfully created } Thread; // Thread function wrapper to handle different return types typedef struct { void *(*func)(void *); void *arg; } ThreadWrapper; static void *thread_start_routine(void *arg) { ThreadWrapper *wrapper = (ThreadWrapper *)arg; void *result = wrapper->func(wrapper->arg); free(wrapper); return result; } // Thread creation function static int thread_create(Thread *thread, ThreadFunc func, void *arg) { int ret = pthread_create(&thread->handle, NULL, func, arg); if (ret == 0) { thread->valid = 1; } return ret; } // Thread join function static int thread_join(Thread *thread) { int ret = pthread_join(thread->handle, NULL); thread->valid = 0; return ret; } // Thread close/detach function static void thread_close(Thread *thread) { if (thread->valid) { pthread_detach(thread->handle); thread->valid = 0; } } // Wait for multiple threads static int thread_wait_multiple(Thread *threads, size_t count) { for (size_t i = 0; i < count; i++) { if (thread_join(&threads[i]) != 0) { return -1; } } return 0; } #endif // ======================== Get file metadata ======================== // -------------------- Path parsing ------------------- static void normalize_path(char *p) { char *src = p; char *dst = p; int prev_slash = 0; while (*src) { char c = *src++; if (c == '\\' || c == '/') { if (!prev_slash) { *dst++ = '/'; prev_slash = 1; } } else { *dst++ = c; prev_slash = 0; } } *dst = '\0'; } static int parse_paths(char *line, char folders[][MAX_PATHLEN], int max_folders) { int count = 0; char *p = line; while (*p && count < max_folders) { while (*p && isspace((unsigned char)*p)) p++; if (!*p) break; char *start; char quote = 0; if (*p == '"' || *p == '\'') { quote = *p++; start = p; while (*p && *p != quote) p++; } else { start = p; while (*p && !isspace((unsigned char)*p)) p++; } size_t len = p - start; if (len >= MAX_PATHLEN) len = MAX_PATHLEN - 1; memcpy(folders[count], start, len); folders[count][len] = 0; normalize_path(folders[count]); count++; if (quote && *p == quote) p++; } return count; } // ----------------------------- File time ------------------------- #if defined(_WIN32) || defined(_WIN64) static void format_time(uint64_t t, char *out, size_t out_sz) { if (t == 0) { snprintf(out, out_sz, "N/A"); return; } time_t tt = (time_t)t; struct tm tm; localtime_s(&tm, &tt); strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); } // ----------------------------- Convert filetime to epoch -------------- static uint64_t filetime_to_epoch(const FILETIME *ft) { ULARGE_INTEGER ull; ull.LowPart = ft->dwLowDateTime; ull.HighPart = ft->dwHighDateTime; // Windows epoch (1601) ¬ニメ Unix epoch (1970) return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL; } void platform_get_file_times(const char *path, uint64_t *out_created, uint64_t *out_modified) { WIN32_FILE_ATTRIBUTE_DATA fad; if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) { *out_created = filetime_to_epoch(&fad.ftCreationTime); *out_modified = filetime_to_epoch(&fad.ftLastWriteTime); } else { *out_created = 0; *out_modified = 0; } } #elif defined(__linux__) static void format_time(uint64_t t, char *out, size_t out_sz) { if (t == 0) { snprintf(out, out_sz, "N/A"); return; } time_t tt = (time_t)t; struct tm tm; localtime_r(&tt, &tm); strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); } void platform_get_file_times(const char *path, uint64_t *out_created, uint64_t *out_modified) { struct stat st; if (stat(path, &st) == 0) { *out_created = (uint64_t)st.st_ctime; *out_modified = (uint64_t)st.st_mtime; } else { *out_created = 0; *out_modified = 0; } } #endif // ----------------------------- File owner --------------------- #if defined(_WIN32) || defined(_WIN64) static void get_file_owner(const char *path, char *out, size_t out_sz) { PSID sid = NULL; PSECURITY_DESCRIPTOR sd = NULL; if (GetNamedSecurityInfoA(path, SE_FILE_OBJECT, OWNER_SECURITY_INFORMATION, &sid, NULL, NULL, NULL, &sd) == ERROR_SUCCESS) { char name[64], domain[64]; DWORD name_len = sizeof(name); DWORD domain_len = sizeof(domain); SID_NAME_USE use; if (LookupAccountSidA(NULL, sid, name, &name_len, domain, &domain_len, &use)) { snprintf(out, out_sz, "%s\\%s", domain, name); } else { snprintf(out, out_sz, "UNKNOWN"); } } else { snprintf(out, out_sz, "UNKNOWN"); } if (sd) LocalFree(sd); } void platform_get_file_owner(const char *path, char *out_owner, size_t out_owner_size) { get_file_owner(path, out_owner, out_owner_size); } #elif defined(__linux__) static void get_file_owner(uid_t uid, char *out, size_t out_sz) { struct passwd *pw = getpwuid(uid); if (pw) { snprintf(out, out_sz, "%s", pw->pw_name); } else { snprintf(out, out_sz, "UNKNOWN"); } } void platform_get_file_owner(const char *path, char *out_owner, size_t out_owner_size) { struct stat st; if (stat(path, &st) == 0) { get_file_owner(st.st_uid, out_owner, out_owner_size); } else { snprintf(out_owner, out_owner_size, "UNKNOWN"); } } #endif // ----------------------------- Scan helpers ----------------------------- typedef struct FileEntry { char *path; uint64_t size_bytes; uint64_t created_time; // epoch uint64_t modified_time; // epoch seconds char owner[128]; // resolved owner name } FileEntry; typedef struct { char buffer[MAX_PATHLEN]; char *base_end; // Points to end of base path char *filename_pos; // Points to where filename should be written size_t base_len; } PathBuilder; static void path_builder_init(PathBuilder *pb, const char *base) { pb->base_len = strlen(base); memcpy(pb->buffer, base, pb->base_len); pb->base_end = pb->buffer + pb->base_len; #if defined(_WIN32) || defined(_WIN64) *pb->base_end = '\\'; #elif defined(__linux__) *pb->base_end = '/'; #endif // Ensure null termination *(pb->base_end + 1) = '\0'; pb->filename_pos = pb->base_end + 1; } static void path_builder_set_filename(PathBuilder *pb, const char *filename, size_t name_len) { memcpy(pb->filename_pos, filename, name_len); pb->filename_pos[name_len] = '\0'; // Ensure null termination } static char *path_builder_dup_arena(PathBuilder *pb, mem_arena *arena, bool zero) { // Calculate total length including base + separator + filename + null // terminator size_t total_len = (pb->filename_pos - pb->buffer) + strlen(pb->filename_pos) + 1; char *dup = arena_push(&arena, total_len, zero); memcpy(dup, pb->buffer, total_len); return dup; } #if defined(_WIN32) || defined(_WIN64) void scan_folder(const char *base, ScannerContext *ctx) { PathBuilder pb; path_builder_init(&pb, base); char search[MAX_PATHLEN]; memcpy(search, pb.buffer, pb.base_len + 1); // Copy base + separator memcpy(search + pb.base_len + 1, "*", 2); // Add "*" and null WIN32_FIND_DATAA fd; HANDLE h = FindFirstFileA(search, &fd); if (h == INVALID_HANDLE_VALUE) return; do { // Skip . and .. if (fd.cFileName[0] == '.' && (fd.cFileName[1] == 0 || (fd.cFileName[1] == '.' && fd.cFileName[2] == 0))) continue; if (fd.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) continue; size_t name_len = strlen(fd.cFileName); path_builder_set_filename(&pb, fd.cFileName, name_len); if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { char *dir = path_builder_dup_arena(&pb, ctx->path_arena, false); mpmc_push_work(ctx->dir_queue, dir); } else { atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); // Create a temporary copy for normalization to avoid corrupting pb.buffer char temp_path[MAX_PATHLEN]; memcpy(temp_path, pb.buffer, (pb.filename_pos - pb.buffer) + name_len + 1); normalize_path(temp_path); fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time); platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); fe->size_bytes = ((uint64_t)fd.nFileSizeHigh << 32) | fd.nFileSizeLow; mpmc_push(ctx->file_queue, fe); } } while (FindNextFileA(h, &fd)); FindClose(h); } #elif defined(__linux__) static int platform_get_file_times_fd(int dir_fd, const char *name, time_t *created, time_t *modified) { struct stat st; if (fstatat(dir_fd, name, &st, 0) == 0) { *created = st.st_ctime; // or st.st_birthtime on systems that support it *modified = st.st_mtime; return 0; } return -1; } static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, size_t owner_size) { struct stat st; if (fstatat(dir_fd, name, &st, 0) == 0) { struct passwd pw; struct passwd *result; char buffer[4096]; // Sufficiently large buffer for passwd data // Reentrant version (thread-safe) if (getpwuid_r(st.st_uid, &pw, buffer, sizeof(buffer), &result) == 0 && result != NULL && result->pw_name != NULL) { strncpy(owner, result->pw_name, owner_size - 1); owner[owner_size - 1] = '\0'; } else { // Fallback to uid snprintf(owner, owner_size, "uid:%d", st.st_uid); } return 0; } return -1; void scan_folder(const char *base, ScannerContext *ctx) { PathBuilder pb; path_builder_init(&pb, base); int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW); if (dir_fd == -1) return; DIR *dir = fdopendir(dir_fd); if (!dir) { close(dir_fd); return; } struct dirent *entry; while ((entry = readdir(dir)) != NULL) { if (entry->d_name[0] == '.' && (entry->d_name[1] == 0 || (entry->d_name[1] == '.' && entry->d_name[2] == 0))) continue; size_t name_len = strlen(entry->d_name); path_builder_set_filename(&pb, entry->d_name, name_len); int file_type = DT_UNKNOWN; #ifdef _DIRENT_HAVE_D_TYPE file_type = entry->d_type; #endif // Fast path using d_type if (file_type != DT_UNKNOWN) { if (file_type == DT_LNK) continue; // Skip symlinks if (file_type == DT_DIR) { char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); mpmc_push_work(ctx->dir_queue, dir_path); continue; } if (file_type == DT_REG) { atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); // Use fstatat for file info struct stat st; if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) { // Convert times using fd variant platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time, &fe->modified_time); platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner, sizeof(fe->owner)); fe->size_bytes = (uint64_t)st.st_size; // Normalize path char temp_path[MAX_PATHLEN]; memcpy(temp_path, pb.buffer, (pb.filename_pos - pb.buffer) + name_len + 1); normalize_path(temp_path); fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); mpmc_push(ctx->file_queue, fe); } continue; } } // Fallback for unknown types struct stat st; if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) { if (S_ISLNK(st.st_mode)) continue; if (S_ISDIR(st.st_mode)) { char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); mpmc_push_work(ctx->dir_queue, dir_path); } else if (S_ISREG(st.st_mode)) { atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time); platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); fe->size_bytes = (uint64_t)st.st_size; char temp_path[MAX_PATHLEN]; memcpy(temp_path, pb.buffer, (pb.filename_pos - pb.buffer) + name_len + 1); normalize_path(temp_path); fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); mpmc_push(ctx->file_queue, fe); } } } closedir(dir); // Closes dir_fd automatically } #endif // ------------------------- Scan worker -------------------------------- static THREAD_RETURN scan_worker(void *arg) { ScannerContext *ctx = (ScannerContext *)arg; for (;;) { char *dir = mpmc_pop(ctx->dir_queue); if (!dir) break; scan_folder(dir, ctx); mpmc_task_done(ctx->dir_queue, ctx->num_threads); } return THREAD_RETURN_VALUE; } // ----------------------------- Hashing helpers ----------------------------- static void xxh3_hash_file_stream(const char *path, char *out_hex, unsigned char *buf) { XXH128_hash_t h; XXH3_state_t state; XXH3_128bits_reset(&state); FileHandle handle = os_file_open(path); if (handle == INVALID_FILE_HANDLE) { strcpy(out_hex, "ERROR"); return; } uint64_t bytes_read; while (os_file_read(handle, buf, READ_BLOCK, &bytes_read) == 0 && bytes_read > 0) { XXH3_128bits_update(&state, buf, (size_t)bytes_read); atomic_fetch_add(&g_bytes_processed, bytes_read); } os_file_close(handle); h = XXH3_128bits_digest(&state); snprintf(out_hex, HASH_STRLEN, "%016llx%016llx", (unsigned long long)h.high64, (unsigned long long)h.low64); } // ------------------------- Hash worker -------------------------------- static THREAD_RETURN hash_worker(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; unsigned char *buf = (unsigned char *)malloc(READ_BLOCK); for (;;) { FileEntry *fe = mpmc_pop(ctx->file_queue); if (!fe) break; char hash[HASH_STRLEN]; xxh3_hash_file_stream(fe->path, hash, buf); char created[32], modified[32]; format_time(fe->created_time, created, sizeof(created)); format_time(fe->modified_time, modified, sizeof(modified)); double size_kib = (double)fe->size_bytes / 1024.0; char stack_buf[1024]; int len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib, created, modified, fe->owner); char *dst = arena_push(&ctx->arena, len, false); memcpy(dst, stack_buf, len); atomic_fetch_add(&g_files_hashed, 1); } free(buf); return THREAD_RETURN_VALUE; } // ------------------------- Progress display --------------------------- static THREAD_RETURN progress_thread(void *arg) { (void)arg; HiResTimer progress_timer; timer_start(&progress_timer); uint64_t last_bytes = 0; double last_time = 0.0; double displayed_speed = 0.0; const double sample_interval = 0.5; // Hide cursor to prevent flickering printf("\033[?25l"); for (;;) { uint64_t found = atomic_load(&g_files_found); uint64_t hashed = atomic_load(&g_files_hashed); uint64_t bytes = atomic_load(&g_bytes_processed); int scan_done = atomic_load(&g_scan_done); double t = timer_elapsed(&progress_timer); double dt = t - last_time; if (dt >= sample_interval) { uint64_t db = (bytes > last_bytes) ? bytes - last_bytes : 0; displayed_speed = (double)db / (1024.0 * 1024.0) / dt; last_bytes = bytes; last_time = t; } printf("\r"); if (!scan_done) { printf("\033[1mScanning:\033[0m %llu files | Hashed: %llu | \033[32m%.2f " "MB/s\033[0m ", (unsigned long long)found, (unsigned long long)hashed, displayed_speed); } else { double pct = found ? (double)hashed / (double)found : 0.0; int barw = 40; int filled = (int)(pct * barw); printf("["); // Print filled part in Green (\033[32m) printf("\033[32m"); for (int i = 0; i < filled; i++) putchar('#'); // Reset color for empty part printf("\033[0m"); for (int i = filled; i < barw; i++) putchar('.'); printf("] %6.2f%% (%llu/%llu) \033[32m%.2f MB/s\033[0m ", pct * 100.0, (unsigned long long)hashed, (unsigned long long)found, displayed_speed); } fflush(stdout); if (scan_done && hashed == found) break; sleep_ms(100); } // Restore cursor (\033[?25h) and move to next line printf("\033[?25h\n"); return THREAD_RETURN_VALUE; } // ======================== Hash worker IO Ring ======================== // -------------------------- Configuration --------------------------- #define NUM_BUFFERS_PER_THREAD 16 #define SUBMIT_TIMEOUT_MS 30000 #define USERDATA_REGISTER 1 #define IORING_READ_BLOCK (KiB(1024)) // Globals u64 g_ioring_read_block = 4096 * 64; static atomic_uint_fast64_t g_io_ring_fallbacks = 0; // -------------------------- Buffer structure --------------------------- typedef struct IoBuffer { void *data; uint64_t offset; size_t size; size_t bytes_read; HRESULT result; int buffer_id; int completed; } IoBuffer; // ============================================================================ // Thread-local I/O Ring context // ============================================================================ typedef struct ThreadIoContext { HIORING ring; HANDLE completion_event; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD]; int free_count; int initialized; } ThreadIoContext; // -------------------------- File context --------------------------- typedef struct FileReadContext { HANDLE hFile; uint64_t file_size; XXH3_state_t hash_state; char path[MAX_PATH]; // Completion tracking int reads_submitted; int reads_completed; int reads_hashed; uint64_t bytes_hashed; int failed_reads; int active_reads; int buffers_ready; // Count of buffers ready to hash // For in-order hashing uint64_t next_hash_offset; // Next offset that needs to be hashed uint64_t next_read_offset; // Next offset to submit for reading } FileReadContext; static _Thread_local ThreadIoContext *g_thread_ctx = NULL; // ---------------------- Initialize thread context --------------------------- static ThreadIoContext *io_ring_init_thread(void) { if (g_thread_ctx && g_thread_ctx->initialized) { return g_thread_ctx; } if (!g_thread_ctx) { g_thread_ctx = (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext)); if (!g_thread_ctx) return NULL; } // Create I/O Ring IORING_CAPABILITIES caps; QueryIoRingCapabilities(&caps); UINT32 queue_size = min(4096, caps.MaxSubmissionQueueSize); IORING_CREATE_FLAGS flags = {0}; HRESULT hr = CreateIoRing(caps.MaxVersion, flags, queue_size, queue_size * 2, &g_thread_ctx->ring); // Create completion event g_thread_ctx->completion_event = CreateEvent(NULL, FALSE, FALSE, NULL); if (g_thread_ctx->completion_event) { SetIoRingCompletionEvent(g_thread_ctx->ring, g_thread_ctx->completion_event); } // Initialize buffer pool IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD]; u64 buf_pool_size = g_ioring_read_block * NUM_BUFFERS_PER_THREAD; // Reserve and Commit the entire memory chunk void *base_ptr = plat_mem_reserve(buf_pool_size); if (base_ptr) { if (!plat_mem_commit(base_ptr, buf_pool_size)) { plat_mem_release(base_ptr, 0); return NULL; } } else { return NULL; } for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) { g_thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_read_block); g_thread_ctx->buffer_pool[i] = i; g_thread_ctx->buffers[i].buffer_id = i; buf_info[i].Address = g_thread_ctx->buffers[i].data; buf_info[i].Length = (ULONG)g_ioring_read_block; } g_thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; HRESULT hb = BuildIoRingRegisterBuffers( g_thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); if (FAILED(hb)) { printf("Buffer registration failed: 0x%lx\n", hb); return NULL; } // Submit registration SubmitIoRing(g_thread_ctx->ring, 0, 0, NULL); g_thread_ctx->initialized = 1; return g_thread_ctx; } static void io_ring_cleanup_thread(void) { if (!g_thread_ctx) return; if (g_thread_ctx->completion_event) CloseHandle(g_thread_ctx->completion_event); if (g_thread_ctx->ring) CloseIoRing(g_thread_ctx->ring); plat_mem_release(g_thread_ctx->buffers[0].data, 0); free(g_thread_ctx); g_thread_ctx = NULL; } // ---------------------- Get a free buffer from pool ------------------------ static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { if (ctx->free_count == 0) { return NULL; } int idx = ctx->buffer_pool[--ctx->free_count]; IoBuffer *buf = &ctx->buffers[idx]; buf->completed = 0; buf->bytes_read = 0; buf->result = E_PENDING; return buf; } static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) { if (!buf) return; ctx->buffer_pool[ctx->free_count++] = buf->buffer_id; } // -------------------------- Submit async read --------------------------- static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx, IoBuffer *buf, uint64_t offset, size_t size) { buf->offset = offset; buf->size = size; IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->hFile); IORING_BUFFER_REF buffer_ref = IoRingBufferRefFromIndexAndOffset(buf->buffer_id, 0); HRESULT hr = BuildIoRingReadFile(ctx->ring, file_ref, buffer_ref, (UINT32)size, offset, (UINT_PTR)buf, IOSQE_FLAGS_NONE); if (SUCCEEDED(hr)) { file_ctx->active_reads++; file_ctx->reads_submitted++; } else { buf->completed = 1; return_buffer(ctx, buf); } return hr; } // -------------------------- Process completions --------------------------- static int process_completions(ThreadIoContext *ctx, FileReadContext *file_ctx) { IORING_CQE cqe; int processed = 0; while (PopIoRingCompletion(ctx->ring, &cqe) == S_OK) { if (cqe.UserData == USERDATA_REGISTER || cqe.UserData == 0) continue; IoBuffer *buf = (IoBuffer *)cqe.UserData; if (buf && !buf->completed) { buf->result = cqe.ResultCode; buf->bytes_read = (DWORD)cqe.Information; buf->completed = 1; if (SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) { file_ctx->active_reads--; file_ctx->reads_completed++; file_ctx->buffers_ready++; } else { file_ctx->failed_reads++; file_ctx->active_reads--; file_ctx->reads_completed++; } processed++; } } return processed; } // -------------------- Hash buffer in sequential order ----------------------- static int hash_sequential_buffers(ThreadIoContext *ctx, FileReadContext *file_ctx) { int hashed = 0; // Keep hashing while the next buffer in sequence is ready while (file_ctx->next_hash_offset < file_ctx->file_size) { // Find the buffer that contains next_hash_offset IoBuffer *found_buf = NULL; for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) { IoBuffer *buf = &ctx->buffers[i]; if (buf->completed && buf->offset == file_ctx->next_hash_offset) { found_buf = buf; break; } } if (!found_buf) break; // Buffer not ready yet // Found the correct buffer in order - hash it if (SUCCEEDED(found_buf->result) && found_buf->bytes_read > 0) { XXH3_128bits_update(&file_ctx->hash_state, found_buf->data, found_buf->bytes_read); atomic_fetch_add(&g_bytes_processed, found_buf->bytes_read); // Update bytes_hashed for this file! file_ctx->bytes_hashed += found_buf->bytes_read; file_ctx->reads_hashed++; file_ctx->buffers_ready--; // Mark as processed and return buffer to pool found_buf->completed = 0; return_buffer(ctx, found_buf); // Move to next offset file_ctx->next_hash_offset += found_buf->size; hashed++; } else if (found_buf->bytes_read == 0 && SUCCEEDED(found_buf->result)) { // Read operation failed with an error code file_ctx->reads_hashed++; file_ctx->buffers_ready--; found_buf->completed = 0; return_buffer(ctx, found_buf); file_ctx->next_hash_offset += found_buf->size; hashed++; } else { // Read failed file_ctx->failed_reads++; file_ctx->reads_hashed++; file_ctx->buffers_ready--; found_buf->completed = 0; return_buffer(ctx, found_buf); file_ctx->next_hash_offset += found_buf->size; hashed++; } } return hashed; } // ------------- Submit pending reads - fill all free buffers ----------------- static int submit_pending_reads(ThreadIoContext *ctx, FileReadContext *file_ctx) { int submitted = 0; while (1) { // Check if we have more data to read uint64_t current_offset = file_ctx->next_read_offset; int has_data = (current_offset < file_ctx->file_size); if (!has_data) break; // Get a free buffer IoBuffer *buf = get_free_buffer(ctx); if (!buf) break; size_t remaining = file_ctx->file_size - current_offset; size_t bytes_to_read; if (remaining >= g_ioring_read_block) { bytes_to_read = g_ioring_read_block; } else { // Round UP to sector size (4096) bytes_to_read = ALIGN_UP_POW2(remaining, g_pagesize); } HRESULT hr = submit_read(ctx, file_ctx, buf, current_offset, bytes_to_read); if (SUCCEEDED(hr)) { submitted++; file_ctx->next_read_offset += bytes_to_read; } else { return_buffer(ctx, buf); break; } } return submitted; } // -------------------------- Wait for completions --------------------------- static void wait_for_completions(ThreadIoContext *ctx, FileReadContext *file_ctx) { int has_active = (file_ctx->active_reads > 0); if (has_active && ctx->completion_event) { WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); } } // ---------------------- Main parallel hashing function ---------------------- static void xxh3_hash_file_parallel(ThreadIoContext *ctx, const char *path, char *out_hex, unsigned char *temp_buffer) { // Validate I/O Ring if (!ctx || !ctx->ring) { xxh3_hash_file_stream(path, out_hex, temp_buffer); return; } HANDLE hFile = CreateFileA( path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL); if (hFile == INVALID_HANDLE_VALUE) { xxh3_hash_file_stream(path, out_hex, temp_buffer); return; } LARGE_INTEGER file_size; if (!GetFileSizeEx(hFile, &file_size)) { xxh3_hash_file_stream(path, out_hex, temp_buffer); CloseHandle(hFile); return; } FileReadContext file_ctx; memset(&file_ctx, 0, sizeof(file_ctx)); file_ctx.hFile = hFile; file_ctx.file_size = file_size.QuadPart; file_ctx.next_hash_offset = 0; file_ctx.next_read_offset = 0; strncpy(file_ctx.path, path, MAX_PATH - 1); file_ctx.path[MAX_PATH - 1] = 0; XXH3_128bits_reset(&file_ctx.hash_state); // Submit initial reads submit_pending_reads(ctx, &file_ctx); if (file_ctx.reads_submitted > 0) { UINT32 submitted = 0; SubmitIoRing(ctx->ring, 0, 0, &submitted); } // Main loop while (file_ctx.reads_hashed < file_ctx.reads_submitted) { // Process completions process_completions(ctx, &file_ctx); // Hash buffers in sequential order (critical!) hash_sequential_buffers(ctx, &file_ctx); // Submit more reads if needed if (file_ctx.active_reads < NUM_BUFFERS_PER_THREAD && file_ctx.next_read_offset < file_ctx.file_size) { int new_reads = submit_pending_reads(ctx, &file_ctx); if (new_reads > 0) { UINT32 submitted = 0; SubmitIoRing(ctx->ring, 0, 0, &submitted); } } // Wait if nothing to hash and active reads exist if (file_ctx.active_reads > 0 && file_ctx.buffers_ready == 0) { wait_for_completions(ctx, &file_ctx); } } // Final verification if (file_ctx.bytes_hashed == file_ctx.file_size && file_ctx.failed_reads == 0) { XXH128_hash_t h = XXH3_128bits_digest(&file_ctx.hash_state); snprintf(out_hex, HASH_STRLEN, "%016llx%016llx", (unsigned long long)h.high64, (unsigned long long)h.low64); } else { if (file_ctx.bytes_hashed != file_ctx.file_size) { atomic_fetch_add(&g_io_ring_fallbacks, 1); } xxh3_hash_file_stream(path, out_hex, temp_buffer); } CloseHandle(hFile); } // -------------------------- Hash worker I/O Ring --------------------------- static THREAD_RETURN hash_worker_io_ring(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; unsigned char *temp_buffer = (unsigned char *)malloc(READ_BLOCK); char hash[HASH_STRLEN]; if (!temp_buffer) return THREAD_RETURN_VALUE; // Initialize I/O Ring for this thread ThreadIoContext *ring_ctx = io_ring_init_thread(); if (!ring_ctx || !ring_ctx->ring) { printf("Thread %lu: I/O Ring unavailable, using buffered I/O\n", GetCurrentThreadId()); free(temp_buffer); return hash_worker(arg); } for (;;) { FileEntry *fe = mpmc_pop(ctx->file_queue); if (!fe) break; // Pass the I/O Ring context to the hashing function xxh3_hash_file_parallel(ring_ctx, fe->path, hash, temp_buffer); char created[32], modified[32]; format_time(fe->created_time, created, sizeof(created)); format_time(fe->modified_time, modified, sizeof(modified)); double size_kib = (double)fe->size_bytes / 1024.0; char stack_buf[1024]; int len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib, created, modified, fe->owner); char *dst = arena_push(&ctx->arena, len, false); memcpy(dst, stack_buf, len); atomic_fetch_add(&g_files_hashed, 1); } io_ring_cleanup_thread(); free(temp_buffer); return THREAD_RETURN_VALUE; }