From 759fdfda1ea475f198a38eb43675ebc33afabc86 Mon Sep 17 00:00:00 2001 From: amir Date: Mon, 4 May 2026 13:39:49 +0100 Subject: [PATCH] Project reordering and mpmc code --- base.h | 61 -- io_ring_test.c => experiments/io_ring_test.c | 0 experiments/io_uring_test.c | 721 ++++++++++++++++++ .../io_uring_test2.c | 2 +- ioringapi.c => experiments/ioringapi.c | 0 lf_mpmc.h | 62 ++ mt_mpmc.h | 246 ++++++ platform.c | 29 +- 8 files changed, 1043 insertions(+), 78 deletions(-) rename io_ring_test.c => experiments/io_ring_test.c (100%) create mode 100644 experiments/io_uring_test.c rename io_uring_test.c => experiments/io_uring_test2.c (99%) rename ioringapi.c => experiments/ioringapi.c (100%) create mode 100644 mt_mpmc.h diff --git a/base.h b/base.h index 3acc537..a7ebdec 100644 --- a/base.h +++ b/base.h @@ -113,9 +113,7 @@ typedef double f64; ------------------------------------------------------------ */ #if defined(_WIN32) || defined(_WIN64) - // Memory allocation - static u32 plat_get_pagesize(void) { SYSTEM_INFO sysinfo = {0}; GetSystemInfo(&sysinfo); @@ -140,43 +138,11 @@ static b32 plat_mem_release(void *ptr, u64 size) { return VirtualFree(ptr, size, MEM_RELEASE); } -// Semaphores -typedef struct plat_sem { - HANDLE handle; -} plat_sem; - -static b32 plat_sem_init(plat_sem *s, u32 initial) { - s->handle = CreateSemaphore(NULL, initial, LONG_MAX, NULL); - return s->handle != NULL; -} - -static void plat_sem_wait(plat_sem *s) { - WaitForSingleObject(s->handle, INFINITE); -} - -// static b32 plat_sem_trywait(HANDLE sem) { // Comment to prevent warning: unused function -// DWORD r = WaitForSingleObject(sem, 0); -// return r == WAIT_OBJECT_0; -// } - -static void plat_sem_post(plat_sem *s, u32 count) { - ReleaseSemaphore(s->handle, count, NULL); -} - -// static void plat_sem_destroy(plat_sem *s) { // Comment to prevent warning: unused function -// if (s->handle) { -// CloseHandle(s->handle); -// s->handle = NULL; -// } -// } - // Sleep static void sleep_ms(int ms) { Sleep(ms); } #elif defined(__linux__) - // Memory allocation - #ifndef _DEFAULT_SOURCE #define _DEFAULT_SOURCE #endif @@ -212,33 +178,6 @@ static b32 plat_mem_release(void *ptr, u64 size) { return ret == 0; } -// Semaphores -#include - -typedef struct plat_sem { - sem_t sem; -} plat_sem; - -static b32 plat_sem_init(plat_sem *s, u32 initial) { - return sem_init(&s->sem, 0, initial) == 0; -} - -static void plat_sem_wait(plat_sem *s) { - while (sem_wait(&s->sem) == -1 && errno == EINTR) { - } -} - -// static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; } // Comment to prevent warning: unused function - -static void plat_sem_post(plat_sem *s, u32 count) { - for (u32 i = 0; i < count; i++) { - sem_post(&s->sem); - } -} - -// static void plat_sem_destroy(plat_sem *s) { sem_destroy(&s->sem); } // Comment to prevent warning: unused function - // Sleep static void sleep_ms(int ms) { usleep(ms * 1000); } - #endif diff --git a/io_ring_test.c b/experiments/io_ring_test.c similarity index 100% rename from io_ring_test.c rename to experiments/io_ring_test.c diff --git a/experiments/io_uring_test.c b/experiments/io_uring_test.c new file mode 100644 index 0000000..32b215a --- /dev/null +++ b/experiments/io_uring_test.c @@ -0,0 +1,721 @@ +/* +# Compile +gcc -o io_uring_test io_uring_test.c -luring + +# Run +./io_uring_test + */ +#include "base.h" +#include +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#define TEST_FILE "test_io_uring.txt" +#define BUFFER_SIZE 4096 +#define NUM_BUFFERS 4 +#define NUM_REGISTERED_FILES 8 // Maximum number of files to register + +// Colors for output +#define COLOR_GREEN "\033[0;32m" +#define COLOR_RED "\033[0;31m" +#define COLOR_YELLOW "\033[0;33m" +#define COLOR_BLUE "\033[0;34m" +#define COLOR_RESET "\033[0m" + +// Test result tracking +typedef struct { + int passed; + int failed; +} TestResults; + +static void print_success(const char *step) { + printf(COLOR_GREEN "[✓] SUCCESS: %s" COLOR_RESET "\n", step); +} + +static void print_failure(const char *step, const char *error) { + printf(COLOR_RED "[✗] FAILED: %s - %s" COLOR_RESET "\n", step, error); +} + +static void print_info(const char *msg) { + printf(COLOR_BLUE "[i] INFO: %s" COLOR_RESET "\n", msg); +} + +static void print_step(const char *step) { + printf(COLOR_YELLOW "\n>>> Testing: %s" COLOR_RESET "\n", step); +} + +// Create a test file with known content +static int create_test_file(const char *filename, const char *content) { + FILE *f = fopen(filename, "w"); + if (!f) { + perror("Failed to create test file"); + return -1; + } + + fprintf(f, "%s", content); + fclose(f); + + printf(" Created test file: %s\n", filename); + return 0; +} + +// Test 1: Create io_uring instance +static int test_io_uring_create(struct io_uring *ring, TestResults *results) { + print_step("io_uring creation"); + + int ret = io_uring_queue_init(256, ring, 0); + if (ret < 0) { + print_failure("io_uring_queue_init", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("io_uring instance created"); + results->passed++; + return 0; +} + +// Test 2: Register buffers +static int test_register_buffers(struct io_uring *ring, void **buffers, + struct iovec *iovs, TestResults *results) { + print_step("Buffer registration"); + + // Allocate and prepare buffers + size_t total_size = BUFFER_SIZE * NUM_BUFFERS; + *buffers = aligned_alloc(4096, total_size); // Page-aligned for O_DIRECT + if (!*buffers) { + print_failure("Buffer allocation", strerror(errno)); + results->failed++; + return -1; + } + + // Initialize iovecs + for (int i = 0; i < NUM_BUFFERS; i++) { + iovs[i].iov_base = (char *)*buffers + (i * BUFFER_SIZE); + iovs[i].iov_len = BUFFER_SIZE; + memset(iovs[i].iov_base, 0, BUFFER_SIZE); + } + + int ret = io_uring_register_buffers(ring, iovs, NUM_BUFFERS); + if (ret < 0) { + print_failure("io_uring_register_buffers", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Buffers registered successfully"); + results->passed++; + return 0; +} + +// Test 2b: Register files +static int test_register_files(struct io_uring *ring, int *fds, int num_fds, + TestResults *results) { + print_step("File registration"); + + if (num_fds == 0) { + print_info("No files to register"); + results->passed++; + return 0; + } + + int ret = io_uring_register_files(ring, fds, num_fds); + if (ret < 0) { + // File registration might not be supported on all kernels + if (ret == -EOPNOTSUPP || ret == -EINVAL) { + print_info("File registration not supported on this kernel, skipping"); + results->passed++; + return 0; + } + print_failure("io_uring_register_files", strerror(-ret)); + results->failed++; + return -1; + } + + printf(" Registered %d files\n", num_fds); + print_success("Files registered successfully"); + results->passed++; + return 0; +} + +// Test 3: Open file +static int test_open_file(const char *filename, int *fd, bool use_direct, + TestResults *results) { + print_step("File opening"); + + // Get file size + struct stat st; + if (stat(filename, &st) != 0) { + print_failure("stat", strerror(errno)); + results->failed++; + return -1; + } + + int page_size = plat_get_pagesize(); + size_t file_size = st.st_size; + + printf(" File: %s\n", filename); + printf(" File size: %zu bytes\n", file_size); + printf(" Page size: %d bytes\n", page_size); + + if (file_size % page_size != 0) { + printf(" Extending read size from %zu to %zu bytes\n", file_size, + ALIGN_UP_POW2(file_size, page_size)); + } + + // Try to open with specified flags + int flags = O_RDONLY; + if (use_direct) { + flags |= O_DIRECT; + } + + *fd = open(filename, flags); + if (*fd < 0) { + if (use_direct) { + print_info("O_DIRECT failed, trying without it"); + *fd = open(filename, O_RDONLY); + if (*fd < 0) { + print_failure("open", strerror(errno)); + results->failed++; + return -1; + } + print_info("Using buffered I/O (O_DIRECT not available)"); + } else { + print_failure("open", strerror(errno)); + results->failed++; + return -1; + } + } else { + const char *io_type = use_direct ? "O_DIRECT" : "buffered I/O"; + printf(" File opened with %s\n", io_type); + print_success("File opened successfully"); + } + + results->passed++; + return 0; +} + +// Test 4: Build and submit read operation (using registered file) +static int test_submit_read_registered(struct io_uring *ring, int file_index, + struct iovec *iovs, int buffer_id, + uint64_t user_data, size_t file_size, + TestResults *results) { + print_step("Building and submitting read operation (registered file)"); + + u32 page_size = plat_get_pagesize(); + size_t read_size = BUFFER_SIZE; + + // For O_DIRECT, ensure read size is sector-aligned + if (read_size > file_size) { + read_size = ALIGN_UP_POW2(file_size, page_size); + printf(" Adjusted read size to %zu bytes for O_DIRECT alignment\n", + read_size); + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("io_uring_get_sqe", "No available SQE"); + results->failed++; + return -1; + } + + // Use fixed file descriptor + io_uring_prep_read_fixed(sqe, file_index, iovs[buffer_id].iov_base, read_size, + 0, buffer_id); + io_uring_sqe_set_data64(sqe, user_data); + + int ret = io_uring_submit(ring); + if (ret < 0) { + print_failure("io_uring_submit", strerror(-ret)); + results->failed++; + return -1; + } + + printf(" Using registered file index: %d\n", file_index); + print_success("Read operation submitted successfully (registered file)"); + results->passed++; + return 0; +} + +// Test 4b: Build and submit read operation (using fd directly) +static int test_submit_read(struct io_uring *ring, int fd, struct iovec *iovs, + int buffer_id, uint64_t user_data, + TestResults *results) { + print_step("Building and submitting read operation"); + + // Get file size for proper alignment + struct stat st; + if (fstat(fd, &st) != 0) { + print_failure("fstat", strerror(errno)); + results->failed++; + return -1; + } + + u32 page_size = plat_get_pagesize(); + size_t file_size = st.st_size; + size_t read_size = BUFFER_SIZE; + + // For O_DIRECT, ensure read size is sector-aligned + if (read_size > file_size) { + read_size = ALIGN_UP_POW2(file_size, page_size); + printf(" Adjusted read size to %zu bytes for O_DIRECT alignment\n", + read_size); + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("io_uring_get_sqe", "No available SQE"); + results->failed++; + return -1; + } + + // Prepare read operation using registered buffer + io_uring_prep_read_fixed(sqe, fd, iovs[buffer_id].iov_base, read_size, 0, + buffer_id); + io_uring_sqe_set_data64(sqe, user_data); + + int ret = io_uring_submit(ring); + if (ret < 0) { + print_failure("io_uring_submit", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Read operation submitted successfully"); + results->passed++; + return 0; +} + +// Test 5: Wait for completion +static int test_wait_completion(struct io_uring *ring, + struct io_uring_cqe **cqe, + TestResults *results) { + print_step("Waiting for completion"); + + int ret = io_uring_wait_cqe(ring, cqe); + if (ret < 0) { + print_failure("io_uring_wait_cqe", strerror(-ret)); + results->failed++; + return -1; + } + + print_success("Completion received"); + results->passed++; + return 0; +} + +// Test 6: Process completion +static int test_process_completion(struct io_uring_cqe *cqe, + uint64_t expected_user_data, + TestResults *results) { + print_step("Processing completion"); + + uint64_t user_data = io_uring_cqe_get_data64(cqe); + int res = cqe->res; + + printf(" Completion data:\n"); + printf(" User data: %lu (expected: %lu)\n", user_data, expected_user_data); + printf(" Result: %d bytes read\n", res); + + if (user_data != expected_user_data) { + print_failure("User data mismatch", + "User data doesn't match expected value"); + results->failed++; + return -1; + } + + if (res < 0) { + print_failure("Read operation", strerror(-res)); + results->failed++; + return -1; + } + + print_success("Completion processed successfully"); + results->passed++; + return res; // Return number of bytes read +} + +// Test 7: Verify read data +static int test_verify_data(struct iovec *iovs, int buffer_id, int bytes_read, + const char *expected_content, + TestResults *results) { + print_step("Data verification"); + + char *data = (char *)iovs[buffer_id].iov_base; + + printf(" Read data (first 200 chars):\n"); + printf(" ---\n"); + for (int i = 0; i < bytes_read && i < 200; i++) { + putchar(data[i]); + } + if (bytes_read > 200) + printf("..."); + printf("\n ---\n"); + + // Check if data is not empty + if (bytes_read == 0) { + print_failure("Data verification", "No data read"); + results->failed++; + return -1; + } + + // Check if data contains expected content + if (expected_content && strstr(data, expected_content) == NULL) { + print_failure("Data verification", "Expected content not found"); + results->failed++; + return -1; + } + + print_success("Data verified successfully"); + results->passed++; + return 0; +} + +// Test 8: Test multiple concurrent reads +static int test_concurrent_reads(struct io_uring *ring, int fd, + struct iovec *iovs, TestResults *results) { + print_step("Concurrent reads test"); + + int num_reads = 3; + int submitted = 0; + + // Submit multiple reads + for (int i = 0; i < num_reads; i++) { + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("Getting SQE for concurrent read", "No available SQE"); + results->failed++; + return -1; + } + + off_t offset = i * 100; // Read from different offsets + io_uring_prep_read_fixed(sqe, fd, iovs[i].iov_base, BUFFER_SIZE, offset, i); + io_uring_sqe_set_data64(sqe, i); + submitted++; + } + + int ret = io_uring_submit(ring); + if (ret != submitted) { + char msg[64]; + snprintf(msg, sizeof(msg), "Expected %d, got %d", submitted, ret); + + print_failure("Submitting concurrent reads", msg); + results->failed++; + return -1; + } + + print_success("Concurrent reads submitted"); + + // Wait for and process completions + for (int i = 0; i < submitted; i++) { + struct io_uring_cqe *cqe; + ret = io_uring_wait_cqe(ring, &cqe); + if (ret < 0) { + print_failure("Waiting for concurrent read completion", strerror(-ret)); + results->failed++; + return -1; + } + + uint64_t user_data = io_uring_cqe_get_data64(cqe); + int res = cqe->res; + + printf(" Concurrent read %lu completed: %d bytes read\n", user_data, res); + io_uring_cqe_seen(ring, cqe); + } + + print_success("Concurrent reads completed successfully"); + results->passed++; + return 0; +} + +// Test 9: Test file registration with multiple files +static int test_file_registration(struct io_uring *ring, TestResults *results) { + print_step("File registration with multiple files"); + + // Create multiple test files + const char *filenames[] = {"test_file1.txt", "test_file2.txt", + "test_file3.txt"}; + const char *contents[] = {"Content of file 1: Hello World!", + "Content of file 2: io_uring is fast!", + "Content of file 3: Registered files test."}; + + int fds[3]; + int num_files = 3; + + // Create and open files + for (int i = 0; i < num_files; i++) { + if (create_test_file(filenames[i], contents[i]) != 0) { + results->failed++; + return -1; + } + + fds[i] = open(filenames[i], O_RDONLY); + if (fds[i] < 0) { + print_failure("Opening file for registration", strerror(errno)); + // Close previously opened files + for (int j = 0; j < i; j++) + close(fds[j]); + results->failed++; + return -1; + } + } + + // Register files + int ret = io_uring_register_files(ring, fds, num_files); + if (ret < 0) { + if (ret == -EOPNOTSUPP || ret == -EINVAL) { + print_info("File registration not supported, skipping test"); + results->passed++; + } else { + print_failure("io_uring_register_files", strerror(-ret)); + results->failed++; + } + // Cleanup + for (int i = 0; i < num_files; i++) { + close(fds[i]); + remove(filenames[i]); + } + return (ret == -EOPNOTSUPP || ret == -EINVAL) ? 0 : -1; + } + + print_success("Multiple files registered successfully"); + + // Read from each registered file using fixed operations + for (int i = 0; i < num_files; i++) { + struct iovec iov; + char buf[256] = {0}; + iov.iov_base = buf; + iov.iov_len = sizeof(buf); + + // Register a single buffer for this test + ret = io_uring_register_buffers(ring, &iov, 1); + if (ret < 0) { + print_failure("Registering buffer for file test", strerror(-ret)); + break; + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) { + print_failure("Getting SQE for registered file", "No available SQE"); + break; + } + + // Use fixed file and fixed buffer + io_uring_prep_read_fixed(sqe, i, iov.iov_base, strlen(contents[i]), 0, 0); + io_uring_sqe_set_data64(sqe, i); + + ret = io_uring_submit(ring); + if (ret < 0) { + print_failure("Submitting read for registered file", strerror(-ret)); + break; + } + + struct io_uring_cqe *cqe; + ret = io_uring_wait_cqe(ring, &cqe); + if (ret < 0) { + print_failure("Waiting for registered file read", strerror(-ret)); + break; + } + + if (cqe->res < 0) { + print_failure("Reading registered file", strerror(-cqe->res)); + io_uring_cqe_seen(ring, cqe); + break; + } + + printf(" File %d: Read %d bytes: %.*s\n", i, cqe->res, cqe->res, buf); + io_uring_cqe_seen(ring, cqe); + + // Unregister buffer for next iteration + io_uring_unregister_buffers(ring); + } + + // Cleanup files + io_uring_unregister_files(ring); + for (int i = 0; i < num_files; i++) { + close(fds[i]); + remove(filenames[i]); + } + + print_success("File registration test completed"); + results->passed++; + return 0; +} + +// Cleanup function +static void cleanup(struct io_uring *ring, int *fds, int num_fds, + void *buffers) { + if (fds) { + io_uring_unregister_files(ring); + for (int i = 0; i < num_fds; i++) { + if (fds[i] >= 0) + close(fds[i]); + } + } + if (buffers) { + io_uring_unregister_buffers(ring); + free(buffers); + } + io_uring_queue_exit(ring); + remove(TEST_FILE); +} + +int main() { + TestResults results = {0, 0}; + struct io_uring ring; + int fd = -1; + int registered_fds[1] = {-1}; // For registered file test + void *buffers = NULL; + struct iovec iovs[NUM_BUFFERS]; + + printf(COLOR_BLUE "\n========================================\n"); + printf(" io_uring Test Suite with File Registration\n"); + printf("========================================\n" COLOR_RESET); + + // Create main test file + const char *test_content = + "Hello, io_uring! This is a test file for async I/O operations.\n" + "Line 2: Testing reads with registered buffers.\n" + "Line 3: The quick brown fox jumps over the lazy dog.\n" + "Line 4: ABCDEFGHIJKLMNOPQRSTUVWXYZ\n" + "Line 5: 0123456789\n"; + + if (create_test_file(TEST_FILE, test_content) != 0) { + return 1; + } + + // Test 1: Create io_uring + if (test_io_uring_create(&ring, &results) != 0) { + cleanup(&ring, NULL, 0, buffers); + return 1; + } + + // Test 2: Register buffers + if (test_register_buffers(&ring, &buffers, iovs, &results) != 0) { + cleanup(&ring, NULL, 0, buffers); + return 1; + } + + // Test 3: Open file + if (test_open_file(TEST_FILE, &fd, true, &results) != 0) { + cleanup(&ring, NULL, 0, buffers); + return 1; + } + + // Test 4: Submit read with direct fd + uint64_t test_user_data = 12345; + if (test_submit_read(&ring, fd, iovs, 0, test_user_data, &results) != 0) { + cleanup(&ring, NULL, 0, buffers); + return 1; + } + + // Test 5: Wait for completion + struct io_uring_cqe *cqe; + if (test_wait_completion(&ring, &cqe, &results) != 0) { + cleanup(&ring, NULL, 0, buffers); + return 1; + } + + // Test 6: Process completion + int bytes_read = test_process_completion(cqe, test_user_data, &results); + if (bytes_read < 0) { + cleanup(&ring, NULL, 0, buffers); + return 1; + } + io_uring_cqe_seen(&ring, cqe); + + // Test 7: Verify data + if (test_verify_data(iovs, 0, bytes_read, "io_uring", &results) != 0) { + cleanup(&ring, NULL, 0, buffers); + return 1; + } + + // Close the file for file registration test + close(fd); + + // Reopen and register the file + registered_fds[0] = open(TEST_FILE, O_RDONLY); + if (registered_fds[0] < 0) { + print_failure("Reopening file for registration", strerror(errno)); + cleanup(&ring, NULL, 0, buffers); + return 1; + } + + // Test 2b: Register files + if (test_register_files(&ring, registered_fds, 1, &results) != 0) { + cleanup(&ring, registered_fds, 1, buffers); + return 1; + } + + // Get file size for the registered read test + struct stat st; + stat(TEST_FILE, &st); + + // Test 4b: Submit read using registered file + test_user_data = 67890; + if (test_submit_read_registered(&ring, 0, iovs, 0, test_user_data, st.st_size, + &results) != 0) { + cleanup(&ring, registered_fds, 1, buffers); + return 1; + } + + // Wait for and process completion + if (test_wait_completion(&ring, &cqe, &results) != 0) { + cleanup(&ring, registered_fds, 1, buffers); + return 1; + } + + bytes_read = test_process_completion(cqe, test_user_data, &results); + if (bytes_read < 0) { + cleanup(&ring, registered_fds, 1, buffers); + return 1; + } + io_uring_cqe_seen(&ring, cqe); + + // Verify data from registered file read + if (test_verify_data(iovs, 0, bytes_read, "io_uring", &results) != 0) { + cleanup(&ring, registered_fds, 1, buffers); + return 1; + } + + // Test 8: Concurrent reads + if (test_concurrent_reads(&ring, registered_fds[0], iovs, &results) != 0) { + cleanup(&ring, registered_fds, 1, buffers); + return 1; + } + + // Test 9: File registration with multiple files (requires new ring) + cleanup(&ring, registered_fds, 1, buffers); + buffers = NULL; + registered_fds[0] = -1; + + if (test_io_uring_create(&ring, &results) != 0) { + return 1; + } + + test_file_registration(&ring, &results); + + // Cleanup the second ring + io_uring_queue_exit(&ring); + + // Print summary + printf(COLOR_BLUE "\n========================================\n"); + printf(" TEST SUMMARY\n"); + printf("========================================\n" COLOR_RESET); + printf(" Total tests: %d\n", results.passed + results.failed); + printf(COLOR_GREEN " Passed: %d\n" COLOR_RESET, results.passed); + if (results.failed > 0) { + printf(COLOR_RED " Failed: %d\n" COLOR_RESET, results.failed); + } else { + printf(COLOR_GREEN " ✓ ALL TESTS PASSED!\n" COLOR_RESET); + } + + return results.failed > 0 ? 1 : 0; +} diff --git a/io_uring_test.c b/experiments/io_uring_test2.c similarity index 99% rename from io_uring_test.c rename to experiments/io_uring_test2.c index 82b7d54..d3eb7d2 100644 --- a/io_uring_test.c +++ b/experiments/io_uring_test2.c @@ -1,6 +1,6 @@ /* # Compile -gcc -o io_uring_test io_uring_test.c -luring +gcc -o io_uring_test io_uring_test2.c -luring # Run ./io_uring_test diff --git a/ioringapi.c b/experiments/ioringapi.c similarity index 100% rename from ioringapi.c rename to experiments/ioringapi.c diff --git a/lf_mpmc.h b/lf_mpmc.h index 05bbd5f..2f52f3b 100644 --- a/lf_mpmc.h +++ b/lf_mpmc.h @@ -2,6 +2,7 @@ #include "base.h" +// Cache align abstraction #define CACHELINE 64 #if defined(_MSC_VER) @@ -10,6 +11,7 @@ #define CACHE_ALIGN __attribute__((aligned(CACHELINE))) #endif +// Compiler hints #if defined(__GNUC__) || defined(__clang__) #define likely(x) __builtin_expect((x), 1) #define unlikely(x) __builtin_expect((x), 0) @@ -24,6 +26,65 @@ static void cpu_pause(void) { #endif } +// Semaphores +#if defined(_WIN32) || defined(_WIN64) +typedef struct plat_sem { + HANDLE handle; +} plat_sem; + +static b32 plat_sem_init(plat_sem *s, u32 initial) { + s->handle = CreateSemaphore(NULL, initial, LONG_MAX, NULL); + return s->handle != NULL; +} + +static void plat_sem_wait(plat_sem *s) { + WaitForSingleObject(s->handle, INFINITE); +} + +// static b32 plat_sem_trywait(HANDLE sem) { // Comment to prevent warning: unused function +// DWORD r = WaitForSingleObject(sem, 0); +// return r == WAIT_OBJECT_0; +// } + +static void plat_sem_post(plat_sem *s, u32 count) { + ReleaseSemaphore(s->handle, count, NULL); +} + +// static void plat_sem_destroy(plat_sem *s) { // Comment to prevent warning: unused function +// if (s->handle) { +// CloseHandle(s->handle); +// s->handle = NULL; +// } +// } + +#elif defined(__linux__) +#include + +typedef struct plat_sem { + sem_t sem; +} plat_sem; + +static b32 plat_sem_init(plat_sem *s, u32 initial) { + return sem_init(&s->sem, 0, initial) == 0; +} + +static void plat_sem_wait(plat_sem *s) { + while (sem_wait(&s->sem) == -1 && errno == EINTR) { + } +} + +// static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; } // Comment to prevent warning: unused function + +static void plat_sem_post(plat_sem *s, u32 count) { + for (u32 i = 0; i < count; i++) { + sem_post(&s->sem); + } +} + +// static void plat_sem_destroy(plat_sem *s) { sem_destroy(&s->sem); } // Comment to prevent warning: unused function + +#endif + typedef struct plat_sem plat_sem; typedef struct CACHE_ALIGN { @@ -235,6 +296,7 @@ static void mpmc_push_work(MPMCQueue *q, void *item) { atomic_fetch_add(&q->work_count, 1); plat_sem_post(&q->items_sem, 1); } + /* ----------------------------------------------------------- */ /* POP */ /* ----------------------------------------------------------- */ diff --git a/mt_mpmc.h b/mt_mpmc.h new file mode 100644 index 0000000..8d1fc1e --- /dev/null +++ b/mt_mpmc.h @@ -0,0 +1,246 @@ +#pragma once + +#include "base.h" + +// Cache align abstraction +#define CACHELINE 64 + +#if defined(_MSC_VER) +#define CACHE_ALIGN __declspec(align(CACHELINE)) +#else +#define CACHE_ALIGN __attribute__((aligned(CACHELINE))) +#endif + +// Mutex/Critical section abstraction +#if defined(_WIN32) +#include +typedef CRITICAL_SECTION mtx_t; +typedef CONDITION_VARIABLE cond_t; + +#define mtx_init(m) InitializeCriticalSection(m) +#define mtx_lock(m) EnterCriticalSection(m) +#define mtx_unlock(m) LeaveCriticalSection(m) +#define mtx_destroy(m) DeleteCriticalSection(m) + +#define cond_init(c) InitializeConditionVariable(c) +#define cond_wait(c, m) SleepConditionVariableCS(c, m, INFINITE) +#define cond_signal(c) WakeConditionVariable(c) +#define cond_broadcast(c) WakeAllConditionVariable(c) + +#else +#include +typedef pthread_mutex_t mtx_t; +typedef pthread_cond_t cond_t; + +#define mtx_init(m) pthread_mutex_init(m, NULL) +#define mtx_lock(m) pthread_mutex_lock(m) +#define mtx_unlock(m) pthread_mutex_unlock(m) +#define mtx_destroy(m) pthread_mutex_destroy(m) + +#define cond_init(c) pthread_cond_init(c, NULL) +#define cond_wait(c, m) pthread_cond_wait(c, m) +#define cond_signal(c) pthread_cond_signal(c) +#define cond_broadcast(c) pthread_cond_broadcast(c) + +#endif + +typedef struct CACHE_ALIGN { + void *data; + char pad[64 - sizeof(void *)]; +} MPMCSlot; + +typedef struct { + CACHE_ALIGN size_t head; + CACHE_ALIGN size_t tail; + + CACHE_ALIGN size_t count; + CACHE_ALIGN size_t work_count; + + size_t capacity; + size_t mask; + + size_t committed; + size_t commit_step; + + mtx_t lock; + cond_t not_empty; + cond_t not_full; + + MPMCSlot *slots; +} MPMCQueue; + +/* ----------------------------------------------------------- */ +/* INIT */ +/* ----------------------------------------------------------- */ +static void mpmc_init(MPMCQueue *q, size_t max_capacity) { + q->capacity = max_capacity; + q->mask = max_capacity - 1; + + size_t pagesize = plat_get_pagesize(); + size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize); + + q->slots = (MPMCSlot *)plat_mem_reserve(bytes); + if (!q->slots) { + fprintf(stderr, "plat_mem_reserve failed\n"); + exit(1); + } + + size_t commit_bytes = ALIGN_UP_POW2(pagesize, pagesize); + q->commit_step = commit_bytes / sizeof(MPMCSlot); + + q->committed = q->commit_step; + plat_mem_commit(q->slots, commit_bytes); + + for (size_t i = 0; i < q->committed; i++) { + q->slots[i].data = NULL; + } + + q->head = 0; + q->tail = 0; + q->count = 0; + q->work_count = 0; + + mtx_init(&q->lock); + cond_init(&q->not_empty); + cond_init(&q->not_full); +} + +/* ----------------------------------------------------------- */ +/* COMMIT MORE MEMORY */ +/* ----------------------------------------------------------- */ +static void mpmc_commit_more(MPMCQueue *q) { + size_t start = q->committed; + + if (start >= q->capacity) + return; + + size_t new_commit = start + q->commit_step; + if (new_commit > q->capacity) + new_commit = q->capacity; + + size_t count = new_commit - start; + + plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot)); + + for (size_t i = start; i < new_commit; i++) { + q->slots[i].data = NULL; + } + + q->committed = new_commit; +} + +/* ----------------------------------------------------------- */ +/* PUSH */ +/* ----------------------------------------------------------- */ +// Does not increment work +static void mpmc_push(MPMCQueue *q, void *item) { + mtx_lock(&q->lock); + + while (q->count == q->capacity) { + cond_wait(&q->not_full, &q->lock); + } + + // Ensure committed + if (q->tail >= q->committed) { + mpmc_commit_more(q); + } + + size_t pos = q->tail & q->mask; + + q->slots[pos].data = item; + q->tail++; + q->count++; + + cond_signal(&q->not_empty); + mtx_unlock(&q->lock); +} + +// Increment work +static void mpmc_push_work(MPMCQueue *q, void *item) { + mtx_lock(&q->lock); + + while (q->count == q->capacity) { + cond_wait(&q->not_full, &q->lock); + } + + if (q->tail >= q->committed) { + mpmc_commit_more(q); + } + + size_t pos = q->tail & q->mask; + + q->slots[pos].data = item; + q->tail++; + q->count++; + q->work_count++; + + cond_signal(&q->not_empty); + mtx_unlock(&q->lock); +} + +/* ----------------------------------------------------------- */ +/* POP */ +/* ----------------------------------------------------------- */ +static void *mpmc_pop(MPMCQueue *q) { + mtx_lock(&q->lock); + + while (q->count == 0) { + cond_wait(&q->not_empty, &q->lock); + } + + size_t pos = q->head & q->mask; + + void *data = q->slots[pos].data; + + q->head++; + q->count--; + + cond_signal(&q->not_full); + mtx_unlock(&q->lock); + + return data; +} + +/* ----------------------------------------------------------- */ +/* PUSH POISON */ +/* ----------------------------------------------------------- */ +static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) { + for (u8 i = 0; i < consumer_count; i++) { + mpmc_push(q, NULL); + } +} + +/* ----------------------------------------------------------- */ +/* Done */ +/* ----------------------------------------------------------- */ +static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) { + mtx_lock(&q->lock); + + if (--q->work_count == 0) { + mpmc_producers_finished(q, consumer_count); + } + + mtx_unlock(&q->lock); +} + +/* ----------------------------------------------------------- */ +/* MPMC Cleanup */ +/* ----------------------------------------------------------- */ +// static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused function +// if (!q) return; +// +// if (q->slots) { +// plat_mem_release(q->slots, 0); +// q->slots = NULL; +// } +// +// mtx_destroy(&q->lock); +// +// #if !defined(_WIN32) +// pthread_cond_destroy(&q->not_empty); +// pthread_cond_destroy(&q->not_full); +// #endif +// +// q->capacity = 0; +// q->mask = 0; +// } diff --git a/platform.c b/platform.c index c313235..3b9f112 100644 --- a/platform.c +++ b/platform.c @@ -1,5 +1,5 @@ -#pragma once // ensure that a given header file is included only once in a - // single compilation unit +#pragma once + #include "arena.h" #include "base.h" #include "lf_mpmc.h" @@ -1296,10 +1296,10 @@ static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx, } static void ioring_process_completions(ThreadIoContext *thread_ctx) { - uint32_t waited = 0; - uint32_t target = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT); + uint32_t cqe_count = 0; + uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT); - while (waited < target) { + while (cqe_count < wait_count) { // ---- Drain all available CQEs (non-blocking) ---- while (1) { @@ -1307,7 +1307,7 @@ static void ioring_process_completions(ThreadIoContext *thread_ctx) { HRESULT hr = PopIoRingCompletion(thread_ctx->ring, &win_cqe); - if (hr == S_FALSE) { + if (hr != S_OK) { // No more CQEs available right now break; } @@ -1347,12 +1347,11 @@ static void ioring_process_completions(ThreadIoContext *thread_ctx) { file->reads_completed++; thread_ctx->num_submissions--; - // Count only "real" completions toward wait budget - waited++; + cqe_count++; } // ---- If we already waited enough, exit ---- - if (waited >= target) { + if (cqe_count >= wait_count) { break; } @@ -1575,18 +1574,16 @@ static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) { } static void ioring_process_completions(ThreadIoContext *thread_ctx) { - IoUring *impl = (IoUring *)thread_ctx->ring; - struct io_uring_cqe *cqes[NUM_BUFFERS_PER_THREAD]; - unsigned count = - io_uring_peek_batch_cqe(&impl->ring, cqes, NUM_BUFFERS_PER_THREAD); + unsigned cqe_count = io_uring_peek_batch_cqe(&((IoUring *)thread_ctx->ring)->ring, + cqes, NUM_BUFFERS_PER_THREAD); - if (count == 0) { + if (cqe_count == 0) { return; } - for (unsigned i = 0; i < count; i++) { + for (unsigned i = 0; i < cqe_count; i++) { struct io_uring_cqe *cqe = cqes[i]; int res = cqe->res; @@ -1613,7 +1610,7 @@ static void ioring_process_completions(ThreadIoContext *thread_ctx) { } // Mark CQE as seen, equivalent to io_uring_cqe_seen() but marks multiple CQEs - io_uring_cq_advance(&impl->ring, count); + io_uring_cq_advance(&((IoUring *)thread_ctx->ring)->ring, cqe_count); } FileHandle ioring_open_file(FileEntry *fe) {