Reworking process_completion() function

This commit is contained in:
2026-05-02 16:17:17 +01:00
parent fb83c3114f
commit 73aa4808f2
5 changed files with 269 additions and 164 deletions

View File

@@ -5,6 +5,7 @@
#include "lf_mpmc.h"
#include "arena.c"
#include <stdint.h>
#include <stdio.h>
// xxhash include
@@ -1149,12 +1150,6 @@ typedef struct {
uint32_t MaxVersion;
} IoRingCapabilities;
typedef struct {
BUILD_READ_RETURN_VALUE ResultCode;
uint32_t Information;
uintptr_t UserData;
} IoRingCQE;
// ------------------------ IO Ring Abstraction -------------------------
#if defined(_WIN32) || defined(_WIN64)
@@ -1192,13 +1187,20 @@ static int close_ioring(ThreadIoContext *thread_ctx) {
#define MAKE_BUF_INFO(a, l) \
(IORING_BUFFER_INFO) { .Address = (a), .Length = (uint32_t)(l) }
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count,
uint32_t *submitted) {
HRESULT hr = SubmitIoRing(thread_ctx->ring, 0, SUBMIT_TIMEOUT_MS, submitted);
// HRESULT hr = SubmitIoRing(ring, wait_count, SUBMIT_TIMEOUT_MS, submitted);
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) {
// The wait_count in windows is not implemented yet, so we wait with a
// completion event for a single completion
// uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
// The wait_count in windows is not implemented yet, so we wait in
// ioring_pop_completion()
HRESULT hr;
// if (wait_count > 0) {
// hr = SubmitIoRing(ring, wait_count, SUBMIT_TIMEOUT_MS, submitted);
// } else {
hr = SubmitIoRing(thread_ctx->ring, 0, SUBMIT_TIMEOUT_MS, submitted);
// }
if (thread_ctx->num_submissions > 0) {
WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS);
}
@@ -1222,7 +1224,7 @@ static void ioring_register_buffers(ThreadIoContext *thread_ctx,
error_msg, (unsigned int)hr);
}
// Submit registration
ioring_submit(thread_ctx, 0, NULL);
ioring_submit(thread_ctx, NULL);
}
#if USE_REGISTERED_FILES
@@ -1293,52 +1295,69 @@ static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx,
return hr;
}
static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) {
IORING_CQE win_cqe;
static void ioring_process_completions(ThreadIoContext *thread_ctx) {
uint32_t waited = 0;
uint32_t target = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
while (1) {
HRESULT hr = PopIoRingCompletion(ring, &win_cqe);
while (waited < target) {
if (hr == S_FALSE)
// No CQE available
return 0;
// ---- Drain all available CQEs (non-blocking) ----
while (1) {
IORING_CQE win_cqe;
if (FAILED(hr))
return -1;
HRESULT hr = PopIoRingCompletion(thread_ctx->ring, &win_cqe);
// Unlike linux, The Windows implementation treats buffer and file
// registration as an asynchronous operation that we submit to the ring,
// similar to a read or write. Those operations produce CQEs (completion
// queue entries) that we filter here using
// cqe.UserData == USERDATA_REGISTER
if (win_cqe.UserData == USERDATA_REGISTER)
continue;
cqe->ResultCode = win_cqe.ResultCode;
cqe->Information = win_cqe.Information;
cqe->UserData = win_cqe.UserData;
// Check for error and print warning
if (FAILED(win_cqe.ResultCode)) {
char error_msg[256];
FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, win_cqe.ResultCode, 0, error_msg, sizeof(error_msg),
NULL);
// Try to get the file path from the buffer
IoBuffer *buf = (IoBuffer *)win_cqe.UserData;
const char *file_path = "unknown";
if (buf && buf->file && buf->file->fe) {
file_path = buf->file->fe->path;
if (hr == S_FALSE) {
// No more CQEs available right now
break;
}
fprintf(stderr,
"WARNING: I/O completion error for file '%s' - Error: %s (Code: "
"0x%lx)\n",
file_path, error_msg, win_cqe.ResultCode);
if (FAILED(hr)) {
fprintf(stderr, "WARNING: PopIoRingCompletion failed (0x%lx)\n", hr);
return;
}
// Skip internal registration completions
if (win_cqe.UserData == USERDATA_REGISTER) {
continue;
}
IoBuffer *buf = (IoBuffer *)win_cqe.UserData;
FileReadContext *file = buf->file;
if (SUCCEEDED(win_cqe.ResultCode)) {
buf->result = 0;
buf->bytes_read = win_cqe.Information;
} else {
buf->result = win_cqe.ResultCode;
buf->bytes_read = 0;
char error_msg[256];
FormatMessageA(
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
win_cqe.ResultCode, 0, error_msg, sizeof(error_msg), NULL);
fprintf(stderr,
"WARNING: I/O completion error for file '%s' - Error: %s "
"(Code: 0x%lx)\n",
buf->file->fe->path, error_msg, win_cqe.ResultCode);
}
file->active_reads--;
file->reads_completed++;
thread_ctx->num_submissions--;
// Count only "real" completions toward wait budget
waited++;
}
return 1;
// ---- If we already waited enough, exit ----
if (waited >= target) {
break;
}
// ---- Otherwise wait for more completions ----
WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS);
}
}
@@ -1431,10 +1450,11 @@ static void ioring_register_buffers(ThreadIoContext *thread_ctx,
struct rlimit limit;
getrlimit(RLIMIT_MEMLOCK, &limit);
fprintf(stderr,
"WARNING: Buffer registration failed due to memlock limits "
"(ENOMEM).\n"
"See README for more informations.\n");
fprintf(
stderr,
"WARNING: Buffer registration failed due to Memlock limit, Error: "
"Cannot allocate memory (code: -12, ENOMEM).\n"
"See README for more informations.\n");
} else {
// For any other error (e.g., EFAULT, EBUSY, EINVAL)
@@ -1530,10 +1550,11 @@ static int ioring_build_read(ThreadIoContext *thread_ctx,
return 0;
}
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count,
uint32_t *submitted) {
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) {
int ret;
uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
if (wait_count > 0) {
ret = io_uring_submit_and_wait(&((IoUring *)thread_ctx->ring)->ring,
wait_count);
@@ -1553,58 +1574,46 @@ static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count,
return 0;
}
static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) {
static void ioring_process_completions(ThreadIoContext *thread_ctx) {
IoUring *impl = (IoUring *)thread_ctx->ring;
struct io_uring_cqe *cqe_ptr = NULL;
struct io_uring_cqe *cqes[NUM_BUFFERS_PER_THREAD];
int ret = io_uring_peek_cqe(&((IoUring *)ring)->ring, &cqe_ptr);
unsigned count =
io_uring_peek_batch_cqe(&impl->ring, cqes, NUM_BUFFERS_PER_THREAD);
if (ret == -EAGAIN) {
// No CQE available
return 0;
if (count == 0) {
return;
}
if (ret < 0) {
// Error
fprintf(stderr, "WARNING: io_uring_peek_cqe error - Error: %s (Code: %d)\n",
strerror(-ret), ret);
return -1;
}
for (unsigned i = 0; i < count; i++) {
struct io_uring_cqe *cqe = cqes[i];
if (!cqe_ptr) {
return 0;
}
int res = cqe->res;
int res = cqe_ptr->res;
IoBuffer *buf = (IoBuffer *)cqe->user_data;
FileReadContext *file = buf->file;
if (res >= 0) {
cqe->ResultCode = 0;
cqe->Information = (uint32_t)res;
} else {
cqe->ResultCode = res;
cqe->Information = 0;
}
if (res >= 0) {
buf->result = 0;
buf->bytes_read = (uint32_t)res;
} else {
buf->result = res;
buf->bytes_read = 0;
cqe->UserData = (uintptr_t)cqe_ptr->user_data;
io_uring_cqe_seen(&((IoUring *)ring)->ring, cqe_ptr);
// Check for error and print warning
if (res < 0) {
// Try to get the file path from the buffer
IoBuffer *buf = (IoBuffer *)cqe->UserData;
const char *file_path = "unknown";
if (buf && buf->file && buf->file->fe) {
file_path = buf->file->fe->path;
fprintf(stderr,
"WARNING: I/O completion error for file '%s' - Error: %s (Code: "
"%d)\n",
buf->file->fe->path, strerror(-res), res);
}
fprintf(
stderr,
"WARNING: I/O completion error for file '%s' - Error: %s (Code: %d)\n",
file_path, strerror(-res), res);
file->active_reads--;
file->reads_completed++;
thread_ctx->num_submissions--;
}
return 1;
// Mark CQE as seen, equivalent to io_uring_cqe_seen() but marks multiple CQEs
io_uring_cq_advance(&impl->ring, count);
}
FileHandle ioring_open_file(FileEntry *fe) {
@@ -1790,22 +1799,22 @@ static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) {
}
// -------------------------- Process completions ---------------------------
static void process_completions(ThreadIoContext *thread_ctx) {
IoRingCQE cqe;
while (ioring_pop_completion(thread_ctx->ring, &cqe) == 1) {
IoBuffer *buf = (IoBuffer *)cqe.UserData;
FileReadContext *file = buf->file;
buf->result = cqe.ResultCode;
buf->bytes_read = cqe.Information;
file->active_reads--;
file->reads_completed++;
thread_ctx->num_submissions--;
}
}
// static void process_completions(ThreadIoContext *thread_ctx) {
// IoRingCQE cqe;
//
// while (ioring_pop_completion(thread_ctx->ring, &cqe) == 1) {
//
// IoBuffer *buf = (IoBuffer *)cqe.UserData;
// FileReadContext *file = buf->file;
//
// buf->result = cqe.ResultCode;
// buf->bytes_read = cqe.Information;
//
// file->active_reads--;
// file->reads_completed++;
// thread_ctx->num_submissions--;
// }
// }
// -------------------- File operations -----------------------
static int init_file(ThreadIoContext *thread_ctx, FileReadContext *file,
@@ -2074,8 +2083,7 @@ static THREAD_RETURN hash_worker_ioring(void *arg) {
FileQueue fq;
memset(&fq, 0, sizeof(fq));
uint32_t submitted = 0;
uint32_t wait_count;
uint32_t submitted;
// Main pipeline loop
for (;;) {
@@ -2083,13 +2091,14 @@ static THREAD_RETURN hash_worker_ioring(void *arg) {
// Submit new reads
build_pending_reads(thread_ctx, &fq, worker_ctx);
wait_count = MIN(thread_ctx->num_submissions, NUM_BUFFERS_PER_THREAD - 6);
submitted = 0;
ioring_submit(thread_ctx, wait_count, &submitted);
ioring_submit(thread_ctx, &submitted);
// Process completions
process_completions(thread_ctx);
ioring_process_completions(thread_ctx);
// Hash files
hash_ready_files(thread_ctx, &fq, worker_ctx);
#if IORING_DEBUG_STATS
printf(
@@ -2098,9 +2107,6 @@ static THREAD_RETURN hash_worker_ioring(void *arg) {
thread_ctx->active_files, fq.count);
#endif
// Hash files
hash_ready_files(thread_ctx, &fq, worker_ctx);
// Exit condition
if (!thread_ctx->submitting && thread_ctx->active_files == 0 &&
thread_ctx->num_submissions == 0) {