#pragma once #include "arena.h" #include "base.h" #include "sm_mpmc.h" #include "arena.c" #include #include // xxhash include #define XXH_STATIC_LINKING_ONLY #include "xxh_x86dispatch.h" #include "config.h" // ----------------------------- Globals ------------------------------------ static atomic_uint_fast64_t g_files_found = 0; static atomic_uint_fast64_t g_files_hashed = 0; static atomic_uint_fast64_t g_bytes_processed = 0; static atomic_int g_scan_done = 0; #define HASH_STRLEN 33 // 128-bit hex (32 chars) + null terminator #define MAX_PATHLEN KiB(4) // ================== OS-agnostic functions abstraction ===================== // --------------------- Timer functions --------------------- typedef struct { u64 start; u64 now; } HiResTimer; #if defined(_WIN32) || defined(_WIN64) static LARGE_INTEGER g_freq; static void timer_init(void) { QueryPerformanceFrequency(&g_freq); } static void timer_start(HiResTimer *t) { LARGE_INTEGER v; QueryPerformanceCounter(&v); t->start = v.QuadPart; } static double timer_elapsed(HiResTimer *t) { LARGE_INTEGER v; QueryPerformanceCounter(&v); t->now = v.QuadPart; return (double)(t->now - t->start) / (double)g_freq.QuadPart; } #elif defined(__linux__) void timer_init(void) {} void timer_start(HiResTimer *t) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); t->start = ts.tv_sec * 1000000000ULL + ts.tv_nsec; } double timer_elapsed(HiResTimer *t) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); uint64_t now = ts.tv_sec * 1000000000ULL + ts.tv_nsec; return (double)(now - t->start) / 1e9; } #endif // ------------------- Get HW info -------------------- #if defined(_WIN32) || defined(_WIN64) size_t platform_physical_cores(void) { DWORD len = 0; GetLogicalProcessorInformation(NULL, &len); SYSTEM_LOGICAL_PROCESSOR_INFORMATION buf[len]; GetLogicalProcessorInformation(buf, &len); DWORD count = 0; DWORD n = len / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION); for (DWORD i = 0; i < n; i++) { if (buf[i].Relationship == RelationProcessorCore) count++; } return count ? count : 1; } #elif defined(__linux__) size_t platform_physical_cores(void) { long n = sysconf(_SC_NPROCESSORS_ONLN); return n > 0 ? (size_t)n : 1; } #endif const char *get_xxhash_instruction_set(void) { int vecID = XXH_featureTest(); switch (vecID) { case XXH_SCALAR: return "Scalar (portable C)"; case XXH_SSE2: return "SSE2"; case XXH_AVX2: return "AVX2"; case XXH_AVX512: return "AVX-512"; default: return "Unknown"; } } // -------------------- File IO ------------------- #if defined(_WIN32) || defined(_WIN64) typedef HANDLE FileHandle; #define FLAG_SEQUENTIAL_READ FILE_FLAG_SEQUENTIAL_SCAN #define FLAG_ASYNC_DIRECT_READ (FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING) #define INVALID_FILE_HANDLE INVALID_HANDLE_VALUE // File open function static FileHandle os_file_open(const char *path, DWORD flags) { return CreateFileA(path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, flags, NULL); } // File read function static int os_file_read(FileHandle handle, void *buf, size_t count, uint64_t *bytes_read) { DWORD read = 0; BOOL result = ReadFile(handle, buf, (DWORD)count, &read, NULL); *bytes_read = read; return (result && read > 0) ? 0 : -1; } // File close function static void os_file_close(FileHandle handle) { CloseHandle(handle); } #elif defined(__linux__) typedef int FileHandle; #define FLAG_SEQUENTIAL_READ (0) #define FLAG_ASYNC_DIRECT_READ (O_DIRECT) #define INVALID_FILE_HANDLE (-1) // File open function static FileHandle os_file_open(const char *path, int flags) { // Combine your mandatory flags with the user-provided flag int fd = open(path, O_RDONLY | O_NOFOLLOW | flags); // If sequential was requested, advise the kernel if (fd != -1 && (flags == FLAG_SEQUENTIAL_READ)) { posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); } return fd; } // File read function static int os_file_read(FileHandle handle, void *buf, size_t count, uint64_t *bytes_read) { ssize_t result = read(handle, buf, count); if (result >= 0) { *bytes_read = (uint64_t)result; return 0; } *bytes_read = 0; return -1; } // File close function static void os_file_close(FileHandle handle) { close(handle); } #endif // -------------------- Thread abstraction ------------------- // Threads context typedef struct { mem_arena *path_arena; mem_arena *meta_arena; MPMCQueue *dir_queue; MPMCQueue *file_queue; u8 num_threads; } ScannerContext; typedef struct { mem_arena *arena; MPMCQueue *file_queue; } WorkerContext; #if defined(_WIN32) || defined(_WIN64) typedef HANDLE ThreadHandle; typedef DWORD(WINAPI *ThreadFunc)(void *); #define THREAD_RETURN DWORD WINAPI #define THREAD_RETURN_VALUE 0; typedef struct { ThreadHandle handle; int valid; // Track if thread was successfully created } Thread; // Thread function wrapper to handle different return types #define THREAD_FUNCTION(name) DWORD WINAPI name(LPVOID arg) // Thread creation function static int thread_create(Thread *thread, ThreadFunc func, void *arg) { thread->handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)func, arg, 0, NULL); return (thread->handle != NULL) ? 0 : -1; } // Thread join function static int thread_join(Thread *thread) { return (WaitForSingleObject(thread->handle, INFINITE) == WAIT_OBJECT_0) ? 0 : -1; } // Thread close/detach function static void thread_close(Thread *thread) { CloseHandle(thread->handle); } // Wait for multiple threads static int thread_wait_multiple(Thread *threads, size_t count) { HANDLE handles[64]; // Max 64 threads for Windows for (size_t i = 0; i < count; i++) { handles[i] = threads[i].handle; } return (WaitForMultipleObjects((DWORD)count, handles, TRUE, INFINITE) == WAIT_OBJECT_0) ? 0 : -1; } #elif defined(__linux__) typedef pthread_t ThreadHandle; typedef void *(*ThreadFunc)(void *); #define THREAD_RETURN void * #define THREAD_RETURN_VALUE NULL; typedef struct { ThreadHandle handle; int valid; // Track if thread was successfully created } Thread; // Thread function wrapper to handle different return types typedef struct { void *(*func)(void *); void *arg; } ThreadWrapper; // Thread creation function static int thread_create(Thread *thread, ThreadFunc func, void *arg) { int ret = pthread_create(&thread->handle, NULL, func, arg); if (ret == 0) { thread->valid = 1; } return ret; } // Thread join function static int thread_join(Thread *thread) { int ret = pthread_join(thread->handle, NULL); thread->valid = 0; return ret; } // Thread close/detach function static void thread_close(Thread *thread) { if (thread->valid) { pthread_detach(thread->handle); thread->valid = 0; } } // Wait for multiple threads static int thread_wait_multiple(Thread *threads, size_t count) { for (size_t i = 0; i < count; i++) { if (thread_join(&threads[i]) != 0) { return -1; } } return 0; } #endif // ======================== Get file metadata ======================== // -------------------- Path parsing ------------------- static void normalize_path(char *p) { char *src = p; char *dst = p; int prev_slash = 0; while (*src) { char c = *src++; if (c == '\\' || c == '/') { if (!prev_slash) { *dst++ = '/'; prev_slash = 1; } } else { *dst++ = c; prev_slash = 0; } } *dst = '\0'; } static int parse_paths(char *line, char folders[][MAX_PATHLEN], int max_folders) { int count = 0; char *p = line; while (*p && count < max_folders) { while (*p && isspace((unsigned char)*p)) p++; if (!*p) break; char *start; char quote = 0; if (*p == '"' || *p == '\'') { quote = *p++; start = p; while (*p && *p != quote) p++; } else { start = p; while (*p && !isspace((unsigned char)*p)) p++; } size_t len = p - start; if (len >= MAX_PATHLEN) len = MAX_PATHLEN - 1; memcpy(folders[count], start, len); folders[count][len] = 0; normalize_path(folders[count]); count++; if (quote && *p == quote) p++; } return count; } // ------------------------- File time ------------------------- #if FILE_TIMES #if defined(_WIN32) || defined(_WIN64) static void format_time(uint64_t t, char *out, size_t out_sz) { if (t == 0) { snprintf(out, out_sz, "N/A"); return; } time_t tt = (time_t)t; struct tm tm; localtime_s(&tm, &tt); strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); } // ------------------ Convert filetime to epoch ------------------- static uint64_t filetime_to_epoch(const FILETIME *ft) { ULARGE_INTEGER ull; ull.LowPart = ft->dwLowDateTime; ull.HighPart = ft->dwHighDateTime; // Windows epoch (1601) ¬ニメ Unix epoch (1970) return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL; } void platform_get_file_times(const char *path, uint64_t *out_created, uint64_t *out_modified) { WIN32_FILE_ATTRIBUTE_DATA fad; if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) { *out_created = filetime_to_epoch(&fad.ftCreationTime); *out_modified = filetime_to_epoch(&fad.ftLastWriteTime); } else { *out_created = 0; *out_modified = 0; } } #elif defined(__linux__) static void format_time(uint64_t t, char *out, size_t out_sz) { if (t == 0) { snprintf(out, out_sz, "N/A"); return; } time_t tt = (time_t)t; struct tm tm; localtime_r(&tt, &tm); strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); } void platform_get_file_times(const char *path, uint64_t *out_created, uint64_t *out_modified) { struct stat st; if (stat(path, &st) == 0) { *out_created = (uint64_t)st.st_ctime; *out_modified = (uint64_t)st.st_mtime; } else { *out_created = 0; *out_modified = 0; } } static int platform_get_file_times_fd(int dir_fd, const char *name, uint64_t *created, uint64_t *modified) { struct stat st; if (fstatat(dir_fd, name, &st, 0) == 0) { *created = st.st_ctime; // or st.st_birthtime on systems that support it *modified = st.st_mtime; return 0; } return -1; } #endif #endif // -------------------- File owner --------------------- #if defined(_WIN32) || defined(_WIN64) #if FILE_OWNER void platform_get_file_owner(const char *path, char *out_owner, size_t out_owner_size) { PSID sid = NULL; PSECURITY_DESCRIPTOR sd = NULL; if (GetNamedSecurityInfoA(path, SE_FILE_OBJECT, OWNER_SECURITY_INFORMATION, &sid, NULL, NULL, NULL, &sd) == ERROR_SUCCESS) { char name[64], domain[64]; DWORD name_len = sizeof(name); DWORD domain_len = sizeof(domain); SID_NAME_USE use; if (LookupAccountSidA(NULL, sid, name, &name_len, domain, &domain_len, &use)) { snprintf(out_owner, out_owner_size, "%s\\%s", domain, name); } else { snprintf(out_owner, out_owner_size, "UNKNOWN"); } } else { snprintf(out_owner, out_owner_size, "UNKNOWN"); } if (sd) LocalFree(sd); } #endif #elif defined(__linux__) #if FILE_OWNER void platform_get_file_owner(const char *path, char *out_owner, size_t out_owner_size) { struct stat st; const char *owner = "UNKNOWN"; if (stat(path, &st) == 0) { struct passwd *pw = getpwuid(st.st_uid); if (pw) { owner = pw->pw_name; } } snprintf(out_owner, out_owner_size, "%s", owner); } static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, size_t owner_size) { struct stat st; if (fstatat(dir_fd, name, &st, 0) == 0) { struct passwd pw; struct passwd *result; char buffer[4096]; // Sufficiently large buffer for passwd data // Reentrant version (thread-safe) if (getpwuid_r(st.st_uid, &pw, buffer, sizeof(buffer), &result) == 0 && result != NULL && result->pw_name != NULL) { strncpy(owner, result->pw_name, owner_size - 1); owner[owner_size - 1] = '\0'; } else { // Fallback to uid snprintf(owner, owner_size, "uid:%d", st.st_uid); } return 0; } return -1; } #endif // ----------------------------- File system ----------------------------- #if CHECK_FILE_SYSTEM typedef enum FileSystemType { FS_UNKNOWN = 0, FS_EXT4, FS_XFS, FS_BTRFS, FS_TMPFS, FS_NFS, FS_CIFS, FS_FAT, FS_EXFAT, FS_NTFS, FS_ZFS, FS_F2FS, FS_EROFS, FS_VIRTIOFS, FS_OVERLAY, FS_HUGETLBFS, FS_SQUASHFS, FS_PROC, FS_SYSFS, } FileSystemType; static inline FileSystemType fs_from_magic(long type) { switch (type) { case 0xEF53: return FS_EXT4; case 0x58465342: return FS_XFS; case 0x9123683E: return FS_BTRFS; case 0x01021994: return FS_TMPFS; case 0x6969: return FS_NFS; case 0xFF534D42: return FS_CIFS; case 0x4d44: return FS_FAT; case 0x2011BAB0: return FS_EXFAT; case 0x5346544E: return FS_NTFS; case 0x2FC1211: return FS_ZFS; case 0xF2F52010: return FS_F2FS; case 0xE0F5E1E2: return FS_EROFS; case 0x56495254: return FS_VIRTIOFS; case 0x794C764F: return FS_OVERLAY; case 0x958458f6: return FS_HUGETLBFS; case 0x73717368: return FS_SQUASHFS; case 0x9fa0: return FS_PROC; case 0x62656572: return FS_SYSFS; default: return FS_UNKNOWN; } } // Yes it is officially called "magic number" or "signature" in the // documentation typedef enum { FS_POLICY_BUFFERED, FS_POLICY_DIRECT_OK, } FsPolicy; static inline FsPolicy fs_policy(FileSystemType fs) { switch (fs) { case FS_EXT4: case FS_XFS: case FS_BTRFS: case FS_ZFS: case FS_F2FS: case FS_NFS: case FS_CIFS: case FS_VIRTIOFS: return FS_POLICY_DIRECT_OK; case FS_TMPFS: // Resides in Page Cache; O_DIRECT generally returns EINVAL case FS_EROFS: // Read-only filesystem; O_DIRECT support is // uncommon/restricted case FS_FAT: // Generally does not implement direct_IO address space ops case FS_EXFAT: // Limited support depending on driver implementation case FS_NTFS: // Depends on driver (ntfs3 supports it, older ntfs-3g does not) default: return FS_POLICY_BUFFERED; } } #endif #endif // ----------------------------- Scan helpers ----------------------------- typedef struct FileEntry { char *path; uint64_t size_bytes; #if FILE_TIMES uint64_t created_time; // epoch uint64_t modified_time; // epoch seconds #endif #if FILE_OWNER char owner[128]; // resolved owner name #endif #if CHECK_FILE_SYSTEM // Linux only FileSystemType fs_type; #endif } FileEntry; typedef struct { char buffer[MAX_PATHLEN]; char *base_end; // Points to end of base path char *filename_pos; // Points to where filename should be written size_t base_len; } PathBuilder; static void path_builder_init(PathBuilder *pb, const char *base) { pb->base_len = strlen(base); memcpy(pb->buffer, base, pb->base_len); pb->base_end = pb->buffer + pb->base_len; #if defined(_WIN32) || defined(_WIN64) *pb->base_end = '\\'; #elif defined(__linux__) *pb->base_end = '/'; #endif // Ensure null termination *(pb->base_end + 1) = '\0'; pb->filename_pos = pb->base_end + 1; } static void path_builder_set_filename(PathBuilder *pb, const char *filename, size_t name_len) { memcpy(pb->filename_pos, filename, name_len); pb->filename_pos[name_len] = '\0'; // Ensure null termination } static char *path_builder_dup_arena(PathBuilder *pb, mem_arena *arena, bool zero) { // Calculate total length including base + separator + filename + null // terminator size_t total_len = (pb->filename_pos - pb->buffer) + strlen(pb->filename_pos) + 1; char *dup = arena_push(&arena, total_len, zero); memcpy(dup, pb->buffer, total_len); return dup; } #if defined(_WIN32) || defined(_WIN64) void scan_folder(const char *base, ScannerContext *ctx) { PathBuilder pb; path_builder_init(&pb, base); char search[MAX_PATHLEN]; memcpy(search, pb.buffer, pb.base_len + 1); // Copy base + separator memcpy(search + pb.base_len + 1, "*", 2); // Add "*" and null WIN32_FIND_DATAA fd; HANDLE h = FindFirstFileA(search, &fd); if (h == INVALID_HANDLE_VALUE) return; do { // Skip . and .. if (fd.cFileName[0] == '.' && (fd.cFileName[1] == 0 || (fd.cFileName[1] == '.' && fd.cFileName[2] == 0))) continue; if (fd.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) continue; size_t name_len = strlen(fd.cFileName); path_builder_set_filename(&pb, fd.cFileName, name_len); // If it's a directory: if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { char *dir = path_builder_dup_arena(&pb, ctx->path_arena, false); mpmc_push_work(ctx->dir_queue, dir); } else { // else a file: atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); // Create a temporary copy for normalization to avoid corrupting pb.buffer char temp_path[MAX_PATHLEN]; memcpy(temp_path, pb.buffer, (pb.filename_pos - pb.buffer) + name_len + 1); normalize_path(temp_path); fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); #if FILE_TIMES platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time); #endif #if FILE_OWNER platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); #endif fe->size_bytes = ((uint64_t)fd.nFileSizeHigh << 32) | fd.nFileSizeLow; mpmc_push(ctx->file_queue, fe); } } while (FindNextFileA(h, &fd)); FindClose(h); } #elif defined(__linux__) void scan_folder(const char *base, ScannerContext *ctx) { PathBuilder pb; path_builder_init(&pb, base); int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW); if (dir_fd == -1) return; #if CHECK_FILE_SYSTEM struct statfs fs; FileSystemType fs_type = FS_UNKNOWN; if (fstatfs(dir_fd, &fs) == 0) { fs_type = fs_from_magic(fs.f_type); } #endif DIR *dir = fdopendir(dir_fd); if (!dir) { close(dir_fd); return; } struct dirent *entry; while ((entry = readdir(dir)) != NULL) { if (entry->d_name[0] == '.' && (entry->d_name[1] == 0 || (entry->d_name[1] == '.' && entry->d_name[2] == 0))) continue; size_t name_len = strlen(entry->d_name); path_builder_set_filename(&pb, entry->d_name, name_len); int file_type = DT_UNKNOWN; #ifdef _DIRENT_HAVE_D_TYPE file_type = entry->d_type; #endif // Fast path using d_type if (file_type != DT_UNKNOWN) { if (file_type == DT_LNK) continue; // Skip symlinks if (file_type == DT_DIR) { char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); mpmc_push_work(ctx->dir_queue, dir_path); continue; } if (file_type == DT_REG) { atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); // Use fstatat for file info struct stat st; if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) { #if FILE_TIMES platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time, &fe->modified_time); #endif #if FILE_OWNER platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner, sizeof(fe->owner)); #endif fe->size_bytes = (uint64_t)st.st_size; // Normalize path char temp_path[MAX_PATHLEN]; memcpy(temp_path, pb.buffer, (pb.filename_pos - pb.buffer) + name_len + 1); normalize_path(temp_path); fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); #if CHECK_FILE_SYSTEM fe->fs_type = fs_type; #endif mpmc_push(ctx->file_queue, fe); } continue; } } // Fallback for unknown types struct stat st; if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) { if (S_ISLNK(st.st_mode)) continue; if (S_ISDIR(st.st_mode)) { char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); mpmc_push_work(ctx->dir_queue, dir_path); } else if (S_ISREG(st.st_mode)) { atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); #if FILE_TIMES platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time); #endif #if FILE_OWNER platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); #endif fe->size_bytes = (uint64_t)st.st_size; char temp_path[MAX_PATHLEN]; memcpy(temp_path, pb.buffer, (pb.filename_pos - pb.buffer) + name_len + 1); normalize_path(temp_path); fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); #if CHECK_FILE_SYSTEM fe->fs_type = fs_type; #endif mpmc_push(ctx->file_queue, fe); } } } closedir(dir); } #endif // ------------------------- Scan worker -------------------------------- static THREAD_RETURN scan_worker(void *arg) { ScannerContext *ctx = (ScannerContext *)arg; for (;;) { char *dir = mpmc_pop(ctx->dir_queue); if (!dir) break; scan_folder(dir, ctx); mpmc_task_done(ctx->dir_queue, ctx->num_threads); } return THREAD_RETURN_VALUE; } // ----------------------------- Hashing helpers ----------------------------- static void xxh3_hash_file_stream(const char *path, char *out_hex, unsigned char *buf) { XXH128_hash_t h; XXH3_state_t state; XXH3_128bits_reset(&state); FileHandle handle = os_file_open(path, FLAG_SEQUENTIAL_READ); if (handle == INVALID_FILE_HANDLE) { strcpy(out_hex, "ERROR"); return; } uint64_t bytes_read; while (os_file_read(handle, buf, READ_BLOCK, &bytes_read) == 0 && bytes_read > 0) { XXH3_128bits_update(&state, buf, (size_t)bytes_read); atomic_fetch_add(&g_bytes_processed, bytes_read); } os_file_close(handle); h = XXH3_128bits_digest(&state); snprintf(out_hex, HASH_STRLEN, "%016llx%016llx", (unsigned long long)h.high64, (unsigned long long)h.low64); } // ------------------------- Hash worker -------------------------------- static THREAD_RETURN hash_worker(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; void *buf = malloc(READ_BLOCK); for (;;) { FileEntry *fe = mpmc_pop(ctx->file_queue); if (!fe) break; char hash[HASH_STRLEN]; xxh3_hash_file_stream(fe->path, hash, buf); double size_kib = (double)fe->size_bytes / 1024.0; char stack_buf[KiB(4)]; int len; #if FILE_TIMES char created[32], modified[32]; format_time(fe->created_time, created, sizeof(created)); format_time(fe->modified_time, modified, sizeof(modified)); #endif #if FILE_TIMES && FILE_OWNER len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib, created, modified, fe->owner); #elif FILE_TIMES len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\n", hash, fe->path, size_kib, created, modified); #elif FILE_OWNER len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\n", hash, fe->path, size_kib, fe->owner); #else len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\n", hash, fe->path, size_kib); #endif char *dst = arena_push(&ctx->arena, len, false); memcpy(dst, stack_buf, len); atomic_fetch_add(&g_files_hashed, 1); } free(buf); return THREAD_RETURN_VALUE; } // ------------------------- Progress display --------------------------- static THREAD_RETURN progress_thread(void *arg) { (void)arg; HiResTimer progress_timer; timer_start(&progress_timer); uint64_t last_bytes = 0; double last_time = 0.0; double displayed_speed = 0.0; const double sample_interval = 0.5; // Hide cursor to prevent flickering printf("\033[?25l"); for (;;) { uint64_t found = atomic_load(&g_files_found); uint64_t hashed = atomic_load(&g_files_hashed); uint64_t bytes = atomic_load(&g_bytes_processed); int scan_done = atomic_load(&g_scan_done); double t = timer_elapsed(&progress_timer); double dt = t - last_time; if (dt >= sample_interval) { uint64_t db = (bytes > last_bytes) ? bytes - last_bytes : 0; displayed_speed = (double)db / (1024.0 * 1024.0) / dt; last_bytes = bytes; last_time = t; } printf("\r"); if (!scan_done) { printf("\033[1mScanning:\033[0m %llu files | Hashed: %llu | \033[32m%.2f " "MB/s\033[0m ", (unsigned long long)found, (unsigned long long)hashed, displayed_speed); } else { double pct = found ? (double)hashed / (double)found : 0.0; int barw = 40; int filled = (int)(pct * barw); printf("["); // Print filled part in Green (\033[32m) printf("\033[32m"); for (int i = 0; i < filled; i++) putchar('#'); // Reset color for empty part printf("\033[0m"); for (int i = filled; i < barw; i++) putchar('.'); printf("] %6.2f%% (%llu/%llu) \033[32m%.2f MB/s\033[0m ", pct * 100.0, (unsigned long long)hashed, (unsigned long long)found, displayed_speed); } fflush(stdout); if (scan_done && hashed == found) break; sleep_ms(100); } // Restore cursor (\033[?25h) and move to next line printf("\033[?25h\n"); return THREAD_RETURN_VALUE; } // ======================== IO Ring implementation ======================== #if USE_IORING // -------------------------- Data structures --------------------------- // Globals u64 g_ioring_buffer_size = 4096 * 64; static atomic_uint_fast64_t g_io_ring_fallbacks = 0; #define IO_PENDING INT_MIN typedef struct IoBuffer IoBuffer; #if defined(_WIN32) || defined(_WIN64) // Windows I/O Ring types typedef HIORING IoRingHandle; #define BUILD_READ_RETURN_VALUE HRESULT #elif defined(__linux__) // Linux io_uring types typedef struct { struct io_uring ring; struct io_uring_cqe *cqe_cache; int cqe_cache_index; int cqe_cache_count; } IoUring; typedef IoUring *IoRingHandle; typedef struct iovec IORING_BUFFER_INFO; #define BUILD_READ_RETURN_VALUE int #endif typedef struct FileReadContext { FileEntry *fe; size_t file_size; // For in-order hashing size_t next_read_offset; IoBuffer *head; IoBuffer *tail; // Completion tracking size_t bytes_hashed; uint32_t reads_hashed; uint32_t reads_submitted; uint32_t reads_completed; uint32_t active_reads; union { XXH3_state_t hash_state; // For incremental hash (large files) XXH128_hash_t single_hash; // For single-shot hash (small files) }; FileHandle file_handle; #if USE_REGISTERED_FILES uint32_t slot_id; #endif bool use_incremental_hash; bool completed; } FileReadContext; // -------------------------- Buffer structure --------------------------- typedef struct IoBuffer { FileReadContext *file; void *data; size_t size; size_t offset; size_t bytes_read; BUILD_READ_RETURN_VALUE result; int buffer_id; struct IoBuffer *next; } IoBuffer; // Thread-local I/O Ring context #if defined(_WIN32) || defined(_WIN64) typedef struct ThreadIoContext { IoRingHandle ring; void *completion_event; void *fallback_buffer; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD]; int free_count; int num_submissions; int active_files; bool submitting; #if USE_REGISTERED_FILES bool use_registered_files; FileHandle registered_handles[MAX_ACTIVE_FILES]; #endif } ThreadIoContext; #elif defined(__linux__) typedef struct ThreadIoContext { IoRingHandle ring; void *fallback_buffer; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD]; int free_count; int num_submissions; int active_files; bool submitting; bool use_registered_buffers; bool use_registered_files; } ThreadIoContext; #endif typedef struct { uint32_t MaxSubmissionQueueSize; uint32_t MaxCompletionQueueSize; uint32_t MaxVersion; } IoRingCapabilities; // ------------------------ IO Ring Abstraction ------------------------- #if defined(_WIN32) || defined(_WIN64) // Windows I/O Ring functions static void ioring_query_capabilities(IoRingCapabilities *caps) { IORING_CAPABILITIES win_caps; QueryIoRingCapabilities(&win_caps); caps->MaxSubmissionQueueSize = win_caps.MaxSubmissionQueueSize; caps->MaxCompletionQueueSize = win_caps.MaxCompletionQueueSize; caps->MaxVersion = win_caps.MaxVersion; } static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { IORING_CREATE_FLAGS flags = {0}; HRESULT hr = CreateIoRing(IORING_VERSION_3, flags, queue_size, queue_size * 2, &thread_ctx->ring); // Create completion event thread_ctx->completion_event = CreateEvent(NULL, FALSE, FALSE, NULL); if (thread_ctx->completion_event) SetIoRingCompletionEvent(thread_ctx->ring, thread_ctx->completion_event); return SUCCEEDED(hr) ? 0 : -1; } static int close_ioring(ThreadIoContext *thread_ctx) { if (thread_ctx->completion_event) CloseHandle(thread_ctx->completion_event); CloseIoRing(thread_ctx->ring); return 0; } #define USERDATA_REGISTER 1 #define MAKE_BUF_INFO(a, l) \ (IORING_BUFFER_INFO) { .Address = (a), .Length = (uint32_t)(l) } static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) { // uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT); // The wait_count in windows is not implemented yet, so we wait in // ioring_pop_completion() HRESULT hr; // if (wait_count > 0) { // hr = SubmitIoRing(ring, wait_count, SUBMIT_TIMEOUT_MS, submitted); // } else { hr = SubmitIoRing(thread_ctx->ring, 0, SUBMIT_TIMEOUT_MS, submitted); // } if (thread_ctx->num_submissions > 0) { WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS); } return SUCCEEDED(hr) ? 0 : -1; } static void ioring_register_buffers(ThreadIoContext *thread_ctx, IORING_BUFFER_INFO *buf_info) { HRESULT hr = BuildIoRingRegisterBuffers( thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); if (FAILED(hr)) { char error_msg[256]; FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), error_msg, sizeof(error_msg), NULL); fprintf(stderr, "WARNING: Error registering buffers: %s (0x%08X)\n", error_msg, (unsigned int)hr); } // Submit registration ioring_submit(thread_ctx, NULL); } #if USE_REGISTERED_FILES static void ioring_register_files(ThreadIoContext *thread_ctx) { HRESULT hr = BuildIoRingRegisterFileHandles( thread_ctx->ring, MAX_ACTIVE_FILES, thread_ctx->registered_handles, USERDATA_REGISTER); if (FAILED(hr)) { char error_msg[256]; FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), error_msg, sizeof(error_msg), NULL); fprintf(stderr, "WARNING: File registration failed: %s (0x%08X)\n", error_msg, (unsigned int)hr); } thread_ctx->use_registered_files = (hr == 0); } static void ioring_register_files_update(ThreadIoContext *thread_ctx, FileReadContext *file) { thread_ctx->registered_handles[file->slot_id] = file->file_handle; ioring_register_files(thread_ctx); } #endif static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, uint32_t buffer_id, size_t size, uint64_t offset, uintptr_t user_data) { #if USE_REGISTERED_FILES IORING_HANDLE_REF file_ref; if (thread_ctx->use_registered_files) { file_ref = (IORING_HANDLE_REF)IoRingHandleRefFromIndex(file_ctx->slot_id); } else { file_ref = (IORING_HANDLE_REF)IoRingHandleRefFromHandle(file_ctx->file_handle); } #else IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->file_handle); #endif IORING_BUFFER_REF buffer_ref = IoRingBufferRefFromIndexAndOffset(buffer_id, 0); HRESULT hr = BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref, (uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE); if (FAILED(hr)) { char error_msg[256]; FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), error_msg, sizeof(error_msg), NULL); fprintf(stderr, "ERROR: Building read error for file '%s' - Error: %s (Code: " "0x%08X)\n", file_ctx->fe->path, error_msg, (unsigned int)hr); } return hr; } static void ioring_process_completions(ThreadIoContext *restrict thread_ctx) { uint32_t cqe_count = 0; uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT); while (cqe_count < wait_count) { // ---- Drain all available CQEs (non-blocking) ---- while (1) { IORING_CQE win_cqe; HRESULT hr = PopIoRingCompletion(thread_ctx->ring, &win_cqe); if (hr != S_OK) { // No more CQEs available right now break; } if (FAILED(hr)) { fprintf(stderr, "WARNING: PopIoRingCompletion failed (0x%lx)\n", hr); return; } // Skip internal registration completions if (win_cqe.UserData == USERDATA_REGISTER) { continue; } IoBuffer *restrict buf = (IoBuffer *)win_cqe.UserData; FileReadContext *restrict file = buf->file; if (SUCCEEDED(win_cqe.ResultCode)) { buf->result = 0; buf->bytes_read = win_cqe.Information; } else { buf->result = win_cqe.ResultCode; buf->bytes_read = 0; char error_msg[256]; FormatMessageA( FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, win_cqe.ResultCode, 0, error_msg, sizeof(error_msg), NULL); fprintf(stderr, "WARNING: I/O completion error for file '%s' - Error: %s " "(Code: 0x%lx)\n", buf->file->fe->path, error_msg, win_cqe.ResultCode); } file->active_reads--; file->reads_completed++; thread_ctx->num_submissions--; cqe_count++; } // ---- If we already waited enough, exit ---- if (cqe_count >= wait_count) { break; } // ---- Otherwise wait for more completions ---- WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS); } } FileHandle ioring_open_file(FileEntry *fe) { FileHandle handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); if (handle == INVALID_FILE_HANDLE) { return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); } return handle; } #elif defined(__linux__) // Linux io_uring functions implementation static void ioring_query_capabilities(IoRingCapabilities *caps) { // Get system limits for io_uring long max_entries = sysconf(_SC_IOV_MAX); if (max_entries <= 0) max_entries = 4096; caps->MaxSubmissionQueueSize = (uint32_t)(max_entries < 4096 ? max_entries : 4096); caps->MaxCompletionQueueSize = caps->MaxSubmissionQueueSize * 2; caps->MaxVersion = 1; } // static int async_io_create_ring(uint32_t queue_size, AsyncIoRing *ring) { static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { IoUring *ring_impl = (IoUring *)calloc(1, sizeof(IoUring)); if (!ring_impl) return -1; // Initialize io_uring struct io_uring_params params = {0}; params.flags = IORING_SETUP_CQSIZE | IORING_SETUP_SINGLE_ISSUER | // Thread local io_uring IORING_SETUP_DEFER_TASKRUN; // Do not send interupts when a CQE is ready, // send them when a wait function is called, // and groupe them according to the nbre or // entries to wait (reduces syscall overhead) params.cq_entries = queue_size * 2; int ret = io_uring_queue_init_params(queue_size, &ring_impl->ring, ¶ms); if (ret < 0) { // Fallback to without params printf("WARNING: Creating io_uring with default params\n"); ret = io_uring_queue_init(queue_size, &ring_impl->ring, 0); if (ret < 0) { free(ring_impl); return -1; } } ring_impl->cqe_cache = NULL; ring_impl->cqe_cache_index = 0; ring_impl->cqe_cache_count = 0; thread_ctx->ring = ring_impl; return 0; } static int close_ioring(ThreadIoContext *thread_ctx) { IoUring *rimg_impl = (IoUring *)thread_ctx->ring; if (!rimg_impl) return -1; if (thread_ctx->use_registered_buffers) { io_uring_unregister_buffers(&rimg_impl->ring); } io_uring_queue_exit(&rimg_impl->ring); free(rimg_impl); return 0; } #define MAKE_BUF_INFO(a, l) \ (IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) } static void ioring_register_buffers(ThreadIoContext *thread_ctx, IORING_BUFFER_INFO *buf_info) { int ret = io_uring_register_buffers(&((IoUring *)thread_ctx->ring)->ring, buf_info, NUM_BUFFERS_PER_THREAD); if (ret < 0) { if (ret == -ENOMEM) { struct rlimit limit; getrlimit(RLIMIT_MEMLOCK, &limit); fprintf( stderr, "WARNING: Buffer registration failed due to Memlock limit, Error: " "Cannot allocate memory (code: -12, ENOMEM).\n" "See README for more informations.\n"); } else { // For any other error (e.g., EFAULT, EBUSY, EINVAL) fprintf(stderr, "WARNING: Error registering buffers: %s (code: %d)\n", strerror(-ret), ret); } fprintf(stderr, "Falling back to unregistered buffers (performance may " "be reduced).\n"); } thread_ctx->use_registered_buffers = (ret == 0); } #if USE_REGISTERED_FILES static void ioring_register_files(ThreadIoContext *thread_ctx) { int ret = io_uring_register_files_sparse(&((IoUring *)thread_ctx->ring)->ring, MAX_ACTIVE_FILES); if (ret < 0) { fprintf(stderr, "WARNING: File registeration failed, fallback to unregistered " "files - Error: %s (code: %d)\n", strerror(-ret), ret); } thread_ctx->use_registered_files = (ret == 0); } static void ioring_register_files_update(ThreadIoContext *thread_ctx, FileReadContext *file) { // Update the kernel's file table at the specific slot int ret = io_uring_register_files_update( &((IoUring *)thread_ctx->ring)->ring, file->slot_id, // offset - which slot to update &file->file_handle, // pointer to the fd 1 // number of files to update ); if (ret < 0) { fprintf(stderr, "WARNING: File registration update failed for slot %u updating " "file '%s' - Error: %s " "(code: %d)\n" "Fallback to unregistered files\n", file->slot_id, file->fe->path, strerror(-ret), ret); thread_ctx->use_registered_files = false; } } #endif static int ioring_build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, uint32_t buffer_id, size_t size, uint64_t offset, uintptr_t user_data) { struct io_uring_sqe *sqe = io_uring_get_sqe(&((IoUring *)thread_ctx->ring)->ring); if (!sqe) { printf("SQE FULL\n"); return -1; } void *buf = thread_ctx->buffers[buffer_id].data; #if USE_REGISTERED_FILES if (thread_ctx->use_registered_files) { sqe->flags |= IOSQE_FIXED_FILE; if (thread_ctx->use_registered_buffers) { io_uring_prep_read_fixed(sqe, file_ctx->slot_id, buf, size, offset, buffer_id); } else { io_uring_prep_read(sqe, file_ctx->slot_id, buf, size, offset); } io_uring_sqe_set_data64(sqe, user_data); return 0; } #endif // Fallback: use regular file descriptor if (thread_ctx->use_registered_buffers) { io_uring_prep_read_fixed(sqe, file_ctx->file_handle, buf, size, offset, buffer_id); } else { io_uring_prep_read(sqe, file_ctx->file_handle, buf, size, offset); } io_uring_sqe_set_data64(sqe, user_data); return 0; } static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) { int ret; uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT); if (wait_count > 0) { ret = io_uring_submit_and_wait(&((IoUring *)thread_ctx->ring)->ring, wait_count); } else { ret = io_uring_submit(&((IoUring *)thread_ctx->ring)->ring); } if (ret < 0) { fprintf(stderr, "ERROR: Submit error: %s (Code: %d)\n", strerror(-ret), ret); return -1; } if (submitted) *submitted = (uint32_t)ret; return 0; } static void ioring_process_completions(ThreadIoContext *thread_ctx) { struct io_uring_cqe *cqes[NUM_BUFFERS_PER_THREAD]; unsigned cqe_count = io_uring_peek_batch_cqe(&((IoUring *)thread_ctx->ring)->ring, cqes, NUM_BUFFERS_PER_THREAD); if (cqe_count == 0) { return; } for (unsigned i = 0; i < cqe_count; i++) { struct io_uring_cqe *cqe = cqes[i]; int res = cqe->res; IoBuffer *restrict buf = (IoBuffer *)cqe->user_data; FileReadContext *restrict file = buf->file; if (res >= 0) { buf->result = 0; buf->bytes_read = (uint32_t)res; } else { buf->result = res; buf->bytes_read = 0; fprintf(stderr, "WARNING: I/O completion error for file '%s' - Error: %s (Code: " "%d)\n", buf->file->fe->path, strerror(-res), res); } file->active_reads--; file->reads_completed++; thread_ctx->num_submissions--; } // Mark CQE as seen, equivalent to io_uring_cqe_seen() but marks multiple CQEs io_uring_cq_advance(&((IoUring *)thread_ctx->ring)->ring, cqe_count); } FileHandle ioring_open_file(FileEntry *fe) { #if CHECK_FILE_SYSTEM if (!fs_policy(fe->fs_type)) { return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); } #endif FileHandle handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); if (handle == INVALID_FILE_HANDLE) { return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); } return handle; } #endif // OS-agnostic helper macros #define IORING_SUCCEEDED(result) ((result) >= 0) #define IORING_FAILED(result) ((result) < 0) // ---------------------- FIFO queue operations --------------------------- typedef struct FileQueue { FileReadContext files[MAX_ACTIVE_FILES]; int head; int tail; int count; } FileQueue; static FileReadContext *fq_push(FileQueue *restrict fq) { if (fq->count == MAX_ACTIVE_FILES) return NULL; FileReadContext *restrict file = &fq->files[fq->tail]; #if USE_REGISTERED_FILES file->slot_id = fq->tail; #endif fq->tail = (fq->tail + 1) % MAX_ACTIVE_FILES; fq->count++; return file; } static FileReadContext *fq_peek_tail(FileQueue *fq) { if (fq->count == 0) return NULL; int idx = (fq->tail - 1 + MAX_ACTIVE_FILES) % MAX_ACTIVE_FILES; return &fq->files[idx]; // return the newest file } static FileReadContext *fq_peek_at(FileQueue *fq, int index) { if (index < 0 || index >= fq->count) return NULL; int idx = (fq->head + index) % MAX_ACTIVE_FILES; return &fq->files[idx]; } static void fq_trim(FileQueue *restrict fq) { while (fq->count > 0) { FileReadContext *restrict file = &fq->files[fq->head]; if (!file->completed) break; fq->head = (fq->head + 1) % MAX_ACTIVE_FILES; fq->count--; } } // ----------------- Initialize thread context ----------------------- static ThreadIoContext *ioring_init_thread(void) { ThreadIoContext *restrict thread_ctx = (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext)); if (!thread_ctx) return NULL; // Query I/O Ring capabilities IoRingCapabilities caps; ioring_query_capabilities(&caps); uint32_t queue_size = caps.MaxSubmissionQueueSize; if (queue_size > 4096) queue_size = 4096; // Cap at 4096 for reasonable memory usage // Create I/O Ring if (create_ioring(thread_ctx, queue_size) != 0) { free(thread_ctx); thread_ctx = NULL; return NULL; } // Initialize buffer pool thread_ctx->fallback_buffer = malloc(READ_BLOCK); IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD]; u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; // Reserve and Commit memory for buffers void *base_ptr = plat_mem_reserve(buf_pool_size); if (base_ptr) { if (!plat_mem_commit(base_ptr, buf_pool_size)) { plat_mem_release(base_ptr, 0); close_ioring(thread_ctx); free(thread_ctx); thread_ctx = NULL; return NULL; } } else { close_ioring(thread_ctx); free(thread_ctx); thread_ctx = NULL; return NULL; } for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) { thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size); thread_ctx->buffer_pool[i] = i; thread_ctx->buffers[i].buffer_id = i; buf_info[i] = MAKE_BUF_INFO(thread_ctx->buffers[i].data, g_ioring_buffer_size); } thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; // Register buffers ioring_register_buffers(thread_ctx, buf_info); #if USE_REGISTERED_FILES ioring_register_files(thread_ctx); #endif thread_ctx->submitting = true; thread_ctx->num_submissions = 0; thread_ctx->active_files = 0; return thread_ctx; } static void ioring_cleanup_thread(ThreadIoContext *thread_ctx) { if (!thread_ctx) return; if (thread_ctx->ring) close_ioring(thread_ctx); // Free the buffer pool memory if (thread_ctx->buffers[0].data) { u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; plat_mem_release(thread_ctx->buffers[0].data, buf_pool_size); } free(thread_ctx); thread_ctx = NULL; } // -------------------------- Buffer get and return ------------------------ static IoBuffer *get_free_buffer(ThreadIoContext *restrict thread_ctx) { if (thread_ctx->free_count == 0) { return NULL; } int idx = thread_ctx->buffer_pool[--thread_ctx->free_count]; IoBuffer *restrict buf = &thread_ctx->buffers[idx]; buf->bytes_read = 0; buf->result = IO_PENDING; buf->next = NULL; return buf; } static void return_buffer(ThreadIoContext *restrict thread_ctx, IoBuffer *restrict buf) { if (!buf) return; thread_ctx->buffer_pool[thread_ctx->free_count++] = buf->buffer_id; } // -------------------- File operations ----------------------- static int init_file(ThreadIoContext *restrict thread_ctx, FileReadContext *restrict file, FileEntry *restrict fe) { #if USE_REGISTERED_FILES uint32_t saved_slot_id = file->slot_id; #endif memset(file, 0, sizeof(*file)); file->fe = fe; file->file_size = fe->size_bytes; file->file_handle = ioring_open_file(fe); if (file->file_handle == INVALID_FILE_HANDLE) { #if IORING_DEBUG_PRINTS printf("ERROR: Could not open file '%s'\n", fe->path); #endif return 0; } #if USE_REGISTERED_FILES file->slot_id = saved_slot_id; if (thread_ctx->use_registered_files) { ioring_register_files_update(thread_ctx, file); } #endif // Determine hash method based on file size if (file->file_size > g_ioring_buffer_size) { file->use_incremental_hash = true; XXH3_128bits_reset(&file->hash_state); } return 1; } static void finalize_file(ThreadIoContext *restrict thread_ctx, WorkerContext *worker_ctx, FileReadContext *restrict file) { FileEntry *restrict fe = file->fe; os_file_close(file->file_handle); char hash[HASH_STRLEN]; if (file->bytes_hashed == file->file_size) { if (file->use_incremental_hash) { // Large file: digest the accumulated hash state XXH128_hash_t h = XXH3_128bits_digest(&file->hash_state); snprintf(hash, HASH_STRLEN, "%016llx%016llx", (unsigned long long)h.high64, (unsigned long long)h.low64); } else { // Small file: hash already computed, stored directly in single_hash snprintf(hash, HASH_STRLEN, "%016llx%016llx", (unsigned long long)file->single_hash.high64, (unsigned long long)file->single_hash.low64); } } else { #if IORING_DEBUG_PRINTS printf("WARNING: Fallback for path: %s\n", fe->path); #endif atomic_fetch_add(&g_io_ring_fallbacks, 1); xxh3_hash_file_stream(fe->path, hash, thread_ctx->fallback_buffer); } double size_kib = (double)fe->size_bytes / 1024.0; char stack_buf[KiB(4)]; int len; #if FILE_TIMES char created[32], modified[32]; format_time(fe->created_time, created, sizeof(created)); format_time(fe->modified_time, modified, sizeof(modified)); #endif #if FILE_TIMES && FILE_OWNER len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib, created, modified, fe->owner); #elif FILE_TIMES len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\n", hash, fe->path, size_kib, created, modified); #elif FILE_OWNER len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\n", hash, fe->path, size_kib, fe->owner); #else len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\n", hash, fe->path, size_kib); #endif char *restrict dst = arena_push(&worker_ctx->arena, len, false); memcpy(dst, stack_buf, len); atomic_fetch_add(&g_files_hashed, 1); } // -------------------- Hash files ----------------------- static void hash_ready_files(ThreadIoContext *restrict thread_ctx, FileQueue *restrict fq, WorkerContext *worker_ctx) { for (int i = 0; i < fq->count; i++) { FileReadContext *restrict file = fq_peek_at(fq, i); if (!file || file->completed) continue; // ---- HASH READY BUFFERS IN ORDER ---- while (file->head) { IoBuffer *restrict buf = file->head; // CQE not received yet if (buf->result == IO_PENDING) break; // Consume buffer file->head = buf->next; if (IORING_SUCCEEDED(buf->result) && buf->bytes_read > 0) { size_t bytes_to_hash = buf->bytes_read; if (buf->offset + buf->bytes_read > file->file_size) { bytes_to_hash = file->file_size - buf->offset; } if (bytes_to_hash > 0) { if (file->use_incremental_hash) { XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); } else { file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); } file->bytes_hashed += bytes_to_hash; atomic_fetch_add(&g_bytes_processed, bytes_to_hash); } file->reads_hashed++; } else if (buf->bytes_read == 0 && IORING_SUCCEEDED(buf->result)) { file->reads_hashed++; // EOF } else { finalize_file(thread_ctx, worker_ctx, file); file->completed = true; } return_buffer(thread_ctx, buf); } // ---- FINALIZE ---- if (!file->completed && file->active_reads == 0 && file->bytes_hashed >= file->file_size) { finalize_file(thread_ctx, worker_ctx, file); file->completed = true; thread_ctx->active_files--; } } // Clean up completed files from the head fq_trim(fq); } // ------------------ Build pending reads ---------------------- static void build_pending_reads(ThreadIoContext *restrict thread_ctx, FileQueue *restrict fq, WorkerContext *worker_ctx) { MPMCQueue *file_queue = worker_ctx->file_queue; FileReadContext *restrict file = fq_peek_tail(fq); for (;;) { // BUILD READS FOR CURRENT FILE if (file) { while (file->next_read_offset < file->file_size) { IoBuffer *restrict buf = get_free_buffer(thread_ctx); if (!buf) return; size_t remaining = file->file_size - file->next_read_offset; size_t bytes_to_read; if (remaining >= g_ioring_buffer_size) { bytes_to_read = g_ioring_buffer_size; } else { bytes_to_read = ALIGN_UP_POW2(remaining, g_pagesize); } // Initialize buffer buf->file = file; buf->offset = file->next_read_offset; buf->size = bytes_to_read; // Chain buffer if (!file->head) { file->head = buf; } else { file->tail->next = buf; } file->tail = buf; BUILD_READ_RETURN_VALUE hr = ioring_build_read(thread_ctx, file, buf->buffer_id, bytes_to_read, buf->offset, (uintptr_t)buf); if (IORING_FAILED(hr)) { // mark failure and stop this file return_buffer(thread_ctx, buf); finalize_file(thread_ctx, worker_ctx, file); file->completed = true; break; } file->active_reads++; file->reads_submitted++; thread_ctx->num_submissions++; file->next_read_offset += bytes_to_read; } } // ADD NEW FILE if (!thread_ctx->submitting) return; if (fq->count >= MAX_ACTIVE_FILES) return; FileEntry *fe = mpmc_pop(file_queue); if (!fe) { thread_ctx->submitting = false; return; } FileReadContext *newfile = fq_push(fq); if (!init_file(thread_ctx, newfile, fe)) { finalize_file(thread_ctx, worker_ctx, newfile); newfile->completed = true; continue; } file = newfile; thread_ctx->active_files++; } } // -------------------------- Hash worker I/O Ring --------------------------- static THREAD_RETURN hash_worker_ioring(void *arg) { WorkerContext *worker_ctx = (WorkerContext *)arg; // Init IO ring ThreadIoContext *thread_ctx = ioring_init_thread(); if (!thread_ctx || !thread_ctx->ring) { printf("I/O Ring unavailable, using buffered I/O\n"); return hash_worker(arg); } // Initialize pipeline state FileQueue fq; memset(&fq, 0, sizeof(fq)); uint32_t submitted; // Main pipeline loop for (;;) { // Submit new reads build_pending_reads(thread_ctx, &fq, worker_ctx); submitted = 0; ioring_submit(thread_ctx, &submitted); // Process completions ioring_process_completions(thread_ctx); // Hash files hash_ready_files(thread_ctx, &fq, worker_ctx); #if IORING_DEBUG_STATS printf( "Free buffers: %d, Submissions: %d, Active files: %d, fq count: %d\n", thread_ctx->free_count, thread_ctx->num_submissions, thread_ctx->active_files, fq.count); #endif // Exit condition if (!thread_ctx->submitting && thread_ctx->active_files == 0 && thread_ctx->num_submissions == 0) { break; } } ioring_cleanup_thread(thread_ctx); return THREAD_RETURN_VALUE; } #endif