diff --git a/base.h b/base.h index 24b9e18..3d8e91a 100644 --- a/base.h +++ b/base.h @@ -1,4 +1,5 @@ #pragma once + #define _CRT_SECURE_NO_WARNINGS #if defined(_WIN32) || defined(_WIN64) @@ -10,7 +11,7 @@ #include #include #include -#include +#include // Needs to be included before stdatomic to avoid errors #include #include #include @@ -26,18 +27,19 @@ #include #include #include +#include #include #include +#include #include #include #include -#include -#include #endif #include #include #include +#include #include #include #include @@ -46,7 +48,6 @@ #include #include #include -#include /* ------------------------------------------------------------ Base types diff --git a/binaries/changelog.txt b/binaries/changelog.txt index 9062932..e619f7e 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -50,8 +50,9 @@ Fixing user prompt parsing Reorganising the code Improving the scan function -5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cache completely, registred buffers, it supports bashing multiple submissions and can handle multiple files at the same time. +5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cache completely, registered buffers (and registered files in io_uring), it supports bashing multiple submissions and can handle multiple files at the same time. Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers fixing the xxh_x86dispatch warnings Updating the progress printing function +Implementing a config file diff --git a/config.h b/config.h new file mode 100644 index 0000000..3aa7208 --- /dev/null +++ b/config.h @@ -0,0 +1,27 @@ + +#define FILE_HASHES_TXT "file_hashes.txt" +#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null +#define MAX_PATHLEN 4096 +#define READ_BLOCK (KiB(64)) + +#define MULTI_THREADED true + +// -------------------- IO Ring Configuration ---------------------- +#define USE_IORING 1 + +#if USE_IORING +#define IORING_BUFFER_SIZE (KiB(256)) +#define NUM_BUFFERS_PER_THREAD 32 +#define MAX_ACTIVE_FILES 32 +#define SUBMIT_TIMEOUT_MS 30000 + +#define IORING_DEBUG_PRINTS false +#define IORING_DEBUG_STATS false + +#if defined(_WIN32) || defined(_WIN64) +#define USE_REGISTERED_FILES false +#elif defined(__linux__) +#define USE_REGISTERED_FILES true +#endif + +#endif diff --git a/file_hasher.c b/file_hasher.c index 52e2c9b..3120723 100644 --- a/file_hasher.c +++ b/file_hasher.c @@ -82,17 +82,31 @@ int main(int argc, char **argv) { // Logical threads = CPU cores * 2 uint32_t cpu_threads = cpu_cores * 2; +#if MULTI_THREADED uint32_t num_scan_threads = cpu_threads; uint32_t num_hash_threads = cpu_threads; - // uint32_t num_hash_threads = 1; printf("%d cores %d threads CPU detected with %s instruction set\n" "Starting thread pool: %d scanning and %d hashing threads\n", cpu_cores, cpu_threads, get_xxhash_instruction_set(), num_scan_threads, num_hash_threads); +#else + uint32_t num_scan_threads = 1; + uint32_t num_hash_threads = 1; + + printf( + "%d cores %d threads CPU detected with %s instruction set\n" + "Starting thread pool: %d scanning and %d hashing threads(Debug mode)\n", + cpu_cores, cpu_threads, get_xxhash_instruction_set(), num_scan_threads, + num_hash_threads); + +#endif // Align IO Ring block size to the system page size +#if USE_IORING g_ioring_buffer_size = ALIGN_UP_POW2(g_ioring_buffer_size, g_pagesize); +#endif + // ------------------------------- // Scanning and hashing // ------------------------------- @@ -104,25 +118,6 @@ int main(int argc, char **argv) { MPMCQueue file_queue; mpmc_init(&file_queue, MiB(1)); - // Starting hash threads - // size_t num_hash_threads = num_threads; - // - // WorkerContext workers[num_hash_threads]; - // Thread *hash_threads = - // arena_push(&gp_arena, sizeof(Thread) * num_hash_threads, true); - // - // for (size_t i = 0; i < num_hash_threads; ++i) { - // workers[i].arena = arena_create(¶ms); - // workers[i].file_queue = &file_queue; - // - // if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker, &workers[i]) - // != - // 0) { - // fprintf(stderr, "Failed to create hash thread %zu\n", i); - // exit(1); - // } - // } - // Starting hash threads WorkerContext workers[num_hash_threads]; Thread *hash_threads = @@ -132,45 +127,19 @@ int main(int argc, char **argv) { workers[i].arena = arena_create(¶ms); workers[i].file_queue = &file_queue; +#if USE_IORING if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker_ioring, - &workers[i]) != 0) { + &workers[i]) != 0) +#else + if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker, &workers[i]) != + 0) +#endif + { fprintf(stderr, "Failed to create hash thread %zu\n", i); exit(1); } } - // Starting hash threads - // size_t num_hash_threads = num_threads; - // - // WorkerContext workers[num_hash_threads]; - // Thread *hash_threads = - // arena_push(&gp_arena, sizeof(Thread) * num_hash_threads, true); - // - // // Check if I/O Ring is available - // bool io_ring_available = false; - // HIORING test_ring = io_ring_init(); - // if (test_ring) { - // io_ring_available = true; - // io_ring_cleanup(test_ring); - // // printf("I/O Ring is available, using high-performance async I/O\n"); - // } else { - // printf("I/O Ring not available, using buffered I/O\n"); - // } - // - // for (size_t i = 0; i < num_hash_threads; ++i) { - // workers[i].arena = arena_create(¶ms); - // workers[i].file_queue = &file_queue; - // - // // Select the appropriate worker function - // ThreadFunc fn = io_ring_available ? (ThreadFunc)hash_worker_io_ring - // : (ThreadFunc)hash_worker; - // - // if (thread_create(&hash_threads[i], fn, &workers[i]) != 0) { - // fprintf(stderr, "Failed to create hash thread %zu\n", i); - // exit(1); - // } - // } - // Starting progress printing thread Thread progress_thread_handle; if (thread_create(&progress_thread_handle, (ThreadFunc)progress_thread, @@ -265,12 +234,14 @@ int main(int argc, char **argv) { // ------------------------------- // Print summary // ------------------------------- +#if USE_IORING uint64_t incomplete = atomic_load(&g_io_ring_fallbacks); if (incomplete > 0) { printf("\nWARNING: I/O Ring incomplete files: %llu (fallback to buffered " "I/O used)\n", (unsigned long long)incomplete); } +#endif double total_seconds = timer_elapsed(&total_timer); diff --git a/platform.c b/platform.c index deb0e63..809cdee 100644 --- a/platform.c +++ b/platform.c @@ -11,12 +11,7 @@ #define XXH_STATIC_LINKING_ONLY #include "xxh_x86dispatch.h" -// ----------------------------- Config ------------------------------------- -#define FILE_HASHES_TXT "file_hashes.txt" -#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null -#define MAX_PATHLEN 4096 -#define READ_BLOCK (KiB(64)) - +#include "config.h" // ----------------------------- Globals ------------------------------------ static atomic_uint_fast64_t g_files_found = 0; static atomic_uint_fast64_t g_files_hashed = 0; @@ -24,7 +19,7 @@ static atomic_uint_fast64_t g_bytes_processed = 0; static atomic_int g_scan_done = 0; // ================== OS-agnostic functions abstraction ===================== -// ----------------------------- Timer functions -------------- +// --------------------- Timer functions --------------------- typedef struct { u64 start; u64 now; @@ -71,7 +66,7 @@ double timer_elapsed(HiResTimer *t) { #endif -// ----------------------------- Get HW info -------------- +// ------------------- Get HW info -------------------- #if defined(_WIN32) || defined(_WIN64) size_t platform_physical_cores(void) { @@ -367,7 +362,7 @@ static int parse_paths(char *line, char folders[][MAX_PATHLEN], return count; } -// ----------------------------- File time ------------------------- +// ------------------------- File time ------------------------- #if defined(_WIN32) || defined(_WIN64) static void format_time(uint64_t t, char *out, size_t out_sz) { if (t == 0) { @@ -382,7 +377,7 @@ static void format_time(uint64_t t, char *out, size_t out_sz) { strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); } -// ----------------------------- Convert filetime to epoch -------------- +// ------------------ Convert filetime to epoch ------------------- static uint64_t filetime_to_epoch(const FILETIME *ft) { ULARGE_INTEGER ull; ull.LowPart = ft->dwLowDateTime; @@ -433,7 +428,7 @@ void platform_get_file_times(const char *path, uint64_t *out_created, #endif -// ----------------------------- File owner --------------------- +// -------------------- File owner --------------------- #if defined(_WIN32) || defined(_WIN64) static void get_file_owner(const char *path, char *out, size_t out_sz) { PSID sid = NULL; @@ -781,7 +776,7 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex, // ------------------------- Hash worker -------------------------------- static THREAD_RETURN hash_worker(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; - unsigned char *buf = (unsigned char *)malloc(READ_BLOCK); + void *buf = malloc(READ_BLOCK); for (;;) { FileEntry *fe = mpmc_pop(ctx->file_queue); @@ -885,31 +880,23 @@ static THREAD_RETURN progress_thread(void *arg) { return THREAD_RETURN_VALUE; } -// ======================== Hash worker IO Ring ======================== -// -------------------------- Configuration --------------------------- -#define IORING_BUFFER_SIZE (KiB(256)) -#define NUM_BUFFERS_PER_THREAD 32 -#define MAX_ACTIVE_FILES 32 -#define SUBMIT_TIMEOUT_MS 30000 -// #define IORING_DEBUG // Uncomment to print some errors +// ======================== IO Ring implemented ======================== +#if USE_IORING +// -------------------------- Data structures --------------------------- // Globals u64 g_ioring_buffer_size = 4096 * 64; static atomic_uint_fast64_t g_io_ring_fallbacks = 0; -// -------------------------- Data structures --------------------------- +#define IO_PENDING INT_MIN + +typedef struct IoBuffer IoBuffer; #if defined(_WIN32) || defined(_WIN64) // Windows I/O Ring types typedef HIORING IoRingHandle; #define BUILD_READ_RETURN_VALUE HRESULT -typedef struct { - HRESULT ResultCode; - uint32_t Information; - uintptr_t UserData; -} IoRingCQE; - #elif defined(__linux__) // Linux io_uring types typedef struct { @@ -923,16 +910,8 @@ typedef IoUring *IoRingHandle; typedef struct iovec IORING_BUFFER_INFO; #define BUILD_READ_RETURN_VALUE int -typedef struct { - int ResultCode; - uint32_t Information; - uintptr_t UserData; -} IoRingCQE; - #endif -typedef struct IoBuffer IoBuffer; - typedef struct FileReadContext { FileEntry *fe; uint64_t file_size; @@ -959,6 +938,10 @@ typedef struct FileReadContext { FileHandle file_handle; +#if USE_REGISTERED_FILES + uint32_t slot_id; +#endif + bool use_incremental_hash; bool completed; @@ -966,8 +949,6 @@ typedef struct FileReadContext { } FileReadContext; // -------------------------- Buffer structure --------------------------- -#define IO_PENDING INT_MIN - typedef struct IoBuffer { FileReadContext *file; void *data; @@ -985,8 +966,11 @@ typedef struct IoBuffer { // Thread-local I/O Ring context typedef struct ThreadIoContext { IoRingHandle ring; +#if defined(_WIN32) || defined(_WIN64) void *completion_event; - unsigned char *fallback_buffer; +#endif + + void *fallback_buffer; IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD]; int free_count; @@ -998,6 +982,11 @@ typedef struct ThreadIoContext { bool use_registered_buffers; #endif +#if USE_REGISTERED_FILES + bool use_registered_files; + FileHandle registered_handles[MAX_ACTIVE_FILES]; +#endif + } ThreadIoContext; typedef struct { @@ -1006,7 +995,13 @@ typedef struct { uint32_t MaxVersion; } IoRingCapabilities; -// ----------------------------- Async I/O Abstraction ------------------------- +typedef struct { + BUILD_READ_RETURN_VALUE ResultCode; + uint32_t Information; + uintptr_t UserData; +} IoRingCQE; + +// ------------------------ IO Ring Abstraction ------------------------- #if defined(_WIN32) || defined(_WIN64) // Windows I/O Ring functions @@ -1063,35 +1058,48 @@ static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, return SUCCEEDED(hr) ? 0 : -1; } -static int ioring_register_buffers(ThreadIoContext *thread_ctx, - uint32_t num_buffers, - IORING_BUFFER_INFO *buf_info) { +static void ioring_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { HRESULT hr = BuildIoRingRegisterBuffers( thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); if (FAILED(hr)) { char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + error_msg, sizeof(error_msg), NULL); - size_t size = FormatMessageA( - FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), error_msg, - sizeof(error_msg), NULL); - - if (size > 0) { - fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", error_msg, - (unsigned int)hr); - } else { - fprintf(stderr, "Error registering buffers: Unknown HRESULT (0x%08X)\n", - (unsigned int)hr); - } + fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", error_msg, + (unsigned int)hr); } // Submit registration ioring_submit(thread_ctx, 0, 0, NULL); - - return hr; } +#if USE_REGISTERED_FILES +static void ioring_register_files(ThreadIoContext *thread_ctx) { + + HRESULT hr = BuildIoRingRegisterFileHandles( + thread_ctx->ring, MAX_ACTIVE_FILES, thread_ctx->registered_handles, + USERDATA_REGISTER); + + if (FAILED(hr)) { + char error_msg[256]; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + error_msg, sizeof(error_msg), NULL); + + fprintf(stderr, "WARNING: File registration failed: %s (0x%08X)\n", + error_msg, (unsigned int)hr); + } + + thread_ctx->use_registered_files = (hr == 0); +} +#endif + static void ioring_close_event(void *event) { CloseHandle(event); } static int close_ioring(ThreadIoContext *thread_ctx) { @@ -1107,9 +1115,23 @@ static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx, uint32_t buffer_id, size_t size, uint64_t offset, uintptr_t user_data) { + +#if USE_REGISTERED_FILES + IORING_HANDLE_REF file_ref; + + if (thread_ctx->use_registered_files) { + file_ref = (IORING_HANDLE_REF)IoRingHandleRefFromIndex(file_ctx->slot_id); + } else { + file_ref = + (IORING_HANDLE_REF)IoRingHandleRefFromHandle(file_ctx->file_handle); + } +#else IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->file_handle); +#endif + IORING_BUFFER_REF buffer_ref = IoRingBufferRefFromIndexAndOffset(buffer_id, 0); + HRESULT hr = BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref, (uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE); @@ -1229,9 +1251,9 @@ static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) { #define MAKE_BUF_INFO(a, l) \ (IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) } -static int ioring_register_buffers(ThreadIoContext *thread_ctx, - uint32_t num_buffers, - IORING_BUFFER_INFO *buf_info) { +static void ioring_register_buffers(ThreadIoContext *thread_ctx, + uint32_t num_buffers, + IORING_BUFFER_INFO *buf_info) { IoUring *impl = (IoUring *)thread_ctx->ring; int hr = io_uring_register_buffers(&impl->ring, buf_info, num_buffers); @@ -1246,6 +1268,7 @@ static int ioring_register_buffers(ThreadIoContext *thread_ctx, "(ENOMEM).\n" "Increase the limit to solve this warning.\n"); + // TODO: document this in read me // The memlock limit in Linux restricts the amount of memory a process can // "lock" into physical RAM using the mlock() family of system calls. This // prevents the operating system from swapping that memory out to disk. @@ -1293,7 +1316,19 @@ static int ioring_register_buffers(ThreadIoContext *thread_ctx, } thread_ctx->use_registered_buffers = (hr == 0); - return hr == 0 ? 0 : -1; +} + +static void ioring_register_files(ThreadIoContext *thread_ctx) { + IoUring *impl = (IoUring *)thread_ctx->ring; + + int hr = io_uring_register_files(&impl->ring, thread_ctx->registered_handles, + MAX_ACTIVE_FILES); + if (hr < 0) { + fprintf(stderr, "file registeration failed: %s (code: %d)\n", strerror(-hr), + hr); + } + + thread_ctx->use_registered_files = (hr == 0); } static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, @@ -1435,10 +1470,14 @@ static FileReadContext *fq_push(FileQueue *fq) { if (fq->count == MAX_ACTIVE_FILES) return NULL; - FileReadContext *f = &fq->files[fq->tail]; + FileReadContext *file = &fq->files[fq->tail]; +#if USE_REGISTERED_FILES + file->slot_id = fq->tail; +#endif + fq->tail = (fq->tail + 1) % MAX_ACTIVE_FILES; fq->count++; - return f; + return file; } static FileReadContext *fq_peek_tail(FileQueue *fq) { @@ -1492,7 +1531,7 @@ static ThreadIoContext *ioring_init_thread(void) { } // Initialize buffer pool - thread_ctx->fallback_buffer = (unsigned char *)malloc(READ_BLOCK); + thread_ctx->fallback_buffer = malloc(READ_BLOCK); IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD]; @@ -1528,8 +1567,11 @@ static ThreadIoContext *ioring_init_thread(void) { thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; // Register buffers - int hr = - ioring_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info); + ioring_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info); + +#if USE_REGISTERED_FILES + ioring_register_files(thread_ctx); +#endif thread_ctx->submitting = true; thread_ctx->num_submissions = 0; @@ -1579,7 +1621,7 @@ static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) { ctx->buffer_pool[ctx->free_count++] = buf->buffer_id; } -// -------------------------- Submit async read --------------------------- +// -------------------------- Build read --------------------------- static int build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, IoBuffer *buf, uint64_t offset, size_t size) { buf->offset = offset; @@ -1620,7 +1662,8 @@ static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) { } // -------------------- File operations ----------------------- -static int init_file(FileReadContext *file, FileEntry *fe) { +static int init_file(ThreadIoContext *thread_ctx, FileReadContext *file, + FileEntry *fe) { memset(file, 0, sizeof(*file)); file->fe = fe; @@ -1631,12 +1674,19 @@ static int init_file(FileReadContext *file, FileEntry *fe) { if (file->file_handle == INVALID_FILE_HANDLE) { -#ifdef IORING_DEBUG +#if IORING_DEBUG_PRINTS printf("ERROR: Could not open file %s\n", fe->path); #endif return 0; } +#if (defined(_WIN32) || defined(_WIN64)) && USE_REGISTERED_FILES + if (thread_ctx->use_registered_files) { + thread_ctx->registered_handles[file->slot_id] = file->file_handle; + ioring_register_files(thread_ctx); + } +#endif + // Determine hash method based on file size if (file->file_size > g_ioring_buffer_size) { file->use_incremental_hash = true; @@ -1647,8 +1697,8 @@ static int init_file(FileReadContext *file, FileEntry *fe) { return 1; } -static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx, - WorkerContext *worker_ctx) { +static void finalize_file(ThreadIoContext *thread_ctx, + WorkerContext *worker_ctx, FileReadContext *file) { FileEntry *fe = file->fe; @@ -1669,7 +1719,7 @@ static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx, (unsigned long long)file->single_hash.low64); } } else { -#ifdef IORING_DEBUG +#if IORING_DEBUG_PRINTS printf("WARNING: Fallback for path: %s\n", fe->path); #endif @@ -1739,7 +1789,7 @@ static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq, } else if (buf->bytes_read == 0 && IORING_SUCCEEDED(buf->result)) { file->reads_hashed++; // EOF } else { - finalize_file(file, thread_ctx, worker_ctx); + finalize_file(thread_ctx, worker_ctx, file); file->completed = true; } @@ -1750,7 +1800,7 @@ static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq, if (!file->completed && file->active_reads == 0 && file->bytes_hashed >= file->file_size) { - finalize_file(file, thread_ctx, worker_ctx); + finalize_file(thread_ctx, worker_ctx, file); file->completed = true; thread_ctx->active_files--; } @@ -1770,7 +1820,7 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, for (;;) { - // ---------------- BUILD READS FOR CURRENT FILE ---------------- + // BUILD READS FOR CURRENT FILE if (file) { while (file->next_read_offset < file->file_size) { @@ -1807,7 +1857,7 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, if (IORING_FAILED(hr)) { // mark failure and stop this file return_buffer(thread_ctx, buf); - finalize_file(file, thread_ctx, worker_ctx); + finalize_file(thread_ctx, worker_ctx, file); file->completed = true; break; } @@ -1820,7 +1870,7 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, } } - // ---------------- ADD NEW FILE ---------------- + // ADD NEW FILE if (!thread_ctx->submitting) return; @@ -1835,8 +1885,8 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, FileReadContext *newfile = fq_push(fq); - if (!init_file(newfile, fe)) { - finalize_file(newfile, thread_ctx, worker_ctx); + if (!init_file(thread_ctx, newfile, fe)) { + finalize_file(thread_ctx, worker_ctx, newfile); newfile->completed = true; continue; } @@ -1845,6 +1895,7 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq, thread_ctx->active_files++; } } + // -------------------------- Hash worker I/O Ring --------------------------- static THREAD_RETURN hash_worker_ioring(void *arg) { WorkerContext *worker_ctx = (WorkerContext *)arg; @@ -1877,12 +1928,12 @@ static THREAD_RETURN hash_worker_ioring(void *arg) { // Process completions process_completions(thread_ctx, &fq); - // debug - // printf( - // "Free buffers: %d, Submissions: %d, Active files: %d, fq count: - // %d\n", thread_ctx->free_count, thread_ctx->num_submissions, - // thread_ctx->active_files, fq.count); - // debug end +#if IORING_DEBUG_STATS + printf("Free buffers: %d, Submissions: %d, Active files: %d, fq count: + % d\n ", thread_ctx->free_count, thread_ctx->num_submissions, + thread_ctx->active_files, + fq.count); +#endif // Hash files hash_ready_files(thread_ctx, &fq, worker_ctx); @@ -1897,3 +1948,4 @@ static THREAD_RETURN hash_worker_ioring(void *arg) { ioring_cleanup_thread(thread_ctx); return THREAD_RETURN_VALUE; } +#endif