diff --git a/.gitignore b/.gitignore index 91ce0c9..670e490 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ temp_code.c /file_hasher /io_uring_test /file_hasher +/io_uring_test diff --git a/README.md b/README.md index 719ef96..1b6353e 100644 --- a/README.md +++ b/README.md @@ -1,26 +1,221 @@ # filehasher -Collects some metadata and hashes files. +## Presentation +Collects some metadata and hashes files. It outputs the path, hash, size, creation and +last modification dates and the author in file_hasher.txt. +Creation and modification dates and author can be disabled in the config file. -## Building: -### Windows: -#### Release: -clang-cl /O3 file_hasher.c xxh_x86dispatch.c +It is a high performance cross platform Windows and Linux compatible program, it uses: + * Multiple threads for scanning and hashing (multi-threading can be disabled in the config file). + * Stores the generated data in thread local configurable arenas that support growing + by committing more memory and chaining blocks. + * Two Multi Producer Multi Consumer queues, one for the scanners and one between the scanners and hashers. + * xxh3_128bits algorithm from xxhash, that supports SIMD instruction sets (SSE2, AVX2, AVX512) + and uses a runtime dispatcher to select the best available instruction set. + * IO Ring for asynchronous I/O in Windows and the equivalent io_uring in Linux. + The implementation is event driven, thread local, uses DMA and direct disk I/O, + bypassing the OS cache completely, registered buffers (and registered files in io_uring), + it supports bashing multiple submissions and can handle multiple files at the same time. + It can be disabled in the config file. + * Fallback to buffered I/O if there is errors in the IO Ring path. -Note: MinGW does not provide IO Ring headers yet, to fix that include ioringapi.c, this will dynamically load all the functions and define all the symbols necessary to replace the official header. -clang -O3 file_hasher.c xxh_x86dispatch.c -o file_hasher -gcc -O3 file_hasher.c xxh_x86dispatch.c -o file_hasher +## Building +### Windows +#### Release -#### Debug: -clang-cl /Zi /Od file_hasher.c xxh_x86dispatch.c -clang -g -O0 file_hasher.c xxh_x86dispatch.c -o file_hasher -gcc -g -O0 file_hasher.c xxh_x86dispatch.c -o file_hasher +**Note**: Make sur to use UCRT64 environment from MSYS2 instead of the standard MinGW environment. +UCRT64 uses the modern Universal C Runtime (ucrtbase.dll), which supports the newest APIs, +the standard MSYS2 uses the legacy msvcrt.dll and does not support IO Ring. +To install: +pacman -S mingw-w64-ucrt-x86_64-gcc +pacman -S mingw-w64-ucrt-x86_64-clang +pacman -Syu +And add to path: +C:\msys64\ucrt64\bin -### Linux: -#### Release: -clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher +clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher +clang-cl /O2 file_hasher.c xxhash.c xxh_x86dispatch.c + +#### Debug +gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher +clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher +clang-cl /Zi /Od file_hasher.c xxhash.c xxh_x86dispatch.c + +### Linux +#### Release gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher -#### Debug: -clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +#### Debug gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher + +## Notes about the IO Ring implementations +### IO Ring + +#### File registration +Registering files is a performance optimization that allows the kernel to allocate an array +of descriptors/handles to pre-validate and maintain long-term references to file handles. +Instead of passing a standard file descriptor/handle with every I/O request, you pass a simple integer +index into a pre-registered table. + +The Linux implementation has io_uring_register_files_scarse() to create an empty array of descriptors +(initialized with -1) without having to create and initialize it in the user space, and we can +use io_uring_register_files_update() to update one or more entries. Windows on the other hand +is limited to BuildIoRingRegisterFileHandles() only, so we need to re register the entire array of handles +each time. This is why there is a provided macro in config.h to disable or enable it. + +##### Why Register Files? (The Benefits) +When you use a standard file descriptor in a high-frequency I/O loop, +the kernel must perform several "hidden" tasks for every single operation: + * Permission Checks: Validating that the process still has the right to read/write + that specific file. + * Reference Counting: Incrementing the file's internal reference count at the start of + the I/O and decrementing it at the end to ensure the file isn't closed while in use. + * Object Lookup: Traversing the internal "file descriptor table" to find the actual + kernel object associated with your integer ID. + +Registering the files performs these checks once at registration time. Subsequent +I/O operations skip these steps, significantly reducing CPU overhead and latency, +especially when handling thousands of small I/O operations per second. + +##### Comparison: Linux vs. Windows Implementation +While both systems share the same core concept, their APIs and management styles differ significantly. +Feature Linux (io_uring) Windows (IoRing) +API Call io_uring_register BuildIoRingRegisterFileHandles +Registration Method Synchronous system call that blocks until the table is set up. Asynchronous request submitted to the ring just like a read/write operation. +Partial Updates Supports IORING_REGISTER_FILES_UPDATE to swap specific indices without a full reset. Does not support partial updates; a new registration call replaces the entire existing table. +Memory Mapping User must manually mmap() the queues into their address space. The kernel handles memory mapping automatically when the ring is created. +Scope of Operations Extremely broad (files, sockets, timers, signals, even other rings). Primarily focused on file storage (read, write, flush). + +#### Completion Wait count +To avoid busy waiting when receiving CQEs, we can use io_uring_submit_and_wait() in Linux by entering a wait count, +the threads sleeps until the count of CQEs are received, in windows the wait_count is present in SubmitIoRing() +but is not implemented yet, so we wait with a completion event for a single completion. Another limitation on the completion +event is that the kernel will waik up the thread only when receiving the first CQE, after that we need to drain the completion +queue completely before sleeping again, or we enter an eternal slumber. And my config, each time the thread wakes up +it receives rarely more than 3 to 5 CQEs and most of the time only one CQE. + +#### Filtering CQEs + +Unlike Linux, The Windows implementation treats buffer and file registration +as an asynchronous operation that we submit to the ring, similar to a read or write. +Those operations produce CQEs (completion queue entries) that we filter here using +cqe.UserData == USERDATA_REGISTER +```c + if (win_cqe.UserData == USERDATA_REGISTER) + continue; +``` + +### io_uring + +#### Creation flags +io_uring provides a lot of configuration flags compared to IO Ring, some +of them are at the creation and others during the operations, here what +we use in this implementation at creation time and is lacking in the +IO Ring implementation. + + * IORING_SETUP_SINGLE_ISSUER: Since we are using a thread local io_uring, we can + set this flag to remove the atomic operations. + * IORING_SETUP_DEFER_TASKRUN: By default, the kernel sends an interrupts when a CQE + is ready, we use this flag to disable this syscall and wait for a specific number of + CQEs to be ready to group them, this reduces the number of syscall. + +#### Memlock limit warning + +```c + "WARNING: Buffer registration failed due to memlock limits (ENOMEM).\n" + "Increase the limit to solve this warning.\n"); +``` + +The Memlock limit in Linux restricts the amount of memory a process can +"lock" into physical RAM using the mlock() family of system calls. This +prevents the operating system from swapping that memory out to disk. +And registering buffers will lock the buffers memory so the hardware +can access it directly without kernel intervention and prevents the kernel from +swapping it to the SSD or HDD. Increase the limit to be able to register the buffers. + +##### Modifying the Limit: +The method for changing the memlock limit depends on whether you are +managing a user session or a system service. +1. For Users and Interactive Sessions +To permanently increase the limit for a specific user or group, modify +the /etc/security/limits.conf file. Add the following lines: + +```conf + # Example for a specific user (replace 'username'), unlimited or a custom value in KB + username soft memlock unlimited + username hard memlock unlimited + + # Example for all users + * soft memlock unlimited + * hard memlock unlimited +``` + +Soft Limit: The value the user starts with; can be raised up to the +hard limit. + +Hard Limit: The absolute maximum; only a privileged user +(root) can increase this. Values: Can be set in Kilobytes (KB) or as +unlimited. + +2. For Systemd Services +Settings in limits.conf do not affect background services managed by +systemd. To increase the limit for a service, edit its service file +(e.g., /etc/systemd/system/myservice.service) and add: +```conf + [Service] + LimitMEMLOCK=infinity +``` + +##### Why Register Buffers? +In a standard "unregistered" I/O operation, the kernel must perform several +expensive steps for every single read or write: + * Virtual-to-Physical Mapping: The kernel has to translate your application's + virtual memory addresses into physical RAM addresses. + * Page Pinning: The kernel must "pin" the memory pages (using get_user_pages) + to prevent them from being swapped to disk or moved while the hardware + (like your SSD) is writing to them. + * TLB Overhead: Constant mapping and unmapping put pressure on the Translation + Lookaside Buffer (TLB), which can slow down the CPU. + +Registering the buffers performs all of this "pinning" and "mapping" once. + +#### Direct I/O: O_DIRECT (Linux) and FILE_FLAG_NO_BUFFERING (Windows) + +Modern operating systems normally use a page cache when reading files. This means file +data is first loaded into kernel memory and then copied to user space. While this improves +performance for many workloads, it introduces extra memory usage and copy overhead. + +Both Linux and Windows provide a way to bypass this cache and perform direct I/O: + +Linux: O_DIRECT +Windows: FILE_FLAG_NO_BUFFERING + +These flags instruct the OS to transfer data directly between disk and user-provided buffers, avoiding the page cache. + +##### Benefits +1. Reduced memory overhead +Avoids polluting the OS page cache +Especially useful for large sequential reads (e.g. hashing, backups) +2. Lower CPU usage +Eliminates extra memory copies between kernel and user space +3. Predictable performance +No interference from cache eviction or readahead heuristics +More consistent throughput for streaming workloads +4. Better scalability +Ideal for high-throughput, multi-threaded I/O pipelines +Prevents cache contention between threads +5. Avoids double caching +Important when the application already manages its own buffering + +##### File system compatibility +Not all file systems are compatible with O_DIRECT, if we try to open files residing in an NTFS partition, +most of the time it will fail, and some times it opens but the CQEs return with an error code bad +descriptor, and it causes some lags. + +To address this issue the program falls back to sequential read when the open fails, and falls back to +buffered sequential hashing if we receive an error in the CQEs. There is also a file system detection +that we can enable in the config file, it will enable the collection of the file system in scan_folder() +and the file will be opened accordingly, but it costs one additional syscall / directory. diff --git a/base.h b/base.h index 3d8e91a..0b8e4b0 100644 --- a/base.h +++ b/base.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #endif diff --git a/binaries/changelog.txt b/binaries/changelog.txt index e619f7e..ea0ee1c 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -14,7 +14,7 @@ v3.2: Making the lock free MPMC queue growable Add padding to avoir false sharing Add sleep() and SwitchToThread() to limit spinning -v3.3: Fix bug slots used before initialization,compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that +v3.3: Fix bug slots used before initialization, compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that Fix bug multiple threads committing at the same time, fixed by using atomic_flag commit_lock and re-checking committed after acquiring the lock Reorder helper functions @@ -55,4 +55,4 @@ Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_ fixing the xxh_x86dispatch warnings Updating the progress printing function Implementing a config file - +Writing the README file diff --git a/compile_commands.json b/compile_commands.json deleted file mode 100644 index 0b29205..0000000 --- a/compile_commands.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - "directory": "D:/Code/c/filehasher", - "command": "clang-cl /O2 file_hasher.c xxh_x86dispatch.c", - "file": "file_hasher.c" - } -] diff --git a/config.h b/config.h index 3aa7208..5cd6f04 100644 --- a/config.h +++ b/config.h @@ -1,27 +1,31 @@ #define FILE_HASHES_TXT "file_hashes.txt" -#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null -#define MAX_PATHLEN 4096 -#define READ_BLOCK (KiB(64)) -#define MULTI_THREADED true +// Metadata selection +#define FILE_TIMES 1 // created and modified time +#define FILE_OWNER 1 + +#define MULTI_THREADING 1 +#define READ_BLOCK KiB(64) // -------------------- IO Ring Configuration ---------------------- #define USE_IORING 1 #if USE_IORING -#define IORING_BUFFER_SIZE (KiB(256)) +#define IORING_BUFFER_SIZE KiB(256) #define NUM_BUFFERS_PER_THREAD 32 -#define MAX_ACTIVE_FILES 32 -#define SUBMIT_TIMEOUT_MS 30000 +#define MAX_ACTIVE_FILES 16 -#define IORING_DEBUG_PRINTS false -#define IORING_DEBUG_STATS false +#define SUBMIT_TIMEOUT_MS 10000 +#define IORING_DEBUG_PRINTS 0 +#define IORING_DEBUG_STATS 0 #if defined(_WIN32) || defined(_WIN64) -#define USE_REGISTERED_FILES false +#define USE_REGISTERED_FILES 1 + #elif defined(__linux__) -#define USE_REGISTERED_FILES true -#endif +#define USE_REGISTERED_FILES 1 +#define CHECK_FILE_SYSTEM 0 #endif +#endif diff --git a/file_hasher.c b/file_hasher.c index 3120723..67bf97d 100644 --- a/file_hasher.c +++ b/file_hasher.c @@ -82,7 +82,7 @@ int main(int argc, char **argv) { // Logical threads = CPU cores * 2 uint32_t cpu_threads = cpu_cores * 2; -#if MULTI_THREADED +#if MULTI_THREADING uint32_t num_scan_threads = cpu_threads; uint32_t num_hash_threads = cpu_threads; diff --git a/io_uring_test b/io_uring_test deleted file mode 100644 index 6315aaa..0000000 Binary files a/io_uring_test and /dev/null differ diff --git a/io_uring_test.c b/io_uring_test.c index 735a44d..82b7d54 100644 --- a/io_uring_test.c +++ b/io_uring_test.c @@ -17,9 +17,9 @@ gcc -o io_uring_test io_uring_test.c -luring #include #include -#define TEST_FILE "test_io_uring.txt" #define BUFFER_SIZE 4096 #define NUM_BUFFERS 4 +#define NUM_REGISTERED_FILES 3 // Test with 3 files // Colors for output #define COLOR_GREEN "\033[0;32m" @@ -50,25 +50,17 @@ static void print_step(const char *step) { printf(COLOR_YELLOW "\n>>> Testing: %s" COLOR_RESET "\n", step); } -// Create a test file with known content -static int create_test_file(void) { - const char *test_content = - "Hello, io_uring! This is a test file for async I/O operations.\n" - "Line 2: Testing reads with registered buffers.\n" - "Line 3: The quick brown fox jumps over the lazy dog.\n" - "Line 4: ABCDEFGHIJKLMNOPQRSTUVWXYZ\n" - "Line 5: 0123456789\n"; - - FILE *f = fopen(TEST_FILE, "w"); +static int create_test_file(const char *filename, const char *content) { + FILE *f = fopen(filename, "w"); if (!f) { perror("Failed to create test file"); return -1; } - fprintf(f, "%s", test_content); + fprintf(f, "%s", content); fclose(f); - print_info("Test file created successfully"); + printf(" Created test file: %s\n", filename); return 0; } @@ -93,16 +85,14 @@ static int test_register_buffers(struct io_uring *ring, void **buffers, struct iovec *iovs, TestResults *results) { print_step("Buffer registration"); - // Allocate and prepare buffers size_t total_size = BUFFER_SIZE * NUM_BUFFERS; - *buffers = aligned_alloc(4096, total_size); // Page-aligned for O_DIRECT + *buffers = aligned_alloc(4096, total_size); if (!*buffers) { print_failure("Buffer allocation", strerror(errno)); results->failed++; return -1; } - // Initialize iovecs for (int i = 0; i < NUM_BUFFERS; i++) { iovs[i].iov_base = (char *)*buffers + (i * BUFFER_SIZE); iovs[i].iov_len = BUFFER_SIZE; @@ -121,334 +111,287 @@ static int test_register_buffers(struct io_uring *ring, void **buffers, return 0; } -// Test 3: Open file -// Modified test_open_file function -static int test_open_file(int *fd, TestResults *results) { - print_step("File opening"); +// Test 3: Register files sparse (empty table) +static int test_register_files_sparse(struct io_uring *ring, unsigned nr_files, + TestResults *results) { + print_step("Sparse file registration (empty table)"); - // Get file size - struct stat st; - if (stat(TEST_FILE, &st) != 0) { - print_failure("stat", strerror(errno)); - results->failed++; - return -1; - } + int ret = io_uring_register_files_sparse(ring, nr_files); + if (ret < 0) { + if (ret == -EINVAL) { + print_info( + "io_uring_register_files_sparse not supported (kernel < 5.19)"); + print_info("Trying regular file registration with invalid fds..."); - // Check if file size is page-aligned - int page_size = plat_get_pagesize(); - size_t file_size = st.st_size; + // Fallback: register with invalid fds + int *invalid_fds = calloc(nr_files, sizeof(int)); + if (!invalid_fds) { + print_failure("Allocating invalid fds array", "Out of memory"); + results->failed++; + return -1; + } - printf(" File size: %zu bytes\n", file_size); - printf(" Page size: %d bytes\n", page_size); + for (int i = 0; i < nr_files; i++) { + invalid_fds[i] = -1; // Mark all as invalid + } - if (file_size % page_size != 0) { - printf(" Extending read size from %zu to %zu bytes\n", file_size, - ALIGN_UP_POW2(file_size, page_size)); - } + ret = io_uring_register_files(ring, invalid_fds, nr_files); + free(invalid_fds); - // Try O_DIRECT first - *fd = open(TEST_FILE, O_RDONLY | O_DIRECT); - if (*fd < 0) { - print_info("O_DIRECT failed, trying without it"); - *fd = open(TEST_FILE, O_RDONLY); - if (*fd < 0) { - print_failure("open", strerror(errno)); + if (ret < 0) { + print_failure("Regular file registration also failed", strerror(-ret)); + results->failed++; + return -1; + } + print_success("File table registered (regular, with invalid fds)"); + } else { + print_failure("io_uring_register_files_sparse", strerror(-ret)); results->failed++; return -1; } - print_info("Using buffered I/O (O_DIRECT not available)"); } else { - print_success("File opened with O_DIRECT"); + printf(" Registered empty file table with %u slots\n", nr_files); + print_success("Sparse file table created"); } results->passed++; return 0; } -// Test 4: Build and submit read operation -static int test_submit_read(struct io_uring *ring, int fd, struct iovec *iovs, - int buffer_id, uint64_t user_data, - TestResults *results) { - print_step("Building and submitting read operation"); +// Test 4: Update file slot and read from it +static int test_file_read_loop(struct io_uring *ring, struct iovec *iovs, + const char **filenames, + const char **expected_contents, int num_files, + TestResults *results) { + print_step("File slot update and read loop"); - // Get file size for proper alignment - struct stat st; - if (fstat(fd, &st) != 0) { - print_failure("fstat", strerror(errno)); + int *fds = calloc(num_files, sizeof(int)); + if (!fds) { + print_failure("Allocating fd array", "Out of memory"); results->failed++; return -1; } - u32 page_size = plat_get_pagesize(); - size_t file_size = st.st_size; - size_t read_size = BUFFER_SIZE; - - // For O_DIRECT, ensure read size is sector-aligned - if (read_size > file_size) { - read_size = ALIGN_UP_POW2(file_size, page_size); - printf(" Adjusted read size to %zu bytes for O_DIRECT alignment\n", - read_size); + // Open all files first + for (int i = 0; i < num_files; i++) { + fds[i] = open(filenames[i], O_RDONLY); + if (fds[i] < 0) { + print_failure("Opening file", filenames[i]); + results->failed++; + // Close already opened files + for (int j = 0; j < i; j++) + close(fds[j]); + free(fds); + return -1; + } + printf(" Opened %s (fd=%d)\n", filenames[i], fds[i]); } - struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) { - print_failure("io_uring_get_sqe", "No available SQE"); - results->failed++; - return -1; - } + // Test loop: update slot, submit read, verify + for (int slot = 0; slot < num_files; slot++) { + printf("\n --- Testing slot %d with file '%s' ---\n", slot, + filenames[slot]); - // Prepare read operation using registered buffer - io_uring_prep_read_fixed(sqe, fd, iovs[buffer_id].iov_base, read_size, 0, - buffer_id); - io_uring_sqe_set_data64(sqe, user_data); + // Update the file registration for this slot + printf(" Updating slot %d with fd %d...\n", slot, fds[slot]); + int ret = io_uring_register_files_update(ring, slot, &fds[slot], 1); - int ret = io_uring_submit(ring); - if (ret < 0) { - print_failure("io_uring_submit", strerror(-ret)); - results->failed++; - return -1; - } + if (ret < 0) { + print_failure("File registration update", strerror(-ret)); + results->failed++; + continue; + } + printf(" Slot update result: %d (expected 1)\n", ret); - print_success("Read operation submitted successfully"); - results->passed++; - return 0; -} + // Get file size for read size calculation + struct stat st; + if (fstat(fds[slot], &st) != 0) { + print_failure("fstat", strerror(errno)); + results->failed++; + continue; + } -// Test 5: Wait for completion -static int test_wait_completion(struct io_uring *ring, - struct io_uring_cqe **cqe, - TestResults *results) { - print_step("Waiting for completion"); + size_t file_size = st.st_size; + size_t read_size = BUFFER_SIZE; - int ret = io_uring_wait_cqe(ring, cqe); - if (ret < 0) { - print_failure("io_uring_wait_cqe", strerror(-ret)); - results->failed++; - return -1; - } + // Adjust read size for O_DIRECT if needed + int page_size = plat_get_pagesize(); + if (read_size > file_size) { + read_size = ALIGN_UP_POW2(file_size, page_size); + } - print_success("Completion received"); - results->passed++; - return 0; -} + printf(" File size: %zu, read size: %zu\n", file_size, read_size); -// Test 6: Process completion -static int test_process_completion(struct io_uring_cqe *cqe, - uint64_t expected_user_data, - TestResults *results) { - print_step("Processing completion"); + // Clear buffer for this test + memset(iovs[0].iov_base, 0, BUFFER_SIZE); - uint64_t user_data = io_uring_cqe_get_data64(cqe); - int res = cqe->res; - - printf(" Completion data:\n"); - printf(" User data: %lu (expected: %lu)\n", user_data, expected_user_data); - printf(" Result: %d bytes read\n", res); - - if (user_data != expected_user_data) { - print_failure("User data mismatch", - "User data doesn't match expected value"); - results->failed++; - return -1; - } - - if (res < 0) { - print_failure("Read operation", strerror(-res)); - results->failed++; - return -1; - } - - print_success("Completion processed successfully"); - results->passed++; - return res; // Return number of bytes read -} - -// Test 7: Verify read data -static int test_verify_data(struct iovec *iovs, int buffer_id, int bytes_read, - TestResults *results) { - print_step("Data verification"); - - char *data = (char *)iovs[buffer_id].iov_base; - - printf(" Read data (first 200 chars):\n"); - printf(" ---\n"); - for (int i = 0; i < bytes_read && i < 200; i++) { - putchar(data[i]); - } - if (bytes_read > 200) - printf("..."); - printf("\n ---\n"); - - // Check if data is not empty - if (bytes_read == 0) { - print_failure("Data verification", "No data read"); - results->failed++; - return -1; - } - - // Check if data contains expected content - if (strstr(data, "io_uring") == NULL) { - print_failure("Data verification", "Expected content not found"); - results->failed++; - return -1; - } - - print_success("Data verified successfully"); - results->passed++; - return 0; -} - -// Test 8: Test multiple concurrent reads -static int test_concurrent_reads(struct io_uring *ring, int fd, - struct iovec *iovs, TestResults *results) { - print_step("Concurrent reads test"); - - int num_reads = 3; - int submitted = 0; - - // Submit multiple reads - for (int i = 0; i < num_reads; i++) { + // Submit read using registered file struct io_uring_sqe *sqe = io_uring_get_sqe(ring); if (!sqe) { - print_failure("Getting SQE for concurrent read", "No available SQE"); + print_failure("Getting SQE", "No available SQE"); results->failed++; - return -1; + continue; } - off_t offset = i * 100; // Read from different offsets - io_uring_prep_read_fixed(sqe, fd, iovs[i].iov_base, BUFFER_SIZE, offset, i); - io_uring_sqe_set_data64(sqe, i); - submitted++; - } + // Use slot index with fixed file flag + io_uring_prep_read_fixed(sqe, slot, iovs[0].iov_base, read_size, 0, 0); + sqe->flags |= IOSQE_FIXED_FILE; + io_uring_sqe_set_data64(sqe, 100 + slot); // Unique user_data per slot - int ret = io_uring_submit(ring); - if (ret != submitted) { - char msg[64]; - snprintf(msg, sizeof(msg), "Expected %d, got %d", submitted, ret); + ret = io_uring_submit(ring); + if (ret < 0) { + print_failure("Submitting read", strerror(-ret)); + results->failed++; + continue; + } + printf(" Submitted read (1 SQE)\n"); - print_failure("Submitting concurrent reads", msg); - results->failed++; - return -1; - } - - print_success("Concurrent reads submitted"); - - // Wait for and process completions - for (int i = 0; i < submitted; i++) { + // Wait for completion struct io_uring_cqe *cqe; ret = io_uring_wait_cqe(ring, &cqe); if (ret < 0) { - print_failure("Waiting for concurrent read completion", strerror(-ret)); + print_failure("Waiting for completion", strerror(-ret)); results->failed++; - return -1; + continue; } + // Process completion uint64_t user_data = io_uring_cqe_get_data64(cqe); - int res = cqe->res; + int bytes_read = cqe->res; + + printf(" Completion: user_data=%lu, result=%d\n", (unsigned long)user_data, + bytes_read); + + if (bytes_read < 0) { + print_failure("Read operation", strerror(-bytes_read)); + results->failed++; + io_uring_cqe_seen(ring, cqe); + continue; + } + + if (user_data != 100 + slot) { + print_failure("User data mismatch", "Wrong user_data value"); + results->failed++; + io_uring_cqe_seen(ring, cqe); + continue; + } + + // Verify the data + char *data = (char *)iovs[0].iov_base; + printf(" Data read (%d bytes): %.*s\n", bytes_read, + bytes_read < 100 ? bytes_read : 100, data); + + if (strstr(data, expected_contents[slot]) == NULL) { + print_failure("Data verification", + "Expected content not found in read data"); + results->failed++; + } else { + print_success("Data verified successfully"); + results->passed++; + } - printf(" Concurrent read %lu completed: %d bytes read\n", user_data, res); io_uring_cqe_seen(ring, cqe); + + // Invalidate the slot after use (mark as -1) + int invalid_fd = -1; + ret = io_uring_register_files_update(ring, slot, &invalid_fd, 1); + if (ret < 0) { + printf(" Warning: Could not invalidate slot %d: %s\n", slot, + strerror(-ret)); + } } - print_success("Concurrent reads completed successfully"); - results->passed++; + // Close all files + for (int i = 0; i < num_files; i++) { + if (fds[i] >= 0) + close(fds[i]); + } + free(fds); + return 0; } -// Cleanup function -static void cleanup(struct io_uring *ring, int fd, void *buffers) { - if (fd >= 0) - close(fd); - if (buffers) { - io_uring_unregister_buffers(ring); - free(buffers); - } - io_uring_queue_exit(ring); - remove(TEST_FILE); -} - int main() { TestResults results = {0, 0}; struct io_uring ring; - int fd = -1; void *buffers = NULL; struct iovec iovs[NUM_BUFFERS]; printf(COLOR_BLUE "\n========================================\n"); - printf(" io_uring Test Suite\n"); + printf(" io_uring Sparse File Registration Test\n"); printf("========================================\n" COLOR_RESET); - // Create test file - if (create_test_file() != 0) { - return 1; + // Define test files and their content + const char *filenames[] = {"test_file_0.txt", "test_file_1.txt", + "test_file_2.txt"}; + + const char *contents[] = { + "This is file 0: Hello World! The quick brown fox jumps over the lazy " + "dog.", + "This is file 1: io_uring is awesome for async I/O operations!", + "This is file 2: Testing sparse file registration with multiple files."}; + + const char *expected_substrings[] = {"Hello World", "io_uring is awesome", + "sparse file registration"}; + + int num_files = 3; + + // Create all test files + print_info("Creating test files..."); + for (int i = 0; i < num_files; i++) { + if (create_test_file(filenames[i], contents[i]) != 0) { + return 1; + } } // Test 1: Create io_uring if (test_io_uring_create(&ring, &results) != 0) { - cleanup(&ring, fd, buffers); - return 1; + goto cleanup_files; } // Test 2: Register buffers if (test_register_buffers(&ring, &buffers, iovs, &results) != 0) { - cleanup(&ring, fd, buffers); - return 1; + io_uring_queue_exit(&ring); + goto cleanup_files; } - // Test 3: Open file - if (test_open_file(&fd, &results) != 0) { - cleanup(&ring, fd, buffers); - return 1; + // Test 3: Register empty file table (sparse) + if (test_register_files_sparse(&ring, num_files, &results) != 0) { + io_uring_unregister_buffers(&ring); + free(buffers); + io_uring_queue_exit(&ring); + goto cleanup_files; } - // Test 4: Submit read - uint64_t test_user_data = 12345; - if (test_submit_read(&ring, fd, iovs, 0, test_user_data, &results) != 0) { - cleanup(&ring, fd, buffers); - return 1; - } + // Test 4: Loop through files, update slots, read and verify + test_file_read_loop(&ring, iovs, filenames, expected_substrings, num_files, + &results); - // Test 5: Wait for completion - struct io_uring_cqe *cqe; - if (test_wait_completion(&ring, &cqe, &results) != 0) { - cleanup(&ring, fd, buffers); - return 1; - } + // Cleanup + io_uring_unregister_files(&ring); + io_uring_unregister_buffers(&ring); + free(buffers); + io_uring_queue_exit(&ring); - // Test 6: Process completion - int bytes_read = test_process_completion(cqe, test_user_data, &results); - if (bytes_read < 0) { - cleanup(&ring, fd, buffers); - return 1; - } - io_uring_cqe_seen(&ring, cqe); - - // Test 7: Verify data - if (test_verify_data(iovs, 0, bytes_read, &results) != 0) { - cleanup(&ring, fd, buffers); - return 1; - } - - // Test 8: Concurrent reads - if (test_concurrent_reads(&ring, fd, iovs, &results) != 0) { - cleanup(&ring, fd, buffers); - return 1; +cleanup_files: + // Remove test files + for (int i = 0; i < num_files; i++) { + remove(filenames[i]); } // Print summary + int total = results.passed + results.failed; printf(COLOR_BLUE "\n========================================\n"); printf(" TEST SUMMARY\n"); printf("========================================\n" COLOR_RESET); - printf(" Total tests: %d\n", results.passed + results.failed); + printf(" Total tests: %d\n", total); printf(COLOR_GREEN " Passed: %d\n" COLOR_RESET, results.passed); if (results.failed > 0) { printf(COLOR_RED " Failed: %d\n" COLOR_RESET, results.failed); + printf(COLOR_RED "\n ✗ SOME TESTS FAILED!\n" COLOR_RESET); } else { - printf(COLOR_GREEN " ✓ ALL TESTS PASSED!\n" COLOR_RESET); + printf(COLOR_GREEN "\n ✓ ALL TESTS PASSED!\n" COLOR_RESET); } - // Cleanup - cleanup(&ring, fd, buffers); - return results.failed > 0 ? 1 : 0; } diff --git a/platform.c b/platform.c index 809cdee..7c5b802 100644 --- a/platform.c +++ b/platform.c @@ -18,6 +18,8 @@ static atomic_uint_fast64_t g_files_hashed = 0; static atomic_uint_fast64_t g_bytes_processed = 0; static atomic_int g_scan_done = 0; +#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null terminator +#define MAX_PATHLEN KiB(4) // ================== OS-agnostic functions abstraction ===================== // --------------------- Timer functions --------------------- typedef struct { @@ -176,13 +178,13 @@ static void os_file_close(FileHandle handle) { close(handle); } // -------------------- Thread abstraction ------------------- // Threads context typedef struct { - u8 num_threads; - mem_arena *path_arena; mem_arena *meta_arena; MPMCQueue *dir_queue; MPMCQueue *file_queue; + + u8 num_threads; } ScannerContext; typedef struct { @@ -363,6 +365,7 @@ static int parse_paths(char *line, char folders[][MAX_PATHLEN], } // ------------------------- File time ------------------------- +#if FILE_TIMES #if defined(_WIN32) || defined(_WIN64) static void format_time(uint64_t t, char *out, size_t out_sz) { if (t == 0) { @@ -426,11 +429,24 @@ void platform_get_file_times(const char *path, uint64_t *out_created, } } +static int platform_get_file_times_fd(int dir_fd, const char *name, + uint64_t *created, uint64_t *modified) { + struct stat st; + if (fstatat(dir_fd, name, &st, 0) == 0) { + *created = st.st_ctime; // or st.st_birthtime on systems that support it + *modified = st.st_mtime; + return 0; + } + return -1; +} +#endif #endif // -------------------- File owner --------------------- #if defined(_WIN32) || defined(_WIN64) -static void get_file_owner(const char *path, char *out, size_t out_sz) { +#if FILE_OWNER +void platform_get_file_owner(const char *path, char *out_owner, + size_t out_owner_size) { PSID sid = NULL; PSECURITY_DESCRIPTOR sd = NULL; @@ -444,43 +460,159 @@ static void get_file_owner(const char *path, char *out, size_t out_sz) { if (LookupAccountSidA(NULL, sid, name, &name_len, domain, &domain_len, &use)) { - snprintf(out, out_sz, "%s\\%s", domain, name); + snprintf(out_owner, out_owner_size, "%s\\%s", domain, name); } else { - snprintf(out, out_sz, "UNKNOWN"); + snprintf(out_owner, out_owner_size, "UNKNOWN"); } } else { - snprintf(out, out_sz, "UNKNOWN"); + snprintf(out_owner, out_owner_size, "UNKNOWN"); } if (sd) LocalFree(sd); } - -void platform_get_file_owner(const char *path, char *out_owner, - size_t out_owner_size) { - get_file_owner(path, out_owner, out_owner_size); -} +#endif #elif defined(__linux__) -static void get_file_owner(uid_t uid, char *out, size_t out_sz) { - struct passwd *pw = getpwuid(uid); - if (pw) { - snprintf(out, out_sz, "%s", pw->pw_name); - } else { - snprintf(out, out_sz, "UNKNOWN"); - } -} - +#if FILE_OWNER void platform_get_file_owner(const char *path, char *out_owner, size_t out_owner_size) { struct stat st; + const char *owner = "UNKNOWN"; + if (stat(path, &st) == 0) { - get_file_owner(st.st_uid, out_owner, out_owner_size); - } else { - snprintf(out_owner, out_owner_size, "UNKNOWN"); + struct passwd *pw = getpwuid(st.st_uid); + if (pw) { + owner = pw->pw_name; + } + } + + snprintf(out_owner, out_owner_size, "%s", owner); +} + +static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, + size_t owner_size) { + struct stat st; + if (fstatat(dir_fd, name, &st, 0) == 0) { + struct passwd pw; + struct passwd *result; + char buffer[4096]; // Sufficiently large buffer for passwd data + + // Reentrant version (thread-safe) + if (getpwuid_r(st.st_uid, &pw, buffer, sizeof(buffer), &result) == 0 && + result != NULL && result->pw_name != NULL) { + strncpy(owner, result->pw_name, owner_size - 1); + owner[owner_size - 1] = '\0'; + } else { + // Fallback to uid + snprintf(owner, owner_size, "uid:%d", st.st_uid); + } + return 0; + } + return -1; +} +#endif + +// ----------------------------- File system ----------------------------- + +#if CHECK_FILE_SYSTEM +typedef enum FileSystemType { + FS_UNKNOWN = 0, + FS_EXT4, + FS_XFS, + FS_BTRFS, + FS_TMPFS, + FS_NFS, + FS_CIFS, + FS_FAT, + FS_EXFAT, + FS_NTFS, + FS_ZFS, + FS_F2FS, + FS_EROFS, + FS_VIRTIOFS, + FS_OVERLAY, + FS_HUGETLBFS, + FS_SQUASHFS, + FS_PROC, + FS_SYSFS, +} FileSystemType; + +static inline FileSystemType fs_from_magic(long type) { + switch (type) { + case 0xEF53: + return FS_EXT4; + case 0x58465342: + return FS_XFS; + case 0x9123683E: + return FS_BTRFS; + case 0x01021994: + return FS_TMPFS; + case 0x6969: + return FS_NFS; + case 0xFF534D42: + return FS_CIFS; + case 0x4d44: + return FS_FAT; + case 0x2011BAB0: + return FS_EXFAT; + case 0x5346544E: + return FS_NTFS; + case 0x2FC1211: + return FS_ZFS; + case 0xF2F52010: + return FS_F2FS; + case 0xE0F5E1E2: + return FS_EROFS; + case 0x56495254: + return FS_VIRTIOFS; + case 0x794C764F: + return FS_OVERLAY; + case 0x958458f6: + return FS_HUGETLBFS; + case 0x73717368: + return FS_SQUASHFS; + case 0x9fa0: + return FS_PROC; + case 0x62656572: + return FS_SYSFS; + + default: + return FS_UNKNOWN; } } +// Yes it is officially called "magic number" or "signature" in the +// documentation + +typedef enum { + FS_POLICY_BUFFERED, + FS_POLICY_DIRECT_OK, +} FsPolicy; + +static inline FsPolicy fs_policy(FileSystemType fs) { + switch (fs) { + case FS_EXT4: + case FS_XFS: + case FS_BTRFS: + case FS_ZFS: + case FS_F2FS: + case FS_NFS: + case FS_CIFS: + case FS_VIRTIOFS: + return FS_POLICY_DIRECT_OK; + + case FS_TMPFS: // Resides in Page Cache; O_DIRECT generally returns EINVAL + case FS_EROFS: // Read-only filesystem; O_DIRECT support is + // uncommon/restricted + case FS_FAT: // Generally does not implement direct_IO address space ops + case FS_EXFAT: // Limited support depending on driver implementation + case FS_NTFS: // Depends on driver (ntfs3 supports it, older ntfs-3g does not) + default: + return FS_POLICY_BUFFERED; + } +} +#endif #endif // ----------------------------- Scan helpers ----------------------------- @@ -488,9 +620,17 @@ typedef struct FileEntry { char *path; uint64_t size_bytes; +#if FILE_TIMES uint64_t created_time; // epoch uint64_t modified_time; // epoch seconds - char owner[128]; // resolved owner name +#endif +#if FILE_OWNER + char owner[128]; // resolved owner name +#endif + +#if CHECK_FILE_SYSTEM // Linux only + FileSystemType fs_type; +#endif } FileEntry; typedef struct { @@ -560,10 +700,12 @@ void scan_folder(const char *base, ScannerContext *ctx) { size_t name_len = strlen(fd.cFileName); path_builder_set_filename(&pb, fd.cFileName, name_len); + // If it's a directory: if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { char *dir = path_builder_dup_arena(&pb, ctx->path_arena, false); mpmc_push_work(ctx->dir_queue, dir); } else { + // else a file: atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); @@ -577,8 +719,14 @@ void scan_folder(const char *base, ScannerContext *ctx) { fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); +#if FILE_TIMES platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time); +#endif + +#if FILE_OWNER platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); +#endif + fe->size_bytes = ((uint64_t)fd.nFileSizeHigh << 32) | fd.nFileSizeLow; mpmc_push(ctx->file_queue, fe); @@ -590,39 +738,6 @@ void scan_folder(const char *base, ScannerContext *ctx) { } #elif defined(__linux__) -static int platform_get_file_times_fd(int dir_fd, const char *name, - uint64_t *created, uint64_t *modified) { - struct stat st; - if (fstatat(dir_fd, name, &st, 0) == 0) { - *created = st.st_ctime; // or st.st_birthtime on systems that support it - *modified = st.st_mtime; - return 0; - } - return -1; -} - -static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, - size_t owner_size) { - struct stat st; - if (fstatat(dir_fd, name, &st, 0) == 0) { - struct passwd pw; - struct passwd *result; - char buffer[4096]; // Sufficiently large buffer for passwd data - - // Reentrant version (thread-safe) - if (getpwuid_r(st.st_uid, &pw, buffer, sizeof(buffer), &result) == 0 && - result != NULL && result->pw_name != NULL) { - strncpy(owner, result->pw_name, owner_size - 1); - owner[owner_size - 1] = '\0'; - } else { - // Fallback to uid - snprintf(owner, owner_size, "uid:%d", st.st_uid); - } - return 0; - } - return -1; -} - void scan_folder(const char *base, ScannerContext *ctx) { PathBuilder pb; path_builder_init(&pb, base); @@ -631,6 +746,14 @@ void scan_folder(const char *base, ScannerContext *ctx) { if (dir_fd == -1) return; +#if CHECK_FILE_SYSTEM + struct statfs fs; + FileSystemType fs_type = FS_UNKNOWN; + if (fstatfs(dir_fd, &fs) == 0) { + fs_type = fs_from_magic(fs.f_type); + } +#endif + DIR *dir = fdopendir(dir_fd); if (!dir) { close(dir_fd); @@ -649,6 +772,7 @@ void scan_folder(const char *base, ScannerContext *ctx) { path_builder_set_filename(&pb, entry->d_name, name_len); int file_type = DT_UNKNOWN; + #ifdef _DIRENT_HAVE_D_TYPE file_type = entry->d_type; #endif @@ -671,11 +795,16 @@ void scan_folder(const char *base, ScannerContext *ctx) { // Use fstatat for file info struct stat st; if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) { - // Convert times using fd variant +#if FILE_TIMES platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time, &fe->modified_time); +#endif + +#if FILE_OWNER platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner, sizeof(fe->owner)); +#endif + fe->size_bytes = (uint64_t)st.st_size; // Normalize path @@ -687,6 +816,10 @@ void scan_folder(const char *base, ScannerContext *ctx) { fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); +#if CHECK_FILE_SYSTEM + fe->fs_type = fs_type; +#endif + mpmc_push(ctx->file_queue, fe); } continue; @@ -706,9 +839,15 @@ void scan_folder(const char *base, ScannerContext *ctx) { atomic_fetch_add(&g_files_found, 1); FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); +#if FILE_TIMES platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time); +#endif + +#if FILE_OWNER platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); +#endif + fe->size_bytes = (uint64_t)st.st_size; char temp_path[MAX_PATHLEN]; @@ -719,12 +858,16 @@ void scan_folder(const char *base, ScannerContext *ctx) { fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); strcpy(fe->path, temp_path); +#if CHECK_FILE_SYSTEM + fe->fs_type = fs_type; +#endif + mpmc_push(ctx->file_queue, fe); } } } - closedir(dir); // Closes dir_fd automatically + closedir(dir); } #endif @@ -786,17 +929,29 @@ static THREAD_RETURN hash_worker(void *arg) { char hash[HASH_STRLEN]; xxh3_hash_file_stream(fe->path, hash, buf); + double size_kib = (double)fe->size_bytes / 1024.0; + char stack_buf[KiB(4)]; + int len; + +#if FILE_TIMES char created[32], modified[32]; format_time(fe->created_time, created, sizeof(created)); format_time(fe->modified_time, modified, sizeof(modified)); +#endif - double size_kib = (double)fe->size_bytes / 1024.0; - - char stack_buf[1024]; - - int len = - snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", - hash, fe->path, size_kib, created, modified, fe->owner); +#if FILE_TIMES && FILE_OWNER + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", + hash, fe->path, size_kib, created, modified, fe->owner); +#elif FILE_TIMES + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\n", hash, + fe->path, size_kib, created, modified); +#elif FILE_OWNER + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\n", hash, + fe->path, size_kib, fe->owner); +#else + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\n", hash, + fe->path, size_kib); +#endif char *dst = arena_push(&ctx->arena, len, false); memcpy(dst, stack_buf, len); @@ -880,7 +1035,7 @@ static THREAD_RETURN progress_thread(void *arg) { return THREAD_RETURN_VALUE; } -// ======================== IO Ring implemented ======================== +// ======================== IO Ring implementation ======================== #if USE_IORING // -------------------------- Data structures --------------------------- @@ -914,16 +1069,16 @@ typedef struct iovec IORING_BUFFER_INFO; typedef struct FileReadContext { FileEntry *fe; - uint64_t file_size; + size_t file_size; // For in-order hashing - uint64_t next_read_offset; + size_t next_read_offset; IoBuffer *head; IoBuffer *tail; // Completion tracking - uint64_t bytes_hashed; + size_t bytes_hashed; uint32_t reads_hashed; uint32_t reads_submitted; @@ -953,22 +1108,21 @@ typedef struct IoBuffer { FileReadContext *file; void *data; size_t size; - uint64_t offset; + size_t offset; size_t bytes_read; BUILD_READ_RETURN_VALUE result; - int completed; - struct IoBuffer *next; int buffer_id; + struct IoBuffer *next; } IoBuffer; // Thread-local I/O Ring context +#if defined(_WIN32) || defined(_WIN64) + typedef struct ThreadIoContext { IoRingHandle ring; -#if defined(_WIN32) || defined(_WIN64) void *completion_event; -#endif void *fallback_buffer; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; @@ -978,10 +1132,6 @@ typedef struct ThreadIoContext { int active_files; bool submitting; -#if defined(__linux__) - bool use_registered_buffers; -#endif - #if USE_REGISTERED_FILES bool use_registered_files; FileHandle registered_handles[MAX_ACTIVE_FILES]; @@ -989,6 +1139,24 @@ typedef struct ThreadIoContext { } ThreadIoContext; +#elif defined(__linux__) +typedef struct ThreadIoContext { + IoRingHandle ring; + + void *fallback_buffer; + IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; + int buffer_pool[NUM_BUFFERS_PER_THREAD]; + int free_count; + int num_submissions; + int active_files; + bool submitting; + + bool use_registered_buffers; + bool use_registered_files; + +} ThreadIoContext; + +#endif typedef struct { uint32_t MaxSubmissionQueueSize; uint32_t MaxCompletionQueueSize; @@ -1013,34 +1181,26 @@ static void ioring_query_capabilities(IoRingCapabilities *caps) { caps->MaxVersion = win_caps.MaxVersion; } -static void *ioring_create_completion_event(void) { - return CreateEvent(NULL, FALSE, FALSE, NULL); -} - -static void ioring_set_completion_event(IoRingHandle ring, void *event) { - SetIoRingCompletionEvent(ring, event); -} - -static void ioring_wait_for_completion(ThreadIoContext *ctx) { - if (ctx->num_submissions > 0) { - WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); - return; - } -} - static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { IORING_CREATE_FLAGS flags = {0}; HRESULT hr = CreateIoRing(IORING_VERSION_3, flags, queue_size, queue_size * 2, &thread_ctx->ring); // Create completion event - thread_ctx->completion_event = ioring_create_completion_event(); - if (thread_ctx->completion_event) { - ioring_set_completion_event(thread_ctx->ring, thread_ctx->completion_event); - } + thread_ctx->completion_event = CreateEvent(NULL, FALSE, FALSE, NULL); + if (thread_ctx->completion_event) + SetIoRingCompletionEvent(thread_ctx->ring, thread_ctx->completion_event); return SUCCEEDED(hr) ? 0 : -1; } +static int close_ioring(ThreadIoContext *thread_ctx) { + + if (thread_ctx->completion_event) + CloseHandle(thread_ctx->completion_event); + CloseIoRing(thread_ctx->ring); + return 0; +} + #define USERDATA_REGISTER 1 #define MAKE_BUF_INFO(a, l) \ @@ -1053,7 +1213,9 @@ static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, // The wait_count in windows is not implemented yet, so we wait with a // completion event for a single completion - ioring_wait_for_completion(thread_ctx); + if (thread_ctx->num_submissions > 0) { + WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS); + } return SUCCEEDED(hr) ? 0 : -1; } @@ -1071,8 +1233,8 @@ static void ioring_register_buffers(ThreadIoContext *thread_ctx, NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), error_msg, sizeof(error_msg), NULL); - fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", error_msg, - (unsigned int)hr); + fprintf(stderr, "WARNING: Error registering buffers: %s (0x%08X)\n", + error_msg, (unsigned int)hr); } // Submit registration ioring_submit(thread_ctx, 0, 0, NULL); @@ -1098,17 +1260,13 @@ static void ioring_register_files(ThreadIoContext *thread_ctx) { thread_ctx->use_registered_files = (hr == 0); } -#endif -static void ioring_close_event(void *event) { CloseHandle(event); } - -static int close_ioring(ThreadIoContext *thread_ctx) { - - if (thread_ctx->completion_event) - ioring_close_event(thread_ctx->completion_event); - CloseIoRing(thread_ctx->ring); - return 0; +static void ioring_register_files_update(ThreadIoContext *thread_ctx, + FileReadContext *file) { + thread_ctx->registered_handles[file->slot_id] = file->file_handle; + ioring_register_files(thread_ctx); } +#endif static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, @@ -1143,7 +1301,8 @@ static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx, error_msg, sizeof(error_msg), NULL); fprintf(stderr, - "ERROR: Building read error for file: %s - Code: %s (0x%08X)\n", + "ERROR: Building read error for file '%s' - Error: %s (Code: " + "0x%08X)\n", file_ctx->fe->path, error_msg, (unsigned int)hr); } return hr; @@ -1156,15 +1315,17 @@ static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { HRESULT hr = PopIoRingCompletion(ring, &win_cqe); if (hr == S_FALSE) + // No CQE available return 0; if (FAILED(hr)) return -1; - // Unlike linux, in addition of IO operations, Windows IO Ring produces - // CQEs (completion queue entries) when doing operations like register - // buffer or submiting, we filter them here cqe.UserData == - // USERDATA_REGISTER cqe.ResultCode == S_OK (or error) + // Unlike linux, The Windows implementation treats buffer and file + // registration as an asynchronous operation that we submit to the ring, + // similar to a read or write. Those operations produce CQEs (completion + // queue entries) that we filter here using + // cqe.UserData == USERDATA_REGISTER if (win_cqe.UserData == USERDATA_REGISTER) continue; @@ -1187,15 +1348,24 @@ static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { } fprintf(stderr, - "WARNING: I/O completion error for file '%s' - Code: 0x%lx, " - "Error: %s\n", - file_path, win_cqe.ResultCode, error_msg); + "WARNING: I/O completion error for file '%s' - Error: %s (Code: " + "0x%lx)\n", + file_path, error_msg, win_cqe.ResultCode); } return 1; } } +FileHandle ioring_open_file(FileEntry *fe) { + + FileHandle handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); + if (handle == INVALID_FILE_HANDLE) { + return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); + } + return handle; +} + #elif defined(__linux__) // Linux io_uring functions implementation static void ioring_query_capabilities(IoRingCapabilities *caps) { @@ -1212,8 +1382,8 @@ static void ioring_query_capabilities(IoRingCapabilities *caps) { // static int async_io_create_ring(uint32_t queue_size, AsyncIoRing *ring) { static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { - IoUring *impl = (IoUring *)calloc(1, sizeof(IoUring)); - if (!impl) + IoUring *ring_impl = (IoUring *)calloc(1, sizeof(IoUring)); + if (!ring_impl) return -1; // Initialize io_uring @@ -1224,27 +1394,41 @@ static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { IORING_SETUP_SINGLE_ISSUER | // Thread local io_uring IORING_SETUP_DEFER_TASKRUN; // Do not send interupts when a CQE is ready, // send them when a wait function is called, - // and groupe them acording to the nbre or + // and groupe them according to the nbre or // entries to wait (reduces syscall overhead) params.cq_entries = queue_size * 2; - int ret = io_uring_queue_init_params(queue_size, &impl->ring, ¶ms); + int ret = io_uring_queue_init_params(queue_size, &ring_impl->ring, ¶ms); if (ret < 0) { // Fallback to without params printf("WARNING: Creating io_uring with default params\n"); - ret = io_uring_queue_init(queue_size, &impl->ring, 0); + ret = io_uring_queue_init(queue_size, &ring_impl->ring, 0); if (ret < 0) { - free(impl); + free(ring_impl); return -1; } } - impl->cqe_cache = NULL; - impl->cqe_cache_index = 0; - impl->cqe_cache_count = 0; + ring_impl->cqe_cache = NULL; + ring_impl->cqe_cache_index = 0; + ring_impl->cqe_cache_count = 0; + + thread_ctx->ring = ring_impl; + return 0; +} + +static int close_ioring(ThreadIoContext *thread_ctx) { + IoUring *rimg_impl = (IoUring *)thread_ctx->ring; + if (!rimg_impl) + return -1; + + if (thread_ctx->use_registered_buffers) { + io_uring_unregister_buffers(&rimg_impl->ring); + } + io_uring_queue_exit(&rimg_impl->ring); + free(rimg_impl); - thread_ctx->ring = impl; return 0; } @@ -1254,132 +1438,79 @@ static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { static void ioring_register_buffers(ThreadIoContext *thread_ctx, uint32_t num_buffers, IORING_BUFFER_INFO *buf_info) { - IoUring *impl = (IoUring *)thread_ctx->ring; - int hr = io_uring_register_buffers(&impl->ring, buf_info, num_buffers); + int ret = io_uring_register_buffers(&((IoUring *)thread_ctx->ring)->ring, + buf_info, num_buffers); - if (hr < 0) { - if (hr == -ENOMEM) { + if (ret < 0) { + if (ret == -ENOMEM) { struct rlimit limit; getrlimit(RLIMIT_MEMLOCK, &limit); fprintf(stderr, "WARNING: Buffer registration failed due to memlock limits " "(ENOMEM).\n" - "Increase the limit to solve this warning.\n"); - - // TODO: document this in read me - // The memlock limit in Linux restricts the amount of memory a process can - // "lock" into physical RAM using the mlock() family of system calls. This - // prevents the operating system from swapping that memory out to disk. - // And registering buffers will lock the buffers memory so the hardware - // can access it directly without kernel intervention. Increase the limit - // to be able to register the buffers. - // - // **Modifying the Limit: - // The method for changing the memlock limit depends on whether you are - // managing a user session or a system service. - // 1. For Users and Interactive Sessions - // To permanently increase the limit for a specific user or group, modify - // the /etc/security/limits.conf file. Add the following lines: - // # Example for a specific user (replace 'username') - // username soft memlock unlimited - // username hard memlock unlimited - // - // # Example for all users - // * soft memlock unlimited - // * hard memlock unlimited - // - // Soft Limit: The value the user starts with; can be raised up to the - // hard limit. - // - // Hard Limit: The absolute maximum; only a privileged user - // (root) can increase this. Values: Can be set in Kilobytes (KB) or as - // unlimited. - // - // 2. For Systemd Services - // Settings in limits.conf do not affect background services managed by - // systemd. To increase the limit for a service, edit its service file - // (e.g., /etc/systemd/system/myservice.service) and add: - // - // [Service] - // LimitMEMLOCK=infinity + "See README for more informations.\n"); } else { // For any other error (e.g., EFAULT, EBUSY, EINVAL) - fprintf(stderr, "Error registering buffers: %s (code: %d)\n", - strerror(-hr), hr); + fprintf(stderr, "WARNING: Error registering buffers: %s (code: %d)\n", + strerror(-ret), ret); } fprintf(stderr, "Falling back to unregistered buffers (performance may " "be reduced).\n"); } - thread_ctx->use_registered_buffers = (hr == 0); + thread_ctx->use_registered_buffers = (ret == 0); } +#if USE_REGISTERED_FILES static void ioring_register_files(ThreadIoContext *thread_ctx) { - IoUring *impl = (IoUring *)thread_ctx->ring; - int hr = io_uring_register_files(&impl->ring, thread_ctx->registered_handles, - MAX_ACTIVE_FILES); - if (hr < 0) { - fprintf(stderr, "file registeration failed: %s (code: %d)\n", strerror(-hr), - hr); + int ret = io_uring_register_files_sparse(&((IoUring *)thread_ctx->ring)->ring, + MAX_ACTIVE_FILES); + if (ret < 0) { + fprintf(stderr, + "WARNING: File registeration failed, fallback to unregistered " + "files - Error: %s (code: %d)\n", + strerror(-ret), ret); } - thread_ctx->use_registered_files = (hr == 0); + thread_ctx->use_registered_files = (ret == 0); } -static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, - uint32_t timeout_ms, uint32_t *submitted) { - IoUring *impl = (IoUring *)thread_ctx->ring; - if (!impl) - return -1; +static void ioring_register_files_update(ThreadIoContext *thread_ctx, + FileReadContext *file) { - int ret; - - if (wait_count > 0) { - ret = io_uring_submit_and_wait(&impl->ring, wait_count); - } else { - ret = io_uring_submit(&impl->ring); - } + // Update the kernel's file table at the specific slot + int ret = io_uring_register_files_update( + &((IoUring *)thread_ctx->ring)->ring, + file->slot_id, // offset - which slot to update + &file->file_handle, // pointer to the fd + 1 // number of files to update + ); if (ret < 0) { - fprintf(stderr, "submit error: %s\n", strerror(-ret)); - return -1; + fprintf(stderr, + "WARNING: File registration update failed for slot %u updating " + "file '%s' - Error: %s " + "(code: %d)\n" + "Fallback to unregistered files\n", + file->slot_id, file->fe->path, strerror(-ret), ret); + + thread_ctx->use_registered_files = false; } - - if (submitted) - *submitted = (uint32_t)ret; - - return 0; -} - -static int close_ioring(ThreadIoContext *thread_ctx) { - IoUring *impl = (IoUring *)thread_ctx->ring; - if (!impl) - return -1; - - if (thread_ctx->use_registered_buffers) { - io_uring_unregister_buffers(&impl->ring); - } - io_uring_queue_exit(&impl->ring); - free(impl); - - return 0; } +#endif static int ioring_build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, uint32_t buffer_id, size_t size, uint64_t offset, uintptr_t user_data) { - IoRingHandle ring = thread_ctx->ring; - IoUring *impl = (IoUring *)ring; - if (!impl) - return -1; - struct io_uring_sqe *sqe = io_uring_get_sqe(&impl->ring); + struct io_uring_sqe *sqe = + io_uring_get_sqe(&((IoUring *)thread_ctx->ring)->ring); if (!sqe) { printf("SQE FULL\n"); return -1; @@ -1387,6 +1518,23 @@ static int ioring_build_read(ThreadIoContext *thread_ctx, void *buf = thread_ctx->buffers[buffer_id].data; +#if USE_REGISTERED_FILES + if (thread_ctx->use_registered_files) { + sqe->flags |= IOSQE_FIXED_FILE; + + if (thread_ctx->use_registered_buffers) { + io_uring_prep_read_fixed(sqe, file_ctx->slot_id, buf, size, offset, + buffer_id); + } else { + io_uring_prep_read(sqe, file_ctx->slot_id, buf, size, offset); + } + + io_uring_sqe_set_data64(sqe, user_data); + return 0; + } +#endif + + // Fallback: use regular file descriptor if (thread_ctx->use_registered_buffers) { io_uring_prep_read_fixed(sqe, file_ctx->file_handle, buf, size, offset, buffer_id); @@ -1398,12 +1546,34 @@ static int ioring_build_read(ThreadIoContext *thread_ctx, return 0; } +static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { + int ret; + + if (wait_count > 0) { + ret = io_uring_submit_and_wait(&((IoUring *)thread_ctx->ring)->ring, + wait_count); + } else { + ret = io_uring_submit(&((IoUring *)thread_ctx->ring)->ring); + } + + if (ret < 0) { + fprintf(stderr, "ERROR: Submit error: %s (Code: %d)\n", strerror(-ret), + ret); + return -1; + } + + if (submitted) + *submitted = (uint32_t)ret; + + return 0; +} + static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { - IoUring *impl = (IoUring *)ring; struct io_uring_cqe *cqe_ptr = NULL; - int ret = io_uring_peek_cqe(&impl->ring, &cqe_ptr); + int ret = io_uring_peek_cqe(&((IoUring *)ring)->ring, &cqe_ptr); if (ret == -EAGAIN) { // No CQE available @@ -1412,7 +1582,8 @@ static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { if (ret < 0) { // Error - fprintf(stderr, "io_uring_peek_cqe error: %d (%s)\n", ret, strerror(-ret)); + fprintf(stderr, "WARNING: io_uring_peek_cqe error - Error: %s (Code: %d)\n", + strerror(-ret), ret); return -1; } @@ -1432,7 +1603,7 @@ static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { cqe->UserData = (uintptr_t)cqe_ptr->user_data; - io_uring_cqe_seen(&impl->ring, cqe_ptr); + io_uring_cqe_seen(&((IoUring *)ring)->ring, cqe_ptr); // Check for error and print warning if (res < 0) { @@ -1445,13 +1616,28 @@ static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { fprintf( stderr, - "WARNING: I/O completion error for file '%s' - Code: %d, Error: %s\n", - file_path, res, strerror(-res)); + "WARNING: I/O completion error for file '%s' - Error: %s (Code: %d)\n", + file_path, strerror(-res), ret); } return 1; } +FileHandle ioring_open_file(FileEntry *fe) { + +#if CHECK_FILE_SYSTEM + if (!fs_policy(fe->fs_type)) { + return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); + } +#endif + + FileHandle handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); + if (handle == INVALID_FILE_HANDLE) { + return os_file_open(fe->path, FLAG_SEQUENTIAL_READ); + } + return handle; +} + #endif // OS-agnostic helper macros @@ -1606,7 +1792,6 @@ static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { int idx = ctx->buffer_pool[--ctx->free_count]; IoBuffer *buf = &ctx->buffers[idx]; - buf->completed = 0; buf->bytes_read = 0; buf->result = IO_PENDING; buf->next = NULL; @@ -1621,28 +1806,6 @@ static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) { ctx->buffer_pool[ctx->free_count++] = buf->buffer_id; } -// -------------------------- Build read --------------------------- -static int build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, - IoBuffer *buf, uint64_t offset, size_t size) { - buf->offset = offset; - buf->size = size; - buf->file = file_ctx; - - BUILD_READ_RETURN_VALUE result = ioring_build_read( - thread_ctx, file_ctx, buf->buffer_id, size, offset, (uintptr_t)buf); - - if (IORING_SUCCEEDED(result)) { - file_ctx->active_reads++; - file_ctx->reads_submitted++; - thread_ctx->num_submissions++; - } else { - buf->completed = true; - buf->result = result; // Store the error code - return_buffer(thread_ctx, buf); - } - return result; -} - // -------------------------- Process completions --------------------------- static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) { IoRingCQE cqe; @@ -1664,26 +1827,31 @@ static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) { // -------------------- File operations ----------------------- static int init_file(ThreadIoContext *thread_ctx, FileReadContext *file, FileEntry *fe) { + +#if USE_REGISTERED_FILES + uint32_t saved_slot_id = file->slot_id; +#endif + memset(file, 0, sizeof(*file)); file->fe = fe; file->file_size = fe->size_bytes; - file->head = file->tail = NULL; - file->file_handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); + file->file_handle = ioring_open_file(fe); if (file->file_handle == INVALID_FILE_HANDLE) { #if IORING_DEBUG_PRINTS - printf("ERROR: Could not open file %s\n", fe->path); + printf("ERROR: Could not open file '%s'\n", fe->path); #endif return 0; } -#if (defined(_WIN32) || defined(_WIN64)) && USE_REGISTERED_FILES +#if USE_REGISTERED_FILES + file->slot_id = saved_slot_id; + if (thread_ctx->use_registered_files) { - thread_ctx->registered_handles[file->slot_id] = file->file_handle; - ioring_register_files(thread_ctx); + ioring_register_files_update(thread_ctx, file); } #endif @@ -1691,8 +1859,6 @@ static int init_file(ThreadIoContext *thread_ctx, FileReadContext *file, if (file->file_size > g_ioring_buffer_size) { file->use_incremental_hash = true; XXH3_128bits_reset(&file->hash_state); - } else { - file->use_incremental_hash = false; } return 1; } @@ -1727,15 +1893,29 @@ static void finalize_file(ThreadIoContext *thread_ctx, xxh3_hash_file_stream(fe->path, hash, thread_ctx->fallback_buffer); } + double size_kib = (double)fe->size_bytes / 1024.0; + char stack_buf[KiB(4)]; + int len; + +#if FILE_TIMES char created[32], modified[32]; format_time(fe->created_time, created, sizeof(created)); format_time(fe->modified_time, modified, sizeof(modified)); +#endif - double size_kib = (double)fe->size_bytes / 1024.0; - - char stack_buf[KiB(4)]; - int len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", - hash, fe->path, size_kib, created, modified, fe->owner); +#if FILE_TIMES && FILE_OWNER + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", + hash, fe->path, size_kib, created, modified, fe->owner); +#elif FILE_TIMES + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\n", hash, + fe->path, size_kib, created, modified); +#elif FILE_OWNER + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\n", hash, + fe->path, size_kib, fe->owner); +#else + len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\n", hash, fe->path, + size_kib); +#endif char *dst = arena_push(&worker_ctx->arena, len, false); memcpy(dst, stack_buf, len); @@ -1929,10 +2109,10 @@ static THREAD_RETURN hash_worker_ioring(void *arg) { process_completions(thread_ctx, &fq); #if IORING_DEBUG_STATS - printf("Free buffers: %d, Submissions: %d, Active files: %d, fq count: - % d\n ", thread_ctx->free_count, thread_ctx->num_submissions, - thread_ctx->active_files, - fq.count); + printf( + "Free buffers: %d, Submissions: %d, Active files: %d, fq count: %d\n", + thread_ctx->free_count, thread_ctx->num_submissions, + thread_ctx->active_files, fq.count); #endif // Hash files