diff --git a/.gitignore b/.gitignore index 96c65c4..19b273a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ Binaries/file_hashes.txt file_list.txt temp_code.c /.cache/clangd/index +/file_hasher +/io_uring_test diff --git a/README.md b/README.md index 697a40d..359fbb6 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,9 @@ gcc -g -O0 file_hasher.c xxh_x86dispatch.c -o file_hasher ### Linux: #### Release: -clang -O3 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher -gcc -O3 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher +clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher #### Debug: -clang -g -O0 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher -gcc -g -O0 -pthread file_hasher.c xxh_x86dispatch.c -o file_hasher +clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher +gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher diff --git a/base.h b/base.h index 4bd4ca2..4439968 100644 --- a/base.h +++ b/base.h @@ -1,23 +1,11 @@ #pragma once #define _CRT_SECURE_NO_WARNINGS -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - #if defined(_WIN32) || defined(_WIN64) -// #define PLATFORM_WINDOWS 1 -// #define WIN32_LEAN_AND_MEAN -// #define NTDDI_VERSION NTDDI_WIN11 -// -// #pragma comment(lib, "kernel32.Lib") + +#if defined(_MSC_VER) +#pragma comment(lib, "advapi32.lib") +#endif #include #include @@ -29,20 +17,36 @@ #include #include -#if defined(_MSC_VER) -#pragma comment(lib, "advapi32.lib") +#elif defined(__linux__) + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE #endif -#define strdup _strdup -#else #include #include +#include #include #include +#include #include #include +#include +#include #endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + /* ------------------------------------------------------------ Base types ------------------------------------------------------------ */ diff --git a/binaries/changelog.txt b/binaries/changelog.txt index 0db7f6c..e2b14d8 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -50,7 +50,7 @@ Fixing user prompt parsing Reorganising the code Improving the scan function -5.0: Implementing the IO Ring instead of buffered hashing, huge performance gains. The IO Ring is thread local, uses DMA and direct disk I/O, bypassing the OS cash completely, it supports bashing multiple submissions and can handle multiple files at the same time. +5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cash completely, registred buffers, it supports bashing multiple submissions and can handle multiple files at the same time. Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers fixing the xxh_x86dispatch warnings Updating the progress printing function diff --git a/compile_commands.json b/compile_commands.json index 34a424d..0b29205 100644 --- a/compile_commands.json +++ b/compile_commands.json @@ -4,4 +4,4 @@ "command": "clang-cl /O2 file_hasher.c xxh_x86dispatch.c", "file": "file_hasher.c" } -] \ No newline at end of file +] diff --git a/file_hasher b/file_hasher new file mode 100644 index 0000000..8c12d07 Binary files /dev/null and b/file_hasher differ diff --git a/file_hasher.c b/file_hasher.c index 9bbe3d2..7948f12 100644 --- a/file_hasher.c +++ b/file_hasher.c @@ -87,7 +87,7 @@ int main(int argc, char **argv) { printf(" Selected instruction set: %s\n", get_xxhash_instruction_set()); // Align IO Ring block size to the system page size - g_ioring_buffer_size = ALIGN_UP_POW2(IORING_BUFFER_SIZE, g_pagesize); + g_ioring_buffer_size = ALIGN_UP_POW2(g_ioring_buffer_size, g_pagesize); // ------------------------------- // Scanning and hashing // ------------------------------- @@ -253,7 +253,7 @@ int main(int argc, char **argv) { FILE *f = fopen(FILE_HASHES_TXT, "wb"); - for (int i = 0; i < num_threads; i++) { + for (int i = 0; i < num_hash_threads; i++) { mem_arena *arena = workers[i].arena; u8 *arena_base = (u8 *)arena + ALIGN_UP_POW2(sizeof(mem_arena), arena->align); @@ -265,14 +265,13 @@ int main(int argc, char **argv) { // ------------------------------- // Print summary // ------------------------------- - // DEBUG uint64_t incomplete = atomic_load(&g_io_ring_fallbacks); if (incomplete > 0) { - printf( - "\nI/O Ring incomplete files: %llu (fallback to buffered I/O used)\n", - (unsigned long long)incomplete); + printf("\nWARNING: I/O Ring incomplete files: %llu (fallback to buffered " + "I/O used)\n", + (unsigned long long)incomplete); } - // + double total_seconds = timer_elapsed(&total_timer); printf("Completed hashing %zu files\n", total_found); diff --git a/io_uring_test b/io_uring_test new file mode 100644 index 0000000..6315aaa Binary files /dev/null and b/io_uring_test differ diff --git a/io_uring_test.c b/io_uring_test.c new file mode 100644 index 0000000..735a44d --- /dev/null +++ b/io_uring_test.c @@ -0,0 +1,454 @@ +/* +# Compile +gcc -o io_uring_test io_uring_test.c -luring + +# Run +./io_uring_test + */ +#include "base.h" +#include +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#define TEST_FILE "test_io_uring.txt" +#define BUFFER_SIZE 4096 +#define NUM_BUFFERS 4 + +// Colors for output +#define COLOR_GREEN "\033[0;32m" +#define COLOR_RED "\033[0;31m" +#define COLOR_YELLOW "\033[0;33m" +#define COLOR_BLUE "\033[0;34m" +#define COLOR_RESET "\033[0m" + +// Test result tracking +typedef struct { + int passed; + int failed; +} TestResults; + +static void print_success(const char *step) { + printf(COLOR_GREEN "[✓] SUCCESS: %s" COLOR_RESET "\n", step); +} + +static void print_failure(const char *step, const char *error) { + printf(COLOR_RED "[✗] FAILED: %s - %s" COLOR_RESET "\n", step, error); +} + +static void print_info(const char *msg) { + printf(COLOR_BLUE "[i] INFO: %s" COLOR_RESET "\n", msg); +} + +static void print_step(const char *step) { + printf(COLOR_YELLOW "\n>>> Testing: %s" COLOR_RESET "\n", step); +} + +// Create a test file with known content +static int create_test_file(void) { + const char *test_content = + "Hello, io_uring! This is a test file for async I/O operations.\n" + "Line 2: Testing reads with registered buffers.\n" + "Line 3: The quick brown fox jumps over the lazy dog.\n" + "Line 4: ABCDEFGHIJKLMNOPQRSTUVWXYZ\n" + "Line 5: 0123456789\n"; + + FILE *f = fopen(TEST_FILE, "w"); + if (!f) { + perror("Failed to create test file"); + return -1; + } + + fprintf(f, "%s", test_content); + fclose(f); + + print_info("Test file created successfully"); + return 0; +} + +// Test 1: Create io_uring instance +static int test_io_uring_create(struct io_uring *ring, TestResults *results) { + print_step("io_uring creation"); + + int ret = io_uring_queue_init(256, ring, 0); + if (ret < 0) { + print_failure("io_uring_queue_init", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("io_uring instance created"); + results->passed++; + return 0; +} + +// Test 2: Register buffers +static int test_register_buffers(struct io_uring *ring, void **buffers, + struct iovec *iovs, TestResults *results) { + print_step("Buffer registration"); + + // Allocate and prepare buffers + size_t total_size = BUFFER_SIZE * NUM_BUFFERS; + *buffers = aligned_alloc(4096, total_size); // Page-aligned for O_DIRECT + if (!*buffers) { + print_failure("Buffer allocation", strerror(errno)); + results->failed++; + return -1; + } + + // Initialize iovecs + for (int i = 0; i < NUM_BUFFERS; i++) { + iovs[i].iov_base = (char *)*buffers + (i * BUFFER_SIZE); + iovs[i].iov_len = BUFFER_SIZE; + memset(iovs[i].iov_base, 0, BUFFER_SIZE); + } + + int ret = io_uring_register_buffers(ring, iovs, NUM_BUFFERS); + if (ret < 0) { + print_failure("io_uring_register_buffers", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Buffers registered successfully"); + results->passed++; + return 0; +} + +// Test 3: Open file +// Modified test_open_file function +static int test_open_file(int *fd, TestResults *results) { + print_step("File opening"); + + // Get file size + struct stat st; + if (stat(TEST_FILE, &st) != 0) { + print_failure("stat", strerror(errno)); + results->failed++; + return -1; + } + + // Check if file size is page-aligned + int page_size = plat_get_pagesize(); + size_t file_size = st.st_size; + + printf(" File size: %zu bytes\n", file_size); + printf(" Page size: %d bytes\n", page_size); + + if (file_size % page_size != 0) { + printf(" Extending read size from %zu to %zu bytes\n", file_size, + ALIGN_UP_POW2(file_size, page_size)); + } + + // Try O_DIRECT first + *fd = open(TEST_FILE, O_RDONLY | O_DIRECT); + if (*fd < 0) { + print_info("O_DIRECT failed, trying without it"); + *fd = open(TEST_FILE, O_RDONLY); + if (*fd < 0) { + print_failure("open", strerror(errno)); + results->failed++; + return -1; + } + print_info("Using buffered I/O (O_DIRECT not available)"); + } else { + print_success("File opened with O_DIRECT"); + } + + results->passed++; + return 0; +} + +// Test 4: Build and submit read operation +static int test_submit_read(struct io_uring *ring, int fd, struct iovec *iovs, + int buffer_id, uint64_t user_data, + TestResults *results) { + print_step("Building and submitting read operation"); + + // Get file size for proper alignment + struct stat st; + if (fstat(fd, &st) != 0) { + print_failure("fstat", strerror(errno)); + results->failed++; + return -1; + } + + u32 page_size = plat_get_pagesize(); + size_t file_size = st.st_size; + size_t read_size = BUFFER_SIZE; + + // For O_DIRECT, ensure read size is sector-aligned + if (read_size > file_size) { + read_size = ALIGN_UP_POW2(file_size, page_size); + printf(" Adjusted read size to %zu bytes for O_DIRECT alignment\n", + read_size); + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("io_uring_get_sqe", "No available SQE"); + results->failed++; + return -1; + } + + // Prepare read operation using registered buffer + io_uring_prep_read_fixed(sqe, fd, iovs[buffer_id].iov_base, read_size, 0, + buffer_id); + io_uring_sqe_set_data64(sqe, user_data); + + int ret = io_uring_submit(ring); + if (ret < 0) { + print_failure("io_uring_submit", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Read operation submitted successfully"); + results->passed++; + return 0; +} + +// Test 5: Wait for completion +static int test_wait_completion(struct io_uring *ring, + struct io_uring_cqe **cqe, + TestResults *results) { + print_step("Waiting for completion"); + + int ret = io_uring_wait_cqe(ring, cqe); + if (ret < 0) { + print_failure("io_uring_wait_cqe", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Completion received"); + results->passed++; + return 0; +} + +// Test 6: Process completion +static int test_process_completion(struct io_uring_cqe *cqe, + uint64_t expected_user_data, + TestResults *results) { + print_step("Processing completion"); + + uint64_t user_data = io_uring_cqe_get_data64(cqe); + int res = cqe->res; + + printf(" Completion data:\n"); + printf(" User data: %lu (expected: %lu)\n", user_data, expected_user_data); + printf(" Result: %d bytes read\n", res); + + if (user_data != expected_user_data) { + print_failure("User data mismatch", + "User data doesn't match expected value"); + results->failed++; + return -1; + } + + if (res < 0) { + print_failure("Read operation", strerror(-res)); + results->failed++; + return -1; + } + + print_success("Completion processed successfully"); + results->passed++; + return res; // Return number of bytes read +} + +// Test 7: Verify read data +static int test_verify_data(struct iovec *iovs, int buffer_id, int bytes_read, + TestResults *results) { + print_step("Data verification"); + + char *data = (char *)iovs[buffer_id].iov_base; + + printf(" Read data (first 200 chars):\n"); + printf(" ---\n"); + for (int i = 0; i < bytes_read && i < 200; i++) { + putchar(data[i]); + } + if (bytes_read > 200) + printf("..."); + printf("\n ---\n"); + + // Check if data is not empty + if (bytes_read == 0) { + print_failure("Data verification", "No data read"); + results->failed++; + return -1; + } + + // Check if data contains expected content + if (strstr(data, "io_uring") == NULL) { + print_failure("Data verification", "Expected content not found"); + results->failed++; + return -1; + } + + print_success("Data verified successfully"); + results->passed++; + return 0; +} + +// Test 8: Test multiple concurrent reads +static int test_concurrent_reads(struct io_uring *ring, int fd, + struct iovec *iovs, TestResults *results) { + print_step("Concurrent reads test"); + + int num_reads = 3; + int submitted = 0; + + // Submit multiple reads + for (int i = 0; i < num_reads; i++) { + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("Getting SQE for concurrent read", "No available SQE"); + results->failed++; + return -1; + } + + off_t offset = i * 100; // Read from different offsets + io_uring_prep_read_fixed(sqe, fd, iovs[i].iov_base, BUFFER_SIZE, offset, i); + io_uring_sqe_set_data64(sqe, i); + submitted++; + } + + int ret = io_uring_submit(ring); + if (ret != submitted) { + char msg[64]; + snprintf(msg, sizeof(msg), "Expected %d, got %d", submitted, ret); + + print_failure("Submitting concurrent reads", msg); + results->failed++; + return -1; + } + + print_success("Concurrent reads submitted"); + + // Wait for and process completions + for (int i = 0; i < submitted; i++) { + struct io_uring_cqe *cqe; + ret = io_uring_wait_cqe(ring, &cqe); + if (ret < 0) { + print_failure("Waiting for concurrent read completion", strerror(-ret)); + results->failed++; + return -1; + } + + uint64_t user_data = io_uring_cqe_get_data64(cqe); + int res = cqe->res; + + printf(" Concurrent read %lu completed: %d bytes read\n", user_data, res); + io_uring_cqe_seen(ring, cqe); + } + + print_success("Concurrent reads completed successfully"); + results->passed++; + return 0; +} + +// Cleanup function +static void cleanup(struct io_uring *ring, int fd, void *buffers) { + if (fd >= 0) + close(fd); + if (buffers) { + io_uring_unregister_buffers(ring); + free(buffers); + } + io_uring_queue_exit(ring); + remove(TEST_FILE); +} + +int main() { + TestResults results = {0, 0}; + struct io_uring ring; + int fd = -1; + void *buffers = NULL; + struct iovec iovs[NUM_BUFFERS]; + + printf(COLOR_BLUE "\n========================================\n"); + printf(" io_uring Test Suite\n"); + printf("========================================\n" COLOR_RESET); + + // Create test file + if (create_test_file() != 0) { + return 1; + } + + // Test 1: Create io_uring + if (test_io_uring_create(&ring, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 2: Register buffers + if (test_register_buffers(&ring, &buffers, iovs, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 3: Open file + if (test_open_file(&fd, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 4: Submit read + uint64_t test_user_data = 12345; + if (test_submit_read(&ring, fd, iovs, 0, test_user_data, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 5: Wait for completion + struct io_uring_cqe *cqe; + if (test_wait_completion(&ring, &cqe, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 6: Process completion + int bytes_read = test_process_completion(cqe, test_user_data, &results); + if (bytes_read < 0) { + cleanup(&ring, fd, buffers); + return 1; + } + io_uring_cqe_seen(&ring, cqe); + + // Test 7: Verify data + if (test_verify_data(iovs, 0, bytes_read, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Test 8: Concurrent reads + if (test_concurrent_reads(&ring, fd, iovs, &results) != 0) { + cleanup(&ring, fd, buffers); + return 1; + } + + // Print summary + printf(COLOR_BLUE "\n========================================\n"); + printf(" TEST SUMMARY\n"); + printf("========================================\n" COLOR_RESET); + printf(" Total tests: %d\n", results.passed + results.failed); + printf(COLOR_GREEN " Passed: %d\n" COLOR_RESET, results.passed); + if (results.failed > 0) { + printf(COLOR_RED " Failed: %d\n" COLOR_RESET, results.failed); + } else { + printf(COLOR_GREEN " ✓ ALL TESTS PASSED!\n" COLOR_RESET); + } + + // Cleanup + cleanup(&ring, fd, buffers); + + return results.failed > 0 ? 1 : 0; +} diff --git a/platform.c b/platform.c index c9cce03..50e27a4 100644 --- a/platform.c +++ b/platform.c @@ -5,6 +5,7 @@ #include "lf_mpmc.h" #include "arena.c" +#include // xxhash include #define XXH_STATIC_LINKING_ONLY @@ -119,12 +120,14 @@ const char *get_xxhash_instruction_set(void) { #if defined(_WIN32) || defined(_WIN64) typedef HANDLE FileHandle; +#define FLAG_SEQUENTIAL_READ FILE_FLAG_SEQUENTIAL_SCAN +#define FLAG_ASYNC_DIRECT_READ (FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING) #define INVALID_FILE_HANDLE INVALID_HANDLE_VALUE // File open function -static FileHandle os_file_open(const char *path) { +static FileHandle os_file_open(const char *path, DWORD flags) { return CreateFileA(path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, - NULL, OPEN_EXISTING, FILE_FLAG_SEQUENTIAL_SCAN, NULL); + NULL, OPEN_EXISTING, flags, NULL); } // File read function @@ -141,11 +144,21 @@ static void os_file_close(FileHandle handle) { CloseHandle(handle); } #elif defined(__linux__) typedef int FileHandle; +#define FLAG_SEQUENTIAL_READ (0) +#define FLAG_ASYNC_DIRECT_READ (O_DIRECT) #define INVALID_FILE_HANDLE (-1) // File open function -static FileHandle os_file_open(const char *path) { - return open(path, O_RDONLY | O_NOFOLLOW); +static FileHandle os_file_open(const char *path, int flags) { + // Combine your mandatory flags with the user-provided flag + int fd = open(path, O_RDONLY | O_NOFOLLOW | flags); + + // If sequential was requested, advise the kernel + if (fd != -1 && (flags == FLAG_SEQUENTIAL_READ)) { + posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); + } + + return fd; } // File read function @@ -583,7 +596,7 @@ void scan_folder(const char *base, ScannerContext *ctx) { #elif defined(__linux__) static int platform_get_file_times_fd(int dir_fd, const char *name, - time_t *created, time_t *modified) { + uint64_t *created, uint64_t *modified) { struct stat st; if (fstatat(dir_fd, name, &st, 0) == 0) { *created = st.st_ctime; // or st.st_birthtime on systems that support it @@ -613,96 +626,64 @@ static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, return 0; } return -1; +} - void scan_folder(const char *base, ScannerContext *ctx) { - PathBuilder pb; - path_builder_init(&pb, base); +void scan_folder(const char *base, ScannerContext *ctx) { + PathBuilder pb; + path_builder_init(&pb, base); - int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW); - if (dir_fd == -1) - return; + int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW); + if (dir_fd == -1) + return; - DIR *dir = fdopendir(dir_fd); - if (!dir) { - close(dir_fd); - return; - } + DIR *dir = fdopendir(dir_fd); + if (!dir) { + close(dir_fd); + return; + } - struct dirent *entry; + struct dirent *entry; - while ((entry = readdir(dir)) != NULL) { - if (entry->d_name[0] == '.' && - (entry->d_name[1] == 0 || - (entry->d_name[1] == '.' && entry->d_name[2] == 0))) - continue; + while ((entry = readdir(dir)) != NULL) { + if (entry->d_name[0] == '.' && + (entry->d_name[1] == 0 || + (entry->d_name[1] == '.' && entry->d_name[2] == 0))) + continue; - size_t name_len = strlen(entry->d_name); - path_builder_set_filename(&pb, entry->d_name, name_len); + size_t name_len = strlen(entry->d_name); + path_builder_set_filename(&pb, entry->d_name, name_len); - int file_type = DT_UNKNOWN; + int file_type = DT_UNKNOWN; #ifdef _DIRENT_HAVE_D_TYPE - file_type = entry->d_type; + file_type = entry->d_type; #endif - // Fast path using d_type - if (file_type != DT_UNKNOWN) { - if (file_type == DT_LNK) - continue; // Skip symlinks + // Fast path using d_type + if (file_type != DT_UNKNOWN) { + if (file_type == DT_LNK) + continue; // Skip symlinks - if (file_type == DT_DIR) { - char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); - mpmc_push_work(ctx->dir_queue, dir_path); - continue; - } - - if (file_type == DT_REG) { - atomic_fetch_add(&g_files_found, 1); - FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); - - // Use fstatat for file info - struct stat st; - if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) { - // Convert times using fd variant - platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time, - &fe->modified_time); - platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner, - sizeof(fe->owner)); - fe->size_bytes = (uint64_t)st.st_size; - - // Normalize path - char temp_path[MAX_PATHLEN]; - memcpy(temp_path, pb.buffer, - (pb.filename_pos - pb.buffer) + name_len + 1); - normalize_path(temp_path); - - fe->path = - arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); - strcpy(fe->path, temp_path); - - mpmc_push(ctx->file_queue, fe); - } - continue; - } + if (file_type == DT_DIR) { + char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); + mpmc_push_work(ctx->dir_queue, dir_path); + continue; } - // Fallback for unknown types - struct stat st; - if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) { - if (S_ISLNK(st.st_mode)) - continue; + if (file_type == DT_REG) { + atomic_fetch_add(&g_files_found, 1); + FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); - if (S_ISDIR(st.st_mode)) { - char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); - mpmc_push_work(ctx->dir_queue, dir_path); - } else if (S_ISREG(st.st_mode)) { - atomic_fetch_add(&g_files_found, 1); - FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); - - platform_get_file_times(pb.buffer, &fe->created_time, - &fe->modified_time); - platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); + // Use fstatat for file info + struct stat st; + if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) { + // Convert times using fd variant + platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time, + &fe->modified_time); + platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner, + sizeof(fe->owner)); fe->size_bytes = (uint64_t)st.st_size; + // Normalize path char temp_path[MAX_PATHLEN]; memcpy(temp_path, pb.buffer, (pb.filename_pos - pb.buffer) + name_len + 1); @@ -713,12 +694,44 @@ static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner, mpmc_push(ctx->file_queue, fe); } + continue; } } - closedir(dir); // Closes dir_fd automatically + // Fallback for unknown types + struct stat st; + if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) { + if (S_ISLNK(st.st_mode)) + continue; + + if (S_ISDIR(st.st_mode)) { + char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false); + mpmc_push_work(ctx->dir_queue, dir_path); + } else if (S_ISREG(st.st_mode)) { + atomic_fetch_add(&g_files_found, 1); + FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true); + + platform_get_file_times(pb.buffer, &fe->created_time, + &fe->modified_time); + platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner)); + fe->size_bytes = (uint64_t)st.st_size; + + char temp_path[MAX_PATHLEN]; + memcpy(temp_path, pb.buffer, + (pb.filename_pos - pb.buffer) + name_len + 1); + normalize_path(temp_path); + + fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false); + strcpy(fe->path, temp_path); + + mpmc_push(ctx->file_queue, fe); + } + } } + closedir(dir); // Closes dir_fd automatically +} + #endif // ------------------------- Scan worker -------------------------------- @@ -745,7 +758,7 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, XXH3_state_t state; XXH3_128bits_reset(&state); - FileHandle handle = os_file_open(path); + FileHandle handle = os_file_open(path, FLAG_SEQUENTIAL_READ); if (handle == INVALID_FILE_HANDLE) { strcpy(out_hex, "ERROR"); return; @@ -874,23 +887,55 @@ static THREAD_RETURN progress_thread(void *arg) { // ======================== Hash worker IO Ring ======================== // -------------------------- Configuration --------------------------- -#define IORING_BUFFER_SIZE (KiB(512)) -#define NUM_BUFFERS_PER_THREAD 16 -#define MAX_ACTIVE_FILES 8 +// #define IORING_BUFFER_SIZE (KiB(32)) +#define IORING_BUFFER_SIZE (KiB(256)) +#define NUM_BUFFERS_PER_THREAD 32 +#define MAX_ACTIVE_FILES 1 #define SUBMIT_TIMEOUT_MS 30000 -#define USERDATA_REGISTER 1 // Globals u64 g_ioring_buffer_size = 4096 * 64; static atomic_uint_fast64_t g_io_ring_fallbacks = 0; -// -------------------------- File context --------------------------- +// -------------------------- Data structures --------------------------- + +#if defined(_WIN32) || defined(_WIN64) +// Windows I/O Ring types +typedef HIORING AsyncIoRing; +typedef HIORING AsyncIoHandle; +#define INVALID_ASYNC_IO_HANDLE NULL +#define SUBMIT_READ_RETURN_VALUE HRESULT + +#elif defined(__linux__) +// Linux io_uring types +typedef struct { + struct io_uring ring; + int event_fd; + struct io_uring_cqe *cqe_cache; + int cqe_cache_index; + int cqe_cache_count; +} AsyncIoRingImpl; + +typedef AsyncIoRingImpl *AsyncIoRing; +typedef int AsyncIoHandle; +typedef struct iovec IORING_BUFFER_INFO; +#define INVALID_ASYNC_IO_HANDLE (-1) +#define SUBMIT_READ_RETURN_VALUE int + +typedef struct { + int ResultCode; + uint32_t Information; + uintptr_t UserData; +} AsyncIoCompletion; + +#endif + typedef struct IoBuffer IoBuffer; typedef struct FileReadContext { - HANDLE hFile; + FileHandle hFile; uint64_t file_size; - bool use_incremental_hash; + int use_incremental_hash; union { XXH3_state_t hash_state; // For incremental hash (large files) XXH128_hash_t single_hash; // For single-shot hash (small files) @@ -922,7 +967,8 @@ typedef struct IoBuffer { uint64_t offset; size_t size; size_t bytes_read; - HRESULT result; + SUBMIT_READ_RETURN_VALUE result; + int buffer_id; int completed; @@ -933,18 +979,376 @@ typedef struct IoBuffer { // Thread-local I/O Ring context typedef struct ThreadIoContext { - HIORING ring; - HANDLE completion_event; + AsyncIoRing ring; + void *completion_event; + unsigned char *fallback_buffer; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD]; int free_count; + int submitting; int num_submissions; int active_files; - int initialized; + +#if defined(__linux__) + int use_registered_buffers; +#endif } ThreadIoContext; -static _Thread_local ThreadIoContext *g_thread_ctx = NULL; +typedef struct { + uint32_t MaxSubmissionQueueSize; + uint32_t MaxCompletionQueueSize; + uint32_t MaxVersion; +} AsyncIoCapabilities; +// ----------------------------- Async I/O Abstraction ------------------------- +#if defined(_WIN32) || defined(_WIN64) + +// Windows I/O Ring functions +static void async_io_query_capabilities(AsyncIoCapabilities *caps) { + IORING_CAPABILITIES win_caps; + QueryIoRingCapabilities(&win_caps); + caps->MaxSubmissionQueueSize = win_caps.MaxSubmissionQueueSize; + caps->MaxCompletionQueueSize = win_caps.MaxCompletionQueueSize; + caps->MaxVersion = win_caps.MaxVersion; +} + +static void *async_io_create_completion_event(void) { + return CreateEvent(NULL, FALSE, FALSE, NULL); +} + +static void async_io_set_completion_event(AsyncIoRing ring, void *event) { + SetIoRingCompletionEvent(ring, event); +} + +static void async_io_wait_for_completion(ThreadIoContext *ctx) { + if (ctx->num_submissions > 0) { + WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); + return; + } +} + +static int async_io_create_ring(ThreadIoContext *thread_ctx, + uint32_t queue_size) { + IORING_CREATE_FLAGS flags = {0}; + HRESULT hr = CreateIoRing(IORING_VERSION_3, flags, queue_size, queue_size * 2, + &thread_ctx->ring); + + // Create completion event + thread_ctx->completion_event = async_io_create_completion_event(); + if (thread_ctx->completion_event) { + async_io_set_completion_event(thread_ctx->ring, + thread_ctx->completion_event); + } + return SUCCEEDED(hr) ? 0 : -1; +} + +#define USERDATA_REGISTER 1 + +#define MAKE_BUF_INFO(a, l) \ + (IORING_BUFFER_INFO) { .Address = (a), .Length = (uint32_t)(l) } + +static int async_io_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { + HRESULT hr = SubmitIoRing(thread_ctx->ring, 0, timeout_ms, submitted); + // HRESULT hr = SubmitIoRing(ring, wait_count, timeout_ms, submitted); + + // The wait_count in windows is not implemented yet, so we wait with a + // completion event for a single completion + async_io_wait_for_completion(thread_ctx); + + return SUCCEEDED(hr) ? 0 : -1; +} + +static int async_io_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { + + HRESULT hr = BuildIoRingRegisterBuffers( + thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); + if (FAILED(hr)) { + LPSTR messageBuffer = NULL; + + size_t size = FormatMessageA( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&messageBuffer, 0, NULL); + + if (size > 0) { + fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", messageBuffer, + (unsigned int)hr); + LocalFree(messageBuffer); // Free the memory allocated by FormatMessage + } else { + fprintf(stderr, "Error registering buffers: Unknown HRESULT (0x%08X)\n", + (unsigned int)hr); + } + } + // Submit registration + async_io_submit(thread_ctx, 0, 0, NULL); + + return hr; +} + +static void async_io_close_event(void *event) { CloseHandle(event); } + +static int async_io_close_ring(ThreadIoContext *thread_ctx) { + + if (thread_ctx->completion_event) + async_io_close_event(thread_ctx->completion_event); + CloseIoRing(thread_ctx->ring); + return 0; +} + +static SUBMIT_READ_RETURN_VALUE +async_io_build_read(ThreadIoContext *thread_ctx, AsyncIoHandle file_handle, + uint32_t buffer_id, size_t size, uint64_t offset, + uintptr_t user_data) { + IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_handle); + IORING_BUFFER_REF buffer_ref = + IoRingBufferRefFromIndexAndOffset(buffer_id, 0); + HRESULT hr = + BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref, + (uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE); + return hr; +} + +typedef struct { + HRESULT ResultCode; + uint32_t Information; + uintptr_t UserData; +} AsyncIoCompletion; + +static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { + IORING_CQE win_cqe; + + while (1) { + HRESULT hr = PopIoRingCompletion(ring, &win_cqe); + + if (hr == S_FALSE) + return 0; + + if (FAILED(hr)) + return -1; + + // Unlike linux, in addition of IO operations, Windows IO Ring produces CQEs + // (completion queue entries) when doing operations like register buffer or + // submiting, we filter them here cqe.UserData == USERDATA_REGISTER + // cqe.ResultCode == S_OK (or error) + if (win_cqe.UserData == USERDATA_REGISTER) + continue; + + cqe->ResultCode = win_cqe.ResultCode; + cqe->Information = win_cqe.Information; + cqe->UserData = win_cqe.UserData; + + // Check for error and print warning + if (FAILED(win_cqe.ResultCode)) { + char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, win_cqe.ResultCode, 0, error_msg, sizeof(error_msg), + NULL); + fprintf(stderr, + "WARNING: I/O completion error - Code: 0x%lx, Error: %s\n", + win_cqe.ResultCode, error_msg); + } + + return 1; + } +} + +#elif defined(__linux__) +// Linux io_uring functions implementation +static void async_io_query_capabilities(AsyncIoCapabilities *caps) { + // Get system limits for io_uring + long max_entries = sysconf(_SC_IOV_MAX); + if (max_entries <= 0) + max_entries = 4096; + + caps->MaxSubmissionQueueSize = + (uint32_t)(max_entries < 4096 ? max_entries : 4096); + caps->MaxCompletionQueueSize = caps->MaxSubmissionQueueSize * 2; + caps->MaxVersion = 1; +} + +// static int async_io_create_ring(uint32_t queue_size, AsyncIoRing *ring) { +static int async_io_create_ring(ThreadIoContext *thread_ctx, + uint32_t queue_size) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)calloc(1, sizeof(AsyncIoRingImpl)); + if (!impl) + return -1; + + // Initialize io_uring + struct io_uring_params params = {0}; + + params.flags = + IORING_SETUP_CQSIZE | + IORING_SETUP_SINGLE_ISSUER | // Thread local io_uring + IORING_SETUP_DEFER_TASKRUN; // Do not send interupts when a CQE is ready, + // send them when a wait function is called, + // and groupe them acording to the nbre or + // entries to wait (reduces syscall overhead) + + params.cq_entries = queue_size * 2; + + int ret = io_uring_queue_init_params(queue_size, &impl->ring, ¶ms); + if (ret < 0) { + // Fallback to without params + printf("WARNING: Creating io_uring with default params\n"); + ret = io_uring_queue_init(queue_size, &impl->ring, 0); + if (ret < 0) { + free(impl); + return -1; + } + } + + impl->cqe_cache = NULL; + impl->cqe_cache_index = 0; + impl->cqe_cache_count = 0; + + thread_ctx->ring = impl; + return 0; +} + +#define MAKE_BUF_INFO(a, l) \ + (IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) } + +static int async_io_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; + + int hr = io_uring_register_buffers(&impl->ring, buf_info, num_buffers); + + if (hr < 0) { + fprintf(stderr, "Error registering buffers: %s (code: %d)\n", strerror(-hr), + hr); + fprintf(stderr, "WARNING: Memlock limit too low buffer size! Fallback to " + "unregistred buffers\n"); + } + return hr == 0 ? 0 : -1; +} + +static int async_io_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; + if (!impl) + return -1; + + int ret; + + if (wait_count > 0) { + ret = io_uring_submit_and_wait(&impl->ring, wait_count); + } else { + ret = io_uring_submit(&impl->ring); + } + + if (ret < 0) { + fprintf(stderr, "submit error: %s\n", strerror(-ret)); + return -1; + } + + if (submitted) + *submitted = (uint32_t)ret; + + return 0; +} + +static int async_io_close_ring(ThreadIoContext *thread_ctx) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; + if (!impl) + return -1; + + if (thread_ctx->use_registered_buffers) { + io_uring_unregister_buffers(&impl->ring); + } + close(impl->event_fd); + io_uring_queue_exit(&impl->ring); + free(impl); + + return 0; +} + +static int async_io_build_read(ThreadIoContext *thread_ctx, + AsyncIoHandle file_handle, uint32_t buffer_id, + size_t size, uint64_t offset, + uintptr_t user_data) { + AsyncIoRing ring = thread_ctx->ring; + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)ring; + if (!impl) + return -1; + + struct io_uring_sqe *sqe = io_uring_get_sqe(&impl->ring); + if (!sqe) { + printf("SQE FULL\n"); + return -1; + } + + ThreadIoContext *ctx = thread_ctx; // or pass it properly TODO : look here + + void *buf = ctx->buffers[buffer_id].data; + + if (thread_ctx->use_registered_buffers) { + io_uring_prep_read_fixed(sqe, file_handle, buf, size, offset, buffer_id); + } else { + io_uring_prep_read(sqe, file_handle, buf, size, offset); + } + + io_uring_sqe_set_data64(sqe, user_data); + return 0; +} + +static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { + AsyncIoRingImpl *impl = (AsyncIoRingImpl *)ring; + + struct io_uring_cqe *cqe_ptr = NULL; + + int ret = io_uring_peek_cqe(&impl->ring, &cqe_ptr); + + if (ret == -EAGAIN) { + // No CQE available + return 0; + } + + if (ret < 0) { + // Error + fprintf(stderr, "io_uring_peek_cqe error: %d (%s)\n", ret, strerror(-ret)); + return -1; + } + + if (!cqe_ptr) { + return 0; + } + + int res = cqe_ptr->res; + + if (res >= 0) { + cqe->ResultCode = 0; + cqe->Information = (uint32_t)res; + } else { + cqe->ResultCode = res; + cqe->Information = 0; + } + + cqe->UserData = (uintptr_t)cqe_ptr->user_data; + + io_uring_cqe_seen(&impl->ring, cqe_ptr); + + // Check for error and print warning + if (res < 0) { + fprintf(stderr, "WARNING: I/O completion error - Code: %d, Error: %s\n", + res, strerror(-res)); + } + + return 1; +} + +#endif + +// OS-agnostic helper macros +#define ASYNC_IO_SUCCEEDED(result) ((result) >= 0) +#define ASYNC_IO_FAILED(result) ((result) < 0) + +// ---------------------- FIFO queue operations --------------------------- typedef struct FileQueue { FileReadContext files[MAX_ACTIVE_FILES]; int head; @@ -952,7 +1356,6 @@ typedef struct FileQueue { int count; } FileQueue; -// ---------------------- FIFO queue operations --------------------------- static FileReadContext *fq_push(FileQueue *fq) { if (fq->count == MAX_ACTIVE_FILES) return NULL; @@ -974,96 +1377,115 @@ static void fq_pop(FileQueue *fq) { fq->count--; } +static void fq_remove_at(FileQueue *fq, int index) { + if (fq->count == 0) + return; + + int remove_idx = (fq->head + index) % MAX_ACTIVE_FILES; + + int last_logical = fq->count - 1; + int last_idx = (fq->head + last_logical) % MAX_ACTIVE_FILES; + + // Swap with last logical element if needed + if (index != last_logical) { + fq->files[remove_idx] = fq->files[last_idx]; + } + + // Just decrease count + fq->count--; + + // Recompute tail properly + fq->tail = (fq->head + fq->count) % MAX_ACTIVE_FILES; +} + // ---------------------- Initialize thread context --------------------------- static ThreadIoContext *io_ring_init_thread(void) { - if (g_thread_ctx && g_thread_ctx->initialized) { - return g_thread_ctx; - } + ThreadIoContext *thread_ctx = + (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext)); + if (!thread_ctx) + return NULL; - if (!g_thread_ctx) { - g_thread_ctx = (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext)); - if (!g_thread_ctx) - return NULL; - } + // Query I/O Ring capabilities + AsyncIoCapabilities caps; + async_io_query_capabilities(&caps); + + uint32_t queue_size = caps.MaxSubmissionQueueSize; + if (queue_size > 4096) + queue_size = 4096; // Cap at 4096 for reasonable memory usage // Create I/O Ring - IORING_CAPABILITIES caps; - QueryIoRingCapabilities(&caps); - - UINT32 queue_size = min(4096, caps.MaxSubmissionQueueSize); - IORING_CREATE_FLAGS flags = {0}; - HRESULT hr = CreateIoRing(caps.MaxVersion, flags, queue_size, queue_size * 2, - &g_thread_ctx->ring); - - // Create completion event - g_thread_ctx->completion_event = CreateEvent(NULL, FALSE, FALSE, NULL); - if (g_thread_ctx->completion_event) { - SetIoRingCompletionEvent(g_thread_ctx->ring, - g_thread_ctx->completion_event); + if (async_io_create_ring(thread_ctx, queue_size) != 0) { + free(thread_ctx); + thread_ctx = NULL; + return NULL; } // Initialize buffer pool + thread_ctx->fallback_buffer = (unsigned char *)malloc(READ_BLOCK); + IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD]; u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; - // Reserve and Commit the entire memory chunk + // Reserve and Commit memory for buffers void *base_ptr = plat_mem_reserve(buf_pool_size); if (base_ptr) { if (!plat_mem_commit(base_ptr, buf_pool_size)) { plat_mem_release(base_ptr, 0); + async_io_close_ring(thread_ctx); + free(thread_ctx); + thread_ctx = NULL; return NULL; } } else { + + async_io_close_ring(thread_ctx); + free(thread_ctx); + thread_ctx = NULL; return NULL; } for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) { + thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size); + thread_ctx->buffer_pool[i] = i; + thread_ctx->buffers[i].buffer_id = i; - g_thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size); - - g_thread_ctx->buffer_pool[i] = i; - g_thread_ctx->buffers[i].buffer_id = i; - - buf_info[i].Address = g_thread_ctx->buffers[i].data; - buf_info[i].Length = (ULONG)g_ioring_buffer_size; + buf_info[i] = + MAKE_BUF_INFO(thread_ctx->buffers[i].data, g_ioring_buffer_size); } - g_thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; + thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; - HRESULT hb = BuildIoRingRegisterBuffers( - g_thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); + // Register buffers + int hr = + async_io_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info); - if (FAILED(hb)) { - printf("Buffer registration failed: 0x%lx\n", hb); - return NULL; - } + thread_ctx->use_registered_buffers = (hr == 0); + thread_ctx->submitting = 1; + thread_ctx->num_submissions = 0; + thread_ctx->active_files = 0; - // Submit registration - SubmitIoRing(g_thread_ctx->ring, 0, 0, NULL); - - g_thread_ctx->num_submissions = 0; - g_thread_ctx->active_files = 0; - - g_thread_ctx->initialized = 1; - - return g_thread_ctx; + return thread_ctx; } -static void io_ring_cleanup_thread(void) { - if (!g_thread_ctx) +static void io_ring_cleanup_thread(ThreadIoContext *thread_ctx) { + if (!thread_ctx) return; - if (g_thread_ctx->completion_event) - CloseHandle(g_thread_ctx->completion_event); - if (g_thread_ctx->ring) - CloseIoRing(g_thread_ctx->ring); - plat_mem_release(g_thread_ctx->buffers[0].data, 0); - free(g_thread_ctx); - g_thread_ctx = NULL; + if (thread_ctx->ring) + async_io_close_ring(thread_ctx); + + // Free the buffer pool memory + if (thread_ctx->buffers[0].data) { + u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD; + plat_mem_release(thread_ctx->buffers[0].data, buf_pool_size); + } + + free(thread_ctx); + thread_ctx = NULL; } -// ---------------------- Buffer get and return ------------------------ +// -------------------------- Buffer get and return ------------------------ static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { if (ctx->free_count == 0) { @@ -1074,7 +1496,7 @@ static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { IoBuffer *buf = &ctx->buffers[idx]; buf->completed = 0; buf->bytes_read = 0; - buf->result = E_PENDING; + buf->result = 0; return buf; } @@ -1087,29 +1509,26 @@ static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) { } // -------------------------- Submit async read --------------------------- -static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx, - IoBuffer *buf, uint64_t offset, size_t size) { +static int build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, + IoBuffer *buf, uint64_t offset, size_t size) { buf->offset = offset; buf->size = size; buf->file = file_ctx; - IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->hFile); - IORING_BUFFER_REF buffer_ref = - IoRingBufferRefFromIndexAndOffset(buf->buffer_id, 0); + SUBMIT_READ_RETURN_VALUE result = + async_io_build_read(thread_ctx, file_ctx->hFile, buf->buffer_id, size, + offset, (uintptr_t)buf); - HRESULT hr = - BuildIoRingReadFile(ctx->ring, file_ref, buffer_ref, (UINT32)size, offset, - (UINT_PTR)buf, IOSQE_FLAGS_NONE); - - if (SUCCEEDED(hr)) { + if (ASYNC_IO_SUCCEEDED(result)) { file_ctx->active_reads++; file_ctx->reads_submitted++; - ctx->num_submissions++; + thread_ctx->num_submissions++; } else { buf->completed = 1; - return_buffer(ctx, buf); + buf->result = result; // Store the error code + return_buffer(thread_ctx, buf); } - return hr; + return result; } // ------------ Link completed buffers in an ordered list ------------- @@ -1144,26 +1563,24 @@ static void insert_buffer_ordered(FileReadContext *file, IoBuffer *buf) { } // -------------------------- Process completions --------------------------- -static void process_completions(ThreadIoContext *ctx, FileQueue *fq) { - IORING_CQE cqe; +static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) { + AsyncIoCompletion cqe; - while (PopIoRingCompletion(ctx->ring, &cqe) == S_OK) { - - if (cqe.UserData == USERDATA_REGISTER || cqe.UserData == 0) - continue; + // Keep processing as long as there are completions available + while (async_io_pop_completion(thread_ctx->ring, &cqe) == 1) { IoBuffer *buf = (IoBuffer *)cqe.UserData; FileReadContext *file = buf->file; buf->result = cqe.ResultCode; - buf->bytes_read = (DWORD)cqe.Information; + buf->bytes_read = cqe.Information; buf->completed = 1; file->active_reads--; file->reads_completed++; - ctx->num_submissions--; + thread_ctx->num_submissions--; - if (SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) { + if (ASYNC_IO_SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) { buf->next = NULL; @@ -1171,7 +1588,7 @@ static void process_completions(ThreadIoContext *ctx, FileQueue *fq) { } else { file->failed_reads++; - return_buffer(ctx, buf); + return_buffer(thread_ctx, buf); } } } @@ -1183,11 +1600,10 @@ static int init_file(FileReadContext *f, FileEntry *fe) { f->fe = fe; f->file_size = fe->size_bytes; - f->hFile = CreateFileA( - fe->path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, - OPEN_EXISTING, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL); + // Use the abstracted os_file_open_async for async I/O with no buffering + f->hFile = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); - if (f->hFile == INVALID_HANDLE_VALUE) { + if (f->hFile == INVALID_ASYNC_IO_HANDLE) { return 0; } @@ -1201,7 +1617,8 @@ static int init_file(FileReadContext *f, FileEntry *fe) { return 1; } -static void finalize_file(FileReadContext *file, WorkerContext *ctx) { +static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx, + WorkerContext *worker_ctx) { FileEntry *fe = file->fe; @@ -1221,8 +1638,9 @@ static void finalize_file(FileReadContext *file, WorkerContext *ctx) { } } else { atomic_fetch_add(&g_io_ring_fallbacks, 1); - xxh3_hash_file_stream(fe->path, hash, NULL); - printf("Fallback for path: %s\n", fe->path); + xxh3_hash_file_stream(fe->path, hash, thread_ctx->fallback_buffer); + // DEBUG + // printf("Fallback for path: %s\n", fe->path); } char created[32], modified[32]; @@ -1231,73 +1649,144 @@ static void finalize_file(FileReadContext *file, WorkerContext *ctx) { double size_kib = (double)fe->size_bytes / 1024.0; - char stack_buf[1024]; + char stack_buf[KiB(4)]; int len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n", hash, fe->path, size_kib, created, modified, fe->owner); - char *dst = arena_push(&ctx->arena, len, false); + char *dst = arena_push(&worker_ctx->arena, len, false); memcpy(dst, stack_buf, len); atomic_fetch_add(&g_files_hashed, 1); } // -------------------- Hash head file ----------------------- -static void hash_head_file(ThreadIoContext *ctx, FileQueue *fq, - WorkerContext *wctx) { +static void hash_head_file(ThreadIoContext *thread_ctx, FileQueue *fq, + WorkerContext *worker_ctx) { FileReadContext *file = fq_peek(fq); if (!file) return; - while (file->head) { + // Keep hashing while the next buffer in sequence is ready at head + while (file->head && file->head->offset == file->next_hash_offset) { IoBuffer *buf = file->head; - // Check ordering - if (buf->offset != file->bytes_hashed) - return; - - // Consume + // Consume from head file->head = buf->next; if (!file->head) file->tail = NULL; - // Calculate actual bytes to hash (handle last partial sector) - size_t bytes_to_hash = buf->bytes_read; + // Process the buffer + if (ASYNC_IO_SUCCEEDED(buf->result) && buf->bytes_read > 0) { + // Calculate actual bytes to hash (handle last partial sector) + size_t bytes_to_hash = buf->bytes_read; - // If this is the last buffer and we read beyond file size, trim it - if (buf->offset + buf->bytes_read > file->file_size) { - bytes_to_hash = file->file_size - buf->offset; - } - - if (bytes_to_hash > 0) { - if (file->use_incremental_hash) { - // Large file: update incremental hash state - XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); - } else { - // Small file: single-shot hash - file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + // If this is the last buffer and we read beyond file size, trim it + if (buf->offset + buf->bytes_read > file->file_size) { + bytes_to_hash = file->file_size - buf->offset; } - file->bytes_hashed += bytes_to_hash; - atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + if (bytes_to_hash > 0) { + if (file->use_incremental_hash) { + // Large file: update incremental hash state + XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); + } else { + // Small file: single-shot hash + file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + } + + file->bytes_hashed += bytes_to_hash; + atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + } + + file->reads_hashed++; + } else if (buf->bytes_read == 0 && ASYNC_IO_SUCCEEDED(buf->result)) { + // Read operation completed with 0 bytes (EOF) + file->reads_hashed++; + } else { + // Read failed + file->failed_reads++; + file->reads_hashed++; } - return_buffer(ctx, buf); + // Move to next offset + file->next_hash_offset += buf->size; + + // Return buffer to pool + return_buffer(thread_ctx, buf); } - // Finalize + // Finalize file when all reads are complete if (file->active_reads == 0 && file->bytes_hashed >= file->file_size) { - finalize_file(file, wctx); - CloseHandle(file->hFile); + finalize_file(file, thread_ctx, worker_ctx); + os_file_close(file->hFile); fq_pop(fq); - ctx->active_files--; + thread_ctx->active_files--; + } +} + +static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq, + WorkerContext *worker_ctx) { + for (int i = 0; i < fq->count;) { + + int idx = (fq->head + i) % MAX_ACTIVE_FILES; + FileReadContext *file = &fq->files[idx]; + + bool progressed = false; + + // ---- HASH READY BUFFERS ---- + while (file->head) { + + IoBuffer *buf = file->head; + + if (buf->offset != file->bytes_hashed) + break; + + progressed = true; + + file->head = buf->next; + if (!file->head) + file->tail = NULL; + + size_t bytes_to_hash = buf->bytes_read; + + if (buf->offset + buf->bytes_read > file->file_size) { + bytes_to_hash = file->file_size - buf->offset; + } + + if (bytes_to_hash > 0) { + if (file->use_incremental_hash) { + XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); + } else { + file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + } + + file->bytes_hashed += bytes_to_hash; + atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + } + + return_buffer(thread_ctx, buf); + } + + // ---- FINALIZE ---- + if (file->active_reads == 0 && file->bytes_hashed >= file->file_size) { + + finalize_file(file, thread_ctx, worker_ctx); + os_file_close(file->hFile); + + fq_remove_at(fq, i); + thread_ctx->active_files--; + + continue; + } + i++; } } // ------------- Submit pending reads - fill all free buffers ----------------- -static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, - WorkerContext *worker_ctx, int *submitting) { +static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, + WorkerContext *worker_ctx) { MPMCQueue *file_queue = worker_ctx->file_queue; @@ -1309,7 +1798,7 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, if (f) { while (f->next_read_offset < f->file_size) { - IoBuffer *buf = get_free_buffer(ctx); + IoBuffer *buf = get_free_buffer(thread_ctx); if (!buf) return; @@ -1318,7 +1807,7 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, // Check if this is the last read and the file size is not // sector-aligned - BOOL is_last_read = (remaining <= g_ioring_buffer_size); + int is_last_read = (remaining <= g_ioring_buffer_size); if (remaining >= g_ioring_buffer_size) { // Normal full read @@ -1333,10 +1822,11 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, } } - HRESULT hr = submit_read(ctx, f, buf, f->next_read_offset, size); + SUBMIT_READ_RETURN_VALUE hr = + build_read(thread_ctx, f, buf, f->next_read_offset, size); - if (FAILED(hr)) { - return_buffer(ctx, buf); + if (ASYNC_IO_FAILED(hr)) { + return_buffer(thread_ctx, buf); f->failed_reads++; f->active_reads = 0; f->reads_submitted = 0; @@ -1345,22 +1835,19 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, } f->next_read_offset += size; - - if (ctx->num_submissions >= NUM_BUFFERS_PER_THREAD) - return; } } // Add new file if possible - if (!*submitting) + if (!thread_ctx->submitting) return; - if (ctx->active_files >= MAX_ACTIVE_FILES) + if (thread_ctx->active_files >= MAX_ACTIVE_FILES) return; FileEntry *fe = mpmc_pop(file_queue); if (!fe) { - *submitting = 0; + thread_ctx->submitting = 0; return; } @@ -1369,37 +1856,24 @@ static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq, if (!init_file(newf, fe)) { // File can't be opened with NO_BUFFERING, process with fallback char hash[HASH_STRLEN]; - finalize_file(newf, worker_ctx); + finalize_file(newf, thread_ctx, worker_ctx); fq_pop(fq); continue; } f = newf; - ctx->active_files++; + thread_ctx->active_files++; } } -// -------------------------- Wait for completions --------------------------- -static void wait_for_completions(ThreadIoContext *ctx) { - - // If there are in-flight I/O requests → wait for completion - if (ctx->num_submissions > 0) { - WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); - return; - } - - // Sleep(1); -} - // -------------------------- Hash worker I/O Ring --------------------------- static THREAD_RETURN hash_worker_io_ring(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; // Init IO ring - ThreadIoContext *ring_ctx = io_ring_init_thread(); - if (!ring_ctx || !ring_ctx->ring) { - printf("Thread %lu: I/O Ring unavailable, using buffered I/O\n", - GetCurrentThreadId()); + ThreadIoContext *thread_ctx = io_ring_init_thread(); + if (!thread_ctx || !thread_ctx->ring) { + printf("I/O Ring unavailable, using buffered I/O\n"); return hash_worker(arg); } @@ -1407,40 +1881,43 @@ static THREAD_RETURN hash_worker_io_ring(void *arg) { FileQueue fq; memset(&fq, 0, sizeof(fq)); - int submitting = 1; + uint32_t submitted = 0; + uint32_t wait_count; // Main pipeline loop for (;;) { - // 1. Submit new reads - submit_pending_reads(ring_ctx, &fq, ctx, &submitting); + // Submit new reads + build_pending_reads(thread_ctx, &fq, ctx); - UINT32 submitted = 0; - SubmitIoRing(ring_ctx->ring, 0, 0, &submitted); + wait_count = MIN(thread_ctx->num_submissions, NUM_BUFFERS_PER_THREAD - 2); - // 5. Avoid busy witing - wait_for_completions(ring_ctx); + submitted = 0; + // async_io_submit(ring_ctx->ring, 0, 0, &submitted); + async_io_submit(thread_ctx, wait_count, 0, &submitted); - // 2. Process completions - process_completions(ring_ctx, &fq); - - // 3. Hash files - for (int i = 0; i < fq.count; i++) { - hash_head_file(ring_ctx, &fq, ctx); - } + // Process completions + process_completions(thread_ctx, &fq); // debug // printf("Free buffers: %d, Submissions: %d, Active files: %d\n", // ring_ctx->free_count, ring_ctx->num_submissions, // ring_ctx->active_files); + // debug end - // 4. Exit condition - if (!submitting && ring_ctx->active_files == 0 && - ring_ctx->num_submissions == 0) { + // Hash files + for (int i = 0; i < fq.count; i++) { + hash_head_file(thread_ctx, &fq, ctx); + } + // hash_ready_files(ring_ctx, &fq, ctx); + + // Exit condition + if (!thread_ctx->submitting && thread_ctx->active_files == 0 && + thread_ctx->num_submissions == 0) { break; } } - io_ring_cleanup_thread(); + io_ring_cleanup_thread(thread_ctx); return THREAD_RETURN_VALUE; }