diff --git a/.gitignore b/.gitignore index 19b273a..91ce0c9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ temp_code.c /.cache/clangd/index /file_hasher /io_uring_test +/file_hasher diff --git a/arena.c b/arena.c index 8c03d99..61767d5 100644 --- a/arena.c +++ b/arena.c @@ -437,12 +437,14 @@ void *arena_push(mem_arena **arena_ptr, u64 size, bool zero) { // mk push Commit memory if needed ------------------------------------------------------------ */ - if (local_post > selected->commit_pos) { - u64 new_commit = ALIGN_UP_POW2(local_post, arena_pagesize()); + if (local_post > selected->commit_pos - + ALIGN_UP_POW2(sizeof(mem_arena), selected->align)) { + u64 new_commit = ALIGN_UP_POW2(local_post + ALIGN_UP_POW2(sizeof(mem_arena), selected->align), arena_pagesize()); new_commit = MIN(new_commit, selected->reserve_size); if (!plat_mem_commit((u8 *)selected + selected->commit_pos, new_commit - selected->commit_pos)) { + printf("ERROR: Could not commit memory!\n"); return NULL; } diff --git a/base.h b/base.h index 4439968..24b9e18 100644 --- a/base.h +++ b/base.h @@ -46,6 +46,7 @@ #include #include #include +#include /* ------------------------------------------------------------ Base types diff --git a/binaries/changelog.txt b/binaries/changelog.txt index e2b14d8..9062932 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -50,7 +50,7 @@ Fixing user prompt parsing Reorganising the code Improving the scan function -5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cash completely, registred buffers, it supports bashing multiple submissions and can handle multiple files at the same time. +5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cache completely, registred buffers, it supports bashing multiple submissions and can handle multiple files at the same time. Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers fixing the xxh_x86dispatch warnings Updating the progress printing function diff --git a/file_hasher b/file_hasher deleted file mode 100644 index 8c12d07..0000000 Binary files a/file_hasher and /dev/null differ diff --git a/file_hasher.c b/file_hasher.c index 7948f12..52e2c9b 100644 --- a/file_hasher.c +++ b/file_hasher.c @@ -77,14 +77,19 @@ int main(int argc, char **argv) { // Detect hardware // ------------------------------- // --- Windows: detect PHYSICAL cores (not logical threads) --- - size_t hw_threads = platform_physical_cores(); + uint32_t cpu_cores = platform_physical_cores(); // Logical threads = CPU cores * 2 - size_t num_threads = hw_threads * 2; + uint32_t cpu_threads = cpu_cores * 2; - printf("Starting thread pool: %zu threads (CPU cores: %zu)\n", num_threads, - hw_threads); - printf(" Selected instruction set: %s\n", get_xxhash_instruction_set()); + uint32_t num_scan_threads = cpu_threads; + uint32_t num_hash_threads = cpu_threads; + // uint32_t num_hash_threads = 1; + + printf("%d cores %d threads CPU detected with %s instruction set\n" + "Starting thread pool: %d scanning and %d hashing threads\n", + cpu_cores, cpu_threads, get_xxhash_instruction_set(), num_scan_threads, + num_hash_threads); // Align IO Ring block size to the system page size g_ioring_buffer_size = ALIGN_UP_POW2(g_ioring_buffer_size, g_pagesize); @@ -119,9 +124,6 @@ int main(int argc, char **argv) { // } // Starting hash threads - size_t num_hash_threads = num_threads; - // size_t num_hash_threads = 1; - WorkerContext workers[num_hash_threads]; Thread *hash_threads = arena_push(&gp_arena, sizeof(Thread) * num_hash_threads, true); @@ -130,7 +132,7 @@ int main(int argc, char **argv) { workers[i].arena = arena_create(¶ms); workers[i].file_queue = &file_queue; - if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker_io_ring, + if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker_ioring, &workers[i]) != 0) { fprintf(stderr, "Failed to create hash thread %zu\n", i); exit(1); @@ -178,8 +180,6 @@ int main(int argc, char **argv) { } // Starting scan threads - size_t num_scan_threads = num_threads; - ScannerContext scanners[num_scan_threads]; Thread *scan_threads = arena_push(&gp_arena, sizeof(Thread) * num_scan_threads, true); diff --git a/platform.c b/platform.c index b277ea6..deb0e63 100644 --- a/platform.c +++ b/platform.c @@ -887,11 +887,11 @@ static THREAD_RETURN progress_thread(void *arg) { // ======================== Hash worker IO Ring ======================== // -------------------------- Configuration --------------------------- -// #define IORING_BUFFER_SIZE (KiB(32)) #define IORING_BUFFER_SIZE (KiB(256)) #define NUM_BUFFERS_PER_THREAD 32 -#define MAX_ACTIVE_FILES 1 +#define MAX_ACTIVE_FILES 32 #define SUBMIT_TIMEOUT_MS 30000 +// #define IORING_DEBUG // Uncomment to print some errors // Globals u64 g_ioring_buffer_size = 4096 * 64; @@ -901,108 +901,116 @@ static atomic_uint_fast64_t g_io_ring_fallbacks = 0; #if defined(_WIN32) || defined(_WIN64) // Windows I/O Ring types -typedef HIORING AsyncIoRing; -typedef HIORING AsyncIoHandle; -#define INVALID_ASYNC_IO_HANDLE NULL -#define SUBMIT_READ_RETURN_VALUE HRESULT +typedef HIORING IoRingHandle; +#define BUILD_READ_RETURN_VALUE HRESULT + +typedef struct { + HRESULT ResultCode; + uint32_t Information; + uintptr_t UserData; +} IoRingCQE; #elif defined(__linux__) // Linux io_uring types typedef struct { struct io_uring ring; - int event_fd; struct io_uring_cqe *cqe_cache; int cqe_cache_index; int cqe_cache_count; -} AsyncIoRingImpl; +} IoUring; -typedef AsyncIoRingImpl *AsyncIoRing; -typedef int AsyncIoHandle; +typedef IoUring *IoRingHandle; typedef struct iovec IORING_BUFFER_INFO; -#define INVALID_ASYNC_IO_HANDLE (-1) -#define SUBMIT_READ_RETURN_VALUE int +#define BUILD_READ_RETURN_VALUE int typedef struct { int ResultCode; uint32_t Information; uintptr_t UserData; -} AsyncIoCompletion; +} IoRingCQE; #endif typedef struct IoBuffer IoBuffer; typedef struct FileReadContext { - FileHandle hFile; - uint64_t file_size; - int use_incremental_hash; - union { - XXH3_state_t hash_state; // For incremental hash (large files) - XXH128_hash_t single_hash; // For single-shot hash (small files) - }; FileEntry *fe; - - // Completion tracking - int reads_submitted; - int reads_completed; - - int active_reads; - int failed_reads; - - int reads_hashed; - uint64_t bytes_hashed; + uint64_t file_size; // For in-order hashing - uint64_t next_hash_offset; uint64_t next_read_offset; IoBuffer *head; IoBuffer *tail; + // Completion tracking + uint64_t bytes_hashed; + uint32_t reads_hashed; + + uint32_t reads_submitted; + uint32_t reads_completed; + + uint32_t active_reads; + + union { + XXH3_state_t hash_state; // For incremental hash (large files) + XXH128_hash_t single_hash; // For single-shot hash (small files) + }; + + FileHandle file_handle; + + bool use_incremental_hash; + + bool completed; + } FileReadContext; // -------------------------- Buffer structure --------------------------- -typedef struct IoBuffer { - void *data; - uint64_t offset; - size_t size; - size_t bytes_read; - SUBMIT_READ_RETURN_VALUE result; +#define IO_PENDING INT_MIN - int buffer_id; +typedef struct IoBuffer { + FileReadContext *file; + void *data; + size_t size; + uint64_t offset; + size_t bytes_read; + + BUILD_READ_RETURN_VALUE result; int completed; - FileReadContext *file; - struct IoBuffer *next; + int buffer_id; } IoBuffer; // Thread-local I/O Ring context typedef struct ThreadIoContext { - AsyncIoRing ring; + IoRingHandle ring; void *completion_event; unsigned char *fallback_buffer; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD]; int free_count; - int submitting; int num_submissions; int active_files; + bool submitting; + +#if defined(__linux__) + bool use_registered_buffers; +#endif - int use_registered_buffers; } ThreadIoContext; typedef struct { uint32_t MaxSubmissionQueueSize; uint32_t MaxCompletionQueueSize; uint32_t MaxVersion; -} AsyncIoCapabilities; +} IoRingCapabilities; // ----------------------------- Async I/O Abstraction ------------------------- #if defined(_WIN32) || defined(_WIN64) // Windows I/O Ring functions -static void async_io_query_capabilities(AsyncIoCapabilities *caps) { +static void ioring_query_capabilities(IoRingCapabilities *caps) { IORING_CAPABILITIES win_caps; QueryIoRingCapabilities(&win_caps); caps->MaxSubmissionQueueSize = win_caps.MaxSubmissionQueueSize; @@ -1010,32 +1018,30 @@ static void async_io_query_capabilities(AsyncIoCapabilities *caps) { caps->MaxVersion = win_caps.MaxVersion; } -static void *async_io_create_completion_event(void) { +static void *ioring_create_completion_event(void) { return CreateEvent(NULL, FALSE, FALSE, NULL); } -static void async_io_set_completion_event(AsyncIoRing ring, void *event) { +static void ioring_set_completion_event(IoRingHandle ring, void *event) { SetIoRingCompletionEvent(ring, event); } -static void async_io_wait_for_completion(ThreadIoContext *ctx) { +static void ioring_wait_for_completion(ThreadIoContext *ctx) { if (ctx->num_submissions > 0) { WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS); return; } } -static int async_io_create_ring(ThreadIoContext *thread_ctx, - uint32_t queue_size) { +static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { IORING_CREATE_FLAGS flags = {0}; HRESULT hr = CreateIoRing(IORING_VERSION_3, flags, queue_size, queue_size * 2, &thread_ctx->ring); // Create completion event - thread_ctx->completion_event = async_io_create_completion_event(); + thread_ctx->completion_event = ioring_create_completion_event(); if (thread_ctx->completion_event) { - async_io_set_completion_event(thread_ctx->ring, - thread_ctx->completion_event); + ioring_set_completion_event(thread_ctx->ring, thread_ctx->completion_event); } return SUCCEEDED(hr) ? 0 : -1; } @@ -1045,78 +1051,83 @@ static int async_io_create_ring(ThreadIoContext *thread_ctx, #define MAKE_BUF_INFO(a, l) \ (IORING_BUFFER_INFO) { .Address = (a), .Length = (uint32_t)(l) } -static int async_io_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, - uint32_t timeout_ms, uint32_t *submitted) { +static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { HRESULT hr = SubmitIoRing(thread_ctx->ring, 0, timeout_ms, submitted); // HRESULT hr = SubmitIoRing(ring, wait_count, timeout_ms, submitted); // The wait_count in windows is not implemented yet, so we wait with a // completion event for a single completion - async_io_wait_for_completion(thread_ctx); + ioring_wait_for_completion(thread_ctx); return SUCCEEDED(hr) ? 0 : -1; } -static int async_io_register_buffers(ThreadIoContext *thread_ctx, - uint32_t num_buffers, - IORING_BUFFER_INFO *buf_info) { +static int ioring_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { HRESULT hr = BuildIoRingRegisterBuffers( thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); if (FAILED(hr)) { - LPSTR messageBuffer = NULL; + char error_msg[256]; size_t size = FormatMessageA( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR)&messageBuffer, 0, NULL); + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), error_msg, + sizeof(error_msg), NULL); if (size > 0) { - fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", messageBuffer, + fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", error_msg, (unsigned int)hr); - LocalFree(messageBuffer); // Free the memory allocated by FormatMessage } else { fprintf(stderr, "Error registering buffers: Unknown HRESULT (0x%08X)\n", (unsigned int)hr); } } // Submit registration - async_io_submit(thread_ctx, 0, 0, NULL); + ioring_submit(thread_ctx, 0, 0, NULL); return hr; } -static void async_io_close_event(void *event) { CloseHandle(event); } +static void ioring_close_event(void *event) { CloseHandle(event); } -static int async_io_close_ring(ThreadIoContext *thread_ctx) { +static int close_ioring(ThreadIoContext *thread_ctx) { if (thread_ctx->completion_event) - async_io_close_event(thread_ctx->completion_event); + ioring_close_event(thread_ctx->completion_event); CloseIoRing(thread_ctx->ring); return 0; } -static SUBMIT_READ_RETURN_VALUE -async_io_build_read(ThreadIoContext *thread_ctx, AsyncIoHandle file_handle, - uint32_t buffer_id, size_t size, uint64_t offset, - uintptr_t user_data) { - IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_handle); +static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx, + FileReadContext *file_ctx, + uint32_t buffer_id, + size_t size, uint64_t offset, + uintptr_t user_data) { + IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->file_handle); IORING_BUFFER_REF buffer_ref = IoRingBufferRefFromIndexAndOffset(buffer_id, 0); HRESULT hr = BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref, (uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE); + if (FAILED(hr)) { + char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + error_msg, sizeof(error_msg), NULL); + + fprintf(stderr, + "ERROR: Building read error for file: %s - Code: %s (0x%08X)\n", + file_ctx->fe->path, error_msg, (unsigned int)hr); + } return hr; } -typedef struct { - HRESULT ResultCode; - uint32_t Information; - uintptr_t UserData; -} AsyncIoCompletion; - -static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { +static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { IORING_CQE win_cqe; while (1) { @@ -1128,10 +1139,10 @@ static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { if (FAILED(hr)) return -1; - // Unlike linux, in addition of IO operations, Windows IO Ring produces CQEs - // (completion queue entries) when doing operations like register buffer or - // submiting, we filter them here cqe.UserData == USERDATA_REGISTER - // cqe.ResultCode == S_OK (or error) + // Unlike linux, in addition of IO operations, Windows IO Ring produces + // CQEs (completion queue entries) when doing operations like register + // buffer or submiting, we filter them here cqe.UserData == + // USERDATA_REGISTER cqe.ResultCode == S_OK (or error) if (win_cqe.UserData == USERDATA_REGISTER) continue; @@ -1145,9 +1156,18 @@ static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, win_cqe.ResultCode, 0, error_msg, sizeof(error_msg), NULL); + + // Try to get the file path from the buffer + IoBuffer *buf = (IoBuffer *)win_cqe.UserData; + const char *file_path = "unknown"; + if (buf && buf->file && buf->file->fe) { + file_path = buf->file->fe->path; + } + fprintf(stderr, - "WARNING: I/O completion error - Code: 0x%lx, Error: %s\n", - win_cqe.ResultCode, error_msg); + "WARNING: I/O completion error for file '%s' - Code: 0x%lx, " + "Error: %s\n", + file_path, win_cqe.ResultCode, error_msg); } return 1; @@ -1156,7 +1176,7 @@ static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { #elif defined(__linux__) // Linux io_uring functions implementation -static void async_io_query_capabilities(AsyncIoCapabilities *caps) { +static void ioring_query_capabilities(IoRingCapabilities *caps) { // Get system limits for io_uring long max_entries = sysconf(_SC_IOV_MAX); if (max_entries <= 0) @@ -1169,9 +1189,8 @@ static void async_io_query_capabilities(AsyncIoCapabilities *caps) { } // static int async_io_create_ring(uint32_t queue_size, AsyncIoRing *ring) { -static int async_io_create_ring(ThreadIoContext *thread_ctx, - uint32_t queue_size) { - AsyncIoRingImpl *impl = (AsyncIoRingImpl *)calloc(1, sizeof(AsyncIoRingImpl)); +static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { + IoUring *impl = (IoUring *)calloc(1, sizeof(IoUring)); if (!impl) return -1; @@ -1210,25 +1229,76 @@ static int async_io_create_ring(ThreadIoContext *thread_ctx, #define MAKE_BUF_INFO(a, l) \ (IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) } -static int async_io_register_buffers(ThreadIoContext *thread_ctx, - uint32_t num_buffers, - IORING_BUFFER_INFO *buf_info) { - AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; +static int ioring_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { + IoUring *impl = (IoUring *)thread_ctx->ring; int hr = io_uring_register_buffers(&impl->ring, buf_info, num_buffers); if (hr < 0) { - fprintf(stderr, "Error registering buffers: %s (code: %d)\n", strerror(-hr), - hr); - fprintf(stderr, "WARNING: Memlock limit too low buffer size! Fallback to " - "unregistred buffers\n"); + if (hr == -ENOMEM) { + struct rlimit limit; + getrlimit(RLIMIT_MEMLOCK, &limit); + + fprintf(stderr, + "WARNING: Buffer registration failed due to memlock limits " + "(ENOMEM).\n" + "Increase the limit to solve this warning.\n"); + + // The memlock limit in Linux restricts the amount of memory a process can + // "lock" into physical RAM using the mlock() family of system calls. This + // prevents the operating system from swapping that memory out to disk. + // And registering buffers will lock the buffers memory so the hardware + // can access it directly without kernel intervention. Increase the limit + // to be able to register the buffers. + // + // **Modifying the Limit: + // The method for changing the memlock limit depends on whether you are + // managing a user session or a system service. + // 1. For Users and Interactive Sessions + // To permanently increase the limit for a specific user or group, modify + // the /etc/security/limits.conf file. Add the following lines: + // # Example for a specific user (replace 'username') + // username soft memlock unlimited + // username hard memlock unlimited + // + // # Example for all users + // * soft memlock unlimited + // * hard memlock unlimited + // + // Soft Limit: The value the user starts with; can be raised up to the + // hard limit. + // + // Hard Limit: The absolute maximum; only a privileged user + // (root) can increase this. Values: Can be set in Kilobytes (KB) or as + // unlimited. + // + // 2. For Systemd Services + // Settings in limits.conf do not affect background services managed by + // systemd. To increase the limit for a service, edit its service file + // (e.g., /etc/systemd/system/myservice.service) and add: + // + // [Service] + // LimitMEMLOCK=infinity + + } else { + // For any other error (e.g., EFAULT, EBUSY, EINVAL) + fprintf(stderr, "Error registering buffers: %s (code: %d)\n", + strerror(-hr), hr); + } + + fprintf(stderr, "Falling back to unregistered buffers (performance may " + "be reduced).\n"); } + + thread_ctx->use_registered_buffers = (hr == 0); return hr == 0 ? 0 : -1; } -static int async_io_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, - uint32_t timeout_ms, uint32_t *submitted) { - AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; +static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, + uint32_t timeout_ms, uint32_t *submitted) { + IoUring *impl = (IoUring *)thread_ctx->ring; if (!impl) return -1; @@ -1251,27 +1321,26 @@ static int async_io_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, return 0; } -static int async_io_close_ring(ThreadIoContext *thread_ctx) { - AsyncIoRingImpl *impl = (AsyncIoRingImpl *)thread_ctx->ring; +static int close_ioring(ThreadIoContext *thread_ctx) { + IoUring *impl = (IoUring *)thread_ctx->ring; if (!impl) return -1; if (thread_ctx->use_registered_buffers) { io_uring_unregister_buffers(&impl->ring); } - close(impl->event_fd); io_uring_queue_exit(&impl->ring); free(impl); return 0; } -static int async_io_build_read(ThreadIoContext *thread_ctx, - AsyncIoHandle file_handle, uint32_t buffer_id, - size_t size, uint64_t offset, - uintptr_t user_data) { - AsyncIoRing ring = thread_ctx->ring; - AsyncIoRingImpl *impl = (AsyncIoRingImpl *)ring; +static int ioring_build_read(ThreadIoContext *thread_ctx, + FileReadContext *file_ctx, uint32_t buffer_id, + size_t size, uint64_t offset, + uintptr_t user_data) { + IoRingHandle ring = thread_ctx->ring; + IoUring *impl = (IoUring *)ring; if (!impl) return -1; @@ -1281,22 +1350,21 @@ static int async_io_build_read(ThreadIoContext *thread_ctx, return -1; } - ThreadIoContext *ctx = thread_ctx; // or pass it properly TODO : look here - - void *buf = ctx->buffers[buffer_id].data; + void *buf = thread_ctx->buffers[buffer_id].data; if (thread_ctx->use_registered_buffers) { - io_uring_prep_read_fixed(sqe, file_handle, buf, size, offset, buffer_id); + io_uring_prep_read_fixed(sqe, file_ctx->file_handle, buf, size, offset, + buffer_id); } else { - io_uring_prep_read(sqe, file_handle, buf, size, offset); + io_uring_prep_read(sqe, file_ctx->file_handle, buf, size, offset); } io_uring_sqe_set_data64(sqe, user_data); return 0; } -static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { - AsyncIoRingImpl *impl = (AsyncIoRingImpl *)ring; +static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) { + IoUring *impl = (IoUring *)ring; struct io_uring_cqe *cqe_ptr = NULL; @@ -1333,8 +1401,17 @@ static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { // Check for error and print warning if (res < 0) { - fprintf(stderr, "WARNING: I/O completion error - Code: %d, Error: %s\n", - res, strerror(-res)); + // Try to get the file path from the buffer + IoBuffer *buf = (IoBuffer *)cqe->UserData; + const char *file_path = "unknown"; + if (buf && buf->file && buf->file->fe) { + file_path = buf->file->fe->path; + } + + fprintf( + stderr, + "WARNING: I/O completion error for file '%s' - Code: %d, Error: %s\n", + file_path, res, strerror(-res)); } return 1; @@ -1343,8 +1420,8 @@ static int async_io_pop_completion(AsyncIoRing ring, AsyncIoCompletion *cqe) { #endif // OS-agnostic helper macros -#define ASYNC_IO_SUCCEEDED(result) ((result) >= 0) -#define ASYNC_IO_FAILED(result) ((result) < 0) +#define IORING_SUCCEEDED(result) ((result) >= 0) +#define IORING_FAILED(result) ((result) < 0) // ---------------------- FIFO queue operations --------------------------- typedef struct FileQueue { @@ -1364,55 +1441,51 @@ static FileReadContext *fq_push(FileQueue *fq) { return f; } -static FileReadContext *fq_peek(FileQueue *fq) { +static FileReadContext *fq_peek_tail(FileQueue *fq) { if (fq->count == 0) return NULL; - return &fq->files[fq->head]; + + int idx = (fq->tail - 1 + MAX_ACTIVE_FILES) % MAX_ACTIVE_FILES; + return &fq->files[idx]; // return the newest file } -static void fq_pop(FileQueue *fq) { - fq->head = (fq->head + 1) % MAX_ACTIVE_FILES; - fq->count--; +static FileReadContext *fq_peek_at(FileQueue *fq, int index) { + if (index < 0 || index >= fq->count) + return NULL; + + int idx = (fq->head + index) % MAX_ACTIVE_FILES; + return &fq->files[idx]; } -static void fq_remove_at(FileQueue *fq, int index) { - if (fq->count == 0) - return; +static void fq_trim(FileQueue *fq) { + while (fq->count > 0) { + FileReadContext *f = &fq->files[fq->head]; - int remove_idx = (fq->head + index) % MAX_ACTIVE_FILES; + if (!f->completed) + break; - int last_logical = fq->count - 1; - int last_idx = (fq->head + last_logical) % MAX_ACTIVE_FILES; - - // Swap with last logical element if needed - if (index != last_logical) { - fq->files[remove_idx] = fq->files[last_idx]; + fq->head = (fq->head + 1) % MAX_ACTIVE_FILES; + fq->count--; } - - // Just decrease count - fq->count--; - - // Recompute tail properly - fq->tail = (fq->head + fq->count) % MAX_ACTIVE_FILES; } -// ---------------------- Initialize thread context --------------------------- -static ThreadIoContext *io_ring_init_thread(void) { +// ----------------- Initialize thread context ----------------------- +static ThreadIoContext *ioring_init_thread(void) { ThreadIoContext *thread_ctx = (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext)); if (!thread_ctx) return NULL; // Query I/O Ring capabilities - AsyncIoCapabilities caps; - async_io_query_capabilities(&caps); + IoRingCapabilities caps; + ioring_query_capabilities(&caps); uint32_t queue_size = caps.MaxSubmissionQueueSize; if (queue_size > 4096) queue_size = 4096; // Cap at 4096 for reasonable memory usage // Create I/O Ring - if (async_io_create_ring(thread_ctx, queue_size) != 0) { + if (create_ioring(thread_ctx, queue_size) != 0) { free(thread_ctx); thread_ctx = NULL; return NULL; @@ -1430,14 +1503,14 @@ static ThreadIoContext *io_ring_init_thread(void) { if (base_ptr) { if (!plat_mem_commit(base_ptr, buf_pool_size)) { plat_mem_release(base_ptr, 0); - async_io_close_ring(thread_ctx); + close_ioring(thread_ctx); free(thread_ctx); thread_ctx = NULL; return NULL; } } else { - async_io_close_ring(thread_ctx); + close_ioring(thread_ctx); free(thread_ctx); thread_ctx = NULL; return NULL; @@ -1456,22 +1529,21 @@ static ThreadIoContext *io_ring_init_thread(void) { // Register buffers int hr = - async_io_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info); + ioring_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info); - thread_ctx->use_registered_buffers = (hr == 0); - thread_ctx->submitting = 1; + thread_ctx->submitting = true; thread_ctx->num_submissions = 0; thread_ctx->active_files = 0; return thread_ctx; } -static void io_ring_cleanup_thread(ThreadIoContext *thread_ctx) { +static void ioring_cleanup_thread(ThreadIoContext *thread_ctx) { if (!thread_ctx) return; if (thread_ctx->ring) - async_io_close_ring(thread_ctx); + close_ioring(thread_ctx); // Free the buffer pool memory if (thread_ctx->buffers[0].data) { @@ -1494,7 +1566,8 @@ static IoBuffer *get_free_buffer(ThreadIoContext *ctx) { IoBuffer *buf = &ctx->buffers[idx]; buf->completed = 0; buf->bytes_read = 0; - buf->result = 0; + buf->result = IO_PENDING; + buf->next = NULL; return buf; } @@ -1513,104 +1586,63 @@ static int build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, buf->size = size; buf->file = file_ctx; - SUBMIT_READ_RETURN_VALUE result = - async_io_build_read(thread_ctx, file_ctx->hFile, buf->buffer_id, size, - offset, (uintptr_t)buf); + BUILD_READ_RETURN_VALUE result = ioring_build_read( + thread_ctx, file_ctx, buf->buffer_id, size, offset, (uintptr_t)buf); - if (ASYNC_IO_SUCCEEDED(result)) { + if (IORING_SUCCEEDED(result)) { file_ctx->active_reads++; file_ctx->reads_submitted++; thread_ctx->num_submissions++; } else { - buf->completed = 1; + buf->completed = true; buf->result = result; // Store the error code return_buffer(thread_ctx, buf); } return result; } -// ------------ Link completed buffers in an ordered list ------------- -static void insert_buffer_ordered(FileReadContext *file, IoBuffer *buf) { - buf->next = NULL; - - // empty list - if (!file->head) { - file->head = file->tail = buf; - return; - } - - // insert at head - if (buf->offset < file->head->offset) { - buf->next = file->head; - file->head = buf; - return; - } - - // find position - IoBuffer *cur = file->head; - - while (cur->next && cur->next->offset < buf->offset) { - cur = cur->next; - } - - buf->next = cur->next; - cur->next = buf; - - if (!buf->next) - file->tail = buf; -} - // -------------------------- Process completions --------------------------- static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) { - AsyncIoCompletion cqe; + IoRingCQE cqe; - // Keep processing as long as there are completions available - while (async_io_pop_completion(thread_ctx->ring, &cqe) == 1) { + while (ioring_pop_completion(thread_ctx->ring, &cqe) == 1) { IoBuffer *buf = (IoBuffer *)cqe.UserData; FileReadContext *file = buf->file; buf->result = cqe.ResultCode; buf->bytes_read = cqe.Information; - buf->completed = 1; file->active_reads--; file->reads_completed++; thread_ctx->num_submissions--; - - if (ASYNC_IO_SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) { - - buf->next = NULL; - - insert_buffer_ordered(file, buf); - - } else { - file->failed_reads++; - return_buffer(thread_ctx, buf); - } } } // -------------------- File operations ----------------------- -static int init_file(FileReadContext *f, FileEntry *fe) { - memset(f, 0, sizeof(*f)); +static int init_file(FileReadContext *file, FileEntry *fe) { + memset(file, 0, sizeof(*file)); - f->fe = fe; - f->file_size = fe->size_bytes; + file->fe = fe; + file->file_size = fe->size_bytes; + file->head = file->tail = NULL; - // Use the abstracted os_file_open_async for async I/O with no buffering - f->hFile = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); + file->file_handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ); - if (f->hFile == INVALID_ASYNC_IO_HANDLE) { + if (file->file_handle == INVALID_FILE_HANDLE) { + +#ifdef IORING_DEBUG + printf("ERROR: Could not open file %s\n", fe->path); +#endif return 0; } // Determine hash method based on file size - if (f->file_size > g_ioring_buffer_size) { - f->use_incremental_hash = true; - XXH3_128bits_reset(&f->hash_state); + if (file->file_size > g_ioring_buffer_size) { + file->use_incremental_hash = true; + XXH3_128bits_reset(&file->hash_state); } else { - f->use_incremental_hash = false; + file->use_incremental_hash = false; } return 1; } @@ -1620,9 +1652,11 @@ static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx, FileEntry *fe = file->fe; + os_file_close(file->file_handle); + char hash[HASH_STRLEN]; - if (file->failed_reads == 0 && file->bytes_hashed == file->file_size) { + if (file->bytes_hashed == file->file_size) { if (file->use_incremental_hash) { // Large file: digest the accumulated hash state XXH128_hash_t h = XXH3_128bits_digest(&file->hash_state); @@ -1635,10 +1669,12 @@ static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx, (unsigned long long)file->single_hash.low64); } } else { +#ifdef IORING_DEBUG + printf("WARNING: Fallback for path: %s\n", fe->path); +#endif + atomic_fetch_add(&g_io_ring_fallbacks, 1); xxh3_hash_file_stream(fe->path, hash, thread_ctx->fallback_buffer); - // DEBUG - // printf("Fallback for path: %s\n", fe->path); } char created[32], modified[32]; @@ -1657,219 +1693,164 @@ static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx, atomic_fetch_add(&g_files_hashed, 1); } -// -------------------- Hash head file ----------------------- -static void hash_head_file(ThreadIoContext *thread_ctx, FileQueue *fq, - WorkerContext *worker_ctx) { - - FileReadContext *file = fq_peek(fq); - if (!file) - return; - - // Keep hashing while the next buffer in sequence is ready at head - while (file->head && file->head->offset == file->next_hash_offset) { - - IoBuffer *buf = file->head; - - // Consume from head - file->head = buf->next; - if (!file->head) - file->tail = NULL; - - // Process the buffer - if (ASYNC_IO_SUCCEEDED(buf->result) && buf->bytes_read > 0) { - // Calculate actual bytes to hash (handle last partial sector) - size_t bytes_to_hash = buf->bytes_read; - - // If this is the last buffer and we read beyond file size, trim it - if (buf->offset + buf->bytes_read > file->file_size) { - bytes_to_hash = file->file_size - buf->offset; - } - - if (bytes_to_hash > 0) { - if (file->use_incremental_hash) { - // Large file: update incremental hash state - XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); - } else { - // Small file: single-shot hash - file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); - } - - file->bytes_hashed += bytes_to_hash; - atomic_fetch_add(&g_bytes_processed, bytes_to_hash); - } - - file->reads_hashed++; - } else if (buf->bytes_read == 0 && ASYNC_IO_SUCCEEDED(buf->result)) { - // Read operation completed with 0 bytes (EOF) - file->reads_hashed++; - } else { - // Read failed - file->failed_reads++; - file->reads_hashed++; - } - - // Move to next offset - file->next_hash_offset += buf->size; - - // Return buffer to pool - return_buffer(thread_ctx, buf); - } - - // Finalize file when all reads are complete - if (file->active_reads == 0 && file->bytes_hashed >= file->file_size) { - finalize_file(file, thread_ctx, worker_ctx); - os_file_close(file->hFile); - fq_pop(fq); - thread_ctx->active_files--; - } -} - +// -------------------- Hash files ----------------------- static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq, WorkerContext *worker_ctx) { - for (int i = 0; i < fq->count;) { - int idx = (fq->head + i) % MAX_ACTIVE_FILES; - FileReadContext *file = &fq->files[idx]; + for (int i = 0; i < fq->count; i++) { - bool progressed = false; + FileReadContext *file = fq_peek_at(fq, i); + if (!file || file->completed) + continue; - // ---- HASH READY BUFFERS ---- + // ---- HASH READY BUFFERS IN ORDER ---- while (file->head) { IoBuffer *buf = file->head; - if (buf->offset != file->bytes_hashed) + // CQE not received yet + if (buf->result == IO_PENDING) break; - progressed = true; - + // Consume buffer file->head = buf->next; - if (!file->head) - file->tail = NULL; - size_t bytes_to_hash = buf->bytes_read; + if (IORING_SUCCEEDED(buf->result) && buf->bytes_read > 0) { - if (buf->offset + buf->bytes_read > file->file_size) { - bytes_to_hash = file->file_size - buf->offset; - } + size_t bytes_to_hash = buf->bytes_read; - if (bytes_to_hash > 0) { - if (file->use_incremental_hash) { - XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); - } else { - file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + if (buf->offset + buf->bytes_read > file->file_size) { + bytes_to_hash = file->file_size - buf->offset; } - file->bytes_hashed += bytes_to_hash; - atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + if (bytes_to_hash > 0) { + if (file->use_incremental_hash) { + XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash); + } else { + file->single_hash = XXH3_128bits(buf->data, bytes_to_hash); + } + + file->bytes_hashed += bytes_to_hash; + atomic_fetch_add(&g_bytes_processed, bytes_to_hash); + } + + file->reads_hashed++; + + } else if (buf->bytes_read == 0 && IORING_SUCCEEDED(buf->result)) { + file->reads_hashed++; // EOF + } else { + finalize_file(file, thread_ctx, worker_ctx); + file->completed = true; } return_buffer(thread_ctx, buf); } // ---- FINALIZE ---- - if (file->active_reads == 0 && file->bytes_hashed >= file->file_size) { + if (!file->completed && file->active_reads == 0 && + file->bytes_hashed >= file->file_size) { finalize_file(file, thread_ctx, worker_ctx); - os_file_close(file->hFile); - - fq_remove_at(fq, i); + file->completed = true; thread_ctx->active_files--; - - continue; } - i++; } + + // Clean up completed files from the head + fq_trim(fq); } -// ------------- Submit pending reads - fill all free buffers ----------------- +// ------------------ Build pending reads ---------------------- static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, WorkerContext *worker_ctx) { MPMCQueue *file_queue = worker_ctx->file_queue; - // Try to submit reads for the current head file - FileReadContext *f = fq_peek(fq); + FileReadContext *file = fq_peek_tail(fq); for (;;) { - if (f) { - while (f->next_read_offset < f->file_size) { + // ---------------- BUILD READS FOR CURRENT FILE ---------------- + if (file) { + while (file->next_read_offset < file->file_size) { IoBuffer *buf = get_free_buffer(thread_ctx); if (!buf) return; - size_t remaining = f->file_size - f->next_read_offset; - size_t size; - - // Check if this is the last read and the file size is not - // sector-aligned - int is_last_read = (remaining <= g_ioring_buffer_size); + size_t remaining = file->file_size - file->next_read_offset; + size_t bytes_to_read; if (remaining >= g_ioring_buffer_size) { - // Normal full read - size = g_ioring_buffer_size; + bytes_to_read = g_ioring_buffer_size; } else { - // Last read - handle partial sector - if (remaining % g_pagesize != 0) { - size = ALIGN_UP_POW2(remaining, g_pagesize); - - } else { - size = remaining; - } + bytes_to_read = ALIGN_UP_POW2(remaining, g_pagesize); } - SUBMIT_READ_RETURN_VALUE hr = - build_read(thread_ctx, f, buf, f->next_read_offset, size); + // Initialize buffer + buf->file = file; + buf->offset = file->next_read_offset; + buf->size = bytes_to_read; - if (ASYNC_IO_FAILED(hr)) { + // Chain buffer + if (!file->head) { + file->head = buf; + } else { + file->tail->next = buf; + } + file->tail = buf; + + BUILD_READ_RETURN_VALUE hr = + ioring_build_read(thread_ctx, file, buf->buffer_id, bytes_to_read, + buf->offset, (uintptr_t)buf); + + if (IORING_FAILED(hr)) { + // mark failure and stop this file return_buffer(thread_ctx, buf); - f->failed_reads++; - f->active_reads = 0; - f->reads_submitted = 0; - f->next_read_offset = f->file_size; + finalize_file(file, thread_ctx, worker_ctx); + file->completed = true; break; } - f->next_read_offset += size; + file->active_reads++; + file->reads_submitted++; + thread_ctx->num_submissions++; + + file->next_read_offset += bytes_to_read; } } - // Add new file if possible + // ---------------- ADD NEW FILE ---------------- if (!thread_ctx->submitting) return; - if (thread_ctx->active_files >= MAX_ACTIVE_FILES) + if (fq->count >= MAX_ACTIVE_FILES) return; FileEntry *fe = mpmc_pop(file_queue); if (!fe) { - thread_ctx->submitting = 0; + thread_ctx->submitting = false; return; } - FileReadContext *newf = fq_push(fq); + FileReadContext *newfile = fq_push(fq); - if (!init_file(newf, fe)) { - // File can't be opened with NO_BUFFERING, process with fallback - char hash[HASH_STRLEN]; - finalize_file(newf, thread_ctx, worker_ctx); - fq_pop(fq); + if (!init_file(newfile, fe)) { + finalize_file(newfile, thread_ctx, worker_ctx); + newfile->completed = true; continue; } - f = newf; + file = newfile; thread_ctx->active_files++; } } - // -------------------------- Hash worker I/O Ring --------------------------- -static THREAD_RETURN hash_worker_io_ring(void *arg) { - WorkerContext *ctx = (WorkerContext *)arg; +static THREAD_RETURN hash_worker_ioring(void *arg) { + WorkerContext *worker_ctx = (WorkerContext *)arg; // Init IO ring - ThreadIoContext *thread_ctx = io_ring_init_thread(); + ThreadIoContext *thread_ctx = ioring_init_thread(); if (!thread_ctx || !thread_ctx->ring) { printf("I/O Ring unavailable, using buffered I/O\n"); return hash_worker(arg); @@ -1886,28 +1867,25 @@ static THREAD_RETURN hash_worker_io_ring(void *arg) { for (;;) { // Submit new reads - build_pending_reads(thread_ctx, &fq, ctx); + build_pending_reads(thread_ctx, &fq, worker_ctx); - wait_count = MIN(thread_ctx->num_submissions, NUM_BUFFERS_PER_THREAD - 2); + wait_count = MIN(thread_ctx->num_submissions, NUM_BUFFERS_PER_THREAD - 6); submitted = 0; - // async_io_submit(ring_ctx->ring, 0, 0, &submitted); - async_io_submit(thread_ctx, wait_count, 0, &submitted); + ioring_submit(thread_ctx, wait_count, 0, &submitted); // Process completions process_completions(thread_ctx, &fq); // debug - // printf("Free buffers: %d, Submissions: %d, Active files: %d\n", - // ring_ctx->free_count, ring_ctx->num_submissions, - // ring_ctx->active_files); + // printf( + // "Free buffers: %d, Submissions: %d, Active files: %d, fq count: + // %d\n", thread_ctx->free_count, thread_ctx->num_submissions, + // thread_ctx->active_files, fq.count); // debug end // Hash files - for (int i = 0; i < fq.count; i++) { - hash_head_file(thread_ctx, &fq, ctx); - } - // hash_ready_files(ring_ctx, &fq, ctx); + hash_ready_files(thread_ctx, &fq, worker_ctx); // Exit condition if (!thread_ctx->submitting && thread_ctx->active_files == 0 && @@ -1916,6 +1894,6 @@ static THREAD_RETURN hash_worker_io_ring(void *arg) { } } - io_ring_cleanup_thread(thread_ctx); + ioring_cleanup_thread(thread_ctx); return THREAD_RETURN_VALUE; }