Add support for multiple inflight files and one shot hash small files
The IO Ring now supports bashing multiple submissions and can handle multiple files at the same time. Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers
This commit is contained in:
@@ -50,6 +50,8 @@ Fixing user prompt parsing
|
||||
Reorganising the code
|
||||
Improving the scan function
|
||||
|
||||
5.0: Implementing the IO Ring instead of buffered hashing, huge performance gains.
|
||||
5.0: Implementing the IO Ring instead of buffered hashing, huge performance gains. The IO Ring is thread local, uses DMA and direct disk I/O, bypassing the OS cash completely, it supports bashing multiple submissions and can handle multiple files at the same time.
|
||||
Hashing small files using XXH3_128bits() instead of the streaming pipeline(XXH3_128bits_reset(), XXH3_128bits_update(), XXH3_128bits_digest()), this reduses the overhead of creating a state and digest, coupled with the IO Ring it improves the hashing of small files whose size is inferior to the size of IO Ring buffers
|
||||
fixing the xxh_x86dispatch warnings
|
||||
Updating the progress printing function
|
||||
|
||||
|
||||
@@ -87,7 +87,7 @@ int main(int argc, char **argv) {
|
||||
printf(" Selected instruction set: %s\n", get_xxhash_instruction_set());
|
||||
|
||||
// Align IO Ring block size to the system page size
|
||||
u64 g_ioring_read_block = ALIGN_UP_POW2(IORING_READ_BLOCK, g_pagesize);
|
||||
g_ioring_buffer_size = ALIGN_UP_POW2(IORING_BUFFER_SIZE, g_pagesize);
|
||||
// -------------------------------
|
||||
// Scanning and hashing
|
||||
// -------------------------------
|
||||
@@ -120,6 +120,7 @@ int main(int argc, char **argv) {
|
||||
|
||||
// Starting hash threads
|
||||
size_t num_hash_threads = num_threads;
|
||||
// size_t num_hash_threads = 1;
|
||||
|
||||
WorkerContext workers[num_hash_threads];
|
||||
Thread *hash_threads =
|
||||
|
||||
598
platform.c
598
platform.c
@@ -874,14 +874,48 @@ static THREAD_RETURN progress_thread(void *arg) {
|
||||
|
||||
// ======================== Hash worker IO Ring ========================
|
||||
// -------------------------- Configuration ---------------------------
|
||||
#define IORING_BUFFER_SIZE (KiB(512))
|
||||
#define NUM_BUFFERS_PER_THREAD 16
|
||||
#define MAX_ACTIVE_FILES 8
|
||||
#define SUBMIT_TIMEOUT_MS 30000
|
||||
#define USERDATA_REGISTER 1
|
||||
#define IORING_READ_BLOCK (KiB(1024))
|
||||
|
||||
// Globals
|
||||
u64 g_ioring_read_block = 4096 * 64;
|
||||
u64 g_ioring_buffer_size = 4096 * 64;
|
||||
static atomic_uint_fast64_t g_io_ring_fallbacks = 0;
|
||||
|
||||
// -------------------------- File context ---------------------------
|
||||
typedef struct IoBuffer IoBuffer;
|
||||
|
||||
typedef struct FileReadContext {
|
||||
HANDLE hFile;
|
||||
uint64_t file_size;
|
||||
bool use_incremental_hash;
|
||||
union {
|
||||
XXH3_state_t hash_state; // For incremental hash (large files)
|
||||
XXH128_hash_t single_hash; // For single-shot hash (small files)
|
||||
};
|
||||
FileEntry *fe;
|
||||
|
||||
// Completion tracking
|
||||
int reads_submitted;
|
||||
int reads_completed;
|
||||
|
||||
int active_reads;
|
||||
int failed_reads;
|
||||
|
||||
int reads_hashed;
|
||||
uint64_t bytes_hashed;
|
||||
|
||||
// For in-order hashing
|
||||
uint64_t next_hash_offset;
|
||||
uint64_t next_read_offset;
|
||||
|
||||
IoBuffer *head;
|
||||
IoBuffer *tail;
|
||||
|
||||
} FileReadContext;
|
||||
|
||||
// -------------------------- Buffer structure ---------------------------
|
||||
typedef struct IoBuffer {
|
||||
void *data;
|
||||
@@ -891,44 +925,55 @@ typedef struct IoBuffer {
|
||||
HRESULT result;
|
||||
int buffer_id;
|
||||
int completed;
|
||||
|
||||
FileReadContext *file;
|
||||
|
||||
struct IoBuffer *next;
|
||||
} IoBuffer;
|
||||
|
||||
// ============================================================================
|
||||
// Thread-local I/O Ring context
|
||||
// ============================================================================
|
||||
typedef struct ThreadIoContext {
|
||||
HIORING ring;
|
||||
HANDLE completion_event;
|
||||
IoBuffer buffers[NUM_BUFFERS_PER_THREAD];
|
||||
int buffer_pool[NUM_BUFFERS_PER_THREAD];
|
||||
int free_count;
|
||||
int num_submissions;
|
||||
int active_files;
|
||||
int initialized;
|
||||
} ThreadIoContext;
|
||||
|
||||
// -------------------------- File context ---------------------------
|
||||
typedef struct FileReadContext {
|
||||
HANDLE hFile;
|
||||
uint64_t file_size;
|
||||
XXH3_state_t hash_state;
|
||||
char path[MAX_PATH];
|
||||
|
||||
// Completion tracking
|
||||
int reads_submitted;
|
||||
int reads_completed;
|
||||
int reads_hashed;
|
||||
uint64_t bytes_hashed;
|
||||
int failed_reads;
|
||||
int active_reads;
|
||||
int buffers_ready; // Count of buffers ready to hash
|
||||
|
||||
// For in-order hashing
|
||||
uint64_t next_hash_offset; // Next offset that needs to be hashed
|
||||
uint64_t next_read_offset; // Next offset to submit for reading
|
||||
|
||||
} FileReadContext;
|
||||
|
||||
static _Thread_local ThreadIoContext *g_thread_ctx = NULL;
|
||||
|
||||
typedef struct FileQueue {
|
||||
FileReadContext files[MAX_ACTIVE_FILES];
|
||||
int head;
|
||||
int tail;
|
||||
int count;
|
||||
} FileQueue;
|
||||
|
||||
// ---------------------- FIFO queue operations ---------------------------
|
||||
static FileReadContext *fq_push(FileQueue *fq) {
|
||||
if (fq->count == MAX_ACTIVE_FILES)
|
||||
return NULL;
|
||||
|
||||
FileReadContext *f = &fq->files[fq->tail];
|
||||
fq->tail = (fq->tail + 1) % MAX_ACTIVE_FILES;
|
||||
fq->count++;
|
||||
return f;
|
||||
}
|
||||
|
||||
static FileReadContext *fq_peek(FileQueue *fq) {
|
||||
if (fq->count == 0)
|
||||
return NULL;
|
||||
return &fq->files[fq->head];
|
||||
}
|
||||
|
||||
static void fq_pop(FileQueue *fq) {
|
||||
fq->head = (fq->head + 1) % MAX_ACTIVE_FILES;
|
||||
fq->count--;
|
||||
}
|
||||
|
||||
// ---------------------- Initialize thread context ---------------------------
|
||||
static ThreadIoContext *io_ring_init_thread(void) {
|
||||
if (g_thread_ctx && g_thread_ctx->initialized) {
|
||||
@@ -960,7 +1005,7 @@ static ThreadIoContext *io_ring_init_thread(void) {
|
||||
// Initialize buffer pool
|
||||
IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD];
|
||||
|
||||
u64 buf_pool_size = g_ioring_read_block * NUM_BUFFERS_PER_THREAD;
|
||||
u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD;
|
||||
|
||||
// Reserve and Commit the entire memory chunk
|
||||
void *base_ptr = plat_mem_reserve(buf_pool_size);
|
||||
@@ -975,13 +1020,13 @@ static ThreadIoContext *io_ring_init_thread(void) {
|
||||
|
||||
for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) {
|
||||
|
||||
g_thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_read_block);
|
||||
g_thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size);
|
||||
|
||||
g_thread_ctx->buffer_pool[i] = i;
|
||||
g_thread_ctx->buffers[i].buffer_id = i;
|
||||
|
||||
buf_info[i].Address = g_thread_ctx->buffers[i].data;
|
||||
buf_info[i].Length = (ULONG)g_ioring_read_block;
|
||||
buf_info[i].Length = (ULONG)g_ioring_buffer_size;
|
||||
}
|
||||
|
||||
g_thread_ctx->free_count = NUM_BUFFERS_PER_THREAD;
|
||||
@@ -997,6 +1042,9 @@ static ThreadIoContext *io_ring_init_thread(void) {
|
||||
// Submit registration
|
||||
SubmitIoRing(g_thread_ctx->ring, 0, 0, NULL);
|
||||
|
||||
g_thread_ctx->num_submissions = 0;
|
||||
g_thread_ctx->active_files = 0;
|
||||
|
||||
g_thread_ctx->initialized = 1;
|
||||
|
||||
return g_thread_ctx;
|
||||
@@ -1015,7 +1063,7 @@ static void io_ring_cleanup_thread(void) {
|
||||
g_thread_ctx = NULL;
|
||||
}
|
||||
|
||||
// ---------------------- Get a free buffer from pool ------------------------
|
||||
// ---------------------- Buffer get and return ------------------------
|
||||
static IoBuffer *get_free_buffer(ThreadIoContext *ctx) {
|
||||
|
||||
if (ctx->free_count == 0) {
|
||||
@@ -1043,6 +1091,7 @@ static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx,
|
||||
IoBuffer *buf, uint64_t offset, size_t size) {
|
||||
buf->offset = offset;
|
||||
buf->size = size;
|
||||
buf->file = file_ctx;
|
||||
|
||||
IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->hFile);
|
||||
IORING_BUFFER_REF buffer_ref =
|
||||
@@ -1055,6 +1104,7 @@ static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx,
|
||||
if (SUCCEEDED(hr)) {
|
||||
file_ctx->active_reads++;
|
||||
file_ctx->reads_submitted++;
|
||||
ctx->num_submissions++;
|
||||
} else {
|
||||
buf->completed = 1;
|
||||
return_buffer(ctx, buf);
|
||||
@@ -1062,11 +1112,40 @@ static HRESULT submit_read(ThreadIoContext *ctx, FileReadContext *file_ctx,
|
||||
return hr;
|
||||
}
|
||||
|
||||
// ------------ Link completed buffers in an ordered list -------------
|
||||
static void insert_buffer_ordered(FileReadContext *file, IoBuffer *buf) {
|
||||
buf->next = NULL;
|
||||
|
||||
// empty list
|
||||
if (!file->head) {
|
||||
file->head = file->tail = buf;
|
||||
return;
|
||||
}
|
||||
|
||||
// insert at head
|
||||
if (buf->offset < file->head->offset) {
|
||||
buf->next = file->head;
|
||||
file->head = buf;
|
||||
return;
|
||||
}
|
||||
|
||||
// find position
|
||||
IoBuffer *cur = file->head;
|
||||
|
||||
while (cur->next && cur->next->offset < buf->offset) {
|
||||
cur = cur->next;
|
||||
}
|
||||
|
||||
buf->next = cur->next;
|
||||
cur->next = buf;
|
||||
|
||||
if (!buf->next)
|
||||
file->tail = buf;
|
||||
}
|
||||
|
||||
// -------------------------- Process completions ---------------------------
|
||||
static int process_completions(ThreadIoContext *ctx,
|
||||
FileReadContext *file_ctx) {
|
||||
static void process_completions(ThreadIoContext *ctx, FileQueue *fq) {
|
||||
IORING_CQE cqe;
|
||||
int processed = 0;
|
||||
|
||||
while (PopIoRingCompletion(ctx->ring, &cqe) == S_OK) {
|
||||
|
||||
@@ -1074,273 +1153,294 @@ static int process_completions(ThreadIoContext *ctx,
|
||||
continue;
|
||||
|
||||
IoBuffer *buf = (IoBuffer *)cqe.UserData;
|
||||
FileReadContext *file = buf->file;
|
||||
|
||||
if (buf && !buf->completed) {
|
||||
buf->result = cqe.ResultCode;
|
||||
buf->bytes_read = (DWORD)cqe.Information;
|
||||
buf->completed = 1;
|
||||
buf->result = cqe.ResultCode;
|
||||
buf->bytes_read = (DWORD)cqe.Information;
|
||||
buf->completed = 1;
|
||||
|
||||
if (SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) {
|
||||
file_ctx->active_reads--;
|
||||
file_ctx->reads_completed++;
|
||||
file_ctx->buffers_ready++;
|
||||
} else {
|
||||
file_ctx->failed_reads++;
|
||||
file_ctx->active_reads--;
|
||||
file_ctx->reads_completed++;
|
||||
}
|
||||
file->active_reads--;
|
||||
file->reads_completed++;
|
||||
ctx->num_submissions--;
|
||||
|
||||
processed++;
|
||||
if (SUCCEEDED(cqe.ResultCode) && cqe.Information > 0) {
|
||||
|
||||
buf->next = NULL;
|
||||
|
||||
insert_buffer_ordered(file, buf);
|
||||
|
||||
} else {
|
||||
file->failed_reads++;
|
||||
return_buffer(ctx, buf);
|
||||
}
|
||||
}
|
||||
|
||||
return processed;
|
||||
}
|
||||
|
||||
// -------------------- Hash buffer in sequential order -----------------------
|
||||
static int hash_sequential_buffers(ThreadIoContext *ctx,
|
||||
FileReadContext *file_ctx) {
|
||||
int hashed = 0;
|
||||
// -------------------- File operations -----------------------
|
||||
static int init_file(FileReadContext *f, FileEntry *fe) {
|
||||
memset(f, 0, sizeof(*f));
|
||||
|
||||
// Keep hashing while the next buffer in sequence is ready
|
||||
while (file_ctx->next_hash_offset < file_ctx->file_size) {
|
||||
// Find the buffer that contains next_hash_offset
|
||||
IoBuffer *found_buf = NULL;
|
||||
f->fe = fe;
|
||||
f->file_size = fe->size_bytes;
|
||||
|
||||
for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) {
|
||||
IoBuffer *buf = &ctx->buffers[i];
|
||||
if (buf->completed && buf->offset == file_ctx->next_hash_offset) {
|
||||
found_buf = buf;
|
||||
break;
|
||||
}
|
||||
}
|
||||
f->hFile = CreateFileA(
|
||||
fe->path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
|
||||
OPEN_EXISTING, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL);
|
||||
|
||||
if (!found_buf)
|
||||
break; // Buffer not ready yet
|
||||
|
||||
// Found the correct buffer in order - hash it
|
||||
if (SUCCEEDED(found_buf->result) && found_buf->bytes_read > 0) {
|
||||
XXH3_128bits_update(&file_ctx->hash_state, found_buf->data,
|
||||
found_buf->bytes_read);
|
||||
atomic_fetch_add(&g_bytes_processed, found_buf->bytes_read);
|
||||
|
||||
// Update bytes_hashed for this file!
|
||||
file_ctx->bytes_hashed += found_buf->bytes_read;
|
||||
file_ctx->reads_hashed++;
|
||||
file_ctx->buffers_ready--;
|
||||
|
||||
// Mark as processed and return buffer to pool
|
||||
found_buf->completed = 0;
|
||||
return_buffer(ctx, found_buf);
|
||||
|
||||
// Move to next offset
|
||||
file_ctx->next_hash_offset += found_buf->size;
|
||||
hashed++;
|
||||
} else if (found_buf->bytes_read == 0 && SUCCEEDED(found_buf->result)) {
|
||||
// Read operation failed with an error code
|
||||
file_ctx->reads_hashed++;
|
||||
file_ctx->buffers_ready--;
|
||||
found_buf->completed = 0;
|
||||
return_buffer(ctx, found_buf);
|
||||
file_ctx->next_hash_offset += found_buf->size;
|
||||
hashed++;
|
||||
} else {
|
||||
// Read failed
|
||||
file_ctx->failed_reads++;
|
||||
file_ctx->reads_hashed++;
|
||||
file_ctx->buffers_ready--;
|
||||
found_buf->completed = 0;
|
||||
return_buffer(ctx, found_buf);
|
||||
file_ctx->next_hash_offset += found_buf->size;
|
||||
hashed++;
|
||||
}
|
||||
if (f->hFile == INVALID_HANDLE_VALUE) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return hashed;
|
||||
// Determine hash method based on file size
|
||||
if (f->file_size > g_ioring_buffer_size) {
|
||||
f->use_incremental_hash = true;
|
||||
XXH3_128bits_reset(&f->hash_state);
|
||||
} else {
|
||||
f->use_incremental_hash = false;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void finalize_file(FileReadContext *file, WorkerContext *ctx) {
|
||||
|
||||
FileEntry *fe = file->fe;
|
||||
|
||||
char hash[HASH_STRLEN];
|
||||
|
||||
if (file->failed_reads == 0 && file->bytes_hashed == file->file_size) {
|
||||
if (file->use_incremental_hash) {
|
||||
// Large file: digest the accumulated hash state
|
||||
XXH128_hash_t h = XXH3_128bits_digest(&file->hash_state);
|
||||
snprintf(hash, HASH_STRLEN, "%016llx%016llx",
|
||||
(unsigned long long)h.high64, (unsigned long long)h.low64);
|
||||
} else {
|
||||
// Small file: hash already computed, stored directly in single_hash
|
||||
snprintf(hash, HASH_STRLEN, "%016llx%016llx",
|
||||
(unsigned long long)file->single_hash.high64,
|
||||
(unsigned long long)file->single_hash.low64);
|
||||
}
|
||||
} else {
|
||||
atomic_fetch_add(&g_io_ring_fallbacks, 1);
|
||||
xxh3_hash_file_stream(fe->path, hash, NULL);
|
||||
printf("Fallback for path: %s\n", fe->path);
|
||||
}
|
||||
|
||||
char created[32], modified[32];
|
||||
format_time(fe->created_time, created, sizeof(created));
|
||||
format_time(fe->modified_time, modified, sizeof(modified));
|
||||
|
||||
double size_kib = (double)fe->size_bytes / 1024.0;
|
||||
|
||||
char stack_buf[1024];
|
||||
int len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n",
|
||||
hash, fe->path, size_kib, created, modified, fe->owner);
|
||||
|
||||
char *dst = arena_push(&ctx->arena, len, false);
|
||||
memcpy(dst, stack_buf, len);
|
||||
|
||||
atomic_fetch_add(&g_files_hashed, 1);
|
||||
}
|
||||
|
||||
// -------------------- Hash head file -----------------------
|
||||
static void hash_head_file(ThreadIoContext *ctx, FileQueue *fq,
|
||||
WorkerContext *wctx) {
|
||||
|
||||
FileReadContext *file = fq_peek(fq);
|
||||
if (!file)
|
||||
return;
|
||||
|
||||
while (file->head) {
|
||||
|
||||
IoBuffer *buf = file->head;
|
||||
|
||||
// Check ordering
|
||||
if (buf->offset != file->bytes_hashed)
|
||||
return;
|
||||
|
||||
// Consume
|
||||
file->head = buf->next;
|
||||
if (!file->head)
|
||||
file->tail = NULL;
|
||||
|
||||
// Calculate actual bytes to hash (handle last partial sector)
|
||||
size_t bytes_to_hash = buf->bytes_read;
|
||||
|
||||
// If this is the last buffer and we read beyond file size, trim it
|
||||
if (buf->offset + buf->bytes_read > file->file_size) {
|
||||
bytes_to_hash = file->file_size - buf->offset;
|
||||
}
|
||||
|
||||
if (bytes_to_hash > 0) {
|
||||
if (file->use_incremental_hash) {
|
||||
// Large file: update incremental hash state
|
||||
XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash);
|
||||
} else {
|
||||
// Small file: single-shot hash
|
||||
file->single_hash = XXH3_128bits(buf->data, bytes_to_hash);
|
||||
}
|
||||
|
||||
file->bytes_hashed += bytes_to_hash;
|
||||
atomic_fetch_add(&g_bytes_processed, bytes_to_hash);
|
||||
}
|
||||
|
||||
return_buffer(ctx, buf);
|
||||
}
|
||||
|
||||
// Finalize
|
||||
if (file->active_reads == 0 && file->bytes_hashed >= file->file_size) {
|
||||
finalize_file(file, wctx);
|
||||
CloseHandle(file->hFile);
|
||||
fq_pop(fq);
|
||||
ctx->active_files--;
|
||||
}
|
||||
}
|
||||
|
||||
// ------------- Submit pending reads - fill all free buffers -----------------
|
||||
static int submit_pending_reads(ThreadIoContext *ctx,
|
||||
FileReadContext *file_ctx) {
|
||||
int submitted = 0;
|
||||
static void submit_pending_reads(ThreadIoContext *ctx, FileQueue *fq,
|
||||
WorkerContext *worker_ctx, int *submitting) {
|
||||
|
||||
while (1) {
|
||||
// Check if we have more data to read
|
||||
uint64_t current_offset = file_ctx->next_read_offset;
|
||||
int has_data = (current_offset < file_ctx->file_size);
|
||||
MPMCQueue *file_queue = worker_ctx->file_queue;
|
||||
|
||||
if (!has_data)
|
||||
break;
|
||||
// Try to submit reads for the current head file
|
||||
FileReadContext *f = fq_peek(fq);
|
||||
|
||||
// Get a free buffer
|
||||
IoBuffer *buf = get_free_buffer(ctx);
|
||||
if (!buf)
|
||||
break;
|
||||
for (;;) {
|
||||
|
||||
size_t remaining = file_ctx->file_size - current_offset;
|
||||
if (f) {
|
||||
while (f->next_read_offset < f->file_size) {
|
||||
|
||||
size_t bytes_to_read;
|
||||
IoBuffer *buf = get_free_buffer(ctx);
|
||||
if (!buf)
|
||||
return;
|
||||
|
||||
if (remaining >= g_ioring_read_block) {
|
||||
bytes_to_read = g_ioring_read_block;
|
||||
} else {
|
||||
// Round UP to sector size (4096)
|
||||
bytes_to_read = ALIGN_UP_POW2(remaining, g_pagesize);
|
||||
}
|
||||
size_t remaining = f->file_size - f->next_read_offset;
|
||||
size_t size;
|
||||
|
||||
HRESULT hr = submit_read(ctx, file_ctx, buf, current_offset, bytes_to_read);
|
||||
// Check if this is the last read and the file size is not
|
||||
// sector-aligned
|
||||
BOOL is_last_read = (remaining <= g_ioring_buffer_size);
|
||||
|
||||
if (SUCCEEDED(hr)) {
|
||||
submitted++;
|
||||
if (remaining >= g_ioring_buffer_size) {
|
||||
// Normal full read
|
||||
size = g_ioring_buffer_size;
|
||||
} else {
|
||||
// Last read - handle partial sector
|
||||
if (remaining % g_pagesize != 0) {
|
||||
size = ALIGN_UP_POW2(remaining, g_pagesize);
|
||||
|
||||
file_ctx->next_read_offset += bytes_to_read;
|
||||
} else {
|
||||
return_buffer(ctx, buf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
size = remaining;
|
||||
}
|
||||
}
|
||||
|
||||
return submitted;
|
||||
}
|
||||
HRESULT hr = submit_read(ctx, f, buf, f->next_read_offset, size);
|
||||
|
||||
// -------------------------- Wait for completions ---------------------------
|
||||
static void wait_for_completions(ThreadIoContext *ctx,
|
||||
FileReadContext *file_ctx) {
|
||||
int has_active = (file_ctx->active_reads > 0);
|
||||
if (FAILED(hr)) {
|
||||
return_buffer(ctx, buf);
|
||||
f->failed_reads++;
|
||||
f->active_reads = 0;
|
||||
f->reads_submitted = 0;
|
||||
f->next_read_offset = f->file_size;
|
||||
break;
|
||||
}
|
||||
|
||||
if (has_active && ctx->completion_event) {
|
||||
WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
f->next_read_offset += size;
|
||||
|
||||
// ---------------------- Main parallel hashing function ----------------------
|
||||
static void xxh3_hash_file_parallel(ThreadIoContext *ctx, const char *path,
|
||||
char *out_hex, unsigned char *temp_buffer) {
|
||||
|
||||
// Validate I/O Ring
|
||||
if (!ctx || !ctx->ring) {
|
||||
xxh3_hash_file_stream(path, out_hex, temp_buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
HANDLE hFile = CreateFileA(
|
||||
path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
|
||||
OPEN_EXISTING, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL);
|
||||
|
||||
if (hFile == INVALID_HANDLE_VALUE) {
|
||||
xxh3_hash_file_stream(path, out_hex, temp_buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
LARGE_INTEGER file_size;
|
||||
if (!GetFileSizeEx(hFile, &file_size)) {
|
||||
xxh3_hash_file_stream(path, out_hex, temp_buffer);
|
||||
CloseHandle(hFile);
|
||||
return;
|
||||
}
|
||||
|
||||
FileReadContext file_ctx;
|
||||
memset(&file_ctx, 0, sizeof(file_ctx));
|
||||
file_ctx.hFile = hFile;
|
||||
file_ctx.file_size = file_size.QuadPart;
|
||||
file_ctx.next_hash_offset = 0;
|
||||
file_ctx.next_read_offset = 0;
|
||||
strncpy(file_ctx.path, path, MAX_PATH - 1);
|
||||
file_ctx.path[MAX_PATH - 1] = 0;
|
||||
XXH3_128bits_reset(&file_ctx.hash_state);
|
||||
|
||||
// Submit initial reads
|
||||
submit_pending_reads(ctx, &file_ctx);
|
||||
|
||||
if (file_ctx.reads_submitted > 0) {
|
||||
UINT32 submitted = 0;
|
||||
SubmitIoRing(ctx->ring, 0, 0, &submitted);
|
||||
}
|
||||
|
||||
// Main loop
|
||||
while (file_ctx.reads_hashed < file_ctx.reads_submitted) {
|
||||
// Process completions
|
||||
process_completions(ctx, &file_ctx);
|
||||
|
||||
// Hash buffers in sequential order (critical!)
|
||||
hash_sequential_buffers(ctx, &file_ctx);
|
||||
|
||||
// Submit more reads if needed
|
||||
if (file_ctx.active_reads < NUM_BUFFERS_PER_THREAD &&
|
||||
file_ctx.next_read_offset < file_ctx.file_size) {
|
||||
int new_reads = submit_pending_reads(ctx, &file_ctx);
|
||||
if (new_reads > 0) {
|
||||
UINT32 submitted = 0;
|
||||
SubmitIoRing(ctx->ring, 0, 0, &submitted);
|
||||
if (ctx->num_submissions >= NUM_BUFFERS_PER_THREAD)
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait if nothing to hash and active reads exist
|
||||
if (file_ctx.active_reads > 0 && file_ctx.buffers_ready == 0) {
|
||||
wait_for_completions(ctx, &file_ctx);
|
||||
// Add new file if possible
|
||||
if (!*submitting)
|
||||
return;
|
||||
|
||||
if (ctx->active_files >= MAX_ACTIVE_FILES)
|
||||
return;
|
||||
|
||||
FileEntry *fe = mpmc_pop(file_queue);
|
||||
if (!fe) {
|
||||
*submitting = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
FileReadContext *newf = fq_push(fq);
|
||||
|
||||
if (!init_file(newf, fe)) {
|
||||
// File can't be opened with NO_BUFFERING, process with fallback
|
||||
char hash[HASH_STRLEN];
|
||||
finalize_file(newf, worker_ctx);
|
||||
fq_pop(fq);
|
||||
continue;
|
||||
}
|
||||
|
||||
f = newf;
|
||||
ctx->active_files++;
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------- Wait for completions ---------------------------
|
||||
static void wait_for_completions(ThreadIoContext *ctx) {
|
||||
|
||||
// If there are in-flight I/O requests → wait for completion
|
||||
if (ctx->num_submissions > 0) {
|
||||
WaitForSingleObject(ctx->completion_event, SUBMIT_TIMEOUT_MS);
|
||||
return;
|
||||
}
|
||||
|
||||
// Final verification
|
||||
if (file_ctx.bytes_hashed == file_ctx.file_size &&
|
||||
file_ctx.failed_reads == 0) {
|
||||
XXH128_hash_t h = XXH3_128bits_digest(&file_ctx.hash_state);
|
||||
snprintf(out_hex, HASH_STRLEN, "%016llx%016llx",
|
||||
(unsigned long long)h.high64, (unsigned long long)h.low64);
|
||||
} else {
|
||||
if (file_ctx.bytes_hashed != file_ctx.file_size) {
|
||||
atomic_fetch_add(&g_io_ring_fallbacks, 1);
|
||||
}
|
||||
xxh3_hash_file_stream(path, out_hex, temp_buffer);
|
||||
}
|
||||
|
||||
CloseHandle(hFile);
|
||||
// Sleep(1);
|
||||
}
|
||||
|
||||
// -------------------------- Hash worker I/O Ring ---------------------------
|
||||
static THREAD_RETURN hash_worker_io_ring(void *arg) {
|
||||
WorkerContext *ctx = (WorkerContext *)arg;
|
||||
unsigned char *temp_buffer = (unsigned char *)malloc(READ_BLOCK);
|
||||
char hash[HASH_STRLEN];
|
||||
|
||||
if (!temp_buffer)
|
||||
return THREAD_RETURN_VALUE;
|
||||
|
||||
// Initialize I/O Ring for this thread
|
||||
// Init IO ring
|
||||
ThreadIoContext *ring_ctx = io_ring_init_thread();
|
||||
if (!ring_ctx || !ring_ctx->ring) {
|
||||
printf("Thread %lu: I/O Ring unavailable, using buffered I/O\n",
|
||||
GetCurrentThreadId());
|
||||
free(temp_buffer);
|
||||
return hash_worker(arg);
|
||||
}
|
||||
|
||||
// Initialize pipeline state
|
||||
FileQueue fq;
|
||||
memset(&fq, 0, sizeof(fq));
|
||||
|
||||
int submitting = 1;
|
||||
|
||||
// Main pipeline loop
|
||||
for (;;) {
|
||||
FileEntry *fe = mpmc_pop(ctx->file_queue);
|
||||
if (!fe)
|
||||
|
||||
// 1. Submit new reads
|
||||
submit_pending_reads(ring_ctx, &fq, ctx, &submitting);
|
||||
|
||||
UINT32 submitted = 0;
|
||||
SubmitIoRing(ring_ctx->ring, 0, 0, &submitted);
|
||||
|
||||
// 5. Avoid busy witing
|
||||
wait_for_completions(ring_ctx);
|
||||
|
||||
// 2. Process completions
|
||||
process_completions(ring_ctx, &fq);
|
||||
|
||||
// 3. Hash files
|
||||
for (int i = 0; i < fq.count; i++) {
|
||||
hash_head_file(ring_ctx, &fq, ctx);
|
||||
}
|
||||
|
||||
// debug
|
||||
// printf("Free buffers: %d, Submissions: %d, Active files: %d\n",
|
||||
// ring_ctx->free_count, ring_ctx->num_submissions,
|
||||
// ring_ctx->active_files);
|
||||
|
||||
// 4. Exit condition
|
||||
if (!submitting && ring_ctx->active_files == 0 &&
|
||||
ring_ctx->num_submissions == 0) {
|
||||
break;
|
||||
|
||||
// Pass the I/O Ring context to the hashing function
|
||||
xxh3_hash_file_parallel(ring_ctx, fe->path, hash, temp_buffer);
|
||||
|
||||
char created[32], modified[32];
|
||||
format_time(fe->created_time, created, sizeof(created));
|
||||
format_time(fe->modified_time, modified, sizeof(modified));
|
||||
|
||||
double size_kib = (double)fe->size_bytes / 1024.0;
|
||||
|
||||
char stack_buf[1024];
|
||||
int len =
|
||||
snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n",
|
||||
hash, fe->path, size_kib, created, modified, fe->owner);
|
||||
|
||||
char *dst = arena_push(&ctx->arena, len, false);
|
||||
memcpy(dst, stack_buf, len);
|
||||
atomic_fetch_add(&g_files_hashed, 1);
|
||||
}
|
||||
}
|
||||
|
||||
io_ring_cleanup_thread();
|
||||
free(temp_buffer);
|
||||
|
||||
return THREAD_RETURN_VALUE;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user