diff --git a/.gitignore b/.gitignore index 96c65c4..19b273a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ Binaries/file_hashes.txt file_list.txt temp_code.c /.cache/clangd/index +/file_hasher +/io_uring_test diff --git a/README.md b/README.md index 697a40d..719ef96 100644 --- a/README.md +++ b/README.md @@ -7,20 +7,20 @@ Collects some metadata and hashes files. #### Release: clang-cl /O3 file_hasher.c xxh_x86dispatch.c -Note: MinGW does not provide IO Ring headers yet, to fix that include ioringapi.c, this will dynamically load all the functions and define all the symbols necessary to replace the official header. +Note: MinGW does not provide IO Ring headers yet, to fix that include ioringapi.c, this will dynamically load all the functions and define all the symbols necessary to replace the official header. clang -O3 file_hasher.c xxh_x86dispatch.c -o file_hasher gcc -O3 file_hasher.c xxh_x86dispatch.c -o file_hasher #### Debug: -clang-cl /Zi /Od file_hasher.c xxh_x86dispatch.c +clang-cl /Zi /Od file_hasher.c xxh_x86dispatch.c clang -g -O0 file_hasher.c xxh_x86dispatch.c -o file_hasher gcc -g -O0 file_hasher.c xxh_x86dispatch.c -o file_hasher ### Linux: #### Release: -clang -O3 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher -gcc -O3 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher +clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher #### Debug: -clang -g -O0 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher -gcc -g -O0 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher +clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher diff --git a/base.h b/base.h index 4bd4ca2..4439968 100644 --- a/base.h +++ b/base.h @@ -1,23 +1,11 @@ #pragma once #define _CRT_SECURE_NO_WARNINGS -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - #if defined(_WIN32) || defined(_WIN64) -// #define PLATFORM_WINDOWS 1 -// #define WIN32_LEAN_AND_MEAN -// #define NTDDI_VERSION NTDDI_WIN11 -// -// #pragma comment(lib, "kernel32.Lib") + +#if defined(_MSC_VER) +#pragma comment(lib, "advapi32.lib") +#endif #include #include @@ -29,20 +17,36 @@ #include #include -#if defined(_MSC_VER) -#pragma comment(lib, "advapi32.lib") +#elif defined(__linux__) + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE #endif -#define strdup _strdup -#else #include #include +#include #include #include +#include #include #include +#include +#include #endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + /* ------------------------------------------------------------ Base types ------------------------------------------------------------ */ diff --git a/binaries/changelog.txt b/binaries/changelog.txt index 0db7f6c..e2b14d8 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -50,7 +50,7 @@ Fixing user prompt parsing Reorganising the code Improving the scan function -5.0: Implementing the IO Ring instead of buffered hashing, huge performance gains. The IO Ring is thread local, uses DMA and direct disk I/O, bypassing the OS cash completely, it supports bashing multiple submissions and can handle multiple files at the same time. +5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cash completely, registred buffers, it supports bashing multiple submissions and can handle multiple files at the same time. Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers fixing the xxh_x86dispatch warnings Updating the progress printing function diff --git a/compile_commands.json b/compile_commands.json index 34a424d..0b29205 100644 --- a/compile_commands.json +++ b/compile_commands.json @@ -4,4 +4,4 @@ "command": "clang-cl /O2 file_hasher.c xxh_x86dispatch.c", "file": "file_hasher.c" } -] \ No newline at end of file +] diff --git a/file_hasher b/file_hasher new file mode 100644 index 0000000..8c12d07 Binary files /dev/null and b/file_hasher differ diff --git a/file_hasher.c b/file_hasher.c index 9bbe3d2..7948f12 100644 --- a/file_hasher.c +++ b/file_hasher.c @@ -87,7 +87,7 @@ int main(int argc, char **argv) { printf(" Selected instruction set: %s\n", get_xxhash_instruction_set()); // Align IO Ring block size to the system page size - g_ioring_buffer_size = ALIGN_UP_POW2(IORING_BUFFER_SIZE, g_pagesize); + g_ioring_buffer_size = ALIGN_UP_POW2(g_ioring_buffer_size, g_pagesize); // ------------------------------- // Scanning and hashing // ------------------------------- @@ -253,7 +253,7 @@ int main(int argc, char **argv) { FILE *f = fopen(FILE_HASHES_TXT, "wb"); - for (int i = 0; i < num_threads; i++) { + for (int i = 0; i < num_hash_threads; i++) { mem_arena *arena = workers[i].arena; u8 *arena_base = (u8 *)arena + ALIGN_UP_POW2(sizeof(mem_arena), arena->align); @@ -265,14 +265,13 @@ int main(int argc, char **argv) { // ------------------------------- // Print summary // ------------------------------- - // DEBUG uint64_t incomplete = atomic_load(&g_io_ring_fallbacks); if (incomplete > 0) { - printf( - "\nI/O Ring incomplete files: %llu (fallback to buffered I/O used)\n", - (unsigned long long)incomplete); + printf("\nWARNING: I/O Ring incomplete files: %llu (fallback to buffered " + "I/O used)\n", + (unsigned long long)incomplete); } - // + double total_seconds = timer_elapsed(&total_timer); printf("Completed hashing %zu files\n", total_found); diff --git a/io_uring_test b/io_uring_test new file mode 100644 index 0000000..6315aaa Binary files /dev/null and b/io_uring_test differ diff --git a/io_uring_test.c b/io_uring_test.c new file mode 100644 index 0000000..735a44d --- /dev/null +++ b/io_uring_test.c @@ -0,0 +1,454 @@ +/* +# Compile +gcc -o io_uring_test io_uring_test.c -luring + +# Run +./io_uring_test + */ +#include "base.h" +#include +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#define TEST_FILE "test_io_uring.txt" +#define BUFFER_SIZE 4096 +#define NUM_BUFFERS 4 + +// Colors for output +#define COLOR_GREEN "\033[0;32m" +#define COLOR_RED "\033[0;31m" +#define COLOR_YELLOW "\033[0;33m" +#define COLOR_BLUE "\033[0;34m" +#define COLOR_RESET "\033[0m" + +// Test result tracking +typedef struct { + int passed; + int failed; +} TestResults; + +static void print_success(const char *step) { + printf(COLOR_GREEN "[✓] SUCCESS: %s" COLOR_RESET "\n", step); +} + +static void print_failure(const char *step, const char *error) { + printf(COLOR_RED "[✗] FAILED: %s - %s" COLOR_RESET "\n", step, error); +} + +static void print_info(const char *msg) { + printf(COLOR_BLUE "[i] INFO: %s" COLOR_RESET "\n", msg); +} + +static void print_step(const char *step) { + printf(COLOR_YELLOW "\n>>> Testing: %s" COLOR_RESET "\n", step); +} + +// Create a test file with known content +static int create_test_file(void) { + const char *test_content = + "Hello, io_uring! This is a test file for async I/O operations.\n" + "Line 2: Testing reads with registered buffers.\n" + "Line 3: The quick brown fox jumps over the lazy dog.\n" + "Line 4: ABCDEFGHIJKLMNOPQRSTUVWXYZ\n" + "Line 5: 0123456789\n"; + + FILE *f = fopen(TEST_FILE, "w"); + if (!f) { + perror("Failed to create test file"); + return -1; + } + + fprintf(f, "%s", test_content); + fclose(f); + + print_info("Test file created successfully"); + return 0; +} + +// Test 1: Create io_uring instance +static int test_io_uring_create(struct io_uring *ring, TestResults *results) { + print_step("io_uring creation"); + + int ret = io_uring_queue_init(256, ring, 0); + if (ret < 0) { + print_failure("io_uring_queue_init", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("io_uring instance created"); + results->passed++; + return 0; +} + +// Test 2: Register buffers +static int test_register_buffers(struct io_uring *ring, void **buffers, + struct iovec *iovs, TestResults *results) { + print_step("Buffer registration"); + + // Allocate and prepare buffers + size_t total_size = BUFFER_SIZE * NUM_BUFFERS; + *buffers = aligned_alloc(4096, total_size); // Page-aligned for O_DIRECT + if (!*buffers) { + print_failure("Buffer allocation", strerror(errno)); + results->failed++; + return -1; + } + + // Initialize iovecs + for (int i = 0; i < NUM_BUFFERS; i++) { + iovs[i].iov_base = (char *)*buffers + (i * BUFFER_SIZE); + iovs[i].iov_len = BUFFER_SIZE; + memset(iovs[i].iov_base, 0, BUFFER_SIZE); + } + + int ret = io_uring_register_buffers(ring, iovs, NUM_BUFFERS); + if (ret < 0) { + print_failure("io_uring_register_buffers", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Buffers registered successfully"); + results->passed++; + return 0; +} + +// Test 3: Open file +// Modified test_open_file function +static int test_open_file(int *fd, TestResults *results) { + print_step("File opening"); + + // Get file size + struct stat st; + if (stat(TEST_FILE, &st) != 0) { + print_failure("stat", strerror(errno)); + results->failed++; + return -1; + } + + // Check if file size is page-aligned + int page_size = plat_get_pagesize(); + size_t file_size = st.st_size; + + printf(" File size: %zu bytes\n", file_size); + printf(" Page size: %d bytes\n", page_size); + + if (file_size % page_size != 0) { + printf(" Extending read size from %zu to %zu bytes\n", file_size, + ALIGN_UP_POW2(file_size, page_size)); + } + + // Try O_DIRECT first + *fd = open(TEST_FILE, O_RDONLY | O_DIRECT); + if (*fd < 0) { + print_info("O_DIRECT failed, trying without it"); + *fd = open(TEST_FILE, O_RDONLY); + if (*fd < 0) { + print_failure("open", strerror(errno)); + results->failed++; + return -1; + } + print_info("Using buffered I/O (O_DIRECT not available)"); + } else { + print_success("File opened with O_DIRECT"); + } + + results->passed++; + return 0; +} + +// Test 4: Build and submit read operation +static int test_submit_read(struct io_uring *ring, int fd, struct iovec *iovs, + int buffer_id, uint64_t user_data, + TestResults *results) { + print_step("Building and submitting read operation"); + + // Get file size for proper alignment + struct stat st; + if (fstat(fd, &st) != 0) { + print_failure("fstat", strerror(errno)); + results->failed++; + return -1; + } + + u32 page_size = plat_get_pagesize(); + size_t file_size = st.st_size; + size_t read_size = BUFFER_SIZE; + + // For O_DIRECT, ensure read size is sector-aligned + if (read_size > file_size) { + read_size = ALIGN_UP_POW2(file_size, page_size); + printf(" Adjusted read size to %zu bytes for O_DIRECT alignment\n", + read_size); + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("io_uring_get_sqe", "No available SQE"); + results->failed++; + return -1; + } + + // Prepare read operation using registered buffer + io_uring_prep_read_fixed(sqe, fd, iovs[buffer_id].iov_base, read_size, 0, + buffer_id); + io_uring_sqe_set_data64(sqe, user_data); + + int ret = io_uring_submit(ring); + if (ret < 0) { + print_failure("io_uring_submit", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Read operation submitted successfully"); + results->passed++; + return 0; +} + +// Test 5: Wait for completion +static int test_wait_completion(struct io_uring *ring, + struct io_uring_cqe **cqe, + TestResults *results) { + print_step("Waiting for completion"); + + int ret = io_uring_wait_cqe(ring, cqe); + if (ret < 0) { + print_failure("io_uring_wait_cqe", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Completion received"); + results->passed++; + return 0; +} + +// Test 6: Process completion +static int test_process_completion(struct io_uring_cqe *cqe, + uint64_t expected_user_data, + TestResults *results) { + print_step("Processing completion"); + + uint64_t user_data = io_uring_cqe_get_data64(cqe); + int res = cqe->res; + + printf(" Completion data:\n"); + printf(" User data: %lu (expected: %lu)\n", user_data, expected_user_data); + printf(" Result: %d bytes read\n", res); + + if (user_data != expected_user_data) { + print_failure("User data mismatch", + "User data doesn't match expected value"); + results->failed++; + return -1; + } + + if (res < 0) { + print_failure("Read operation", strerror(-res)); + results->failed++; + return -1; + } + + print_success("Completion processed successfully"); + results->passed++; + return res; // Return number of bytes read +} + +// Test 7: Verify read data +static int test_verify_data(struct iovec *iovs, int buffer_id, int bytes_read, + TestResults *results) { + print_step("Data verification"); + + char *data = (char *)iovs[buffer_id].iov_base; + + printf(" Read data (first 200 chars):\n"); + printf(" ---\n"); + for (int i = 0; i < bytes_read && i < 200; i++) { + putchar(data[i]); + } + if (bytes_read > 200) + printf("..."); + printf("\n ---\n"); + + // Check if data is not empty + if (bytes_read == 0) { + print_failure("Data verification", "No data read"); + results->failed++; + return -1; + } + + // Check if data contains expected content + if (strstr(data, "io_uring") == NULL) { + print_failure("Data verification", "Expected content not found"); + results->failed++; + return -1; + } + + print_success("Data verified successfully"); + results->passed++; + return 0; +} + +// Test 8: Test multiple concurrent reads +static int test_concurrent_reads(struct io_uring *ring, int fd, + struct iovec *iovs, TestResults *results) { + print_step("Concurrent reads test"); + + int num_reads = 3; + int submitted = 0; + + // Submit multiple reads + for (int i = 0; i < num_reads; i++) { + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("Getting SQE for concurrent read", "No available SQE"); + results->failed++; + return -1; + } + + off_t offset = i * 100; // Read from different offsets + io_uring_prep_read_fixed(sqe, fd, iovs[i].iov_base, BUFFER_SIZE, offset, i); + io_uring_sqe_set_data64(sqe, i); + submitted++; + } + + int ret = io_uring_submit(ring); + if (ret != submitted) { + char msg[64]; + snprintf(msg, sizeof(msg), "Expected %d, got %d", submitted, ret); + + print_failure("Submitting concurrent reads", msg); + results->failed++; + return -1; + } + + print_success("Concurrent reads submitted"); + + // Wait for and process completions + for (int i = 0; i < submitted; i++) { + struct io_uring_cqe *cqe; + ret = io_uring_wait_cqe(ring, &cqe); + if (ret < 0) { + print_failure("Waiting for concurrent read completion", strerror(-ret)); + results->failed++; + return -1; + } + + uint64_t user_data = io_uring_cqe_get_data64(cqe); + int res = cqe->res; + + printf(" Concurrent read %lu completed: %d bytes read\n", user_data, res); + io_uring_cqe_seen(ring, cqe); + } + + print_success("Concurrent reads completed successfully"); + results->passed++; + return 0; +} + +// Cleanup function +static void cleanup(struct io_uring *ring, int fd, void *buffers) { + if (fd >= 0) + close(fd); + if (buffers) { + io_uring_unregister_buffers(ring); + free(buffers); + } + io_uring_queue_exit(ring); + remove(TEST_FILE); +} + +int main() { + TestResults results = {0, 0}; + struct io_uring ring; + int fd = -1; + void *buffers = NULL; + struct iovec iovs[NUM_BUFFERS]; + + printf(COLOR_BLUE "\n========================================\n"); + printf(" io_uring Test Suite\n"); + printf("========================================\n" COLOR_RESET); + + // Create test file + if (create_test_file() != 0) { + return 1; + } + + // Test 1: Create io_uring + if (test_io_uring_create(&ring, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 2: Register buffers + if (test_register_buffers(&ring, &buffers, iovs, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 3: Open file + if (test_open_file(&fd, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 4: Submit read + uint64_t test_user_data = 12345; + if (test_submit_read(&ring, fd, iovs, 0, test_user_data, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 5: Wait for completion + struct io_uring_cqe *cqe; + if (test_wait_completion(&ring, &cqe, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 6: Process completion + int bytes_read = test_process_completion(cqe, test_user_data, &results); + if (bytes_read < 0) { + cleanup(&ring, fd, buffers); + return 1; + } + io_uring_cqe_seen(&ring, cqe); + + // Test 7: Verify data + if (test_verify_data(iovs, 0, bytes_read, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 8: Concurrent reads + if (test_concurrent_reads(&ring, fd, iovs, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Print summary + printf(COLOR_BLUE "\n========================================\n"); + printf(" TEST SUMMARY\n"); + printf("========================================\n" COLOR_RESET); + printf(" Total tests: %d\n", results.passed + results.failed); + printf(COLOR_GREEN " Passed: %d\n" COLOR_RESET, results.passed); + if (results.failed > 0) { + printf(COLOR_RED " Failed: %d\n" COLOR_RESET, results.failed); + } else { + printf(COLOR_GREEN " ✓ ALL TESTS PASSED!\n" COLOR_RESET); + } + + // Cleanup + cleanup(&ring, fd, buffers); + + return results.failed > 0 ? 1 : 0; +} diff --git a/platform.c b/platform.c index c9cce03..50e27a4 100644 --- a/platform.c +++ b/platform.c @@ -5,6 +5,7 @@ #include "lf_mpmc.h" #include "arena.c" +#include // xxhash include #define XXH_STATIC_LINKING_ONLY @@ -119,12 +120,14 @@ const char *get_xxhash_instruction_set(void) { #if defined(_WIN32) || defined(_WIN64) typedef HANDLE FileHandle; +#define FLAG_SEQUENTIAL_READ FILE_FLAG_SEQUENTIAL_SCAN +#define FLAG_ASYNC_DIRECT_READ (FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING) #define INVALID_FILE_HANDLE INVALID_HANDLE_VALUE // File open function -static FileHandle os_file_open(const char *path) { +static FileHandle os_file_open(const char *path, DWORD flags) { return CreateFileA(path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, - NULL, OPEN_EXISTING, FILE_FLAG_SEQUENTIAL_SCAN, NULL); + NULL, OPEN_EXISTING, flags, NULL); } // File read function @@ -141,11 +144,21 @@ static void os_file_close(FileHandle handle) { CloseHandle(handle); } #elif defined(__linux__) typedef int FileHandle; +#define FLAG_SEQUENTIAL_READ (0) +#define FLAG_ASYNC_DIRECT_READ (O_DIRECT) #define INVALID_FILE_HANDLE (-1) // File open function -static FileHandle os_file_open(const char *path) { - return open(path, O_RDONLY | O_NOFOLLOW); +static FileHandle os_file_open(const char *path, int flags) { + // Combine your mandatory flags with the user-provided flag + int fd = open(path, O_RDONLY | O_NOFOLLOW | flags); + + // If sequential was requested, advise the kernel + if (fd != -1 && (flags == FLAG_SEQUENTIAL_READ)) { + posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); + } + + return fd; } // File read function @@ -583,7 +596,7 @@ void scan_folder(const char *base, ScannerContext *ctx) { #elif defined(__linux__) static int platform_get_file_times_fd(int dir_fd, const char *name, - time_t *created, time_t *modified) { + uint64_t *created, uint64_t *modified) { struct stat st; if (fstatat(dir_fd, name, &st, 0) == 0) { *created = st.st_ctime; // or st.st_birthtime on systems that support it @@ -613,96 +626,64 @@ static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, return 0; } return -1; +} - void scan_folder(const char *base, ScannerContext *ctx) { - PathBuilder pb; - path_builder_init(&pb, base); +void scan_folder(const char *base, ScannerContext *ctx) { + PathBuilder pb; + path_builder_init(&pb, base); - int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW); - if (dir_fd == -1) - return; + int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW); + if (dir_fd == -1) + return; - DIR *dir = fdopendir(dir_fd); - if (!dir) { - close(dir_fd); - return; - } + DIR *dir = fdopendir(dir_fd); + if (!dir) { + close(dir_fd); + return; + } - struct dirent *entry; + struct dirent *entry; - while ((entry = readdir(dir)) != NULL) { - if (entry->d_name[0] == '.' && - (entry->d_name[1] == 0 || - (entry->d_name[1] == '.' && entry->d_name[2] == 0))) - continue; + while ((entry = readdir(dir)) != NULL) { + if (entry->d_name[0] == '.' && + (entry->d_name[1] == 0 || + (entry->d_name[1] == '.' && entry->d_name[2] == 0))) + continue; - size_t name_len = strlen(entry->d_name); - path_builder_set_filename(&pb, entry->d_name, name_len); + size_t name_len = strlen(entry->d_name); + path_builder_set_filename(&pb, entry->d_name, name_len); - int file_type = DT_UNKNOWN; + int file_type = DT_UNKNOWN; #ifdef _DIRENT_HAVE_D_TYPE - file_type = entry->d_type; + file_type = entry->d_type; #endif - // Fast path using d_type - if (file_type != DT_UNKNOWN) { - if (file_type == DT_LNK) - continue; // Skip symlinks + // Fast path using d_type + if (file_type != DT_UNKNOWN) { + if (file_type == DT_LNK) + continue; // Skip symlinks - if (file_type == DT_DIR) { - char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); - mpmc_push_work(ctx->dir_queue, dir_path); - continue; - } - - if (file_type == DT_REG) { - atomic_fetch_add(&g_files_found, 1); - FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); - - // Use fstatat for file info - struct stat st; - if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) { - // Convert times using fd variant - platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time, - &fe->modified_time); - platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner, - sizeof(fe->owner)); - fe->size_bytes = (uint64_t)st.st_size; - - // Normalize path - char temp_path[MAX_PATHLEN]; - memcpy(temp_path, pb.buffer, - (pb.filename_pos - pb.buffer) + name_len + 1); - normalize_path(temp_path); - - fe->path = - arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); - strcpy(fe->path, temp_path); - - mpmc_push(ctx->file_queue, fe); - } - continue; - } + if (file_type == DT_DIR) { + char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); + mpmc_push_work(ctx->dir_queue, dir_path); + continue; } - // Fallback for unknown types - struct stat st; - if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) { - if (S_ISLNK(st.st_mode)) - continue; + if (file_type == DT_REG) { + atomic_fetch_add(&g_files_found, 1); + FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); - if (S_ISDIR(st.st_mode)) { - char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); - mpmc_push_work(ctx->dir_queue, dir_path); - } else if (S_ISREG(st.st_mode)) { - atomic_fetch_add(&g_files_found, 1); - FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); - - platform_get_file_times(pb.buffer, &fe->created_time, - &fe->modified_time); - platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); + // Use fstatat for file info + struct stat st; + if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) { + // Convert times using fd variant + platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time, + &fe->modified_time); + platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner, + sizeof(fe->owner)); fe->size_bytes = (uint64_t)st.st_size; + // Normalize path char temp_path[MAX_PATHLEN]; memcpy(temp_path, pb.buffer, (pb.filename_pos - pb.buffer) + name_len + 1); @@ -713,12 +694,44 @@ static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, mpmc_push(ctx->file_queue, fe); } + continue; } } - closedir(dir); // Closes dir_fd automatically + // Fallback for unknown types + struct stat st; + if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) { + if (S_ISLNK(st.st_mode)) + continue; + + if (S_ISDIR(st.st_mode)) { + char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); + mpmc_push_work(ctx->dir_queue, dir_path); + } else if (S_ISREG(st.st_mode)) { + atomic_fetch_add(&g_files_found, 1); + FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); + + platform_get_file_times(pb.buffer, &fe->created_time, + &fe->modified_time); + platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); + fe->size_bytes = (uint64_t)st.st_size; + + char temp_path[MAX_PATHLEN]; + memcpy(temp_path, pb.buffer, + (pb.filename_pos - pb.buffer) + name_len + 1); + normalize_path(temp_path); + + fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); + strcpy(fe->path, temp_path); + + mpmc_push(ctx->file_queue, fe); + } + } } + closedir(dir); // Closes dir_fd automatically +} + #endif // ------------------------- Scan worker -------------------------------- @@ -745,7 +758,7 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, XXH3_state_t state; XXH3_128bits_reset(&state); - FileHandle handle = os_file_open(path); + FileHandle handle = os_file_open(path, FLAG_SEQUENTIAL_READ); if (handle == INVALID_FILE_HANDLE) { strcpy(out_hex, "ERROR"); return; @@ -874,23 +887,55 @@ static THREAD_RETURN progress_thread(void *arg) { // ======================== Hash worker IO Ring ======================== // -------------------------- Configuration --------------------------- -#define IORING_BUFFER_SIZE (KiB(512)) -#define NUM_BUFFERS_PER_THREAD 16 -#define MAX_ACTIVE_FILES 8 +// #define IORING_BUFFER_SIZE (KiB(32)) +#define IORING_BUFFER_SIZE (KiB(256)) +#define NUM_BUFFERS_PER_THREAD 32 +#define MAX_ACTIVE_FILES 1 #define SUBMIT_TIMEOUT_MS 30000 -#define USERDATA_REGISTER 1 // Globals u64 g_ioring_buffer_size = 4096 * 64; static atomic_uint_fast64_t g_io_ring_fallbacks = 0; -// -------------------------- File context --------------------------- +// -------------------------- Data structures --------------------------- + +#if defined(_WIN32) || defined(_WIN64) +// Windows I/O Ring types +typedef HIORING AsyncIoRing; +typedef HIORING AsyncIoHandle; +#define INVALID_ASYNC_IO_HANDLE NULL +#define SUBMIT_READ_RETURN_VALUE HRESULT + +#elif defined(__linux__) +// Linux io_uring types +typedef struct { + struct io_uring ring; + int event_fd; + struct io_uring_cqe *cqe_cache; + int cqe_cache_index; + int cqe_cache_count; +} AsyncIoRingImpl; + +typedef AsyncIoRingImpl *AsyncIoRing; +typedef int AsyncIoHandle; +typedef struct iovec IORING_BUFFER_INFO; +#define INVALID_ASYNC_IO_HANDLE (-1) +#define SUBMIT_READ_RETURN_VALUE int + +typedef struct { + int ResultCode; + uint32_t Information; + uintptr_t UserData; +} AsyncIoCompletion; + +#endif + typedef struct IoBuffer IoBuffer; typedef struct FileReadContext { - HANDLE hFile; + FileHandle hFile; uint64_t file_size; - bool use_incremental_hash; + int use_incremental_hash; union { XXH3_state_t hash_state; // For incremental hash (large files) XXH128_hash_t single_hash; // For single-shot hash (small files) @@ -922,7 +967,8 @@ typedef struct IoBuffer { uint64_t offset; size_t size; size_t bytes_read; - HRESULT result; + SUBMIT_READ_RETURN_VALUE result; + int buffer_id; int completed; @@ -933,18 +979,376 @@ typedef struct IoBuffer { // Thread-local I/O Ring context typedef struct ThreadIoContext { - HIORING ring; - HANDLE completion_event; + AsyncIoRing ring; + void *completion_event; + unsigned char *fallback_buffer; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD]; int free_count; + int submitting; int num_submissions; int active_files; - int initialized; + +#if defined(__linux__) + int use_registered_buffers; +#endif } ThreadIoContext; -static _Thread_local ThreadIoContext *g_thread_ctx = NULL; +typedef struct { + uint32_t MaxSubmissionQueueSize; + uint32_t MaxCompletionQueueSize; + uint32_t MaxVersion; +} AsyncIoCapabilities; +// ----------------------------- Async I/O Abstraction ------------------------- +#if defined(_WIN32) || defined(_WIN64) + +// Windows I/O Ring functions +static void async_io_query_capabilities(AsyncIoCapabilities *caps) { + IORING_CAPABILITIES win_caps; + QueryIoRingCapabilities(&win_caps); + caps->MaxSubmissionQueueSize = win_caps.MaxSubmissionQueueSize; + caps->MaxCompletionQueueSize = win_caps.MaxCompletionQueueSize; + caps->MaxVersion = win_caps.MaxVersion; +} + +static void *async_io_create_completion_event(void) { + return CreateEvent(NULL, FALSE, FALSE, NULL); +} + +static void async_io_set_completion_event(AsyncIoRing ring, void *event) { + SetIoRingCompletionEvent(ring, event); +} + +static void async_io_wait_for_completion(ThreadIoContext *ctx) { + if (ctx->num_submissions > 0) { + WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); + return; + } +} + +static int async_io_create_ring(ThreadIoContext *thread_ctx, + uint32_t queue_size) { + IORING_CREATE_FLAGS flags = {0}; + HRESULT hr = CreateIoRing(IORING_VERSION_3, flags, queue_size, queue_size * 2, + &thread_ctx->ring); + + // Create completion event + thread_ctx->completion_event = async_io_create_completion_event(); + if (thread_ctx->completion_event) { + async_io_set_completion_event(thread_ctx->ring, + thread_ctx->completion_event); + } + return SUCCEEDED(hr) ? 0 : -1; +} + +#define USERDATA_REGISTER 1 + +#define MAKE_BUF_INFO(a, l) \ + (IORING_BUFFER_INFO) { .Address = (a), .Length = (uint32_t)(l) } + +static int async_io_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { + HRESULT hr = SubmitIoRing(thread_ctx->ring, 0, timeout_ms, submitted); + // HRESULT hr = SubmitIoRing(ring, wait_count, timeout_ms, submitted); + + // The wait_count in windows is not implemented yet, so we wait with a + // completion event for a single completion + async_io_wait_for_completion(thread_ctx); + + return SUCCEEDED(hr) ? 0 : -1; +} + +static int async_io_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { + + HRESULT hr = BuildIoRingRegisterBuffers( + thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); + if (FAILED(hr)) { + LPSTR messageBuffer = NULL; + + size_t size = FormatMessageA( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&messageBuffer, 0, NULL); + + if (size > 0) { + fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", messageBuffer, + (unsigned int)hr); + LocalFree(messageBuffer); // Free the memory allocated by FormatMessage + } else { + fprintf(stderr, "Error registering buffers: Unknown HRESULT (0x%08X)\n", + (unsigned int)hr); + } + } + // Submit registration + async_io_submit(thread_ctx, 0, 0, NULL); + + return hr; +} + +static void async_io_close_event(void *event) { CloseHandle(event); } + +static int async_io_close_ring(ThreadIoContext *thread_ctx) { + + if (thread_ctx->completion_event) + async_io_close_event(thread_ctx->completion_event); + CloseIoRing(thread_ctx->ring); + return 0; +} + +static SUBMIT_READ_RETURN_VALUE +async_io_build_read(ThreadIoContext *thread_ctx, AsyncIoHandle file_handle, + uint32_t buffer_id, size_t size, uint64_t offset, + uintptr_t user_data) { + IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_handle); + IORING_BUFFER_REF buffer_ref = + IoRingBufferRefFromIndexAndOffset(buffer_id, 0); + HRESULT hr = + BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref, + (uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE); + return hr; +} + +typedef struct { + HRESULT ResultCode; + uint32_t Information; + uintptr_t UserData; +} AsyncIoCompletion; + +static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { + IORING_CQE win_cqe; + + while (1) { + HRESULT hr = PopIoRingCompletion(ring, &win_cqe); + + if (hr == S_FALSE) + return 0; + + if (FAILED(hr)) + return -1; + + // Unlike linux, in addition of IO operations, Windows IO Ring produces CQEs + // (completion queue entries) when doing operations like register buffer or + // submiting, we filter them here cqe.UserData == USERDATA_REGISTER + // cqe.ResultCode == S_OK (or error) + if (win_cqe.UserData == USERDATA_REGISTER) + continue; + + cqe->ResultCode = win_cqe.ResultCode; + cqe->Information = win_cqe.Information; + cqe->UserData = win_cqe.UserData; + + // Check for error and print warning + if (FAILED(win_cqe.ResultCode)) { + char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, win_cqe.ResultCode, 0, error_msg, sizeof(error_msg), + NULL); + fprintf(stderr, + "WARNING: I/O completion error - Code: 0x%lx, Error: %s\n", + win_cqe.ResultCode, error_msg); + } + + return 1; + } +} + +#elif defined(__linux__) +// Linux io_uring functions implementation +static void async_io_query_capabilities(AsyncIoCapabilities *caps) { + // Get system limits for io_uring + long max_entries = sysconf(_SC_IOV_MAX); + if (max_entries <= 0) + max_entries = 4096; + + caps->MaxSubmissionQueueSize = + (uint32_t)(max_entries < 4096 ? max_entries : 4096); + caps->MaxCompletionQueueSize = caps->MaxSubmissionQueueSize * 2; + caps->MaxVersion = 1; +} + +// static int async_io_create_ring(uint32_t queue_size, AsyncIoRing *ring) { +static int async_io_create_ring(ThreadIoContext *thread_ctx, + uint32_t queue_size) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)calloc(1, sizeof(AsyncIoRingImpl)); + if (!impl) + return -1; + + // Initialize io_uring + struct io_uring_params params = {0}; + + params.flags = + IORING_SETUP_CQSIZE | + IORING_SETUP_SINGLE_ISSUER | // Thread local io_uring + IORING_SETUP_DEFER_TASKRUN; // Do not send interupts when a CQE is ready, + // send them when a wait function is called, + // and groupe them acording to the nbre or + // entries to wait (reduces syscall overhead) + + params.cq_entries = queue_size * 2; + + int ret = io_uring_queue_init_params(queue_size, &impl->ring, ¶ms); + if (ret < 0) { + // Fallback to without params + printf("WARNING: Creating io_uring with default params\n"); + ret = io_uring_queue_init(queue_size, &impl->ring, 0); + if (ret < 0) { + free(impl); + return -1; + } + } + + impl->cqe_cache = NULL; + impl->cqe_cache_index = 0; + impl->cqe_cache_count = 0; + + thread_ctx->ring = impl; + return 0; +} + +#define MAKE_BUF_INFO(a, l) \ + (IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) } + +static int async_io_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; + + int hr = io_uring_register_buffers(&impl->ring, buf_info, num_buffers); + + if (hr < 0) { + fprintf(stderr, "Error registering buffers: %s (code: %d)\n", strerror(-hr), + hr); + fprintf(stderr, "WARNING: Memlock limit too low buffer size! Fallback to " + "unregistred buffers\n"); + } + return hr == 0 ? 0 : -1; +} + +static int async_io_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; + if (!impl) + return -1; + + int ret; + + if (wait_count > 0) { + ret = io_uring_submit_and_wait(&impl->ring, wait_count); + } else { + ret = io_uring_submit(&impl->ring); + } + + if (ret < 0) { + fprintf(stderr, "submit error: %s\n", strerror(-ret)); + return -1; + } + + if (submitted) + *submitted = (uint32_t)ret; + + return 0; +} + +static int async_io_close_ring(ThreadIoContext *thread_ctx) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; + if (!impl) + return -1; + + if (thread_ctx->use_registered_buffers) { + io_uring_unregister_buffers(&impl->ring); + } + close(impl->event_fd); + io_uring_queue_exit(&impl->ring); + free(impl); + + return 0; +} + +static int async_io_build_read(ThreadIoContext *thread_ctx, + AsyncIoHandle file_handle, uint32_t buffer_id, + size_t size, uint64_t offset, + uintptr_t user_data) { + AsyncIoRing ring = thread_ctx->ring; + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)ring; + if (!impl) + return -1; + + struct io_uring_sqe *sqe = io_uring_get_sqe(&impl->ring); + if (!sqe) { + printf("SQE FULL\n"); + return -1; + } + + ThreadIoContext *ctx = thread_ctx; // or pass it properly TODO : look here + + void *buf = ctx->buffers[buffer_id].data; + + if (thread_ctx->use_registered_buffers) { + io_uring_prep_read_fixed(sqe, file_handle, buf, size, offset, buffer_id); + } else { + io_uring_prep_read(sqe, file_handle, buf, size, offset); + } + + io_uring_sqe_set_data64(sqe, user_data); + return 0; +} + +static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)ring; + + struct io_uring_cqe *cqe_ptr = NULL; + + int ret = io_uring_peek_cqe(&impl->ring, &cqe_ptr); + + if (ret == -EAGAIN) { + // No CQE available + return 0; + } + + if (ret < 0) { + // Error + fprintf(stderr, "io_uring_peek_cqe error: %d (%s)\n", ret, strerror(-ret)); + return -1; + } + + if (!cqe_ptr) { + return 0; + } + + int res = cqe_ptr->res; + + if (res >= 0) { + cqe->ResultCode = 0; + cqe->Information = (uint32_t)res; + } else { + cqe->ResultCode = res; + cqe->Information = 0; + } + + cqe->UserData = (uintptr_t)cqe_ptr->user_data; + + io_uring_cqe_seen(&impl->ring, cqe_ptr); + + // Check for error and print warning + if (res < 0) { + fprintf(stderr, "WARNING: I/O completion error - Code: %d, Error: %s\n", + res, strerror(-res)); + } + + return 1; +} + +#endif + +// OS-agnostic helper macros +#define ASYNC_IO_SUCCEEDED(result) ((result) >= 0) +#define ASYNC_IO_FAILED(result) ((result) < 0) + +// ---------------------- FIFO queue operations --------------------------- typedef struct FileQueue { FileReadContext files[MAX_ACTIVE_FILES]; int head; @@ -952,7 +1356,6 @@ typedef struct FileQueue { int count; } FileQueue; -// ---------------------- FIFO queue operations --------------------------- static FileReadContext *fq_push(FileQueue *fq) { if (fq->count == MAX_ACTIVE_FILES) return NULL; @@ -974,96 +1377,115 @@ static void fq_pop(FileQueue *fq) { fq->count--; } +static void fq_remove_at(FileQueue *fq, int index) { + if (fq->count == 0) + return; + + int remove_idx = (fq->head + index) % MAX_ACTIVE_FILES; + + int last_logical = fq->count - 1; + int last_idx = (fq->head + last_logical) % MAX_ACTIVE_FILES; + + // Swap with last logical element if needed + if (index != last_logical) { + fq->files[remove_idx] = fq->files[last_idx]; + } + + // Just decrease count + fq->count--; + + // Recompute tail properly + fq->tail = (fq->head + fq->count) % MAX_ACTIVE_FILES; +} + // ---------------------- Initialize thread context --------------------------- static ThreadIoContext *io_ring_init_thread(void) { - if (g_thread_ctx && g_thread_ctx->initialized) { - return g_thread_ctx; - } + ThreadIoContext *thread_ctx = + (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext)); + if (!thread_ctx) + return NULL; - if (!g_thread_ctx) { - g_thread_ctx = (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext)); - if (!g_thread_ctx) - return NULL; - } + // Query I/O Ring capabilities + AsyncIoCapabilities caps; + async_io_query_capabilities(&caps); + + uint32_t queue_size = caps.MaxSubmissionQueueSize; + if (queue_size > 4096) + queue_size = 4096; // Cap at 4096 for reasonable memory usage // Create I/O Ring - IORING_CAPABILITIES caps; - QueryIoRingCapabilities(&caps); - - UINT32 queue_size = min(4096, caps.MaxSubmissionQueueSize); - IORING_CREATE_FLAGS flags = {0}; - HRESULT hr = CreateIoRing(caps.MaxVersion, flags, queue_size, queue_size * 2, - &g_thread_ctx->ring); - - // Create completion event - g_thread_ctx->completion_event = CreateEvent(NULL, FALSE, FALSE, NULL); - if (g_thread_ctx->completion_event) { - SetIoRingCompletionEvent(g_thread_ctx->ring, - g_thread_ctx->completion_event); + if (async_io_create_ring(thread_ctx, queue_size) != 0) { + free(thread_ctx); + thread_ctx = NULL; + return NULL; } // Initialize buffer pool + thread_ctx->fallback_buffer = (unsigned char *)malloc(READ_BLOCK); + IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD]; u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; - // Reserve and Commit the entire memory chunk + // Reserve and Commit memory for buffers void *base_ptr = plat_mem_reserve(buf_pool_size); if (base_ptr) { if (!plat_mem_commit(base_ptr, buf_pool_size)) { plat_mem_release(base_ptr, 0); + async_io_close_ring(thread_ctx); + free(thread_ctx); + thread_ctx = NULL; return NULL; } } else { + + async_io_close_ring(thread_ctx); + free(thread_ctx); + thread_ctx = NULL; return NULL; } for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) { + thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size); + thread_ctx->buffer_pool[i] = i; + thread_ctx->buffers[i].buffer_id = i; - g_thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size); - - g_thread_ctx->buffer_pool[i] = i; - g_thread_ctx->buffers[i].buffer_id = i; - - buf_info[i].Address = g_thread_ctx->buffers[i].data; - buf_info[i].Length = (ULONG)g_ioring_buffer_size; + buf_info[i] = + MAKE_BUF_INFO(thread_ctx->buffers[i].data, g_ioring_buffer_size); } - g_thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; + thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; - HRESULT hb = BuildIoRingRegisterBuffers( - g_thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); + // Register buffers + int hr = + async_io_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info); - if (FAILED(hb)) { - printf("Buffer registration failed: 0x%lx\n", hb); - return NULL; - } + thread_ctx->use_registered_buffers = (hr == 0); + thread_ctx->submitting = 1; + thread_ctx->num_submissions = 0; + thread_ctx->active_files = 0; - // Submit registration - SubmitIoRing(g_thread_ctx->ring, 0, 0, NULL); - - g_thread_ctx->num_submissions = 0; - g_thread_ctx->active_files = 0; - - g_thread_ctx->initialized = 1; - - return g_thread_ctx; + return thread_ctx; } -static void io_ring_cleanup_thread(void) { - if (!g_thread_ctx) +static void io_ring_cleanup_thread(ThreadIoContext *thread_ctx) { + if (!thread_ctx) return; - if (g_thread_ctx->completion_event) - CloseHandle(g_thread_ctx->completion_event); - if (g_thread_ctx->ring) - CloseIoRing(g_thread_ctx->ring); - plat_mem_release(g_thread_ctx->buffers[0].data, 0); - free(g_thread_ctx); - g_thread_ctx = NULL; + if (thread_ctx->ring) + async_io_close_ring(thread_ctx); + + // Free the buffer pool memory + if (thread_ctx->buffers[0].data) { + u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; + plat_mem_release(thread_ctx->buffers[0].data, buf_pool_size); + } + + free(thread_ctx); + thread_ctx = NULL; } -// ---------------------- Buffer get and return ------------------------ +// -------------------------- Buffer get and return ------------------------ static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { if (ctx->free_count == 0) { @@ -1074,7 +1496,7 @@ static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { IoBuffer *buf = &ctx->buffers[idx]; buf->completed = 0; buf->bytes_read = 0; - buf->result = E_PENDING; + buf->result = 0; return buf; } @@ -1087,29 +1509,26 @@ static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) { } // -------------------------- Submit async read --------------------------- -static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx, - IoBuffer *buf, uint64_t offset, size_t size) { +static int build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, + IoBuffer *buf, uint64_t offset, size_t size) { buf->offset = offset; buf->size = size; buf->file = file_ctx; - IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->hFile); - IORING_BUFFER_REF buffer_ref = - IoRingBufferRefFromIndexAndOffset(buf->buffer_id, 0); + SUBMIT_READ_RETURN_VALUE result = + async_io_build_read(thread_ctx, file_ctx->hFile, buf->buffer_id, size, + offset, (uintptr_t)buf); - HRESULT hr = - BuildIoRingReadFile(ctx->ring, file_ref, buffer_ref, (UINT32)size, offset, - (UINT_PTR)buf, IOSQE_FLAGS_NONE); - - if (SUCCEEDED(hr)) { + if (ASYNC_IO_SUCCEEDED(result)) { file_ctx->active_reads++; file_ctx->reads_submitted++; - ctx->num_submissions++; + thread_ctx->num_submissions++; } else { buf->completed = 1; - return_buffer(ctx, buf); + buf->result = result; // Store the error code + return_buffer(thread_ctx, buf); } - return hr; + return result; } // ------------ Link completed buffers in an ordered list ------------- @@ -1144,26 +1563,24 @@ static void insert_buffer_ordered(FileReadContext *file, IoBuffer *buf) { } // -------------------------- Process completions --------------------------- -static void process_completions(ThreadIoContext *ctx, FileQueue *fq) { - IORING_CQE cqe; +static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) { + AsyncIoCompletion cqe; - while (PopIoRingCompletion(ctx->ring, &cqe) == S_OK) { - - if (cqe.UserData == USERDATA_REGISTER || cqe.UserData == 0) - continue; + // Keep processing as long as there are completions available + while (async_io_pop_completion(thread_ctx->ring, &cqe) == 1) { IoBuffer *buf = (IoBuffer *)cqe.UserData; FileReadContext *file = buf->file; buf->result = cqe.ResultCode; - buf->bytes_read = (DWORD)cqe.Information; + buf->bytes_read = cqe.Information; buf->completed = 1; file->active_reads--; file->reads_completed++; - ctx->num_submissions--; + thread_ctx->num_submissions--; - if (SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) { + if (ASYNC_IO_SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) { buf->next = NULL; @@ -1171,7 +1588,7 @@ static void process_completions(ThreadIoContext *ctx, FileQueue *fq) { } else { file->failed_reads++; - return_buffer(ctx, buf); + return_buffer(thread_ctx, buf); } } } @@ -1183,11 +1600,10 @@ static int init_file(FileReadContext *f, FileEntry *fe) { f->fe = fe; f->file_size = fe->size_bytes; - f->hFile = CreateFileA( - fe->path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, - OPEN_EXISTING, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL); + // Use the abstracted os_file_open_async for async I/O with no buffering + f->hFile = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); - if (f->hFile == INVALID_HANDLE_VALUE) { + if (f->hFile == INVALID_ASYNC_IO_HANDLE) { return 0; } @@ -1201,7 +1617,8 @@ static int init_file(FileReadContext *f, FileEntry *fe) { return 1; } -static void finalize_file(FileReadContext *file, WorkerContext *ctx) { +static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx, + WorkerContext *worker_ctx) { FileEntry *fe = file->fe; @@ -1221,8 +1638,9 @@ static void finalize_file(FileReadContext *file, WorkerContext *ctx) { } } else { atomic_fetch_add(&g_io_ring_fallbacks, 1); - xxh3_hash_file_stream(fe->path, hash, NULL); - printf("Fallback for path: %s\n", fe->path); + xxh3_hash_file_stream(fe->path, hash, thread_ctx->fallback_buffer); + // DEBUG + // printf("Fallback for path: %s\n", fe->path); } char created[32], modified[32]; @@ -1231,73 +1649,144 @@ static void finalize_file(FileReadContext *file, WorkerContext *ctx) { double size_kib = (double)fe->size_bytes / 1024.0; - char stack_buf[1024]; + char stack_buf[KiB(4)]; int len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib, created, modified, fe->owner); - char *dst = arena_push(&ctx->arena, len, false); + char *dst = arena_push(&worker_ctx->arena, len, false); memcpy(dst, stack_buf, len); atomic_fetch_add(&g_files_hashed, 1); } // -------------------- Hash head file ----------------------- -static void hash_head_file(ThreadIoContext *ctx, FileQueue *fq, - WorkerContext *wctx) { +static void hash_head_file(ThreadIoContext *thread_ctx, FileQueue *fq, + WorkerContext *worker_ctx) { FileReadContext *file = fq_peek(fq); if (!file) return; - while (file->head) { + // Keep hashing while the next buffer in sequence is ready at head + while (file->head && file->head->offset == file->next_hash_offset) { IoBuffer *buf = file->head; - // Check ordering - if (buf->offset != file->bytes_hashed) - return; - - // Consume + // Consume from head file->head = buf->next; if (!file->head) file->tail = NULL; - // Calculate actual bytes to hash (handle last partial sector) - size_t bytes_to_hash = buf->bytes_read; + // Process the buffer + if (ASYNC_IO_SUCCEEDED(buf->result) && buf->bytes_read > 0) { + // Calculate actual bytes to hash (handle last partial sector) + size_t bytes_to_hash = buf->bytes_read; - // If this is the last buffer and we read beyond file size, trim it - if (buf->offset + buf->bytes_read > file->file_size) { - bytes_to_hash = file->file_size - buf->offset; - } - - if (bytes_to_hash > 0) { - if (file->use_incremental_hash) { - // Large file: update incremental hash state - XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); - } else { - // Small file: single-shot hash - file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + // If this is the last buffer and we read beyond file size, trim it + if (buf->offset + buf->bytes_read > file->file_size) { + bytes_to_hash = file->file_size - buf->offset; } - file->bytes_hashed += bytes_to_hash; - atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + if (bytes_to_hash > 0) { + if (file->use_incremental_hash) { + // Large file: update incremental hash state + XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); + } else { + // Small file: single-shot hash + file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + } + + file->bytes_hashed += bytes_to_hash; + atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + } + + file->reads_hashed++; + } else if (buf->bytes_read == 0 && ASYNC_IO_SUCCEEDED(buf->result)) { + // Read operation completed with 0 bytes (EOF) + file->reads_hashed++; + } else { + // Read failed + file->failed_reads++; + file->reads_hashed++; } - return_buffer(ctx, buf); + // Move to next offset + file->next_hash_offset += buf->size; + + // Return buffer to pool + return_buffer(thread_ctx, buf); } - // Finalize + // Finalize file when all reads are complete if (file->active_reads == 0 && file->bytes_hashed >= file->file_size) { - finalize_file(file, wctx); - CloseHandle(file->hFile); + finalize_file(file, thread_ctx, worker_ctx); + os_file_close(file->hFile); fq_pop(fq); - ctx->active_files--; + thread_ctx->active_files--; + } +} + +static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq, + WorkerContext *worker_ctx) { + for (int i = 0; i < fq->count;) { + + int idx = (fq->head + i) % MAX_ACTIVE_FILES; + FileReadContext *file = &fq->files[idx]; + + bool progressed = false; + + // ---- HASH READY BUFFERS ---- + while (file->head) { + + IoBuffer *buf = file->head; + + if (buf->offset != file->bytes_hashed) + break; + + progressed = true; + + file->head = buf->next; + if (!file->head) + file->tail = NULL; + + size_t bytes_to_hash = buf->bytes_read; + + if (buf->offset + buf->bytes_read > file->file_size) { + bytes_to_hash = file->file_size - buf->offset; + } + + if (bytes_to_hash > 0) { + if (file->use_incremental_hash) { + XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); + } else { + file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + } + + file->bytes_hashed += bytes_to_hash; + atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + } + + return_buffer(thread_ctx, buf); + } + + // ---- FINALIZE ---- + if (file->active_reads == 0 && file->bytes_hashed >= file->file_size) { + + finalize_file(file, thread_ctx, worker_ctx); + os_file_close(file->hFile); + + fq_remove_at(fq, i); + thread_ctx->active_files--; + + continue; + } + i++; } } // ------------- Submit pending reads - fill all free buffers ----------------- -static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, - WorkerContext *worker_ctx, int *submitting) { +static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, + WorkerContext *worker_ctx) { MPMCQueue *file_queue = worker_ctx->file_queue; @@ -1309,7 +1798,7 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, if (f) { while (f->next_read_offset < f->file_size) { - IoBuffer *buf = get_free_buffer(ctx); + IoBuffer *buf = get_free_buffer(thread_ctx); if (!buf) return; @@ -1318,7 +1807,7 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, // Check if this is the last read and the file size is not // sector-aligned - BOOL is_last_read = (remaining <= g_ioring_buffer_size); + int is_last_read = (remaining <= g_ioring_buffer_size); if (remaining >= g_ioring_buffer_size) { // Normal full read @@ -1333,10 +1822,11 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, } } - HRESULT hr = submit_read(ctx, f, buf, f->next_read_offset, size); + SUBMIT_READ_RETURN_VALUE hr = + build_read(thread_ctx, f, buf, f->next_read_offset, size); - if (FAILED(hr)) { - return_buffer(ctx, buf); + if (ASYNC_IO_FAILED(hr)) { + return_buffer(thread_ctx, buf); f->failed_reads++; f->active_reads = 0; f->reads_submitted = 0; @@ -1345,22 +1835,19 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, } f->next_read_offset += size; - - if (ctx->num_submissions >= NUM_BUFFERS_PER_THREAD) - return; } } // Add new file if possible - if (!*submitting) + if (!thread_ctx->submitting) return; - if (ctx->active_files >= MAX_ACTIVE_FILES) + if (thread_ctx->active_files >= MAX_ACTIVE_FILES) return; FileEntry *fe = mpmc_pop(file_queue); if (!fe) { - *submitting = 0; + thread_ctx->submitting = 0; return; } @@ -1369,37 +1856,24 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, if (!init_file(newf, fe)) { // File can't be opened with NO_BUFFERING, process with fallback char hash[HASH_STRLEN]; - finalize_file(newf, worker_ctx); + finalize_file(newf, thread_ctx, worker_ctx); fq_pop(fq); continue; } f = newf; - ctx->active_files++; + thread_ctx->active_files++; } } -// -------------------------- Wait for completions --------------------------- -static void wait_for_completions(ThreadIoContext *ctx) { - - // If there are in-flight I/O requests → wait for completion - if (ctx->num_submissions > 0) { - WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); - return; - } - - // Sleep(1); -} - // -------------------------- Hash worker I/O Ring --------------------------- static THREAD_RETURN hash_worker_io_ring(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; // Init IO ring - ThreadIoContext *ring_ctx = io_ring_init_thread(); - if (!ring_ctx || !ring_ctx->ring) { - printf("Thread %lu: I/O Ring unavailable, using buffered I/O\n", - GetCurrentThreadId()); + ThreadIoContext *thread_ctx = io_ring_init_thread(); + if (!thread_ctx || !thread_ctx->ring) { + printf("I/O Ring unavailable, using buffered I/O\n"); return hash_worker(arg); } @@ -1407,40 +1881,43 @@ static THREAD_RETURN hash_worker_io_ring(void *arg) { FileQueue fq; memset(&fq, 0, sizeof(fq)); - int submitting = 1; + uint32_t submitted = 0; + uint32_t wait_count; // Main pipeline loop for (;;) { - // 1. Submit new reads - submit_pending_reads(ring_ctx, &fq, ctx, &submitting); + // Submit new reads + build_pending_reads(thread_ctx, &fq, ctx); - UINT32 submitted = 0; - SubmitIoRing(ring_ctx->ring, 0, 0, &submitted); + wait_count = MIN(thread_ctx->num_submissions, NUM_BUFFERS_PER_THREAD - 2); - // 5. Avoid busy witing - wait_for_completions(ring_ctx); + submitted = 0; + // async_io_submit(ring_ctx->ring, 0, 0, &submitted); + async_io_submit(thread_ctx, wait_count, 0, &submitted); - // 2. Process completions - process_completions(ring_ctx, &fq); - - // 3. Hash files - for (int i = 0; i < fq.count; i++) { - hash_head_file(ring_ctx, &fq, ctx); - } + // Process completions + process_completions(thread_ctx, &fq); // debug // printf("Free buffers: %d, Submissions: %d, Active files: %d\n", // ring_ctx->free_count, ring_ctx->num_submissions, // ring_ctx->active_files); + // debug end - // 4. Exit condition - if (!submitting && ring_ctx->active_files == 0 && - ring_ctx->num_submissions == 0) { + // Hash files + for (int i = 0; i < fq.count; i++) { + hash_head_file(thread_ctx, &fq, ctx); + } + // hash_ready_files(ring_ctx, &fq, ctx); + + // Exit condition + if (!thread_ctx->submitting && thread_ctx->active_files == 0 && + thread_ctx->num_submissions == 0) { break; } } - io_ring_cleanup_thread(); + io_ring_cleanup_thread(thread_ctx); return THREAD_RETURN_VALUE; }