diff --git a/.gitignore b/.gitignore index c15abe7..670e490 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,11 @@ file_hasher.ilk file_hasher.rdi file_hasher.exe file_hashes.txt +Binaries/file_hashes.txt file_list.txt temp_code.c +/.cache/clangd/index +/file_hasher +/io_uring_test +/file_hasher +/io_uring_test diff --git a/README.md b/README.md index 1b7f080..1b6353e 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,221 @@ # filehasher -Collects some metadata and hashes files. +## Presentation +Collects some metadata and hashes files. It outputs the path, hash, size, creation and +last modification dates and the author in file_hasher.txt. +Creation and modification dates and author can be disabled in the config file. -## Building: -### Windows: -#### Release: -clang-cl /O3 file_hasher.c xxh_x86dispatch.c advapi32.lib -clang -O3 file_hasher.c xxh_x86dispatch.c -ladvapi32 -o file_hasher -gcc -O3 file_hasher.c xxh_x86dispatch.c -ladvapi32 -o file_hasher +It is a high performance cross platform Windows and Linux compatible program, it uses: + * Multiple threads for scanning and hashing (multi-threading can be disabled in the config file). + * Stores the generated data in thread local configurable arenas that support growing + by committing more memory and chaining blocks. + * Two Multi Producer Multi Consumer queues, one for the scanners and one between the scanners and hashers. + * xxh3_128bits algorithm from xxhash, that supports SIMD instruction sets (SSE2, AVX2, AVX512) + and uses a runtime dispatcher to select the best available instruction set. + * IO Ring for asynchronous I/O in Windows and the equivalent io_uring in Linux. + The implementation is event driven, thread local, uses DMA and direct disk I/O, + bypassing the OS cache completely, registered buffers (and registered files in io_uring), + it supports bashing multiple submissions and can handle multiple files at the same time. + It can be disabled in the config file. + * Fallback to buffered I/O if there is errors in the IO Ring path. -#### Debug: -clang-cl /Zi /Od file_hasher.c xxh_x86dispatch.c advapi32.lib -clang -g -O0 file_hasher.c xxh_x86dispatch.c -ladvapi32 -o file_hasher -gcc -g -O0 file_hasher.c xxh_x86dispatch.c -ladvapi32 -o file_hasher +## Building +### Windows +#### Release -### Linux: -#### Release: -clang -O3 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher -gcc -O3 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher +**Note**: Make sur to use UCRT64 environment from MSYS2 instead of the standard MinGW environment. +UCRT64 uses the modern Universal C Runtime (ucrtbase.dll), which supports the newest APIs, +the standard MSYS2 uses the legacy msvcrt.dll and does not support IO Ring. +To install: +pacman -S mingw-w64-ucrt-x86_64-gcc +pacman -S mingw-w64-ucrt-x86_64-clang +pacman -Syu +And add to path: +C:\msys64\ucrt64\bin -#### Debug: -clang -g -O0 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher -gcc -g -O0 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher +gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher +clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher +clang-cl /O2 file_hasher.c xxhash.c xxh_x86dispatch.c + +#### Debug +gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher +clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher +clang-cl /Zi /Od file_hasher.c xxhash.c xxh_x86dispatch.c + +### Linux +#### Release +gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher + +#### Debug +gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher + +## Notes about the IO Ring implementations +### IO Ring + +#### File registration +Registering files is a performance optimization that allows the kernel to allocate an array +of descriptors/handles to pre-validate and maintain long-term references to file handles. +Instead of passing a standard file descriptor/handle with every I/O request, you pass a simple integer +index into a pre-registered table. + +The Linux implementation has io_uring_register_files_scarse() to create an empty array of descriptors +(initialized with -1) without having to create and initialize it in the user space, and we can +use io_uring_register_files_update() to update one or more entries. Windows on the other hand +is limited to BuildIoRingRegisterFileHandles() only, so we need to re register the entire array of handles +each time. This is why there is a provided macro in config.h to disable or enable it. + +##### Why Register Files? (The Benefits) +When you use a standard file descriptor in a high-frequency I/O loop, +the kernel must perform several "hidden" tasks for every single operation: + * Permission Checks: Validating that the process still has the right to read/write + that specific file. + * Reference Counting: Incrementing the file's internal reference count at the start of + the I/O and decrementing it at the end to ensure the file isn't closed while in use. + * Object Lookup: Traversing the internal "file descriptor table" to find the actual + kernel object associated with your integer ID. + +Registering the files performs these checks once at registration time. Subsequent +I/O operations skip these steps, significantly reducing CPU overhead and latency, +especially when handling thousands of small I/O operations per second. + +##### Comparison: Linux vs. Windows Implementation +While both systems share the same core concept, their APIs and management styles differ significantly. +Feature Linux (io_uring) Windows (IoRing) +API Call io_uring_register BuildIoRingRegisterFileHandles +Registration Method Synchronous system call that blocks until the table is set up. Asynchronous request submitted to the ring just like a read/write operation. +Partial Updates Supports IORING_REGISTER_FILES_UPDATE to swap specific indices without a full reset. Does not support partial updates; a new registration call replaces the entire existing table. +Memory Mapping User must manually mmap() the queues into their address space. The kernel handles memory mapping automatically when the ring is created. +Scope of Operations Extremely broad (files, sockets, timers, signals, even other rings). Primarily focused on file storage (read, write, flush). + +#### Completion Wait count +To avoid busy waiting when receiving CQEs, we can use io_uring_submit_and_wait() in Linux by entering a wait count, +the threads sleeps until the count of CQEs are received, in windows the wait_count is present in SubmitIoRing() +but is not implemented yet, so we wait with a completion event for a single completion. Another limitation on the completion +event is that the kernel will waik up the thread only when receiving the first CQE, after that we need to drain the completion +queue completely before sleeping again, or we enter an eternal slumber. And my config, each time the thread wakes up +it receives rarely more than 3 to 5 CQEs and most of the time only one CQE. + +#### Filtering CQEs + +Unlike Linux, The Windows implementation treats buffer and file registration +as an asynchronous operation that we submit to the ring, similar to a read or write. +Those operations produce CQEs (completion queue entries) that we filter here using +cqe.UserData == USERDATA_REGISTER +```c + if (win_cqe.UserData == USERDATA_REGISTER) + continue; +``` + +### io_uring + +#### Creation flags +io_uring provides a lot of configuration flags compared to IO Ring, some +of them are at the creation and others during the operations, here what +we use in this implementation at creation time and is lacking in the +IO Ring implementation. + + * IORING_SETUP_SINGLE_ISSUER: Since we are using a thread local io_uring, we can + set this flag to remove the atomic operations. + * IORING_SETUP_DEFER_TASKRUN: By default, the kernel sends an interrupts when a CQE + is ready, we use this flag to disable this syscall and wait for a specific number of + CQEs to be ready to group them, this reduces the number of syscall. + +#### Memlock limit warning + +```c + "WARNING: Buffer registration failed due to memlock limits (ENOMEM).\n" + "Increase the limit to solve this warning.\n"); +``` + +The Memlock limit in Linux restricts the amount of memory a process can +"lock" into physical RAM using the mlock() family of system calls. This +prevents the operating system from swapping that memory out to disk. +And registering buffers will lock the buffers memory so the hardware +can access it directly without kernel intervention and prevents the kernel from +swapping it to the SSD or HDD. Increase the limit to be able to register the buffers. + +##### Modifying the Limit: +The method for changing the memlock limit depends on whether you are +managing a user session or a system service. +1. For Users and Interactive Sessions +To permanently increase the limit for a specific user or group, modify +the /etc/security/limits.conf file. Add the following lines: + +```conf + # Example for a specific user (replace 'username'), unlimited or a custom value in KB + username soft memlock unlimited + username hard memlock unlimited + + # Example for all users + * soft memlock unlimited + * hard memlock unlimited +``` + +Soft Limit: The value the user starts with; can be raised up to the +hard limit. + +Hard Limit: The absolute maximum; only a privileged user +(root) can increase this. Values: Can be set in Kilobytes (KB) or as +unlimited. + +2. For Systemd Services +Settings in limits.conf do not affect background services managed by +systemd. To increase the limit for a service, edit its service file +(e.g., /etc/systemd/system/myservice.service) and add: +```conf + [Service] + LimitMEMLOCK=infinity +``` + +##### Why Register Buffers? +In a standard "unregistered" I/O operation, the kernel must perform several +expensive steps for every single read or write: + * Virtual-to-Physical Mapping: The kernel has to translate your application's + virtual memory addresses into physical RAM addresses. + * Page Pinning: The kernel must "pin" the memory pages (using get_user_pages) + to prevent them from being swapped to disk or moved while the hardware + (like your SSD) is writing to them. + * TLB Overhead: Constant mapping and unmapping put pressure on the Translation + Lookaside Buffer (TLB), which can slow down the CPU. + +Registering the buffers performs all of this "pinning" and "mapping" once. + +#### Direct I/O: O_DIRECT (Linux) and FILE_FLAG_NO_BUFFERING (Windows) + +Modern operating systems normally use a page cache when reading files. This means file +data is first loaded into kernel memory and then copied to user space. While this improves +performance for many workloads, it introduces extra memory usage and copy overhead. + +Both Linux and Windows provide a way to bypass this cache and perform direct I/O: + +Linux: O_DIRECT +Windows: FILE_FLAG_NO_BUFFERING + +These flags instruct the OS to transfer data directly between disk and user-provided buffers, avoiding the page cache. + +##### Benefits +1. Reduced memory overhead +Avoids polluting the OS page cache +Especially useful for large sequential reads (e.g. hashing, backups) +2. Lower CPU usage +Eliminates extra memory copies between kernel and user space +3. Predictable performance +No interference from cache eviction or readahead heuristics +More consistent throughput for streaming workloads +4. Better scalability +Ideal for high-throughput, multi-threaded I/O pipelines +Prevents cache contention between threads +5. Avoids double caching +Important when the application already manages its own buffering + +##### File system compatibility +Not all file systems are compatible with O_DIRECT, if we try to open files residing in an NTFS partition, +most of the time it will fail, and some times it opens but the CQEs return with an error code bad +descriptor, and it causes some lags. + +To address this issue the program falls back to sequential read when the open fails, and falls back to +buffered sequential hashing if we receive an error in the CQEs. There is also a file system detection +that we can enable in the config file, it will enable the collection of the file system in scan_folder() +and the file will be opened accordingly, but it costs one additional syscall / directory. diff --git a/arena.c b/arena.c index 8c03d99..61767d5 100644 --- a/arena.c +++ b/arena.c @@ -437,12 +437,14 @@ void *arena_push(mem_arena **arena_ptr, u64 size, bool zero) { // mk push Commit memory if needed ------------------------------------------------------------ */ - if (local_post > selected->commit_pos) { - u64 new_commit = ALIGN_UP_POW2(local_post, arena_pagesize()); + if (local_post > selected->commit_pos - + ALIGN_UP_POW2(sizeof(mem_arena), selected->align)) { + u64 new_commit = ALIGN_UP_POW2(local_post + ALIGN_UP_POW2(sizeof(mem_arena), selected->align), arena_pagesize()); new_commit = MIN(new_commit, selected->reserve_size); if (!plat_mem_commit((u8 *)selected + selected->commit_pos, new_commit - selected->commit_pos)) { + printf("ERROR: Could not commit memory!\n"); return NULL; } diff --git a/base.h b/base.h index cafaafb..e209c28 100644 --- a/base.h +++ b/base.h @@ -1,9 +1,49 @@ #pragma once +#define _CRT_SECURE_NO_WARNINGS + +#if defined(_WIN32) || defined(_WIN64) + +#if defined(_MSC_VER) +#pragma comment(lib, "advapi32.lib") +#endif + +#include +#include +#include +#include // Needs to be included before stdatomic to avoid errors +#include +#include +#include +#include +#include + +#elif defined(__linux__) + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + #include +#include #include +#include #include #include +#include #include #include #include @@ -11,25 +51,6 @@ #include #include -#if defined(_WIN32) || defined(_WIN64) -#define PLATFORM_WINDOWS 1 -#include -#include -#include -#include -#include -#include - -#define strdup _strdup -#else -#include -#include -#include -#include -#include -#include -#endif - /* ------------------------------------------------------------ Base types ------------------------------------------------------------ */ diff --git a/binaries/changelog.txt b/binaries/changelog.txt index 049d2ff..ea0ee1c 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -14,7 +14,7 @@ v3.2: Making the lock free MPMC queue growable Add padding to avoir false sharing Add sleep() and SwitchToThread() to limit spinning -v3.3: Fix bug slots used before initialization,compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that +v3.3: Fix bug slots used before initialization, compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that Fix bug multiple threads committing at the same time, fixed by using atomic_flag commit_lock and re-checking committed after acquiring the lock Reorder helper functions @@ -49,3 +49,10 @@ Fixing user prompt parsing 4.5: Porting to linux Reorganising the code Improving the scan function + +5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cache completely, registered buffers (and registered files in io_uring), it supports bashing multiple submissions and can handle multiple files at the same time. +Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers +fixing the xxh_x86dispatch warnings +Updating the progress printing function +Implementing a config file +Writing the README file diff --git a/config.h b/config.h new file mode 100644 index 0000000..5cd6f04 --- /dev/null +++ b/config.h @@ -0,0 +1,31 @@ + +#define FILE_HASHES_TXT "file_hashes.txt" + +// Metadata selection +#define FILE_TIMES 1 // created and modified time +#define FILE_OWNER 1 + +#define MULTI_THREADING 1 +#define READ_BLOCK KiB(64) + +// -------------------- IO Ring Configuration ---------------------- +#define USE_IORING 1 + +#if USE_IORING +#define IORING_BUFFER_SIZE KiB(256) +#define NUM_BUFFERS_PER_THREAD 32 +#define MAX_ACTIVE_FILES 16 + +#define SUBMIT_TIMEOUT_MS 10000 +#define IORING_DEBUG_PRINTS 0 +#define IORING_DEBUG_STATS 0 + +#if defined(_WIN32) || defined(_WIN64) +#define USE_REGISTERED_FILES 1 + +#elif defined(__linux__) +#define USE_REGISTERED_FILES 1 +#define CHECK_FILE_SYSTEM 0 + +#endif +#endif diff --git a/file_hasher.c b/file_hasher.c index 8a2b57f..67bf97d 100644 --- a/file_hasher.c +++ b/file_hasher.c @@ -74,21 +74,44 @@ int main(int argc, char **argv) { mem_arena *gp_arena = arena_create(¶ms); // ------------------------------- - // Detect hardware threads + // Detect hardware // ------------------------------- // --- Windows: detect PHYSICAL cores (not logical threads) --- - size_t hw_threads = platform_physical_cores(); + uint32_t cpu_cores = platform_physical_cores(); // Logical threads = CPU cores * 2 - size_t num_threads = hw_threads * 2; + uint32_t cpu_threads = cpu_cores * 2; - printf("Starting thread pool: %zu threads (CPU cores: %zu)\n", num_threads, - hw_threads); - printf(" Selected instruction set: %s\n", get_xxhash_instruction_set()); +#if MULTI_THREADING + uint32_t num_scan_threads = cpu_threads; + uint32_t num_hash_threads = cpu_threads; + + printf("%d cores %d threads CPU detected with %s instruction set\n" + "Starting thread pool: %d scanning and %d hashing threads\n", + cpu_cores, cpu_threads, get_xxhash_instruction_set(), num_scan_threads, + num_hash_threads); +#else + uint32_t num_scan_threads = 1; + uint32_t num_hash_threads = 1; + + printf( + "%d cores %d threads CPU detected with %s instruction set\n" + "Starting thread pool: %d scanning and %d hashing threads(Debug mode)\n", + cpu_cores, cpu_threads, get_xxhash_instruction_set(), num_scan_threads, + num_hash_threads); + +#endif + + // Align IO Ring block size to the system page size +#if USE_IORING + g_ioring_buffer_size = ALIGN_UP_POW2(g_ioring_buffer_size, g_pagesize); +#endif // ------------------------------- // Scanning and hashing // ------------------------------- + + // test_io_ring(); MPMCQueue dir_queue; mpmc_init(&dir_queue, MiB(1)); @@ -96,8 +119,6 @@ int main(int argc, char **argv) { mpmc_init(&file_queue, MiB(1)); // Starting hash threads - size_t num_hash_threads = num_threads; - WorkerContext workers[num_hash_threads]; Thread *hash_threads = arena_push(&gp_arena, sizeof(Thread) * num_hash_threads, true); @@ -106,8 +127,14 @@ int main(int argc, char **argv) { workers[i].arena = arena_create(¶ms); workers[i].file_queue = &file_queue; +#if USE_IORING + if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker_ioring, + &workers[i]) != 0) +#else if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker, &workers[i]) != - 0) { + 0) +#endif + { fprintf(stderr, "Failed to create hash thread %zu\n", i); exit(1); } @@ -122,8 +149,6 @@ int main(int argc, char **argv) { } // Starting scan threads - size_t num_scan_threads = num_threads; - ScannerContext scanners[num_scan_threads]; Thread *scan_threads = arena_push(&gp_arena, sizeof(Thread) * num_scan_threads, true); @@ -197,7 +222,7 @@ int main(int argc, char **argv) { FILE *f = fopen(FILE_HASHES_TXT, "wb"); - for (int i = 0; i < num_threads; i++) { + for (int i = 0; i < num_hash_threads; i++) { mem_arena *arena = workers[i].arena; u8 *arena_base = (u8 *)arena + ALIGN_UP_POW2(sizeof(mem_arena), arena->align); @@ -209,6 +234,15 @@ int main(int argc, char **argv) { // ------------------------------- // Print summary // ------------------------------- +#if USE_IORING + uint64_t incomplete = atomic_load(&g_io_ring_fallbacks); + if (incomplete > 0) { + printf("\nWARNING: I/O Ring incomplete files: %llu (fallback to buffered " + "I/O used)\n", + (unsigned long long)incomplete); + } +#endif + double total_seconds = timer_elapsed(&total_timer); printf("Completed hashing %zu files\n", total_found); diff --git a/io_ring_test.c b/io_ring_test.c new file mode 100644 index 0000000..39fe81f --- /dev/null +++ b/io_ring_test.c @@ -0,0 +1,147 @@ +#pragma once + +#include +#include +// #include "ioringapi.c" +#include + +// Initialize I/O Ring +HIORING io_ring_init(void) { + + // if (!io_ring_load_functions()) { + // printf("[I/O Ring] Failed to load functions\n"); + // return NULL; + // } + + IORING_CAPABILITIES caps; + ZeroMemory(&caps, sizeof(caps)); + + HRESULT hr = QueryIoRingCapabilities(&caps); + if (FAILED(hr)) { + printf("[I/O Ring] QueryIoRingCapabilities failed: 0x%08lx\n", hr); + return NULL; + } + + // printf("[I/O Ring] MaxVersion=%d, MaxSubmission=%u, MaxCompletion=%u\n", + // (int)caps.MaxVersion, caps.MaxSubmissionQueueSize, + // caps.MaxCompletionQueueSize); + + if (caps.MaxVersion < IORING_VERSION_1) { + printf("[I/O Ring] Version too old\n"); + return NULL; + } + + IORING_CREATE_FLAGS flags = {0}; + HIORING ring = NULL; + + // hr = CreateIoRing(IORING_VERSION_1, flags, 256, 512, &ring); + hr = CreateIoRing(caps.MaxVersion, flags, 256, 512, &ring); + if (FAILED(hr)) { + printf("[I/O Ring] CreateIoRing failed: 0x%08lx\n", hr); + return NULL; + } + + // printf("[I/O Ring] Created successfully\n"); + + // Check if read operation is supported + + // HRESULT io_ring_support = IsIoRingOpSupported(ring, IORING_OP_READ); + // if (io_ring_support == S_FALSE) { + // printf("[I/O Ring] Not supported, %ld /n", io_ring_support); + // } + + // Get ring info + IORING_INFO info; + ZeroMemory(&info, sizeof(info)); + GetIoRingInfo(ring, &info); + // printf("[I/O Ring] Submission: %u, Completion: %u\n", + // info.SubmissionQueueSize, info.CompletionQueueSize); + + return ring; +} + +void io_ring_cleanup(HIORING ring) { + if (ring) { + CloseIoRing(ring); + // printf("[I/O Ring] Closed\n"); + } +} + +// Read file using I/O Ring +int io_ring_read_file(HIORING ring, HANDLE hFile, void *buffer, DWORD size, + UINT64 offset) { + + IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(hFile); + IORING_BUFFER_REF buf_ref = IoRingBufferRefFromPointer(buffer); + + HRESULT hr = BuildIoRingReadFile(ring, file_ref, buf_ref, size, offset, + (UINT_PTR)buffer, IOSQE_FLAGS_NONE); + + if (FAILED(hr)) + return -1; + + UINT32 submitted = 0; + hr = SubmitIoRing(ring, 1, INFINITE, &submitted); + if (FAILED(hr) || submitted == 0) + return -1; + + for (;;) { + IORING_CQE cqe; + hr = PopIoRingCompletion(ring, &cqe); + + if (FAILED(hr)) + continue; + + if (cqe.UserData != (UINT_PTR)buffer) + continue; + + if (FAILED(cqe.ResultCode)) + return -1; + + return (int)cqe.Information; + } +} + +// Test function +void test_io_ring(void) { + printf("\n=== Testing I/O Ring ===\n"); + + HIORING ring = io_ring_init(); + if (!ring) { + printf("I/O Ring not available\n"); + return; + } + + // Create test file + HANDLE hFile = CreateFileA("test.txt", GENERIC_READ | GENERIC_WRITE, 0, NULL, + CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); + if (hFile != INVALID_HANDLE_VALUE) { + char test_data[] = + "Hello, I/O Ring! This is a test of the Windows I/O Ring API."; + DWORD written; + WriteFile(hFile, test_data, sizeof(test_data), &written, NULL); + CloseHandle(hFile); + } + + // Read using I/O Ring + hFile = CreateFileA("test.txt", GENERIC_READ, FILE_SHARE_READ, NULL, + OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); + if (hFile != INVALID_HANDLE_VALUE) { + char buffer[512] = {0}; + int bytes = io_ring_read_file(ring, hFile, buffer, sizeof(buffer), 0); + if (bytes > 0) { + printf("Read %d bytes: %s\n", bytes, buffer); + } else { + printf("Failed to read file\n"); + } + CloseHandle(hFile); + } else { + printf("Failed to open test file\n"); + } + + // Cleanup + DeleteFileA("test.txt"); + io_ring_cleanup(ring); + + printf("=== Test complete ===\n\n"); +} diff --git a/io_uring_test.c b/io_uring_test.c new file mode 100644 index 0000000..82b7d54 --- /dev/null +++ b/io_uring_test.c @@ -0,0 +1,397 @@ +/* +# Compile +gcc -o io_uring_test io_uring_test.c -luring + +# Run +./io_uring_test + */ +#include "base.h" +#include +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#define BUFFER_SIZE 4096 +#define NUM_BUFFERS 4 +#define NUM_REGISTERED_FILES 3 // Test with 3 files + +// Colors for output +#define COLOR_GREEN "\033[0;32m" +#define COLOR_RED "\033[0;31m" +#define COLOR_YELLOW "\033[0;33m" +#define COLOR_BLUE "\033[0;34m" +#define COLOR_RESET "\033[0m" + +// Test result tracking +typedef struct { + int passed; + int failed; +} TestResults; + +static void print_success(const char *step) { + printf(COLOR_GREEN "[✓] SUCCESS: %s" COLOR_RESET "\n", step); +} + +static void print_failure(const char *step, const char *error) { + printf(COLOR_RED "[✗] FAILED: %s - %s" COLOR_RESET "\n", step, error); +} + +static void print_info(const char *msg) { + printf(COLOR_BLUE "[i] INFO: %s" COLOR_RESET "\n", msg); +} + +static void print_step(const char *step) { + printf(COLOR_YELLOW "\n>>> Testing: %s" COLOR_RESET "\n", step); +} + +static int create_test_file(const char *filename, const char *content) { + FILE *f = fopen(filename, "w"); + if (!f) { + perror("Failed to create test file"); + return -1; + } + + fprintf(f, "%s", content); + fclose(f); + + printf(" Created test file: %s\n", filename); + return 0; +} + +// Test 1: Create io_uring instance +static int test_io_uring_create(struct io_uring *ring, TestResults *results) { + print_step("io_uring creation"); + + int ret = io_uring_queue_init(256, ring, 0); + if (ret < 0) { + print_failure("io_uring_queue_init", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("io_uring instance created"); + results->passed++; + return 0; +} + +// Test 2: Register buffers +static int test_register_buffers(struct io_uring *ring, void **buffers, + struct iovec *iovs, TestResults *results) { + print_step("Buffer registration"); + + size_t total_size = BUFFER_SIZE * NUM_BUFFERS; + *buffers = aligned_alloc(4096, total_size); + if (!*buffers) { + print_failure("Buffer allocation", strerror(errno)); + results->failed++; + return -1; + } + + for (int i = 0; i < NUM_BUFFERS; i++) { + iovs[i].iov_base = (char *)*buffers + (i * BUFFER_SIZE); + iovs[i].iov_len = BUFFER_SIZE; + memset(iovs[i].iov_base, 0, BUFFER_SIZE); + } + + int ret = io_uring_register_buffers(ring, iovs, NUM_BUFFERS); + if (ret < 0) { + print_failure("io_uring_register_buffers", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Buffers registered successfully"); + results->passed++; + return 0; +} + +// Test 3: Register files sparse (empty table) +static int test_register_files_sparse(struct io_uring *ring, unsigned nr_files, + TestResults *results) { + print_step("Sparse file registration (empty table)"); + + int ret = io_uring_register_files_sparse(ring, nr_files); + if (ret < 0) { + if (ret == -EINVAL) { + print_info( + "io_uring_register_files_sparse not supported (kernel < 5.19)"); + print_info("Trying regular file registration with invalid fds..."); + + // Fallback: register with invalid fds + int *invalid_fds = calloc(nr_files, sizeof(int)); + if (!invalid_fds) { + print_failure("Allocating invalid fds array", "Out of memory"); + results->failed++; + return -1; + } + + for (int i = 0; i < nr_files; i++) { + invalid_fds[i] = -1; // Mark all as invalid + } + + ret = io_uring_register_files(ring, invalid_fds, nr_files); + free(invalid_fds); + + if (ret < 0) { + print_failure("Regular file registration also failed", strerror(-ret)); + results->failed++; + return -1; + } + print_success("File table registered (regular, with invalid fds)"); + } else { + print_failure("io_uring_register_files_sparse", strerror(-ret)); + results->failed++; + return -1; + } + } else { + printf(" Registered empty file table with %u slots\n", nr_files); + print_success("Sparse file table created"); + } + + results->passed++; + return 0; +} + +// Test 4: Update file slot and read from it +static int test_file_read_loop(struct io_uring *ring, struct iovec *iovs, + const char **filenames, + const char **expected_contents, int num_files, + TestResults *results) { + print_step("File slot update and read loop"); + + int *fds = calloc(num_files, sizeof(int)); + if (!fds) { + print_failure("Allocating fd array", "Out of memory"); + results->failed++; + return -1; + } + + // Open all files first + for (int i = 0; i < num_files; i++) { + fds[i] = open(filenames[i], O_RDONLY); + if (fds[i] < 0) { + print_failure("Opening file", filenames[i]); + results->failed++; + // Close already opened files + for (int j = 0; j < i; j++) + close(fds[j]); + free(fds); + return -1; + } + printf(" Opened %s (fd=%d)\n", filenames[i], fds[i]); + } + + // Test loop: update slot, submit read, verify + for (int slot = 0; slot < num_files; slot++) { + printf("\n --- Testing slot %d with file '%s' ---\n", slot, + filenames[slot]); + + // Update the file registration for this slot + printf(" Updating slot %d with fd %d...\n", slot, fds[slot]); + int ret = io_uring_register_files_update(ring, slot, &fds[slot], 1); + + if (ret < 0) { + print_failure("File registration update", strerror(-ret)); + results->failed++; + continue; + } + printf(" Slot update result: %d (expected 1)\n", ret); + + // Get file size for read size calculation + struct stat st; + if (fstat(fds[slot], &st) != 0) { + print_failure("fstat", strerror(errno)); + results->failed++; + continue; + } + + size_t file_size = st.st_size; + size_t read_size = BUFFER_SIZE; + + // Adjust read size for O_DIRECT if needed + int page_size = plat_get_pagesize(); + if (read_size > file_size) { + read_size = ALIGN_UP_POW2(file_size, page_size); + } + + printf(" File size: %zu, read size: %zu\n", file_size, read_size); + + // Clear buffer for this test + memset(iovs[0].iov_base, 0, BUFFER_SIZE); + + // Submit read using registered file + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("Getting SQE", "No available SQE"); + results->failed++; + continue; + } + + // Use slot index with fixed file flag + io_uring_prep_read_fixed(sqe, slot, iovs[0].iov_base, read_size, 0, 0); + sqe->flags |= IOSQE_FIXED_FILE; + io_uring_sqe_set_data64(sqe, 100 + slot); // Unique user_data per slot + + ret = io_uring_submit(ring); + if (ret < 0) { + print_failure("Submitting read", strerror(-ret)); + results->failed++; + continue; + } + printf(" Submitted read (1 SQE)\n"); + + // Wait for completion + struct io_uring_cqe *cqe; + ret = io_uring_wait_cqe(ring, &cqe); + if (ret < 0) { + print_failure("Waiting for completion", strerror(-ret)); + results->failed++; + continue; + } + + // Process completion + uint64_t user_data = io_uring_cqe_get_data64(cqe); + int bytes_read = cqe->res; + + printf(" Completion: user_data=%lu, result=%d\n", (unsigned long)user_data, + bytes_read); + + if (bytes_read < 0) { + print_failure("Read operation", strerror(-bytes_read)); + results->failed++; + io_uring_cqe_seen(ring, cqe); + continue; + } + + if (user_data != 100 + slot) { + print_failure("User data mismatch", "Wrong user_data value"); + results->failed++; + io_uring_cqe_seen(ring, cqe); + continue; + } + + // Verify the data + char *data = (char *)iovs[0].iov_base; + printf(" Data read (%d bytes): %.*s\n", bytes_read, + bytes_read < 100 ? bytes_read : 100, data); + + if (strstr(data, expected_contents[slot]) == NULL) { + print_failure("Data verification", + "Expected content not found in read data"); + results->failed++; + } else { + print_success("Data verified successfully"); + results->passed++; + } + + io_uring_cqe_seen(ring, cqe); + + // Invalidate the slot after use (mark as -1) + int invalid_fd = -1; + ret = io_uring_register_files_update(ring, slot, &invalid_fd, 1); + if (ret < 0) { + printf(" Warning: Could not invalidate slot %d: %s\n", slot, + strerror(-ret)); + } + } + + // Close all files + for (int i = 0; i < num_files; i++) { + if (fds[i] >= 0) + close(fds[i]); + } + free(fds); + + return 0; +} + +int main() { + TestResults results = {0, 0}; + struct io_uring ring; + void *buffers = NULL; + struct iovec iovs[NUM_BUFFERS]; + + printf(COLOR_BLUE "\n========================================\n"); + printf(" io_uring Sparse File Registration Test\n"); + printf("========================================\n" COLOR_RESET); + + // Define test files and their content + const char *filenames[] = {"test_file_0.txt", "test_file_1.txt", + "test_file_2.txt"}; + + const char *contents[] = { + "This is file 0: Hello World! The quick brown fox jumps over the lazy " + "dog.", + "This is file 1: io_uring is awesome for async I/O operations!", + "This is file 2: Testing sparse file registration with multiple files."}; + + const char *expected_substrings[] = {"Hello World", "io_uring is awesome", + "sparse file registration"}; + + int num_files = 3; + + // Create all test files + print_info("Creating test files..."); + for (int i = 0; i < num_files; i++) { + if (create_test_file(filenames[i], contents[i]) != 0) { + return 1; + } + } + + // Test 1: Create io_uring + if (test_io_uring_create(&ring, &results) != 0) { + goto cleanup_files; + } + + // Test 2: Register buffers + if (test_register_buffers(&ring, &buffers, iovs, &results) != 0) { + io_uring_queue_exit(&ring); + goto cleanup_files; + } + + // Test 3: Register empty file table (sparse) + if (test_register_files_sparse(&ring, num_files, &results) != 0) { + io_uring_unregister_buffers(&ring); + free(buffers); + io_uring_queue_exit(&ring); + goto cleanup_files; + } + + // Test 4: Loop through files, update slots, read and verify + test_file_read_loop(&ring, iovs, filenames, expected_substrings, num_files, + &results); + + // Cleanup + io_uring_unregister_files(&ring); + io_uring_unregister_buffers(&ring); + free(buffers); + io_uring_queue_exit(&ring); + +cleanup_files: + // Remove test files + for (int i = 0; i < num_files; i++) { + remove(filenames[i]); + } + + // Print summary + int total = results.passed + results.failed; + printf(COLOR_BLUE "\n========================================\n"); + printf(" TEST SUMMARY\n"); + printf("========================================\n" COLOR_RESET); + printf(" Total tests: %d\n", total); + printf(COLOR_GREEN " Passed: %d\n" COLOR_RESET, results.passed); + if (results.failed > 0) { + printf(COLOR_RED " Failed: %d\n" COLOR_RESET, results.failed); + printf(COLOR_RED "\n ✗ SOME TESTS FAILED!\n" COLOR_RESET); + } else { + printf(COLOR_GREEN "\n ✓ ALL TESTS PASSED!\n" COLOR_RESET); + } + + return results.failed > 0 ? 1 : 0; +} diff --git a/ioringapi.c b/ioringapi.c new file mode 100644 index 0000000..202905c --- /dev/null +++ b/ioringapi.c @@ -0,0 +1,285 @@ +#pragma once +#include +#include +#include + +// Forward declarations +typedef struct IORING_HANDLE_REF IORING_HANDLE_REF; +typedef struct IORING_BUFFER_REF IORING_BUFFER_REF; +typedef void *HIORING; + +/* --------------------- Types declaration --------------------- */ +typedef enum IORING_CREATE_ADVISORY_FLAGS { + IORING_CREATE_ADVISORY_FLAGS_NONE, + IORING_CREATE_SKIP_BUILDER_PARAM_CHECKS +} IORING_CREATE_ADVISORY_FLAGS; +// Specifies advisory flags for creating an I/O ring with a call to +// CreateIoRing. + +typedef enum IORING_CREATE_REQUIRED_FLAGS { + IORING_CREATE_REQUIRED_FLAGS_NONE +} IORING_CREATE_REQUIRED_FLAGS; +// Specifies required flags for creating an I/O ring with a call to +// CreateIoRing. + +typedef enum IORING_REF_KIND { + IORING_REF_RAW = 0, + IORING_REF_REGISTERED = 1, +} IORING_REF_KIND; +// Specifies the type of an IORING_HANDLE_REF structure. + +typedef enum IORING_SQE_FLAGS { + IOSQE_FLAGS_NONE, + IOSQE_FLAGS_DRAIN_PRECEDING_OPS +} IORING_SQE_FLAGS; +// Specifies kernel behavior options for I/O ring submission queue entries + +// IORING_REGISTERED_BUFFER structure +typedef struct IORING_REGISTERED_BUFFER { + UINT32 Index; + UINT32 Offset; +} IORING_REGISTERED_BUFFER; + +// IORING_HANDLE_REF +struct IORING_HANDLE_REF { + IORING_REF_KIND Kind; + union { + HANDLE Handle; + UINT32 Index; + } HandleUnion; +}; +// Represents a reference to a file handle used in an I/O ring operation + +// IORING_BUFFER_REF +struct IORING_BUFFER_REF { + IORING_REF_KIND Kind; + union { + void *Address; + IORING_REGISTERED_BUFFER IndexAndOffset; + } BufferUnion; +}; + +typedef struct IORING_BUFFER_INFO { + void *Address; + UINT32 Length; +} IORING_BUFFER_INFO; + +// IORING_BUFFER_REF represents a reference to a buffer used in an I/O ring +// operation + +// IORING_VERSION enumeration +typedef enum IORING_VERSION { + IORING_VERSION_INVALID = 0, + IORING_VERSION_1 = 1, + IORING_VERSION_2 = 2, + IORING_VERSION_3 = 3, + IORING_VERSION_4 = 4, +} IORING_VERSION; + +typedef enum IORING_FEATURE_FLAGS { + IORING_FEATURE_FLAGS_NONE = 0, + IORING_FEATURE_UM_EMULATION = 1 +} IORING_FEATURE_FLAGS; + +// IORING_CAPABILITIES structure +typedef struct IORING_CAPABILITIES { + IORING_VERSION MaxVersion; + UINT32 MaxSubmissionQueueSize; + UINT32 MaxCompletionQueueSize; + IORING_FEATURE_FLAGS FeatureFlags; +} IORING_CAPABILITIES; +// Represents the IORING API capabilities. + +// IORING_CQE structure +typedef struct IORING_CQE { + UINT_PTR UserData; + HRESULT ResultCode; + ULONG_PTR Information; +} IORING_CQE; +// Represents a completed I/O ring queue entry. + +// IORING_CREATE_FLAGS structure +typedef struct IORING_CREATE_FLAGS { + IORING_CREATE_REQUIRED_FLAGS Required; + IORING_CREATE_ADVISORY_FLAGS Advisory; +} IORING_CREATE_FLAGS; +// Specifies flags for creating an I/O ring with a call to CreateIoRing. + +// IORING_INFO structure +typedef struct IORING_INFO { + IORING_VERSION IoRingVersion; + IORING_CREATE_FLAGS Flags; + UINT32 SubmissionQueueSize; + UINT32 CompletionQueueSize; +} IORING_INFO; +// Represents the shape and version information for the specified I/O ring + +// IORING_OP_CODE for IsIoRingOpSupported +typedef enum IORING_OP_CODE { + IORING_OP_NOP = 0, + IORING_OP_READ = 1, + IORING_OP_WRITE = 2, + IORING_OP_FLUSH = 3, + IORING_OP_REGISTER_BUFFERS = 4, + IORING_OP_REGISTER_FILES = 5, + IORING_OP_CANCEL = 6, +} IORING_OP_CODE; + +/* --------------------- Dynamic loader --------------------- */ +// Function pointer types +typedef BOOL(WINAPI *IsIoRingOpSupported_t)(HIORING, IORING_OP_CODE); +typedef HRESULT(WINAPI *QueryIoRingCapabilities_t)(IORING_CAPABILITIES *); +typedef HRESULT(WINAPI *GetIoRingInfo_t)(HIORING, IORING_INFO *); +typedef HRESULT(WINAPI *CreateIoRing_t)(IORING_VERSION, IORING_CREATE_FLAGS, + UINT32, UINT32, HIORING *); +typedef HRESULT(WINAPI *CloseIoRing_t)(HIORING); +typedef HRESULT(WINAPI *SubmitIoRing_t)(HIORING, UINT32, UINT32, UINT32 *); +typedef HRESULT(WINAPI *PopIoRingCompletion_t)(HIORING, IORING_CQE *); +typedef HRESULT(WINAPI *SetIoRingCompletionEvent_t)(HIORING, HANDLE); +typedef HRESULT(WINAPI *BuildIoRingCancelRequest_t)(HIORING, IORING_HANDLE_REF, + UINT_PTR, UINT_PTR); +typedef HRESULT(WINAPI *BuildIoRingReadFile_t)(HIORING, IORING_HANDLE_REF, + IORING_BUFFER_REF, UINT32, + UINT64, UINT_PTR, + IORING_SQE_FLAGS); +typedef HRESULT(WINAPI *BuildIoRingRegisterBuffers_t)( + HIORING, UINT32, IORING_BUFFER_INFO const[], UINT_PTR); + +typedef HRESULT(WINAPI *BuildIoRingRegisterFileHandles_t)(HIORING, UINT32, + HANDLE const[], + UINT_PTR); + +// Core: +// Queries the support of the specified operation for the specified I/O ring +static IsIoRingOpSupported_t IsIoRingOpSupported = NULL; + +// Queries the OS for the supported capabilities for IORINGs +static QueryIoRingCapabilities_t QueryIoRingCapabilities = NULL; + +// Gets information about the API version and queue sizes of an I/O ring +static GetIoRingInfo_t GetIoRingInfo = NULL; + +// Creates a new instance of an I/O ring submission/completion queue pair and +// returns a handle for referencing the I/O ring +static CreateIoRing_t CreateIoRing = NULL; + +// Closes an HIORING handle that was previously opened with a call to +// CreateIoRing +static CloseIoRing_t CloseIoRing = NULL; + +// Submission / completion: +// Submits all constructed but not yet submitted entries to the kernel’s queue +// and optionally waits for a set of operations to complete +static SubmitIoRing_t SubmitIoRing = NULL; + +// Pops a single entry from the completion queue, if one is available +static PopIoRingCompletion_t PopIoRingCompletion = NULL; + +// Registers a completion queue event with an IORING +static SetIoRingCompletionEvent_t SetIoRingCompletionEvent = NULL; + +// Operations: +// Performs an asynchronous read from a file using an I/O ring +static BuildIoRingReadFile_t BuildIoRingReadFile = NULL; + +// Attempts to cancel a previously submitted I/O ring operation +static BuildIoRingCancelRequest_t BuildIoRingCancelRequest = NULL; + +// Registers an array of buffers with the system for future I/O ring operations +static BuildIoRingRegisterBuffers_t BuildIoRingRegisterBuffers = NULL; + +// Registers an array of file handles with the system for future I/O ring +// operations +static BuildIoRingRegisterFileHandles_t BuildIoRingRegisterFileHandles = NULL; + +static int io_ring_loaded = 0; + +static int io_ring_load_functions(void) { + if (io_ring_loaded) + return 1; + + HMODULE hKernel = GetModuleHandleW(L"kernel32.dll"); + if (!hKernel) + return 0; + + IsIoRingOpSupported = + (IsIoRingOpSupported_t)GetProcAddress(hKernel, "IsIoRingOpSupported"); + QueryIoRingCapabilities = (QueryIoRingCapabilities_t)GetProcAddress( + hKernel, "QueryIoRingCapabilities"); + GetIoRingInfo = (GetIoRingInfo_t)GetProcAddress(hKernel, "GetIoRingInfo"); + CreateIoRing = (CreateIoRing_t)GetProcAddress(hKernel, "CreateIoRing"); + CloseIoRing = (CloseIoRing_t)GetProcAddress(hKernel, "CloseIoRing"); + SubmitIoRing = (SubmitIoRing_t)GetProcAddress(hKernel, "SubmitIoRing"); + PopIoRingCompletion = + (PopIoRingCompletion_t)GetProcAddress(hKernel, "PopIoRingCompletion"); + SetIoRingCompletionEvent = (SetIoRingCompletionEvent_t)GetProcAddress( + hKernel, "SetIoRingCompletionEvent"); + BuildIoRingReadFile = + (BuildIoRingReadFile_t)GetProcAddress(hKernel, "BuildIoRingReadFile"); + BuildIoRingCancelRequest = (BuildIoRingCancelRequest_t)GetProcAddress( + hKernel, "BuildIoRingCancelRequest"); + BuildIoRingRegisterBuffers = (BuildIoRingRegisterBuffers_t)GetProcAddress( + hKernel, "BuildIoRingRegisterBuffers"); + BuildIoRingRegisterFileHandles = + (BuildIoRingRegisterFileHandles_t)GetProcAddress( + hKernel, "BuildIoRingRegisterFileHandles"); + + io_ring_loaded = + (IsIoRingOpSupported && QueryIoRingCapabilities && CreateIoRing && + CloseIoRing && SubmitIoRing && PopIoRingCompletion && + SetIoRingCompletionEvent && BuildIoRingReadFile && + BuildIoRingCancelRequest && BuildIoRingRegisterBuffers && + BuildIoRingRegisterFileHandles); + + if (io_ring_loaded) + printf("[I/O Ring] Functions loaded\n"); + else + printf("[I/O Ring] Some functions not available\n"); + + return io_ring_loaded; +} + +/* ------------- Standard helper functions definition ------------- */ +// Creates an instance of the IORING_BUFFER_REF structure with the provided +// buffer index and offset +static inline IORING_BUFFER_REF +IoRingBufferRefFromIndexAndOffset(UINT32 index, UINT32 offset) { + IORING_BUFFER_REF ref; + ref.Kind = IORING_REF_REGISTERED; + ref.BufferUnion.IndexAndOffset.Index = index; + ref.BufferUnion.IndexAndOffset.Offset = offset; + return ref; +} + +// Creates an instance of the IORING_BUFFER_REF structure from the provided +// pointer +static IORING_BUFFER_REF IoRingBufferRefFromPointer(void *addr) { + IORING_BUFFER_REF ref; + ref.Kind = IORING_REF_RAW; + ref.BufferUnion.Address = addr; + return ref; +} + +// Creates an instance of the IORING_HANDLE_REF structure from the provided file +// handle +static IORING_HANDLE_REF IoRingHandleRefFromHandle(HANDLE h) { + IORING_HANDLE_REF ref; + ref.Kind = IORING_REF_RAW; + ref.HandleUnion.Handle = h; + return ref; +} + +// Creates an instance of the IORING_HANDLE_REF structure from the provided +// index +static inline IORING_HANDLE_REF IoRingHandleRefFromIndex(UINT32 index) { + IORING_HANDLE_REF ref; + ref.Kind = IORING_REF_REGISTERED; // MUST be registered + ref.HandleUnion.Index = index; + return ref; +} + +// NOTE: If you are using index-based buffers or handles, make sure you have +// successfully called BuildIoRingRegisterBuffers or +// BuildIoRingRegisterFileHandles first so the kernel has a valid table to look +// into, otherwise the kernel will treat the index as an invalid memory +// address/handle. diff --git a/platform.c b/platform.c index 669402c..6bea75c 100644 --- a/platform.c +++ b/platform.c @@ -1,32 +1,28 @@ #pragma once // ensure that a given header file is included only once in a // single compilation unit -#define _CRT_SECURE_NO_WARNINGS - #include "arena.h" #include "base.h" #include "lf_mpmc.h" #include "arena.c" +#include // xxhash include -#define XXH_INLINE_ALL +#define XXH_STATIC_LINKING_ONLY #include "xxh_x86dispatch.h" #include -// ----------------------------- Config ------------------------------------- -#define FILE_HASHES_TXT "file_hashes.txt" -#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null -#define MAX_PATHLEN 4096 -#define READ_BLOCK (KiB(64)) - +#include "config.h" // ----------------------------- Globals ------------------------------------ static atomic_uint_fast64_t g_files_found = 0; static atomic_uint_fast64_t g_files_hashed = 0; static atomic_uint_fast64_t g_bytes_processed = 0; static atomic_int g_scan_done = 0; +#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null terminator +#define MAX_PATHLEN KiB(4) // ================== OS-agnostic functions abstraction ===================== -// ----------------------------- Timer functions -------------- +// --------------------- Timer functions --------------------- typedef struct { u64 start; u64 now; @@ -73,7 +69,7 @@ double timer_elapsed(HiResTimer *t) { #endif -// ----------------------------- Get HW info -------------- +// ------------------- Get HW info -------------------- #if defined(_WIN32) || defined(_WIN64) size_t platform_physical_cores(void) { @@ -122,12 +118,14 @@ const char *get_xxhash_instruction_set(void) { #if defined(_WIN32) || defined(_WIN64) typedef HANDLE FileHandle; +#define FLAG_SEQUENTIAL_READ FILE_FLAG_SEQUENTIAL_SCAN +#define FLAG_ASYNC_DIRECT_READ (FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING) #define INVALID_FILE_HANDLE INVALID_HANDLE_VALUE // File open function -static FileHandle os_file_open(const char *path) { +static FileHandle os_file_open(const char *path, DWORD flags) { return CreateFileA(path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, - NULL, OPEN_EXISTING, FILE_FLAG_SEQUENTIAL_SCAN, NULL); + NULL, OPEN_EXISTING, flags, NULL); } // File read function @@ -144,11 +142,21 @@ static void os_file_close(FileHandle handle) { CloseHandle(handle); } #elif defined(__linux__) typedef int FileHandle; +#define FLAG_SEQUENTIAL_READ (0) +#define FLAG_ASYNC_DIRECT_READ (O_DIRECT) #define INVALID_FILE_HANDLE (-1) // File open function -static FileHandle os_file_open(const char *path) { - return open(path, O_RDONLY | O_NOFOLLOW); +static FileHandle os_file_open(const char *path, int flags) { + // Combine your mandatory flags with the user-provided flag + int fd = open(path, O_RDONLY | O_NOFOLLOW | flags); + + // If sequential was requested, advise the kernel + if (fd != -1 && (flags == FLAG_SEQUENTIAL_READ)) { + posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); + } + + return fd; } // File read function @@ -171,13 +179,13 @@ static void os_file_close(FileHandle handle) { close(handle); } // -------------------- Thread abstraction ------------------- // Threads context typedef struct { - u8 num_threads; - mem_arena *path_arena; mem_arena *meta_arena; MPMCQueue *dir_queue; MPMCQueue *file_queue; + + u8 num_threads; } ScannerContext; typedef struct { @@ -357,7 +365,8 @@ static int parse_paths(char *line, char folders[][MAX_PATHLEN], return count; } -// ----------------------------- File time ------------------------- +// ------------------------- File time ------------------------- +#if FILE_TIMES #if defined(_WIN32) || defined(_WIN64) static void format_time(uint64_t t, char *out, size_t out_sz) { if (t == 0) { @@ -372,7 +381,7 @@ static void format_time(uint64_t t, char *out, size_t out_sz) { strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); } -// ----------------------------- Convert filetime to epoch -------------- +// ------------------ Convert filetime to epoch ------------------- static uint64_t filetime_to_epoch(const FILETIME *ft) { ULARGE_INTEGER ull; ull.LowPart = ft->dwLowDateTime; @@ -421,11 +430,24 @@ void platform_get_file_times(const char *path, uint64_t *out_created, } } +static int platform_get_file_times_fd(int dir_fd, const char *name, + uint64_t *created, uint64_t *modified) { + struct stat st; + if (fstatat(dir_fd, name, &st, 0) == 0) { + *created = st.st_ctime; // or st.st_birthtime on systems that support it + *modified = st.st_mtime; + return 0; + } + return -1; +} +#endif #endif -// ----------------------------- File owner --------------------- +// -------------------- File owner --------------------- #if defined(_WIN32) || defined(_WIN64) -static void get_file_owner(const char *path, char *out, size_t out_sz) { +#if FILE_OWNER +void platform_get_file_owner(const char *path, char *out_owner, + size_t out_owner_size) { PSID sid = NULL; PSECURITY_DESCRIPTOR sd = NULL; @@ -439,43 +461,159 @@ static void get_file_owner(const char *path, char *out, size_t out_sz) { if (LookupAccountSidA(NULL, sid, name, &name_len, domain, &domain_len, &use)) { - snprintf(out, out_sz, "%s\\%s", domain, name); + snprintf(out_owner, out_owner_size, "%s\\%s", domain, name); } else { - snprintf(out, out_sz, "UNKNOWN"); + snprintf(out_owner, out_owner_size, "UNKNOWN"); } } else { - snprintf(out, out_sz, "UNKNOWN"); + snprintf(out_owner, out_owner_size, "UNKNOWN"); } if (sd) LocalFree(sd); } - -void platform_get_file_owner(const char *path, char *out_owner, - size_t out_owner_size) { - get_file_owner(path, out_owner, out_owner_size); -} +#endif #elif defined(__linux__) -static void get_file_owner(uid_t uid, char *out, size_t out_sz) { - struct passwd *pw = getpwuid(uid); - if (pw) { - snprintf(out, out_sz, "%s", pw->pw_name); - } else { - snprintf(out, out_sz, "UNKNOWN"); - } -} - +#if FILE_OWNER void platform_get_file_owner(const char *path, char *out_owner, size_t out_owner_size) { struct stat st; + const char *owner = "UNKNOWN"; + if (stat(path, &st) == 0) { - get_file_owner(st.st_uid, out_owner, out_owner_size); - } else { - snprintf(out_owner, out_owner_size, "UNKNOWN"); + struct passwd *pw = getpwuid(st.st_uid); + if (pw) { + owner = pw->pw_name; + } + } + + snprintf(out_owner, out_owner_size, "%s", owner); +} + +static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, + size_t owner_size) { + struct stat st; + if (fstatat(dir_fd, name, &st, 0) == 0) { + struct passwd pw; + struct passwd *result; + char buffer[4096]; // Sufficiently large buffer for passwd data + + // Reentrant version (thread-safe) + if (getpwuid_r(st.st_uid, &pw, buffer, sizeof(buffer), &result) == 0 && + result != NULL && result->pw_name != NULL) { + strncpy(owner, result->pw_name, owner_size - 1); + owner[owner_size - 1] = '\0'; + } else { + // Fallback to uid + snprintf(owner, owner_size, "uid:%d", st.st_uid); + } + return 0; + } + return -1; +} +#endif + +// ----------------------------- File system ----------------------------- + +#if CHECK_FILE_SYSTEM +typedef enum FileSystemType { + FS_UNKNOWN = 0, + FS_EXT4, + FS_XFS, + FS_BTRFS, + FS_TMPFS, + FS_NFS, + FS_CIFS, + FS_FAT, + FS_EXFAT, + FS_NTFS, + FS_ZFS, + FS_F2FS, + FS_EROFS, + FS_VIRTIOFS, + FS_OVERLAY, + FS_HUGETLBFS, + FS_SQUASHFS, + FS_PROC, + FS_SYSFS, +} FileSystemType; + +static inline FileSystemType fs_from_magic(long type) { + switch (type) { + case 0xEF53: + return FS_EXT4; + case 0x58465342: + return FS_XFS; + case 0x9123683E: + return FS_BTRFS; + case 0x01021994: + return FS_TMPFS; + case 0x6969: + return FS_NFS; + case 0xFF534D42: + return FS_CIFS; + case 0x4d44: + return FS_FAT; + case 0x2011BAB0: + return FS_EXFAT; + case 0x5346544E: + return FS_NTFS; + case 0x2FC1211: + return FS_ZFS; + case 0xF2F52010: + return FS_F2FS; + case 0xE0F5E1E2: + return FS_EROFS; + case 0x56495254: + return FS_VIRTIOFS; + case 0x794C764F: + return FS_OVERLAY; + case 0x958458f6: + return FS_HUGETLBFS; + case 0x73717368: + return FS_SQUASHFS; + case 0x9fa0: + return FS_PROC; + case 0x62656572: + return FS_SYSFS; + + default: + return FS_UNKNOWN; } } +// Yes it is officially called "magic number" or "signature" in the +// documentation + +typedef enum { + FS_POLICY_BUFFERED, + FS_POLICY_DIRECT_OK, +} FsPolicy; + +static inline FsPolicy fs_policy(FileSystemType fs) { + switch (fs) { + case FS_EXT4: + case FS_XFS: + case FS_BTRFS: + case FS_ZFS: + case FS_F2FS: + case FS_NFS: + case FS_CIFS: + case FS_VIRTIOFS: + return FS_POLICY_DIRECT_OK; + + case FS_TMPFS: // Resides in Page Cache; O_DIRECT generally returns EINVAL + case FS_EROFS: // Read-only filesystem; O_DIRECT support is + // uncommon/restricted + case FS_FAT: // Generally does not implement direct_IO address space ops + case FS_EXFAT: // Limited support depending on driver implementation + case FS_NTFS: // Depends on driver (ntfs3 supports it, older ntfs-3g does not) + default: + return FS_POLICY_BUFFERED; + } +} +#endif #endif // ----------------------------- Scan helpers ----------------------------- @@ -483,9 +621,17 @@ typedef struct FileEntry { char *path; uint64_t size_bytes; +#if FILE_TIMES uint64_t created_time; // epoch uint64_t modified_time; // epoch seconds - char owner[128]; // resolved owner name +#endif +#if FILE_OWNER + char owner[128]; // resolved owner name +#endif + +#if CHECK_FILE_SYSTEM // Linux only + FileSystemType fs_type; +#endif } FileEntry; typedef struct { @@ -555,10 +701,12 @@ void scan_folder(const char *base, ScannerContext *ctx) { size_t name_len = strlen(fd.cFileName); path_builder_set_filename(&pb, fd.cFileName, name_len); + // If it's a directory: if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { char *dir = path_builder_dup_arena(&pb, ctx->path_arena, false); mpmc_push_work(ctx->dir_queue, dir); } else { + // else a file: atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); @@ -572,8 +720,14 @@ void scan_folder(const char *base, ScannerContext *ctx) { fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); +#if FILE_TIMES platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time); +#endif + +#if FILE_OWNER platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); +#endif + fe->size_bytes = ((uint64_t)fd.nFileSizeHigh << 32) | fd.nFileSizeLow; mpmc_push(ctx->file_queue, fe); @@ -626,6 +780,14 @@ void scan_folder(const char *base, ScannerContext *ctx) { if (dir_fd == -1) return; +#if CHECK_FILE_SYSTEM + struct statfs fs; + FileSystemType fs_type = FS_UNKNOWN; + if (fstatfs(dir_fd, &fs) == 0) { + fs_type = fs_from_magic(fs.f_type); + } +#endif + DIR *dir = fdopendir(dir_fd); if (!dir) { close(dir_fd); @@ -644,6 +806,7 @@ void scan_folder(const char *base, ScannerContext *ctx) { path_builder_set_filename(&pb, entry->d_name, name_len); int file_type = DT_UNKNOWN; + #ifdef _DIRENT_HAVE_D_TYPE file_type = entry->d_type; #endif @@ -661,18 +824,21 @@ void scan_folder(const char *base, ScannerContext *ctx) { if (file_type == DT_REG) { atomic_fetch_add(&g_files_found, 1); - FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), - true); + FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); // Use fstatat for file info struct stat st; if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) { - // Convert times using fd variant - platform_get_file_times_fd(dir_fd, entry->d_name, - &fe->created_time, +#if FILE_TIMES + platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time, &fe->modified_time); +#endif + +#if FILE_OWNER platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner, sizeof(fe->owner)); +#endif + fe->size_bytes = (uint64_t)st.st_size; // Normalize path @@ -681,8 +847,12 @@ void scan_folder(const char *base, ScannerContext *ctx) { (pb.filename_pos - pb.buffer) + name_len + 1); normalize_path(temp_path); - fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, - false); strcpy(fe->path, temp_path); + fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); + strcpy(fe->path, temp_path); + +#if CHECK_FILE_SYSTEM + fe->fs_type = fs_type; +#endif mpmc_push(ctx->file_queue, fe); } @@ -701,12 +871,17 @@ void scan_folder(const char *base, ScannerContext *ctx) { mpmc_push_work(ctx->dir_queue, dir_path); } else if (S_ISREG(st.st_mode)) { atomic_fetch_add(&g_files_found, 1); - FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), - true); + FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); +#if FILE_TIMES platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time); +#endif + +#if FILE_OWNER platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); +#endif + fe->size_bytes = (uint64_t)st.st_size; char temp_path[MAX_PATHLEN]; @@ -714,73 +889,21 @@ void scan_folder(const char *base, ScannerContext *ctx) { (pb.filename_pos - pb.buffer) + name_len + 1); normalize_path(temp_path); - fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, - false); strcpy(fe->path, temp_path); + fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); + strcpy(fe->path, temp_path); + +#if CHECK_FILE_SYSTEM + fe->fs_type = fs_type; +#endif mpmc_push(ctx->file_queue, fe); } } } - closedir(dir); // Closes dir_fd automatically + closedir(dir); } -// Choice 2 - -// void scan_folder(const char *base, ScannerContext *ctx) { -// PathBuilder pb; -// path_builder_init(&pb, base); -// -// DIR *dir = opendir(base); -// if (!dir) -// return; -// -// struct dirent *entry; -// struct stat st; -// -// while ((entry = readdir(dir)) != NULL) { -// if (entry->d_name[0] == '.' && -// (entry->d_name[1] == 0 || -// (entry->d_name[1] == '.' && entry->d_name[2] == 0))) -// continue; -// -// size_t name_len = strlen(entry->d_name); -// path_builder_set_filename(&pb, entry->d_name, name_len); -// -// if (lstat(pb.buffer, &st) == 0 && S_ISLNK(st.st_mode)) -// continue; -// -// if (stat(pb.buffer, &st) == 0) { -// if (S_ISDIR(st.st_mode)) { -// char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); -// mpmc_push_work(ctx->dir_queue, dir_path); -// } else { -// atomic_fetch_add(&g_files_found, 1); -// -// FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); -// -// // Create a temporary copy for normalization -// char temp_path[MAX_PATHLEN]; -// memcpy(temp_path, pb.buffer, -// (pb.filename_pos - pb.buffer) + name_len + 1); -// normalize_path(temp_path); -// -// fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); -// strcpy(fe->path, temp_path); -// -// platform_get_file_times(pb.buffer, &fe->created_time, -// &fe->modified_time); -// platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); -// fe->size_bytes = (uint64_t)st.st_size; -// -// mpmc_push(ctx->file_queue, fe); -// } -// } -// } -// -// closedir(dir); -// } - #endif // ------------------------- Scan worker -------------------------------- @@ -807,7 +930,7 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, XXH3_state_t state; XXH3_128bits_reset(&state); - FileHandle handle = os_file_open(path); + FileHandle handle = os_file_open(path, FLAG_SEQUENTIAL_READ); if (handle == INVALID_FILE_HANDLE) { strcpy(out_hex, "ERROR"); return; @@ -830,7 +953,7 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, // ------------------------- Hash worker -------------------------------- static THREAD_RETURN hash_worker(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; - unsigned char *buf = (unsigned char *)malloc(READ_BLOCK); + void *buf = malloc(READ_BLOCK); for (;;) { FileEntry *fe = mpmc_pop(ctx->file_queue); @@ -840,17 +963,29 @@ static THREAD_RETURN hash_worker(void *arg) { char hash[HASH_STRLEN]; xxh3_hash_file_stream(fe->path, hash, buf); + double size_kib = (double)fe->size_bytes / 1024.0; + char stack_buf[KiB(4)]; + int len; + +#if FILE_TIMES char created[32], modified[32]; format_time(fe->created_time, created, sizeof(created)); format_time(fe->modified_time, modified, sizeof(modified)); +#endif - double size_kib = (double)fe->size_bytes / 1024.0; - - char stack_buf[1024]; - - int len = - snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", - hash, fe->path, size_kib, created, modified, fe->owner); +#if FILE_TIMES && FILE_OWNER + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", + hash, fe->path, size_kib, created, modified, fe->owner); +#elif FILE_TIMES + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\n", hash, + fe->path, size_kib, created, modified); +#elif FILE_OWNER + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\n", hash, + fe->path, size_kib, fe->owner); +#else + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\n", hash, + fe->path, size_kib); +#endif char *dst = arena_push(&ctx->arena, len, false); memcpy(dst, stack_buf, len); @@ -863,19 +998,21 @@ static THREAD_RETURN hash_worker(void *arg) { return THREAD_RETURN_VALUE; } -// ----------------------------- Progress display --------------------------- +// ------------------------- Progress display --------------------------- static THREAD_RETURN progress_thread(void *arg) { - (void)arg; // Unused parameter + (void)arg; HiResTimer progress_timer; timer_start(&progress_timer); - uint64_t last_bytes = atomic_load(&g_bytes_processed); + uint64_t last_bytes = 0; double last_time = 0.0; - double displayed_speed = 0.0; const double sample_interval = 0.5; + // Hide cursor to prevent flickering + printf("\033[?25l"); + for (;;) { uint64_t found = atomic_load(&g_files_found); uint64_t hashed = atomic_load(&g_files_hashed); @@ -883,27 +1020,20 @@ static THREAD_RETURN progress_thread(void *arg) { int scan_done = atomic_load(&g_scan_done); double t = timer_elapsed(&progress_timer); - - if (last_time == 0.0) { - last_time = t; - last_bytes = bytes; - } - double dt = t - last_time; if (dt >= sample_interval) { - uint64_t db = bytes - last_bytes; - - if (db > 0 && dt > 0.0001) { - displayed_speed = (double)db / (1024.0 * 1024.0) / dt; - } - + uint64_t db = (bytes > last_bytes) ? bytes - last_bytes : 0; + displayed_speed = (double)db / (1024.0 * 1024.0) / dt; last_bytes = bytes; last_time = t; } + printf("\r"); + if (!scan_done) { - printf("\rScanning: %llu files | Hashed: %llu | %.2f MB/s ", + printf("\033[1mScanning:\033[0m %llu files | Hashed: %llu | \033[32m%.2f " + "MB/s\033[0m ", (unsigned long long)found, (unsigned long long)hashed, displayed_speed); } else { @@ -911,18 +1041,17 @@ static THREAD_RETURN progress_thread(void *arg) { int barw = 40; int filled = (int)(pct * barw); - char bar[64]; - int p = 0; - - bar[p++] = '['; + printf("["); + // Print filled part in Green (\033[32m) + printf("\033[32m"); for (int i = 0; i < filled; i++) - bar[p++] = '#'; + putchar('#'); + // Reset color for empty part + printf("\033[0m"); for (int i = filled; i < barw; i++) - bar[p++] = '.'; - bar[p++] = ']'; - bar[p] = 0; + putchar('.'); - printf("\r%s %6.2f%% (%llu / %llu) %.2f MB/s ", bar, pct * 100.0, + printf("] %6.2f%% (%llu/%llu) \033[32m%.2f MB/s\033[0m ", pct * 100.0, (unsigned long long)hashed, (unsigned long long)found, displayed_speed); } @@ -931,11 +1060,1106 @@ static THREAD_RETURN progress_thread(void *arg) { if (scan_done && hashed == found) break; - sleep_ms(100); } - printf("\n"); + // Restore cursor (\033[?25h) and move to next line + printf("\033[?25h\n"); return THREAD_RETURN_VALUE; } + +// ======================== IO Ring implementation ======================== +#if USE_IORING +// -------------------------- Data structures --------------------------- + +// Globals +u64 g_ioring_buffer_size = 4096 * 64; +static atomic_uint_fast64_t g_io_ring_fallbacks = 0; + +#define IO_PENDING INT_MIN + +typedef struct IoBuffer IoBuffer; + +#if defined(_WIN32) || defined(_WIN64) +// Windows I/O Ring types +typedef HIORING IoRingHandle; +#define BUILD_READ_RETURN_VALUE HRESULT + +#elif defined(__linux__) +// Linux io_uring types +typedef struct { + struct io_uring ring; + struct io_uring_cqe *cqe_cache; + int cqe_cache_index; + int cqe_cache_count; +} IoUring; + +typedef IoUring *IoRingHandle; +typedef struct iovec IORING_BUFFER_INFO; +#define BUILD_READ_RETURN_VALUE int + +#endif + +typedef struct FileReadContext { + FileEntry *fe; + size_t file_size; + + // For in-order hashing + size_t next_read_offset; + + IoBuffer *head; + IoBuffer *tail; + + // Completion tracking + size_t bytes_hashed; + uint32_t reads_hashed; + + uint32_t reads_submitted; + uint32_t reads_completed; + + uint32_t active_reads; + + union { + XXH3_state_t hash_state; // For incremental hash (large files) + XXH128_hash_t single_hash; // For single-shot hash (small files) + }; + + FileHandle file_handle; + +#if USE_REGISTERED_FILES + uint32_t slot_id; +#endif + + bool use_incremental_hash; + + bool completed; + +} FileReadContext; + +// -------------------------- Buffer structure --------------------------- +typedef struct IoBuffer { + FileReadContext *file; + void *data; + size_t size; + size_t offset; + size_t bytes_read; + + BUILD_READ_RETURN_VALUE result; + + int buffer_id; + struct IoBuffer *next; +} IoBuffer; + +// Thread-local I/O Ring context +#if defined(_WIN32) || defined(_WIN64) + +typedef struct ThreadIoContext { + IoRingHandle ring; + void *completion_event; + + void *fallback_buffer; + IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; + int buffer_pool[NUM_BUFFERS_PER_THREAD]; + int free_count; + int num_submissions; + int active_files; + bool submitting; + +#if USE_REGISTERED_FILES + bool use_registered_files; + FileHandle registered_handles[MAX_ACTIVE_FILES]; +#endif + +} ThreadIoContext; + +#elif defined(__linux__) +typedef struct ThreadIoContext { + IoRingHandle ring; + + void *fallback_buffer; + IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; + int buffer_pool[NUM_BUFFERS_PER_THREAD]; + int free_count; + int num_submissions; + int active_files; + bool submitting; + + bool use_registered_buffers; + bool use_registered_files; + +} ThreadIoContext; + +#endif +typedef struct { + uint32_t MaxSubmissionQueueSize; + uint32_t MaxCompletionQueueSize; + uint32_t MaxVersion; +} IoRingCapabilities; + +typedef struct { + BUILD_READ_RETURN_VALUE ResultCode; + uint32_t Information; + uintptr_t UserData; +} IoRingCQE; + +// ------------------------ IO Ring Abstraction ------------------------- +#if defined(_WIN32) || defined(_WIN64) + +// Windows I/O Ring functions +static void ioring_query_capabilities(IoRingCapabilities *caps) { + IORING_CAPABILITIES win_caps; + QueryIoRingCapabilities(&win_caps); + caps->MaxSubmissionQueueSize = win_caps.MaxSubmissionQueueSize; + caps->MaxCompletionQueueSize = win_caps.MaxCompletionQueueSize; + caps->MaxVersion = win_caps.MaxVersion; +} + +static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { + IORING_CREATE_FLAGS flags = {0}; + HRESULT hr = CreateIoRing(IORING_VERSION_3, flags, queue_size, queue_size * 2, + &thread_ctx->ring); + + // Create completion event + thread_ctx->completion_event = CreateEvent(NULL, FALSE, FALSE, NULL); + if (thread_ctx->completion_event) + SetIoRingCompletionEvent(thread_ctx->ring, thread_ctx->completion_event); + return SUCCEEDED(hr) ? 0 : -1; +} + +static int close_ioring(ThreadIoContext *thread_ctx) { + + if (thread_ctx->completion_event) + CloseHandle(thread_ctx->completion_event); + CloseIoRing(thread_ctx->ring); + return 0; +} + +#define USERDATA_REGISTER 1 + +#define MAKE_BUF_INFO(a, l) \ + (IORING_BUFFER_INFO) { .Address = (a), .Length = (uint32_t)(l) } + +static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { + HRESULT hr = SubmitIoRing(thread_ctx->ring, 0, timeout_ms, submitted); + // HRESULT hr = SubmitIoRing(ring, wait_count, timeout_ms, submitted); + + // The wait_count in windows is not implemented yet, so we wait with a + // completion event for a single completion + if (thread_ctx->num_submissions > 0) { + WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS); + } + + return SUCCEEDED(hr) ? 0 : -1; +} + +static void ioring_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { + + HRESULT hr = BuildIoRingRegisterBuffers( + thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); + if (FAILED(hr)) { + char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + error_msg, sizeof(error_msg), NULL); + + fprintf(stderr, "WARNING: Error registering buffers: %s (0x%08X)\n", + error_msg, (unsigned int)hr); + } + // Submit registration + ioring_submit(thread_ctx, 0, 0, NULL); +} + +#if USE_REGISTERED_FILES +static void ioring_register_files(ThreadIoContext *thread_ctx) { + + HRESULT hr = BuildIoRingRegisterFileHandles( + thread_ctx->ring, MAX_ACTIVE_FILES, thread_ctx->registered_handles, + USERDATA_REGISTER); + + if (FAILED(hr)) { + char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + error_msg, sizeof(error_msg), NULL); + + fprintf(stderr, "WARNING: File registration failed: %s (0x%08X)\n", + error_msg, (unsigned int)hr); + } + + thread_ctx->use_registered_files = (hr == 0); +} + +static void ioring_register_files_update(ThreadIoContext *thread_ctx, + FileReadContext *file) { + thread_ctx->registered_handles[file->slot_id] = file->file_handle; + ioring_register_files(thread_ctx); +} +#endif + +static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx, + FileReadContext *file_ctx, + uint32_t buffer_id, + size_t size, uint64_t offset, + uintptr_t user_data) { + +#if USE_REGISTERED_FILES + IORING_HANDLE_REF file_ref; + + if (thread_ctx->use_registered_files) { + file_ref = (IORING_HANDLE_REF)IoRingHandleRefFromIndex(file_ctx->slot_id); + } else { + file_ref = + (IORING_HANDLE_REF)IoRingHandleRefFromHandle(file_ctx->file_handle); + } +#else + IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->file_handle); +#endif + + IORING_BUFFER_REF buffer_ref = + IoRingBufferRefFromIndexAndOffset(buffer_id, 0); + + HRESULT hr = + BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref, + (uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE); + if (FAILED(hr)) { + char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + error_msg, sizeof(error_msg), NULL); + + fprintf(stderr, + "ERROR: Building read error for file '%s' - Error: %s (Code: " + "0x%08X)\n", + file_ctx->fe->path, error_msg, (unsigned int)hr); + } + return hr; +} + +static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { + IORING_CQE win_cqe; + + while (1) { + HRESULT hr = PopIoRingCompletion(ring, &win_cqe); + + if (hr == S_FALSE) + // No CQE available + return 0; + + if (FAILED(hr)) + return -1; + + // Unlike linux, The Windows implementation treats buffer and file + // registration as an asynchronous operation that we submit to the ring, + // similar to a read or write. Those operations produce CQEs (completion + // queue entries) that we filter here using + // cqe.UserData == USERDATA_REGISTER + if (win_cqe.UserData == USERDATA_REGISTER) + continue; + + cqe->ResultCode = win_cqe.ResultCode; + cqe->Information = win_cqe.Information; + cqe->UserData = win_cqe.UserData; + + // Check for error and print warning + if (FAILED(win_cqe.ResultCode)) { + char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, win_cqe.ResultCode, 0, error_msg, sizeof(error_msg), + NULL); + + // Try to get the file path from the buffer + IoBuffer *buf = (IoBuffer *)win_cqe.UserData; + const char *file_path = "unknown"; + if (buf && buf->file && buf->file->fe) { + file_path = buf->file->fe->path; + } + + fprintf(stderr, + "WARNING: I/O completion error for file '%s' - Error: %s (Code: " + "0x%lx)\n", + file_path, error_msg, win_cqe.ResultCode); + } + + return 1; + } +} + +FileHandle ioring_open_file(FileEntry *fe) { + + FileHandle handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); + if (handle == INVALID_FILE_HANDLE) { + return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); + } + return handle; +} + +#elif defined(__linux__) +// Linux io_uring functions implementation +static void ioring_query_capabilities(IoRingCapabilities *caps) { + // Get system limits for io_uring + long max_entries = sysconf(_SC_IOV_MAX); + if (max_entries <= 0) + max_entries = 4096; + + caps->MaxSubmissionQueueSize = + (uint32_t)(max_entries < 4096 ? max_entries : 4096); + caps->MaxCompletionQueueSize = caps->MaxSubmissionQueueSize * 2; + caps->MaxVersion = 1; +} + +// static int async_io_create_ring(uint32_t queue_size, AsyncIoRing *ring) { +static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { + IoUring *ring_impl = (IoUring *)calloc(1, sizeof(IoUring)); + if (!ring_impl) + return -1; + + // Initialize io_uring + struct io_uring_params params = {0}; + + params.flags = + IORING_SETUP_CQSIZE | + IORING_SETUP_SINGLE_ISSUER | // Thread local io_uring + IORING_SETUP_DEFER_TASKRUN; // Do not send interupts when a CQE is ready, + // send them when a wait function is called, + // and groupe them according to the nbre or + // entries to wait (reduces syscall overhead) + + params.cq_entries = queue_size * 2; + + int ret = io_uring_queue_init_params(queue_size, &ring_impl->ring, ¶ms); + if (ret < 0) { + // Fallback to without params + printf("WARNING: Creating io_uring with default params\n"); + ret = io_uring_queue_init(queue_size, &ring_impl->ring, 0); + if (ret < 0) { + free(ring_impl); + return -1; + } + } + + ring_impl->cqe_cache = NULL; + ring_impl->cqe_cache_index = 0; + ring_impl->cqe_cache_count = 0; + + thread_ctx->ring = ring_impl; + return 0; +} + +static int close_ioring(ThreadIoContext *thread_ctx) { + IoUring *rimg_impl = (IoUring *)thread_ctx->ring; + if (!rimg_impl) + return -1; + + if (thread_ctx->use_registered_buffers) { + io_uring_unregister_buffers(&rimg_impl->ring); + } + io_uring_queue_exit(&rimg_impl->ring); + free(rimg_impl); + + return 0; +} + +#define MAKE_BUF_INFO(a, l) \ + (IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) } + +static void ioring_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { + + int ret = io_uring_register_buffers(&((IoUring *)thread_ctx->ring)->ring, + buf_info, num_buffers); + + if (ret < 0) { + if (ret == -ENOMEM) { + struct rlimit limit; + getrlimit(RLIMIT_MEMLOCK, &limit); + + fprintf(stderr, + "WARNING: Buffer registration failed due to memlock limits " + "(ENOMEM).\n" + "See README for more informations.\n"); + + } else { + // For any other error (e.g., EFAULT, EBUSY, EINVAL) + fprintf(stderr, "WARNING: Error registering buffers: %s (code: %d)\n", + strerror(-ret), ret); + } + + fprintf(stderr, "Falling back to unregistered buffers (performance may " + "be reduced).\n"); + } + + thread_ctx->use_registered_buffers = (ret == 0); +} + +#if USE_REGISTERED_FILES +static void ioring_register_files(ThreadIoContext *thread_ctx) { + + int ret = io_uring_register_files_sparse(&((IoUring *)thread_ctx->ring)->ring, + MAX_ACTIVE_FILES); + if (ret < 0) { + fprintf(stderr, + "WARNING: File registeration failed, fallback to unregistered " + "files - Error: %s (code: %d)\n", + strerror(-ret), ret); + } + + thread_ctx->use_registered_files = (ret == 0); +} + +static void ioring_register_files_update(ThreadIoContext *thread_ctx, + FileReadContext *file) { + + // Update the kernel's file table at the specific slot + int ret = io_uring_register_files_update( + &((IoUring *)thread_ctx->ring)->ring, + file->slot_id, // offset - which slot to update + &file->file_handle, // pointer to the fd + 1 // number of files to update + ); + + if (ret < 0) { + fprintf(stderr, + "WARNING: File registration update failed for slot %u updating " + "file '%s' - Error: %s " + "(code: %d)\n" + "Fallback to unregistered files\n", + file->slot_id, file->fe->path, strerror(-ret), ret); + + thread_ctx->use_registered_files = false; + } +} +#endif + +static int ioring_build_read(ThreadIoContext *thread_ctx, + FileReadContext *file_ctx, uint32_t buffer_id, + size_t size, uint64_t offset, + uintptr_t user_data) { + + struct io_uring_sqe *sqe = + io_uring_get_sqe(&((IoUring *)thread_ctx->ring)->ring); + if (!sqe) { + printf("SQE FULL\n"); + return -1; + } + + void *buf = thread_ctx->buffers[buffer_id].data; + +#if USE_REGISTERED_FILES + if (thread_ctx->use_registered_files) { + sqe->flags |= IOSQE_FIXED_FILE; + + if (thread_ctx->use_registered_buffers) { + io_uring_prep_read_fixed(sqe, file_ctx->slot_id, buf, size, offset, + buffer_id); + } else { + io_uring_prep_read(sqe, file_ctx->slot_id, buf, size, offset); + } + + io_uring_sqe_set_data64(sqe, user_data); + return 0; + } +#endif + + // Fallback: use regular file descriptor + if (thread_ctx->use_registered_buffers) { + io_uring_prep_read_fixed(sqe, file_ctx->file_handle, buf, size, offset, + buffer_id); + } else { + io_uring_prep_read(sqe, file_ctx->file_handle, buf, size, offset); + } + + io_uring_sqe_set_data64(sqe, user_data); + return 0; +} + +static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { + int ret; + + if (wait_count > 0) { + ret = io_uring_submit_and_wait(&((IoUring *)thread_ctx->ring)->ring, + wait_count); + } else { + ret = io_uring_submit(&((IoUring *)thread_ctx->ring)->ring); + } + + if (ret < 0) { + fprintf(stderr, "ERROR: Submit error: %s (Code: %d)\n", strerror(-ret), + ret); + return -1; + } + + if (submitted) + *submitted = (uint32_t)ret; + + return 0; +} + +static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { + + struct io_uring_cqe *cqe_ptr = NULL; + + int ret = io_uring_peek_cqe(&((IoUring *)ring)->ring, &cqe_ptr); + + if (ret == -EAGAIN) { + // No CQE available + return 0; + } + + if (ret < 0) { + // Error + fprintf(stderr, "WARNING: io_uring_peek_cqe error - Error: %s (Code: %d)\n", + strerror(-ret), ret); + return -1; + } + + if (!cqe_ptr) { + return 0; + } + + int res = cqe_ptr->res; + + if (res >= 0) { + cqe->ResultCode = 0; + cqe->Information = (uint32_t)res; + } else { + cqe->ResultCode = res; + cqe->Information = 0; + } + + cqe->UserData = (uintptr_t)cqe_ptr->user_data; + + io_uring_cqe_seen(&((IoUring *)ring)->ring, cqe_ptr); + + // Check for error and print warning + if (res < 0) { + // Try to get the file path from the buffer + IoBuffer *buf = (IoBuffer *)cqe->UserData; + const char *file_path = "unknown"; + if (buf && buf->file && buf->file->fe) { + file_path = buf->file->fe->path; + } + + fprintf( + stderr, + "WARNING: I/O completion error for file '%s' - Error: %s (Code: %d)\n", + file_path, strerror(-res), ret); + } + + return 1; +} + +FileHandle ioring_open_file(FileEntry *fe) { + +#if CHECK_FILE_SYSTEM + if (!fs_policy(fe->fs_type)) { + return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); + } +#endif + + FileHandle handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); + if (handle == INVALID_FILE_HANDLE) { + return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); + } + return handle; +} + +#endif + +// OS-agnostic helper macros +#define IORING_SUCCEEDED(result) ((result) >= 0) +#define IORING_FAILED(result) ((result) < 0) + +// ---------------------- FIFO queue operations --------------------------- +typedef struct FileQueue { + FileReadContext files[MAX_ACTIVE_FILES]; + int head; + int tail; + int count; +} FileQueue; + +static FileReadContext *fq_push(FileQueue *fq) { + if (fq->count == MAX_ACTIVE_FILES) + return NULL; + + FileReadContext *file = &fq->files[fq->tail]; +#if USE_REGISTERED_FILES + file->slot_id = fq->tail; +#endif + + fq->tail = (fq->tail + 1) % MAX_ACTIVE_FILES; + fq->count++; + return file; +} + +static FileReadContext *fq_peek_tail(FileQueue *fq) { + if (fq->count == 0) + return NULL; + + int idx = (fq->tail - 1 + MAX_ACTIVE_FILES) % MAX_ACTIVE_FILES; + return &fq->files[idx]; // return the newest file +} + +static FileReadContext *fq_peek_at(FileQueue *fq, int index) { + if (index < 0 || index >= fq->count) + return NULL; + + int idx = (fq->head + index) % MAX_ACTIVE_FILES; + return &fq->files[idx]; +} + +static void fq_trim(FileQueue *fq) { + while (fq->count > 0) { + FileReadContext *f = &fq->files[fq->head]; + + if (!f->completed) + break; + + fq->head = (fq->head + 1) % MAX_ACTIVE_FILES; + fq->count--; + } +} + +// ----------------- Initialize thread context ----------------------- +static ThreadIoContext *ioring_init_thread(void) { + ThreadIoContext *thread_ctx = + (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext)); + if (!thread_ctx) + return NULL; + + // Query I/O Ring capabilities + IoRingCapabilities caps; + ioring_query_capabilities(&caps); + + uint32_t queue_size = caps.MaxSubmissionQueueSize; + if (queue_size > 4096) + queue_size = 4096; // Cap at 4096 for reasonable memory usage + + // Create I/O Ring + if (create_ioring(thread_ctx, queue_size) != 0) { + free(thread_ctx); + thread_ctx = NULL; + return NULL; + } + + // Initialize buffer pool + thread_ctx->fallback_buffer = malloc(READ_BLOCK); + + IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD]; + + u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; + + // Reserve and Commit memory for buffers + void *base_ptr = plat_mem_reserve(buf_pool_size); + if (base_ptr) { + if (!plat_mem_commit(base_ptr, buf_pool_size)) { + plat_mem_release(base_ptr, 0); + close_ioring(thread_ctx); + free(thread_ctx); + thread_ctx = NULL; + return NULL; + } + } else { + + close_ioring(thread_ctx); + free(thread_ctx); + thread_ctx = NULL; + return NULL; + } + + for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) { + thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size); + thread_ctx->buffer_pool[i] = i; + thread_ctx->buffers[i].buffer_id = i; + + buf_info[i] = + MAKE_BUF_INFO(thread_ctx->buffers[i].data, g_ioring_buffer_size); + } + + thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; + + // Register buffers + ioring_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info); + +#if USE_REGISTERED_FILES + ioring_register_files(thread_ctx); +#endif + + thread_ctx->submitting = true; + thread_ctx->num_submissions = 0; + thread_ctx->active_files = 0; + + return thread_ctx; +} + +static void ioring_cleanup_thread(ThreadIoContext *thread_ctx) { + if (!thread_ctx) + return; + + if (thread_ctx->ring) + close_ioring(thread_ctx); + + // Free the buffer pool memory + if (thread_ctx->buffers[0].data) { + u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; + plat_mem_release(thread_ctx->buffers[0].data, buf_pool_size); + } + + free(thread_ctx); + thread_ctx = NULL; +} + +// -------------------------- Buffer get and return ------------------------ +static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { + + if (ctx->free_count == 0) { + return NULL; + } + + int idx = ctx->buffer_pool[--ctx->free_count]; + IoBuffer *buf = &ctx->buffers[idx]; + buf->bytes_read = 0; + buf->result = IO_PENDING; + buf->next = NULL; + + return buf; +} + +static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) { + if (!buf) + return; + + ctx->buffer_pool[ctx->free_count++] = buf->buffer_id; +} + +// -------------------------- Process completions --------------------------- +static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) { + IoRingCQE cqe; + + while (ioring_pop_completion(thread_ctx->ring, &cqe) == 1) { + + IoBuffer *buf = (IoBuffer *)cqe.UserData; + FileReadContext *file = buf->file; + + buf->result = cqe.ResultCode; + buf->bytes_read = cqe.Information; + + file->active_reads--; + file->reads_completed++; + thread_ctx->num_submissions--; + } +} + +// -------------------- File operations ----------------------- +static int init_file(ThreadIoContext *thread_ctx, FileReadContext *file, + FileEntry *fe) { + +#if USE_REGISTERED_FILES + uint32_t saved_slot_id = file->slot_id; +#endif + + memset(file, 0, sizeof(*file)); + + file->fe = fe; + file->file_size = fe->size_bytes; + + file->file_handle = ioring_open_file(fe); + + if (file->file_handle == INVALID_FILE_HANDLE) { + +#if IORING_DEBUG_PRINTS + printf("ERROR: Could not open file '%s'\n", fe->path); +#endif + return 0; + } + +#if USE_REGISTERED_FILES + file->slot_id = saved_slot_id; + + if (thread_ctx->use_registered_files) { + ioring_register_files_update(thread_ctx, file); + } +#endif + + // Determine hash method based on file size + if (file->file_size > g_ioring_buffer_size) { + file->use_incremental_hash = true; + XXH3_128bits_reset(&file->hash_state); + } + return 1; +} + +static void finalize_file(ThreadIoContext *thread_ctx, + WorkerContext *worker_ctx, FileReadContext *file) { + + FileEntry *fe = file->fe; + + os_file_close(file->file_handle); + + char hash[HASH_STRLEN]; + + if (file->bytes_hashed == file->file_size) { + if (file->use_incremental_hash) { + // Large file: digest the accumulated hash state + XXH128_hash_t h = XXH3_128bits_digest(&file->hash_state); + snprintf(hash, HASH_STRLEN, "%016llx%016llx", + (unsigned long long)h.high64, (unsigned long long)h.low64); + } else { + // Small file: hash already computed, stored directly in single_hash + snprintf(hash, HASH_STRLEN, "%016llx%016llx", + (unsigned long long)file->single_hash.high64, + (unsigned long long)file->single_hash.low64); + } + } else { +#if IORING_DEBUG_PRINTS + printf("WARNING: Fallback for path: %s\n", fe->path); +#endif + + atomic_fetch_add(&g_io_ring_fallbacks, 1); + xxh3_hash_file_stream(fe->path, hash, thread_ctx->fallback_buffer); + } + + double size_kib = (double)fe->size_bytes / 1024.0; + char stack_buf[KiB(4)]; + int len; + +#if FILE_TIMES + char created[32], modified[32]; + format_time(fe->created_time, created, sizeof(created)); + format_time(fe->modified_time, modified, sizeof(modified)); +#endif + +#if FILE_TIMES && FILE_OWNER + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", + hash, fe->path, size_kib, created, modified, fe->owner); +#elif FILE_TIMES + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\n", hash, + fe->path, size_kib, created, modified); +#elif FILE_OWNER + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\n", hash, + fe->path, size_kib, fe->owner); +#else + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\n", hash, fe->path, + size_kib); +#endif + + char *dst = arena_push(&worker_ctx->arena, len, false); + memcpy(dst, stack_buf, len); + + atomic_fetch_add(&g_files_hashed, 1); +} + +// -------------------- Hash files ----------------------- +static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq, + WorkerContext *worker_ctx) { + + for (int i = 0; i < fq->count; i++) { + + FileReadContext *file = fq_peek_at(fq, i); + if (!file || file->completed) + continue; + + // ---- HASH READY BUFFERS IN ORDER ---- + while (file->head) { + + IoBuffer *buf = file->head; + + // CQE not received yet + if (buf->result == IO_PENDING) + break; + + // Consume buffer + file->head = buf->next; + + if (IORING_SUCCEEDED(buf->result) && buf->bytes_read > 0) { + + size_t bytes_to_hash = buf->bytes_read; + + if (buf->offset + buf->bytes_read > file->file_size) { + bytes_to_hash = file->file_size - buf->offset; + } + + if (bytes_to_hash > 0) { + if (file->use_incremental_hash) { + XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); + } else { + file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + } + + file->bytes_hashed += bytes_to_hash; + atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + } + + file->reads_hashed++; + + } else if (buf->bytes_read == 0 && IORING_SUCCEEDED(buf->result)) { + file->reads_hashed++; // EOF + } else { + finalize_file(thread_ctx, worker_ctx, file); + file->completed = true; + } + + return_buffer(thread_ctx, buf); + } + + // ---- FINALIZE ---- + if (!file->completed && file->active_reads == 0 && + file->bytes_hashed >= file->file_size) { + + finalize_file(thread_ctx, worker_ctx, file); + file->completed = true; + thread_ctx->active_files--; + } + } + + // Clean up completed files from the head + fq_trim(fq); +} + +// ------------------ Build pending reads ---------------------- +static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, + WorkerContext *worker_ctx) { + + MPMCQueue *file_queue = worker_ctx->file_queue; + + FileReadContext *file = fq_peek_tail(fq); + + for (;;) { + + // BUILD READS FOR CURRENT FILE + if (file) { + while (file->next_read_offset < file->file_size) { + + IoBuffer *buf = get_free_buffer(thread_ctx); + if (!buf) + return; + + size_t remaining = file->file_size - file->next_read_offset; + size_t bytes_to_read; + + if (remaining >= g_ioring_buffer_size) { + bytes_to_read = g_ioring_buffer_size; + } else { + bytes_to_read = ALIGN_UP_POW2(remaining, g_pagesize); + } + + // Initialize buffer + buf->file = file; + buf->offset = file->next_read_offset; + buf->size = bytes_to_read; + + // Chain buffer + if (!file->head) { + file->head = buf; + } else { + file->tail->next = buf; + } + file->tail = buf; + + BUILD_READ_RETURN_VALUE hr = + ioring_build_read(thread_ctx, file, buf->buffer_id, bytes_to_read, + buf->offset, (uintptr_t)buf); + + if (IORING_FAILED(hr)) { + // mark failure and stop this file + return_buffer(thread_ctx, buf); + finalize_file(thread_ctx, worker_ctx, file); + file->completed = true; + break; + } + + file->active_reads++; + file->reads_submitted++; + thread_ctx->num_submissions++; + + file->next_read_offset += bytes_to_read; + } + } + + // ADD NEW FILE + if (!thread_ctx->submitting) + return; + + if (fq->count >= MAX_ACTIVE_FILES) + return; + + FileEntry *fe = mpmc_pop(file_queue); + if (!fe) { + thread_ctx->submitting = false; + return; + } + + FileReadContext *newfile = fq_push(fq); + + if (!init_file(thread_ctx, newfile, fe)) { + finalize_file(thread_ctx, worker_ctx, newfile); + newfile->completed = true; + continue; + } + + file = newfile; + thread_ctx->active_files++; + } +} + +// -------------------------- Hash worker I/O Ring --------------------------- +static THREAD_RETURN hash_worker_ioring(void *arg) { + WorkerContext *worker_ctx = (WorkerContext *)arg; + + // Init IO ring + ThreadIoContext *thread_ctx = ioring_init_thread(); + if (!thread_ctx || !thread_ctx->ring) { + printf("I/O Ring unavailable, using buffered I/O\n"); + return hash_worker(arg); + } + + // Initialize pipeline state + FileQueue fq; + memset(&fq, 0, sizeof(fq)); + + uint32_t submitted = 0; + uint32_t wait_count; + + // Main pipeline loop + for (;;) { + + // Submit new reads + build_pending_reads(thread_ctx, &fq, worker_ctx); + + wait_count = MIN(thread_ctx->num_submissions, NUM_BUFFERS_PER_THREAD - 6); + + submitted = 0; + ioring_submit(thread_ctx, wait_count, 0, &submitted); + + // Process completions + process_completions(thread_ctx, &fq); + +#if IORING_DEBUG_STATS + printf( + "Free buffers: %d, Submissions: %d, Active files: %d, fq count: %d\n", + thread_ctx->free_count, thread_ctx->num_submissions, + thread_ctx->active_files, fq.count); +#endif + + // Hash files + hash_ready_files(thread_ctx, &fq, worker_ctx); + + // Exit condition + if (!thread_ctx->submitting && thread_ctx->active_files == 0 && + thread_ctx->num_submissions == 0) { + break; + } + } + + ioring_cleanup_thread(thread_ctx); + return THREAD_RETURN_VALUE; +} +#endif