Implementation of IO Ring in Windows
Fixing the two compilation warnings.
This commit is contained in:
694
platform.c
694
platform.c
@@ -1,7 +1,5 @@
|
||||
#pragma once // ensure that a given header file is included only once in a
|
||||
// single compilation unit
|
||||
#define _CRT_SECURE_NO_WARNINGS
|
||||
|
||||
#include "arena.h"
|
||||
#include "base.h"
|
||||
#include "lf_mpmc.h"
|
||||
@@ -9,7 +7,7 @@
|
||||
#include "arena.c"
|
||||
|
||||
// xxhash include
|
||||
#define XXH_INLINE_ALL
|
||||
#define XXH_STATIC_LINKING_ONLY
|
||||
#include "xxh_x86dispatch.h"
|
||||
|
||||
// ----------------------------- Config -------------------------------------
|
||||
@@ -584,8 +582,6 @@ void scan_folder(const char *base, ScannerContext *ctx) {
|
||||
}
|
||||
|
||||
#elif defined(__linux__)
|
||||
To test
|
||||
Choice 1
|
||||
static int platform_get_file_times_fd(int dir_fd, const char *name,
|
||||
time_t *created, time_t *modified) {
|
||||
struct stat st;
|
||||
@@ -604,9 +600,9 @@ static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner,
|
||||
struct passwd pw;
|
||||
struct passwd *result;
|
||||
char buffer[4096]; // Sufficiently large buffer for passwd data
|
||||
|
||||
|
||||
// Reentrant version (thread-safe)
|
||||
if (getpwuid_r(st.st_uid, &pw, buffer, sizeof(buffer), &result) == 0 &&
|
||||
if (getpwuid_r(st.st_uid, &pw, buffer, sizeof(buffer), &result) == 0 &&
|
||||
result != NULL && result->pw_name != NULL) {
|
||||
strncpy(owner, result->pw_name, owner_size - 1);
|
||||
owner[owner_size - 1] = '\0';
|
||||
@@ -618,170 +614,111 @@ static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner,
|
||||
}
|
||||
return -1;
|
||||
|
||||
void scan_folder(const char *base, ScannerContext *ctx) {
|
||||
PathBuilder pb;
|
||||
path_builder_init(&pb, base);
|
||||
|
||||
void scan_folder(const char *base, ScannerContext *ctx) {
|
||||
PathBuilder pb;
|
||||
path_builder_init(&pb, base);
|
||||
int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW);
|
||||
if (dir_fd == -1)
|
||||
return;
|
||||
|
||||
int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW);
|
||||
if (dir_fd == -1)
|
||||
return;
|
||||
DIR *dir = fdopendir(dir_fd);
|
||||
if (!dir) {
|
||||
close(dir_fd);
|
||||
return;
|
||||
}
|
||||
|
||||
DIR *dir = fdopendir(dir_fd);
|
||||
if (!dir) {
|
||||
close(dir_fd);
|
||||
return;
|
||||
}
|
||||
struct dirent *entry;
|
||||
|
||||
struct dirent *entry;
|
||||
while ((entry = readdir(dir)) != NULL) {
|
||||
if (entry->d_name[0] == '.' &&
|
||||
(entry->d_name[1] == 0 ||
|
||||
(entry->d_name[1] == '.' && entry->d_name[2] == 0)))
|
||||
continue;
|
||||
|
||||
while ((entry = readdir(dir)) != NULL) {
|
||||
if (entry->d_name[0] == '.' &&
|
||||
(entry->d_name[1] == 0 ||
|
||||
(entry->d_name[1] == '.' && entry->d_name[2] == 0)))
|
||||
continue;
|
||||
size_t name_len = strlen(entry->d_name);
|
||||
path_builder_set_filename(&pb, entry->d_name, name_len);
|
||||
|
||||
size_t name_len = strlen(entry->d_name);
|
||||
path_builder_set_filename(&pb, entry->d_name, name_len);
|
||||
|
||||
int file_type = DT_UNKNOWN;
|
||||
int file_type = DT_UNKNOWN;
|
||||
#ifdef _DIRENT_HAVE_D_TYPE
|
||||
file_type = entry->d_type;
|
||||
file_type = entry->d_type;
|
||||
#endif
|
||||
|
||||
// Fast path using d_type
|
||||
if (file_type != DT_UNKNOWN) {
|
||||
if (file_type == DT_LNK)
|
||||
continue; // Skip symlinks
|
||||
// Fast path using d_type
|
||||
if (file_type != DT_UNKNOWN) {
|
||||
if (file_type == DT_LNK)
|
||||
continue; // Skip symlinks
|
||||
|
||||
if (file_type == DT_DIR) {
|
||||
char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false);
|
||||
mpmc_push_work(ctx->dir_queue, dir_path);
|
||||
continue;
|
||||
if (file_type == DT_DIR) {
|
||||
char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false);
|
||||
mpmc_push_work(ctx->dir_queue, dir_path);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (file_type == DT_REG) {
|
||||
atomic_fetch_add(&g_files_found, 1);
|
||||
FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true);
|
||||
|
||||
// Use fstatat for file info
|
||||
struct stat st;
|
||||
if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) {
|
||||
// Convert times using fd variant
|
||||
platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time,
|
||||
&fe->modified_time);
|
||||
platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner,
|
||||
sizeof(fe->owner));
|
||||
fe->size_bytes = (uint64_t)st.st_size;
|
||||
|
||||
// Normalize path
|
||||
char temp_path[MAX_PATHLEN];
|
||||
memcpy(temp_path, pb.buffer,
|
||||
(pb.filename_pos - pb.buffer) + name_len + 1);
|
||||
normalize_path(temp_path);
|
||||
|
||||
fe->path =
|
||||
arena_push(&ctx->path_arena, strlen(temp_path) + 1, false);
|
||||
strcpy(fe->path, temp_path);
|
||||
|
||||
mpmc_push(ctx->file_queue, fe);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (file_type == DT_REG) {
|
||||
atomic_fetch_add(&g_files_found, 1);
|
||||
FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry),
|
||||
true);
|
||||
// Fallback for unknown types
|
||||
struct stat st;
|
||||
if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) {
|
||||
if (S_ISLNK(st.st_mode))
|
||||
continue;
|
||||
|
||||
// Use fstatat for file info
|
||||
struct stat st;
|
||||
if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) {
|
||||
// Convert times using fd variant
|
||||
platform_get_file_times_fd(dir_fd, entry->d_name,
|
||||
&fe->created_time,
|
||||
&fe->modified_time);
|
||||
platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner,
|
||||
sizeof(fe->owner));
|
||||
if (S_ISDIR(st.st_mode)) {
|
||||
char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false);
|
||||
mpmc_push_work(ctx->dir_queue, dir_path);
|
||||
} else if (S_ISREG(st.st_mode)) {
|
||||
atomic_fetch_add(&g_files_found, 1);
|
||||
FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true);
|
||||
|
||||
platform_get_file_times(pb.buffer, &fe->created_time,
|
||||
&fe->modified_time);
|
||||
platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner));
|
||||
fe->size_bytes = (uint64_t)st.st_size;
|
||||
|
||||
// Normalize path
|
||||
char temp_path[MAX_PATHLEN];
|
||||
memcpy(temp_path, pb.buffer,
|
||||
(pb.filename_pos - pb.buffer) + name_len + 1);
|
||||
normalize_path(temp_path);
|
||||
|
||||
fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1,
|
||||
false); strcpy(fe->path, temp_path);
|
||||
fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false);
|
||||
strcpy(fe->path, temp_path);
|
||||
|
||||
mpmc_push(ctx->file_queue, fe);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback for unknown types
|
||||
struct stat st;
|
||||
if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) {
|
||||
if (S_ISLNK(st.st_mode))
|
||||
continue;
|
||||
|
||||
if (S_ISDIR(st.st_mode)) {
|
||||
char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false);
|
||||
mpmc_push_work(ctx->dir_queue, dir_path);
|
||||
} else if (S_ISREG(st.st_mode)) {
|
||||
atomic_fetch_add(&g_files_found, 1);
|
||||
FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry),
|
||||
true);
|
||||
|
||||
platform_get_file_times(pb.buffer, &fe->created_time,
|
||||
&fe->modified_time);
|
||||
platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner));
|
||||
fe->size_bytes = (uint64_t)st.st_size;
|
||||
|
||||
char temp_path[MAX_PATHLEN];
|
||||
memcpy(temp_path, pb.buffer,
|
||||
(pb.filename_pos - pb.buffer) + name_len + 1);
|
||||
normalize_path(temp_path);
|
||||
|
||||
fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1,
|
||||
false); strcpy(fe->path, temp_path);
|
||||
|
||||
mpmc_push(ctx->file_queue, fe);
|
||||
}
|
||||
}
|
||||
closedir(dir); // Closes dir_fd automatically
|
||||
}
|
||||
|
||||
closedir(dir); // Closes dir_fd automatically
|
||||
}
|
||||
|
||||
// Choice 2
|
||||
|
||||
// void scan_folder(const char *base, ScannerContext *ctx) {
|
||||
// PathBuilder pb;
|
||||
// path_builder_init(&pb, base);
|
||||
//
|
||||
// DIR *dir = opendir(base);
|
||||
// if (!dir)
|
||||
// return;
|
||||
//
|
||||
// struct dirent *entry;
|
||||
// struct stat st;
|
||||
//
|
||||
// while ((entry = readdir(dir)) != NULL) {
|
||||
// if (entry->d_name[0] == '.' &&
|
||||
// (entry->d_name[1] == 0 ||
|
||||
// (entry->d_name[1] == '.' && entry->d_name[2] == 0)))
|
||||
// continue;
|
||||
//
|
||||
// size_t name_len = strlen(entry->d_name);
|
||||
// path_builder_set_filename(&pb, entry->d_name, name_len);
|
||||
//
|
||||
// if (lstat(pb.buffer, &st) == 0 && S_ISLNK(st.st_mode))
|
||||
// continue;
|
||||
//
|
||||
// if (stat(pb.buffer, &st) == 0) {
|
||||
// if (S_ISDIR(st.st_mode)) {
|
||||
// char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false);
|
||||
// mpmc_push_work(ctx->dir_queue, dir_path);
|
||||
// } else {
|
||||
// atomic_fetch_add(&g_files_found, 1);
|
||||
//
|
||||
// FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true);
|
||||
//
|
||||
// // Create a temporary copy for normalization
|
||||
// char temp_path[MAX_PATHLEN];
|
||||
// memcpy(temp_path, pb.buffer,
|
||||
// (pb.filename_pos - pb.buffer) + name_len + 1);
|
||||
// normalize_path(temp_path);
|
||||
//
|
||||
// fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false);
|
||||
// strcpy(fe->path, temp_path);
|
||||
//
|
||||
// platform_get_file_times(pb.buffer, &fe->created_time,
|
||||
// &fe->modified_time);
|
||||
// platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner));
|
||||
// fe->size_bytes = (uint64_t)st.st_size;
|
||||
//
|
||||
// mpmc_push(ctx->file_queue, fe);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// closedir(dir);
|
||||
// }
|
||||
|
||||
#endif
|
||||
|
||||
// ------------------------- Scan worker --------------------------------
|
||||
@@ -864,7 +801,7 @@ static THREAD_RETURN hash_worker(void *arg) {
|
||||
return THREAD_RETURN_VALUE;
|
||||
}
|
||||
|
||||
// ----------------------------- Progress display ---------------------------
|
||||
// ------------------------- Progress display ---------------------------
|
||||
static THREAD_RETURN progress_thread(void *arg) {
|
||||
(void)arg; // Unused parameter
|
||||
|
||||
@@ -940,3 +877,472 @@ static THREAD_RETURN progress_thread(void *arg) {
|
||||
|
||||
return THREAD_RETURN_VALUE;
|
||||
}
|
||||
|
||||
// ======================== Hash worker IO Ring ========================
|
||||
// -------------------------- Configuration ---------------------------
|
||||
#define IORING_READ_BLOCK (4096 * 64)
|
||||
#define NUM_BUFFERS_PER_THREAD 16
|
||||
#define SUBMIT_TIMEOUT_MS 30000
|
||||
#define USERDATA_REGISTER 1
|
||||
|
||||
// Global stats
|
||||
static atomic_uint_fast64_t g_io_ring_fallbacks = 0;
|
||||
|
||||
// -------------------------- Buffer structure ---------------------------
|
||||
typedef struct IoBuffer {
|
||||
void *data;
|
||||
uint64_t offset;
|
||||
size_t size;
|
||||
size_t bytes_read;
|
||||
HRESULT result;
|
||||
int buffer_id;
|
||||
int completed;
|
||||
} IoBuffer;
|
||||
|
||||
// ============================================================================
|
||||
// Thread-local I/O Ring context
|
||||
// ============================================================================
|
||||
typedef struct ThreadIoContext {
|
||||
HIORING ring;
|
||||
HANDLE completion_event;
|
||||
IoBuffer buffers[NUM_BUFFERS_PER_THREAD];
|
||||
int buffer_pool[NUM_BUFFERS_PER_THREAD];
|
||||
int free_count;
|
||||
int initialized;
|
||||
} ThreadIoContext;
|
||||
|
||||
// -------------------------- File context ---------------------------
|
||||
typedef struct FileReadContext {
|
||||
HANDLE hFile;
|
||||
uint64_t file_size;
|
||||
XXH3_state_t hash_state;
|
||||
char path[MAX_PATH];
|
||||
|
||||
// Completion tracking
|
||||
int reads_submitted;
|
||||
int reads_completed;
|
||||
int reads_hashed;
|
||||
uint64_t bytes_hashed;
|
||||
int failed_reads;
|
||||
int active_reads;
|
||||
int buffers_ready; // Count of buffers ready to hash
|
||||
|
||||
// For in-order hashing
|
||||
uint64_t next_hash_offset; // Next offset that needs to be hashed
|
||||
uint64_t next_read_offset; // Next offset to submit for reading
|
||||
|
||||
} FileReadContext;
|
||||
|
||||
static _Thread_local ThreadIoContext *g_thread_ctx = NULL;
|
||||
|
||||
// ---------------------- Initialize thread context ---------------------------
|
||||
static ThreadIoContext *io_ring_init_thread(void) {
|
||||
if (g_thread_ctx && g_thread_ctx->initialized) {
|
||||
return g_thread_ctx;
|
||||
}
|
||||
|
||||
if (!g_thread_ctx) {
|
||||
g_thread_ctx = (ThreadIoContext *)calloc(1, sizeof(ThreadIoContext));
|
||||
if (!g_thread_ctx)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Create I/O Ring
|
||||
IORING_CAPABILITIES caps;
|
||||
QueryIoRingCapabilities(&caps);
|
||||
|
||||
UINT32 queue_size = min(4096, caps.MaxSubmissionQueueSize);
|
||||
IORING_CREATE_FLAGS flags = {0};
|
||||
HRESULT hr = CreateIoRing(caps.MaxVersion, flags, queue_size, queue_size * 2,
|
||||
&g_thread_ctx->ring);
|
||||
|
||||
// Create completion event
|
||||
g_thread_ctx->completion_event = CreateEvent(NULL, FALSE, FALSE, NULL);
|
||||
if (g_thread_ctx->completion_event) {
|
||||
SetIoRingCompletionEvent(g_thread_ctx->ring,
|
||||
g_thread_ctx->completion_event);
|
||||
}
|
||||
|
||||
// Initialize buffer pool
|
||||
for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) {
|
||||
|
||||
// 4096 alignment
|
||||
void *ptr = _aligned_malloc(IORING_READ_BLOCK, 4096);
|
||||
if (!ptr) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
g_thread_ctx->buffers[i].data = ptr;
|
||||
|
||||
g_thread_ctx->buffer_pool[i] = i;
|
||||
g_thread_ctx->buffers[i].buffer_id = i;
|
||||
}
|
||||
g_thread_ctx->free_count = NUM_BUFFERS_PER_THREAD;
|
||||
|
||||
IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD];
|
||||
|
||||
for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) {
|
||||
buf_info[i].Address = g_thread_ctx->buffers[i].data;
|
||||
buf_info[i].Length = IORING_READ_BLOCK;
|
||||
}
|
||||
|
||||
HRESULT hb = BuildIoRingRegisterBuffers(
|
||||
g_thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER);
|
||||
|
||||
if (FAILED(hb)) {
|
||||
printf("Buffer registration failed: 0x%lx\n", hb);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Submit registration
|
||||
SubmitIoRing(g_thread_ctx->ring, 0, 0, NULL);
|
||||
|
||||
g_thread_ctx->initialized = 1;
|
||||
|
||||
return g_thread_ctx;
|
||||
}
|
||||
|
||||
static void io_ring_cleanup_thread(void) {
|
||||
if (!g_thread_ctx)
|
||||
return;
|
||||
|
||||
if (g_thread_ctx->completion_event)
|
||||
CloseHandle(g_thread_ctx->completion_event);
|
||||
if (g_thread_ctx->ring)
|
||||
CloseIoRing(g_thread_ctx->ring);
|
||||
for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) {
|
||||
_aligned_free(g_thread_ctx->buffers[i].data);
|
||||
}
|
||||
free(g_thread_ctx);
|
||||
g_thread_ctx = NULL;
|
||||
}
|
||||
|
||||
// ---------------------- Get a free buffer from pool ------------------------
|
||||
static IoBuffer *get_free_buffer(ThreadIoContext *ctx) {
|
||||
|
||||
if (ctx->free_count == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int idx = ctx->buffer_pool[--ctx->free_count];
|
||||
IoBuffer *buf = &ctx->buffers[idx];
|
||||
buf->completed = 0;
|
||||
buf->bytes_read = 0;
|
||||
buf->result = E_PENDING;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) {
|
||||
if (!buf)
|
||||
return;
|
||||
|
||||
ctx->buffer_pool[ctx->free_count++] = buf->buffer_id;
|
||||
}
|
||||
|
||||
// -------------------------- Submit async read ---------------------------
|
||||
static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx,
|
||||
IoBuffer *buf, uint64_t offset, size_t size) {
|
||||
buf->offset = offset;
|
||||
buf->size = size;
|
||||
|
||||
IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->hFile);
|
||||
IORING_BUFFER_REF buffer_ref =
|
||||
IoRingBufferRefFromIndexAndOffset(buf->buffer_id, 0);
|
||||
|
||||
HRESULT hr =
|
||||
BuildIoRingReadFile(ctx->ring, file_ref, buffer_ref, (UINT32)size, offset,
|
||||
(UINT_PTR)buf, IOSQE_FLAGS_NONE);
|
||||
|
||||
if (SUCCEEDED(hr)) {
|
||||
file_ctx->active_reads++;
|
||||
file_ctx->reads_submitted++;
|
||||
} else {
|
||||
buf->completed = 1;
|
||||
return_buffer(ctx, buf);
|
||||
}
|
||||
return hr;
|
||||
}
|
||||
|
||||
// -------------------------- Process completions ---------------------------
|
||||
static int process_completions(ThreadIoContext *ctx,
|
||||
FileReadContext *file_ctx) {
|
||||
IORING_CQE cqe;
|
||||
int processed = 0;
|
||||
|
||||
while (PopIoRingCompletion(ctx->ring, &cqe) == S_OK) {
|
||||
|
||||
if (cqe.UserData == USERDATA_REGISTER || cqe.UserData == 0)
|
||||
continue;
|
||||
|
||||
IoBuffer *buf = (IoBuffer *)cqe.UserData;
|
||||
|
||||
if (buf && !buf->completed) {
|
||||
buf->result = cqe.ResultCode;
|
||||
buf->bytes_read = (DWORD)cqe.Information;
|
||||
buf->completed = 1;
|
||||
|
||||
if (SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) {
|
||||
file_ctx->active_reads--;
|
||||
file_ctx->reads_completed++;
|
||||
file_ctx->buffers_ready++;
|
||||
} else {
|
||||
file_ctx->failed_reads++;
|
||||
file_ctx->active_reads--;
|
||||
file_ctx->reads_completed++;
|
||||
}
|
||||
|
||||
processed++;
|
||||
}
|
||||
}
|
||||
|
||||
return processed;
|
||||
}
|
||||
|
||||
// -------------------- Hash buffer in sequential order -----------------------
|
||||
static int hash_sequential_buffers(ThreadIoContext *ctx,
|
||||
FileReadContext *file_ctx) {
|
||||
int hashed = 0;
|
||||
|
||||
// Keep hashing while the next buffer in sequence is ready
|
||||
while (file_ctx->next_hash_offset < file_ctx->file_size) {
|
||||
// Find the buffer that contains next_hash_offset
|
||||
IoBuffer *found_buf = NULL;
|
||||
|
||||
for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) {
|
||||
IoBuffer *buf = &ctx->buffers[i];
|
||||
if (buf->completed && buf->offset == file_ctx->next_hash_offset) {
|
||||
found_buf = buf;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found_buf)
|
||||
break; // Buffer not ready yet
|
||||
|
||||
// Found the correct buffer in order - hash it
|
||||
if (SUCCEEDED(found_buf->result) && found_buf->bytes_read > 0) {
|
||||
XXH3_128bits_update(&file_ctx->hash_state, found_buf->data,
|
||||
found_buf->bytes_read);
|
||||
atomic_fetch_add(&g_bytes_processed, found_buf->bytes_read);
|
||||
|
||||
// Update bytes_hashed for this file!
|
||||
file_ctx->bytes_hashed += found_buf->bytes_read;
|
||||
file_ctx->reads_hashed++;
|
||||
file_ctx->buffers_ready--;
|
||||
|
||||
// Mark as processed and return buffer to pool
|
||||
found_buf->completed = 0;
|
||||
return_buffer(ctx, found_buf);
|
||||
|
||||
// Move to next offset
|
||||
file_ctx->next_hash_offset += found_buf->size;
|
||||
hashed++;
|
||||
} else if (found_buf->bytes_read == 0 && SUCCEEDED(found_buf->result)) {
|
||||
// Read operation failed with an error code
|
||||
file_ctx->reads_hashed++;
|
||||
file_ctx->buffers_ready--;
|
||||
found_buf->completed = 0;
|
||||
return_buffer(ctx, found_buf);
|
||||
file_ctx->next_hash_offset += found_buf->size;
|
||||
hashed++;
|
||||
} else {
|
||||
// Read failed
|
||||
file_ctx->failed_reads++;
|
||||
file_ctx->reads_hashed++;
|
||||
file_ctx->buffers_ready--;
|
||||
found_buf->completed = 0;
|
||||
return_buffer(ctx, found_buf);
|
||||
file_ctx->next_hash_offset += found_buf->size;
|
||||
hashed++;
|
||||
}
|
||||
}
|
||||
|
||||
return hashed;
|
||||
}
|
||||
|
||||
// ------------- Submit pending reads - fill all free buffers -----------------
|
||||
static int submit_pending_reads(ThreadIoContext *ctx,
|
||||
FileReadContext *file_ctx) {
|
||||
int submitted = 0;
|
||||
|
||||
while (1) {
|
||||
// Check if we have more data to read
|
||||
uint64_t current_offset = file_ctx->next_read_offset;
|
||||
int has_data = (current_offset < file_ctx->file_size);
|
||||
|
||||
if (!has_data)
|
||||
break;
|
||||
|
||||
// Get a free buffer
|
||||
IoBuffer *buf = get_free_buffer(ctx);
|
||||
if (!buf)
|
||||
break;
|
||||
|
||||
size_t remaining = file_ctx->file_size - current_offset;
|
||||
|
||||
size_t bytes_to_read;
|
||||
|
||||
if (remaining >= IORING_READ_BLOCK) {
|
||||
bytes_to_read = IORING_READ_BLOCK;
|
||||
} else {
|
||||
// Round UP to sector size (4096)
|
||||
bytes_to_read = (remaining + 4095) & ~4095;
|
||||
}
|
||||
|
||||
HRESULT hr = submit_read(ctx, file_ctx, buf, current_offset, bytes_to_read);
|
||||
|
||||
if (SUCCEEDED(hr)) {
|
||||
submitted++;
|
||||
|
||||
file_ctx->next_read_offset += bytes_to_read;
|
||||
} else {
|
||||
return_buffer(ctx, buf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return submitted;
|
||||
}
|
||||
|
||||
// -------------------------- Wait for completions ---------------------------
|
||||
static void wait_for_completions(ThreadIoContext *ctx,
|
||||
FileReadContext *file_ctx) {
|
||||
int has_active = (file_ctx->active_reads > 0);
|
||||
|
||||
if (has_active && ctx->completion_event) {
|
||||
WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------- Main parallel hashing function ----------------------
|
||||
static void xxh3_hash_file_parallel(ThreadIoContext *ctx, const char *path,
|
||||
char *out_hex, unsigned char *temp_buffer) {
|
||||
|
||||
// Validate I/O Ring
|
||||
if (!ctx || !ctx->ring) {
|
||||
xxh3_hash_file_stream(path, out_hex, temp_buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
HANDLE hFile = CreateFileA(
|
||||
path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
|
||||
OPEN_EXISTING, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL);
|
||||
|
||||
if (hFile == INVALID_HANDLE_VALUE) {
|
||||
xxh3_hash_file_stream(path, out_hex, temp_buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
LARGE_INTEGER file_size;
|
||||
if (!GetFileSizeEx(hFile, &file_size)) {
|
||||
xxh3_hash_file_stream(path, out_hex, temp_buffer);
|
||||
CloseHandle(hFile);
|
||||
return;
|
||||
}
|
||||
|
||||
FileReadContext file_ctx;
|
||||
memset(&file_ctx, 0, sizeof(file_ctx));
|
||||
file_ctx.hFile = hFile;
|
||||
file_ctx.file_size = file_size.QuadPart;
|
||||
file_ctx.next_hash_offset = 0;
|
||||
file_ctx.next_read_offset = 0;
|
||||
strncpy(file_ctx.path, path, MAX_PATH - 1);
|
||||
file_ctx.path[MAX_PATH - 1] = 0;
|
||||
XXH3_128bits_reset(&file_ctx.hash_state);
|
||||
|
||||
// Submit initial reads
|
||||
submit_pending_reads(ctx, &file_ctx);
|
||||
|
||||
if (file_ctx.reads_submitted > 0) {
|
||||
UINT32 submitted = 0;
|
||||
SubmitIoRing(ctx->ring, 0, 0, &submitted);
|
||||
}
|
||||
|
||||
// Main loop
|
||||
while (file_ctx.reads_hashed < file_ctx.reads_submitted) {
|
||||
// Process completions
|
||||
process_completions(ctx, &file_ctx);
|
||||
|
||||
// Hash buffers in sequential order (critical!)
|
||||
hash_sequential_buffers(ctx, &file_ctx);
|
||||
|
||||
// Submit more reads if needed
|
||||
if (file_ctx.active_reads < NUM_BUFFERS_PER_THREAD &&
|
||||
file_ctx.next_read_offset < file_ctx.file_size) {
|
||||
int new_reads = submit_pending_reads(ctx, &file_ctx);
|
||||
if (new_reads > 0) {
|
||||
UINT32 submitted = 0;
|
||||
SubmitIoRing(ctx->ring, 0, 0, &submitted);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait if nothing to hash and active reads exist
|
||||
if (file_ctx.active_reads > 0 && file_ctx.buffers_ready == 0) {
|
||||
wait_for_completions(ctx, &file_ctx);
|
||||
}
|
||||
}
|
||||
|
||||
// Final verification
|
||||
if (file_ctx.bytes_hashed == file_ctx.file_size &&
|
||||
file_ctx.failed_reads == 0) {
|
||||
XXH128_hash_t h = XXH3_128bits_digest(&file_ctx.hash_state);
|
||||
snprintf(out_hex, HASH_STRLEN, "%016llx%016llx",
|
||||
(unsigned long long)h.high64, (unsigned long long)h.low64);
|
||||
} else {
|
||||
if (file_ctx.bytes_hashed != file_ctx.file_size) {
|
||||
atomic_fetch_add(&g_io_ring_fallbacks, 1);
|
||||
}
|
||||
xxh3_hash_file_stream(path, out_hex, temp_buffer);
|
||||
}
|
||||
|
||||
CloseHandle(hFile);
|
||||
}
|
||||
|
||||
// -------------------------- Hash worker I/O Ring ---------------------------
|
||||
static THREAD_RETURN hash_worker_io_ring(void *arg) {
|
||||
WorkerContext *ctx = (WorkerContext *)arg;
|
||||
unsigned char *temp_buffer = (unsigned char *)malloc(IORING_READ_BLOCK);
|
||||
char hash[HASH_STRLEN];
|
||||
|
||||
if (!temp_buffer)
|
||||
return THREAD_RETURN_VALUE;
|
||||
|
||||
// Initialize I/O Ring for this thread
|
||||
ThreadIoContext *ring_ctx = io_ring_init_thread();
|
||||
if (!ring_ctx || !ring_ctx->ring) {
|
||||
printf("Thread %lu: I/O Ring unavailable, using buffered I/O\n",
|
||||
GetCurrentThreadId());
|
||||
free(temp_buffer);
|
||||
return hash_worker(arg);
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
FileEntry *fe = mpmc_pop(ctx->file_queue);
|
||||
if (!fe)
|
||||
break;
|
||||
|
||||
// Pass the I/O Ring context to the hashing function
|
||||
xxh3_hash_file_parallel(ring_ctx, fe->path, hash, temp_buffer);
|
||||
|
||||
char created[32], modified[32];
|
||||
format_time(fe->created_time, created, sizeof(created));
|
||||
format_time(fe->modified_time, modified, sizeof(modified));
|
||||
|
||||
double size_kib = (double)fe->size_bytes / 1024.0;
|
||||
|
||||
char stack_buf[1024];
|
||||
int len =
|
||||
snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n",
|
||||
hash, fe->path, size_kib, created, modified, fe->owner);
|
||||
|
||||
char *dst = arena_push(&ctx->arena, len, false);
|
||||
memcpy(dst, stack_buf, len);
|
||||
atomic_fetch_add(&g_files_hashed, 1);
|
||||
}
|
||||
|
||||
io_ring_cleanup_thread();
|
||||
free(temp_buffer);
|
||||
|
||||
return THREAD_RETURN_VALUE;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user