Implementing registered files in io_uring

The windows implementation is disabled, currently registering files in
IO Ring when there is inflight IO operations causes corruptions.

Implementing a config file.

Some code cleanup
This commit is contained in:
2026-04-24 15:30:04 +01:00
parent ab31776658
commit 3393129c5f
5 changed files with 194 additions and 142 deletions

9
base.h
View File

@@ -1,4 +1,5 @@
#pragma once #pragma once
#define _CRT_SECURE_NO_WARNINGS #define _CRT_SECURE_NO_WARNINGS
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
@@ -10,7 +11,7 @@
#include <aclapi.h> #include <aclapi.h>
#include <fcntl.h> #include <fcntl.h>
#include <io.h> #include <io.h>
#include <ioringapi.h> #include <ioringapi.h> // Needs to be included before stdatomic to avoid errors
#include <ntioring_x.h> #include <ntioring_x.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
@@ -26,18 +27,19 @@
#include <dirent.h> #include <dirent.h>
#include <fcntl.h> #include <fcntl.h>
#include <liburing.h> #include <liburing.h>
#include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <pwd.h> #include <pwd.h>
#include <sys/eventfd.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <poll.h>
#include <sys/eventfd.h>
#endif #endif
#include <assert.h> #include <assert.h>
#include <ctype.h> #include <ctype.h>
#include <immintrin.h> #include <immintrin.h>
#include <limits.h>
#include <stdatomic.h> #include <stdatomic.h>
#include <stdbool.h> #include <stdbool.h>
#include <stddef.h> #include <stddef.h>
@@ -46,7 +48,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include <limits.h>
/* ------------------------------------------------------------ /* ------------------------------------------------------------
Base types Base types

View File

@@ -50,8 +50,9 @@ Fixing user prompt parsing
Reorganising the code Reorganising the code
Improving the scan function Improving the scan function
5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cache completely, registred buffers, it supports bashing multiple submissions and can handle multiple files at the same time. 5.0: Implementing the IO Ring for windows and ui_uring for linux instead of buffered hashing, huge performance gains. The IO Ring is event driven, thread local, uses DMA and direct disk I/O, bypassing the OS cache completely, registered buffers (and registered files in io_uring), it supports bashing multiple submissions and can handle multiple files at the same time.
Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers
fixing the xxh_x86dispatch warnings fixing the xxh_x86dispatch warnings
Updating the progress printing function Updating the progress printing function
Implementing a config file

27
config.h Normal file
View File

@@ -0,0 +1,27 @@
#define FILE_HASHES_TXT "file_hashes.txt"
#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null
#define MAX_PATHLEN 4096
#define READ_BLOCK (KiB(64))
#define MULTI_THREADED true
// -------------------- IO Ring Configuration ----------------------
#define USE_IORING 1
#if USE_IORING
#define IORING_BUFFER_SIZE (KiB(256))
#define NUM_BUFFERS_PER_THREAD 32
#define MAX_ACTIVE_FILES 32
#define SUBMIT_TIMEOUT_MS 30000
#define IORING_DEBUG_PRINTS false
#define IORING_DEBUG_STATS false
#if defined(_WIN32) || defined(_WIN64)
#define USE_REGISTERED_FILES false
#elif defined(__linux__)
#define USE_REGISTERED_FILES true
#endif
#endif

View File

@@ -82,17 +82,31 @@ int main(int argc, char **argv) {
// Logical threads = CPU cores * 2 // Logical threads = CPU cores * 2
uint32_t cpu_threads = cpu_cores * 2; uint32_t cpu_threads = cpu_cores * 2;
#if MULTI_THREADED
uint32_t num_scan_threads = cpu_threads; uint32_t num_scan_threads = cpu_threads;
uint32_t num_hash_threads = cpu_threads; uint32_t num_hash_threads = cpu_threads;
// uint32_t num_hash_threads = 1;
printf("%d cores %d threads CPU detected with %s instruction set\n" printf("%d cores %d threads CPU detected with %s instruction set\n"
"Starting thread pool: %d scanning and %d hashing threads\n", "Starting thread pool: %d scanning and %d hashing threads\n",
cpu_cores, cpu_threads, get_xxhash_instruction_set(), num_scan_threads, cpu_cores, cpu_threads, get_xxhash_instruction_set(), num_scan_threads,
num_hash_threads); num_hash_threads);
#else
uint32_t num_scan_threads = 1;
uint32_t num_hash_threads = 1;
printf(
"%d cores %d threads CPU detected with %s instruction set\n"
"Starting thread pool: %d scanning and %d hashing threads(Debug mode)\n",
cpu_cores, cpu_threads, get_xxhash_instruction_set(), num_scan_threads,
num_hash_threads);
#endif
// Align IO Ring block size to the system page size // Align IO Ring block size to the system page size
#if USE_IORING
g_ioring_buffer_size = ALIGN_UP_POW2(g_ioring_buffer_size, g_pagesize); g_ioring_buffer_size = ALIGN_UP_POW2(g_ioring_buffer_size, g_pagesize);
#endif
// ------------------------------- // -------------------------------
// Scanning and hashing // Scanning and hashing
// ------------------------------- // -------------------------------
@@ -104,25 +118,6 @@ int main(int argc, char **argv) {
MPMCQueue file_queue; MPMCQueue file_queue;
mpmc_init(&file_queue, MiB(1)); mpmc_init(&file_queue, MiB(1));
// Starting hash threads
// size_t num_hash_threads = num_threads;
//
// WorkerContext workers[num_hash_threads];
// Thread *hash_threads =
// arena_push(&gp_arena, sizeof(Thread) * num_hash_threads, true);
//
// for (size_t i = 0; i < num_hash_threads; ++i) {
// workers[i].arena = arena_create(&params);
// workers[i].file_queue = &file_queue;
//
// if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker, &workers[i])
// !=
// 0) {
// fprintf(stderr, "Failed to create hash thread %zu\n", i);
// exit(1);
// }
// }
// Starting hash threads // Starting hash threads
WorkerContext workers[num_hash_threads]; WorkerContext workers[num_hash_threads];
Thread *hash_threads = Thread *hash_threads =
@@ -132,45 +127,19 @@ int main(int argc, char **argv) {
workers[i].arena = arena_create(&params); workers[i].arena = arena_create(&params);
workers[i].file_queue = &file_queue; workers[i].file_queue = &file_queue;
#if USE_IORING
if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker_ioring, if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker_ioring,
&workers[i]) != 0) { &workers[i]) != 0)
#else
if (thread_create(&hash_threads[i], (ThreadFunc)hash_worker, &workers[i]) !=
0)
#endif
{
fprintf(stderr, "Failed to create hash thread %zu\n", i); fprintf(stderr, "Failed to create hash thread %zu\n", i);
exit(1); exit(1);
} }
} }
// Starting hash threads
// size_t num_hash_threads = num_threads;
//
// WorkerContext workers[num_hash_threads];
// Thread *hash_threads =
// arena_push(&gp_arena, sizeof(Thread) * num_hash_threads, true);
//
// // Check if I/O Ring is available
// bool io_ring_available = false;
// HIORING test_ring = io_ring_init();
// if (test_ring) {
// io_ring_available = true;
// io_ring_cleanup(test_ring);
// // printf("I/O Ring is available, using high-performance async I/O\n");
// } else {
// printf("I/O Ring not available, using buffered I/O\n");
// }
//
// for (size_t i = 0; i < num_hash_threads; ++i) {
// workers[i].arena = arena_create(&params);
// workers[i].file_queue = &file_queue;
//
// // Select the appropriate worker function
// ThreadFunc fn = io_ring_available ? (ThreadFunc)hash_worker_io_ring
// : (ThreadFunc)hash_worker;
//
// if (thread_create(&hash_threads[i], fn, &workers[i]) != 0) {
// fprintf(stderr, "Failed to create hash thread %zu\n", i);
// exit(1);
// }
// }
// Starting progress printing thread // Starting progress printing thread
Thread progress_thread_handle; Thread progress_thread_handle;
if (thread_create(&progress_thread_handle, (ThreadFunc)progress_thread, if (thread_create(&progress_thread_handle, (ThreadFunc)progress_thread,
@@ -265,12 +234,14 @@ int main(int argc, char **argv) {
// ------------------------------- // -------------------------------
// Print summary // Print summary
// ------------------------------- // -------------------------------
#if USE_IORING
uint64_t incomplete = atomic_load(&g_io_ring_fallbacks); uint64_t incomplete = atomic_load(&g_io_ring_fallbacks);
if (incomplete > 0) { if (incomplete > 0) {
printf("\nWARNING: I/O Ring incomplete files: %llu (fallback to buffered " printf("\nWARNING: I/O Ring incomplete files: %llu (fallback to buffered "
"I/O used)\n", "I/O used)\n",
(unsigned long long)incomplete); (unsigned long long)incomplete);
} }
#endif
double total_seconds = timer_elapsed(&total_timer); double total_seconds = timer_elapsed(&total_timer);

View File

@@ -11,12 +11,7 @@
#define XXH_STATIC_LINKING_ONLY #define XXH_STATIC_LINKING_ONLY
#include "xxh_x86dispatch.h" #include "xxh_x86dispatch.h"
// ----------------------------- Config ------------------------------------- #include "config.h"
#define FILE_HASHES_TXT "file_hashes.txt"
#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null
#define MAX_PATHLEN 4096
#define READ_BLOCK (KiB(64))
// ----------------------------- Globals ------------------------------------ // ----------------------------- Globals ------------------------------------
static atomic_uint_fast64_t g_files_found = 0; static atomic_uint_fast64_t g_files_found = 0;
static atomic_uint_fast64_t g_files_hashed = 0; static atomic_uint_fast64_t g_files_hashed = 0;
@@ -24,7 +19,7 @@ static atomic_uint_fast64_t g_bytes_processed = 0;
static atomic_int g_scan_done = 0; static atomic_int g_scan_done = 0;
// ================== OS-agnostic functions abstraction ===================== // ================== OS-agnostic functions abstraction =====================
// ----------------------------- Timer functions -------------- // --------------------- Timer functions ---------------------
typedef struct { typedef struct {
u64 start; u64 start;
u64 now; u64 now;
@@ -71,7 +66,7 @@ double timer_elapsed(HiResTimer *t) {
#endif #endif
// ----------------------------- Get HW info -------------- // ------------------- Get HW info --------------------
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
size_t platform_physical_cores(void) { size_t platform_physical_cores(void) {
@@ -367,7 +362,7 @@ static int parse_paths(char *line, char folders[][MAX_PATHLEN],
return count; return count;
} }
// ----------------------------- File time ------------------------- // ------------------------- File time -------------------------
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
static void format_time(uint64_t t, char *out, size_t out_sz) { static void format_time(uint64_t t, char *out, size_t out_sz) {
if (t == 0) { if (t == 0) {
@@ -382,7 +377,7 @@ static void format_time(uint64_t t, char *out, size_t out_sz) {
strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm);
} }
// ----------------------------- Convert filetime to epoch -------------- // ------------------ Convert filetime to epoch -------------------
static uint64_t filetime_to_epoch(const FILETIME *ft) { static uint64_t filetime_to_epoch(const FILETIME *ft) {
ULARGE_INTEGER ull; ULARGE_INTEGER ull;
ull.LowPart = ft->dwLowDateTime; ull.LowPart = ft->dwLowDateTime;
@@ -433,7 +428,7 @@ void platform_get_file_times(const char *path, uint64_t *out_created,
#endif #endif
// ----------------------------- File owner --------------------- // -------------------- File owner ---------------------
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
static void get_file_owner(const char *path, char *out, size_t out_sz) { static void get_file_owner(const char *path, char *out, size_t out_sz) {
PSID sid = NULL; PSID sid = NULL;
@@ -781,7 +776,7 @@ static void xxh3_hash_file_stream(const char *path, char *out_hex,
// ------------------------- Hash worker -------------------------------- // ------------------------- Hash worker --------------------------------
static THREAD_RETURN hash_worker(void *arg) { static THREAD_RETURN hash_worker(void *arg) {
WorkerContext *ctx = (WorkerContext *)arg; WorkerContext *ctx = (WorkerContext *)arg;
unsigned char *buf = (unsigned char *)malloc(READ_BLOCK); void *buf = malloc(READ_BLOCK);
for (;;) { for (;;) {
FileEntry *fe = mpmc_pop(ctx->file_queue); FileEntry *fe = mpmc_pop(ctx->file_queue);
@@ -885,31 +880,23 @@ static THREAD_RETURN progress_thread(void *arg) {
return THREAD_RETURN_VALUE; return THREAD_RETURN_VALUE;
} }
// ======================== Hash worker IO Ring ======================== // ======================== IO Ring implemented ========================
// -------------------------- Configuration --------------------------- #if USE_IORING
#define IORING_BUFFER_SIZE (KiB(256)) // -------------------------- Data structures ---------------------------
#define NUM_BUFFERS_PER_THREAD 32
#define MAX_ACTIVE_FILES 32
#define SUBMIT_TIMEOUT_MS 30000
// #define IORING_DEBUG // Uncomment to print some errors
// Globals // Globals
u64 g_ioring_buffer_size = 4096 * 64; u64 g_ioring_buffer_size = 4096 * 64;
static atomic_uint_fast64_t g_io_ring_fallbacks = 0; static atomic_uint_fast64_t g_io_ring_fallbacks = 0;
// -------------------------- Data structures --------------------------- #define IO_PENDING INT_MIN
typedef struct IoBuffer IoBuffer;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
// Windows I/O Ring types // Windows I/O Ring types
typedef HIORING IoRingHandle; typedef HIORING IoRingHandle;
#define BUILD_READ_RETURN_VALUE HRESULT #define BUILD_READ_RETURN_VALUE HRESULT
typedef struct {
HRESULT ResultCode;
uint32_t Information;
uintptr_t UserData;
} IoRingCQE;
#elif defined(__linux__) #elif defined(__linux__)
// Linux io_uring types // Linux io_uring types
typedef struct { typedef struct {
@@ -923,16 +910,8 @@ typedef IoUring *IoRingHandle;
typedef struct iovec IORING_BUFFER_INFO; typedef struct iovec IORING_BUFFER_INFO;
#define BUILD_READ_RETURN_VALUE int #define BUILD_READ_RETURN_VALUE int
typedef struct {
int ResultCode;
uint32_t Information;
uintptr_t UserData;
} IoRingCQE;
#endif #endif
typedef struct IoBuffer IoBuffer;
typedef struct FileReadContext { typedef struct FileReadContext {
FileEntry *fe; FileEntry *fe;
uint64_t file_size; uint64_t file_size;
@@ -959,6 +938,10 @@ typedef struct FileReadContext {
FileHandle file_handle; FileHandle file_handle;
#if USE_REGISTERED_FILES
uint32_t slot_id;
#endif
bool use_incremental_hash; bool use_incremental_hash;
bool completed; bool completed;
@@ -966,8 +949,6 @@ typedef struct FileReadContext {
} FileReadContext; } FileReadContext;
// -------------------------- Buffer structure --------------------------- // -------------------------- Buffer structure ---------------------------
#define IO_PENDING INT_MIN
typedef struct IoBuffer { typedef struct IoBuffer {
FileReadContext *file; FileReadContext *file;
void *data; void *data;
@@ -985,8 +966,11 @@ typedef struct IoBuffer {
// Thread-local I/O Ring context // Thread-local I/O Ring context
typedef struct ThreadIoContext { typedef struct ThreadIoContext {
IoRingHandle ring; IoRingHandle ring;
#if defined(_WIN32) || defined(_WIN64)
void *completion_event; void *completion_event;
unsigned char *fallback_buffer; #endif
void *fallback_buffer;
IoBuffer buffers[NUM_BUFFERS_PER_THREAD]; IoBuffer buffers[NUM_BUFFERS_PER_THREAD];
int buffer_pool[NUM_BUFFERS_PER_THREAD]; int buffer_pool[NUM_BUFFERS_PER_THREAD];
int free_count; int free_count;
@@ -998,6 +982,11 @@ typedef struct ThreadIoContext {
bool use_registered_buffers; bool use_registered_buffers;
#endif #endif
#if USE_REGISTERED_FILES
bool use_registered_files;
FileHandle registered_handles[MAX_ACTIVE_FILES];
#endif
} ThreadIoContext; } ThreadIoContext;
typedef struct { typedef struct {
@@ -1006,7 +995,13 @@ typedef struct {
uint32_t MaxVersion; uint32_t MaxVersion;
} IoRingCapabilities; } IoRingCapabilities;
// ----------------------------- Async I/O Abstraction ------------------------- typedef struct {
BUILD_READ_RETURN_VALUE ResultCode;
uint32_t Information;
uintptr_t UserData;
} IoRingCQE;
// ------------------------ IO Ring Abstraction -------------------------
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
// Windows I/O Ring functions // Windows I/O Ring functions
@@ -1063,35 +1058,48 @@ static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count,
return SUCCEEDED(hr) ? 0 : -1; return SUCCEEDED(hr) ? 0 : -1;
} }
static int ioring_register_buffers(ThreadIoContext *thread_ctx, static void ioring_register_buffers(ThreadIoContext *thread_ctx,
uint32_t num_buffers, uint32_t num_buffers,
IORING_BUFFER_INFO *buf_info) { IORING_BUFFER_INFO *buf_info) {
HRESULT hr = BuildIoRingRegisterBuffers( HRESULT hr = BuildIoRingRegisterBuffers(
thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER); thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER);
if (FAILED(hr)) { if (FAILED(hr)) {
char error_msg[256]; char error_msg[256];
FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
error_msg, sizeof(error_msg), NULL);
size_t size = FormatMessageA( fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", error_msg,
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | (unsigned int)hr);
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), error_msg,
sizeof(error_msg), NULL);
if (size > 0) {
fprintf(stderr, "Error registering buffers: %s (0x%08X)\n", error_msg,
(unsigned int)hr);
} else {
fprintf(stderr, "Error registering buffers: Unknown HRESULT (0x%08X)\n",
(unsigned int)hr);
}
} }
// Submit registration // Submit registration
ioring_submit(thread_ctx, 0, 0, NULL); ioring_submit(thread_ctx, 0, 0, NULL);
return hr;
} }
#if USE_REGISTERED_FILES
static void ioring_register_files(ThreadIoContext *thread_ctx) {
HRESULT hr = BuildIoRingRegisterFileHandles(
thread_ctx->ring, MAX_ACTIVE_FILES, thread_ctx->registered_handles,
USERDATA_REGISTER);
if (FAILED(hr)) {
char error_msg[256];
FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
error_msg, sizeof(error_msg), NULL);
fprintf(stderr, "WARNING: File registration failed: %s (0x%08X)\n",
error_msg, (unsigned int)hr);
}
thread_ctx->use_registered_files = (hr == 0);
}
#endif
static void ioring_close_event(void *event) { CloseHandle(event); } static void ioring_close_event(void *event) { CloseHandle(event); }
static int close_ioring(ThreadIoContext *thread_ctx) { static int close_ioring(ThreadIoContext *thread_ctx) {
@@ -1107,9 +1115,23 @@ static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx,
uint32_t buffer_id, uint32_t buffer_id,
size_t size, uint64_t offset, size_t size, uint64_t offset,
uintptr_t user_data) { uintptr_t user_data) {
#if USE_REGISTERED_FILES
IORING_HANDLE_REF file_ref;
if (thread_ctx->use_registered_files) {
file_ref = (IORING_HANDLE_REF)IoRingHandleRefFromIndex(file_ctx->slot_id);
} else {
file_ref =
(IORING_HANDLE_REF)IoRingHandleRefFromHandle(file_ctx->file_handle);
}
#else
IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->file_handle); IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->file_handle);
#endif
IORING_BUFFER_REF buffer_ref = IORING_BUFFER_REF buffer_ref =
IoRingBufferRefFromIndexAndOffset(buffer_id, 0); IoRingBufferRefFromIndexAndOffset(buffer_id, 0);
HRESULT hr = HRESULT hr =
BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref, BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref,
(uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE); (uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE);
@@ -1229,9 +1251,9 @@ static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) {
#define MAKE_BUF_INFO(a, l) \ #define MAKE_BUF_INFO(a, l) \
(IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) } (IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) }
static int ioring_register_buffers(ThreadIoContext *thread_ctx, static void ioring_register_buffers(ThreadIoContext *thread_ctx,
uint32_t num_buffers, uint32_t num_buffers,
IORING_BUFFER_INFO *buf_info) { IORING_BUFFER_INFO *buf_info) {
IoUring *impl = (IoUring *)thread_ctx->ring; IoUring *impl = (IoUring *)thread_ctx->ring;
int hr = io_uring_register_buffers(&impl->ring, buf_info, num_buffers); int hr = io_uring_register_buffers(&impl->ring, buf_info, num_buffers);
@@ -1246,6 +1268,7 @@ static int ioring_register_buffers(ThreadIoContext *thread_ctx,
"(ENOMEM).\n" "(ENOMEM).\n"
"Increase the limit to solve this warning.\n"); "Increase the limit to solve this warning.\n");
// TODO: document this in read me
// The memlock limit in Linux restricts the amount of memory a process can // The memlock limit in Linux restricts the amount of memory a process can
// "lock" into physical RAM using the mlock() family of system calls. This // "lock" into physical RAM using the mlock() family of system calls. This
// prevents the operating system from swapping that memory out to disk. // prevents the operating system from swapping that memory out to disk.
@@ -1293,7 +1316,19 @@ static int ioring_register_buffers(ThreadIoContext *thread_ctx,
} }
thread_ctx->use_registered_buffers = (hr == 0); thread_ctx->use_registered_buffers = (hr == 0);
return hr == 0 ? 0 : -1; }
static void ioring_register_files(ThreadIoContext *thread_ctx) {
IoUring *impl = (IoUring *)thread_ctx->ring;
int hr = io_uring_register_files(&impl->ring, thread_ctx->registered_handles,
MAX_ACTIVE_FILES);
if (hr < 0) {
fprintf(stderr, "file registeration failed: %s (code: %d)\n", strerror(-hr),
hr);
}
thread_ctx->use_registered_files = (hr == 0);
} }
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count, static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count,
@@ -1435,10 +1470,14 @@ static FileReadContext *fq_push(FileQueue *fq) {
if (fq->count == MAX_ACTIVE_FILES) if (fq->count == MAX_ACTIVE_FILES)
return NULL; return NULL;
FileReadContext *f = &fq->files[fq->tail]; FileReadContext *file = &fq->files[fq->tail];
#if USE_REGISTERED_FILES
file->slot_id = fq->tail;
#endif
fq->tail = (fq->tail + 1) % MAX_ACTIVE_FILES; fq->tail = (fq->tail + 1) % MAX_ACTIVE_FILES;
fq->count++; fq->count++;
return f; return file;
} }
static FileReadContext *fq_peek_tail(FileQueue *fq) { static FileReadContext *fq_peek_tail(FileQueue *fq) {
@@ -1492,7 +1531,7 @@ static ThreadIoContext *ioring_init_thread(void) {
} }
// Initialize buffer pool // Initialize buffer pool
thread_ctx->fallback_buffer = (unsigned char *)malloc(READ_BLOCK); thread_ctx->fallback_buffer = malloc(READ_BLOCK);
IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD]; IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD];
@@ -1528,8 +1567,11 @@ static ThreadIoContext *ioring_init_thread(void) {
thread_ctx->free_count = NUM_BUFFERS_PER_THREAD; thread_ctx->free_count = NUM_BUFFERS_PER_THREAD;
// Register buffers // Register buffers
int hr = ioring_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info);
ioring_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info);
#if USE_REGISTERED_FILES
ioring_register_files(thread_ctx);
#endif
thread_ctx->submitting = true; thread_ctx->submitting = true;
thread_ctx->num_submissions = 0; thread_ctx->num_submissions = 0;
@@ -1579,7 +1621,7 @@ static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) {
ctx->buffer_pool[ctx->free_count++] = buf->buffer_id; ctx->buffer_pool[ctx->free_count++] = buf->buffer_id;
} }
// -------------------------- Submit async read --------------------------- // -------------------------- Build read ---------------------------
static int build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx, static int build_read(ThreadIoContext *thread_ctx, FileReadContext *file_ctx,
IoBuffer *buf, uint64_t offset, size_t size) { IoBuffer *buf, uint64_t offset, size_t size) {
buf->offset = offset; buf->offset = offset;
@@ -1620,7 +1662,8 @@ static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) {
} }
// -------------------- File operations ----------------------- // -------------------- File operations -----------------------
static int init_file(FileReadContext *file, FileEntry *fe) { static int init_file(ThreadIoContext *thread_ctx, FileReadContext *file,
FileEntry *fe) {
memset(file, 0, sizeof(*file)); memset(file, 0, sizeof(*file));
file->fe = fe; file->fe = fe;
@@ -1631,12 +1674,19 @@ static int init_file(FileReadContext *file, FileEntry *fe) {
if (file->file_handle == INVALID_FILE_HANDLE) { if (file->file_handle == INVALID_FILE_HANDLE) {
#ifdef IORING_DEBUG #if IORING_DEBUG_PRINTS
printf("ERROR: Could not open file %s\n", fe->path); printf("ERROR: Could not open file %s\n", fe->path);
#endif #endif
return 0; return 0;
} }
#if (defined(_WIN32) || defined(_WIN64)) && USE_REGISTERED_FILES
if (thread_ctx->use_registered_files) {
thread_ctx->registered_handles[file->slot_id] = file->file_handle;
ioring_register_files(thread_ctx);
}
#endif
// Determine hash method based on file size // Determine hash method based on file size
if (file->file_size > g_ioring_buffer_size) { if (file->file_size > g_ioring_buffer_size) {
file->use_incremental_hash = true; file->use_incremental_hash = true;
@@ -1647,8 +1697,8 @@ static int init_file(FileReadContext *file, FileEntry *fe) {
return 1; return 1;
} }
static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx, static void finalize_file(ThreadIoContext *thread_ctx,
WorkerContext *worker_ctx) { WorkerContext *worker_ctx, FileReadContext *file) {
FileEntry *fe = file->fe; FileEntry *fe = file->fe;
@@ -1669,7 +1719,7 @@ static void finalize_file(FileReadContext *file, ThreadIoContext *thread_ctx,
(unsigned long long)file->single_hash.low64); (unsigned long long)file->single_hash.low64);
} }
} else { } else {
#ifdef IORING_DEBUG #if IORING_DEBUG_PRINTS
printf("WARNING: Fallback for path: %s\n", fe->path); printf("WARNING: Fallback for path: %s\n", fe->path);
#endif #endif
@@ -1739,7 +1789,7 @@ static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq,
} else if (buf->bytes_read == 0 && IORING_SUCCEEDED(buf->result)) { } else if (buf->bytes_read == 0 && IORING_SUCCEEDED(buf->result)) {
file->reads_hashed++; // EOF file->reads_hashed++; // EOF
} else { } else {
finalize_file(file, thread_ctx, worker_ctx); finalize_file(thread_ctx, worker_ctx, file);
file->completed = true; file->completed = true;
} }
@@ -1750,7 +1800,7 @@ static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq,
if (!file->completed && file->active_reads == 0 && if (!file->completed && file->active_reads == 0 &&
file->bytes_hashed >= file->file_size) { file->bytes_hashed >= file->file_size) {
finalize_file(file, thread_ctx, worker_ctx); finalize_file(thread_ctx, worker_ctx, file);
file->completed = true; file->completed = true;
thread_ctx->active_files--; thread_ctx->active_files--;
} }
@@ -1770,7 +1820,7 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq,
for (;;) { for (;;) {
// ---------------- BUILD READS FOR CURRENT FILE ---------------- // BUILD READS FOR CURRENT FILE
if (file) { if (file) {
while (file->next_read_offset < file->file_size) { while (file->next_read_offset < file->file_size) {
@@ -1807,7 +1857,7 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq,
if (IORING_FAILED(hr)) { if (IORING_FAILED(hr)) {
// mark failure and stop this file // mark failure and stop this file
return_buffer(thread_ctx, buf); return_buffer(thread_ctx, buf);
finalize_file(file, thread_ctx, worker_ctx); finalize_file(thread_ctx, worker_ctx, file);
file->completed = true; file->completed = true;
break; break;
} }
@@ -1820,7 +1870,7 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq,
} }
} }
// ---------------- ADD NEW FILE ---------------- // ADD NEW FILE
if (!thread_ctx->submitting) if (!thread_ctx->submitting)
return; return;
@@ -1835,8 +1885,8 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq,
FileReadContext *newfile = fq_push(fq); FileReadContext *newfile = fq_push(fq);
if (!init_file(newfile, fe)) { if (!init_file(thread_ctx, newfile, fe)) {
finalize_file(newfile, thread_ctx, worker_ctx); finalize_file(thread_ctx, worker_ctx, newfile);
newfile->completed = true; newfile->completed = true;
continue; continue;
} }
@@ -1845,6 +1895,7 @@ static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq,
thread_ctx->active_files++; thread_ctx->active_files++;
} }
} }
// -------------------------- Hash worker I/O Ring --------------------------- // -------------------------- Hash worker I/O Ring ---------------------------
static THREAD_RETURN hash_worker_ioring(void *arg) { static THREAD_RETURN hash_worker_ioring(void *arg) {
WorkerContext *worker_ctx = (WorkerContext *)arg; WorkerContext *worker_ctx = (WorkerContext *)arg;
@@ -1877,12 +1928,12 @@ static THREAD_RETURN hash_worker_ioring(void *arg) {
// Process completions // Process completions
process_completions(thread_ctx, &fq); process_completions(thread_ctx, &fq);
// debug #if IORING_DEBUG_STATS
// printf( printf("Free buffers: %d, Submissions: %d, Active files: %d, fq count:
// "Free buffers: %d, Submissions: %d, Active files: %d, fq count: % d\n ", thread_ctx->free_count, thread_ctx->num_submissions,
// %d\n", thread_ctx->free_count, thread_ctx->num_submissions, thread_ctx->active_files,
// thread_ctx->active_files, fq.count); fq.count);
// debug end #endif
// Hash files // Hash files
hash_ready_files(thread_ctx, &fq, worker_ctx); hash_ready_files(thread_ctx, &fq, worker_ctx);
@@ -1897,3 +1948,4 @@ static THREAD_RETURN hash_worker_ioring(void *arg) {
ioring_cleanup_thread(thread_ctx); ioring_cleanup_thread(thread_ctx);
return THREAD_RETURN_VALUE; return THREAD_RETURN_VALUE;
} }
#endif