1 Commits
main ... v5.0

Author SHA1 Message Date
d9098a69a9 Minor fixes after the merge
Deleting some duplicate functions and header
2026-04-28 18:48:30 +01:00
29 changed files with 312 additions and 2346 deletions

6
.gitignore vendored
View File

@@ -3,13 +3,11 @@ file_hasher.ilk
file_hasher.rdi
file_hasher.exe
file_hashes.txt
/Binaries
Binaries/file_hashes.txt
file_list.txt
temp_code.c
/.cache
/.cache/clangd/index
/file_hasher
/io_uring_test
/file_hasher
/io_uring_test
/compile_commands.json
/build

View File

@@ -1,284 +0,0 @@
cmake_minimum_required(VERSION 3.20)
project(filehasher
VERSION 1.0.0
DESCRIPTION "High-performance file hasher with I/O Ring/io_uring support"
LANGUAGES C
)
# ---------------------------------------------------------------------------
# Force compiler search order
# ---------------------------------------------------------------------------
# On Windows, prefer clang-cl, then GCC, then Clang
if(WIN32)
# Try to force compiler order if not already set
if(NOT CMAKE_C_COMPILER)
# Search in preferred order
find_program(CLANG_CL_COMPILER NAMES clang-cl)
find_program(GCC_COMPILER NAMES gcc)
find_program(CLANG_COMPILER NAMES clang)
if(CLANG_CL_COMPILER)
message(STATUS "Found clang-cl as preferred compiler: ${CLANG_CL_COMPILER}")
set(CMAKE_C_COMPILER "${CLANG_CL_COMPILER}" CACHE STRING "" FORCE)
elseif(GCC_COMPILER)
message(STATUS "Found GCC as fallback compiler: ${GCC_COMPILER}")
set(CMAKE_C_COMPILER "${GCC_COMPILER}" CACHE STRING "" FORCE)
elseif(CLANG_COMPILER)
message(STATUS "Found Clang as last-resort compiler: ${CLANG_COMPILER}")
set(CMAKE_C_COMPILER "${CLANG_COMPILER}" CACHE STRING "" FORCE)
endif()
endif()
else()
# On Linux, prefer GCC, then Clang
if(NOT CMAKE_C_COMPILER)
find_program(GCC_COMPILER NAMES gcc)
find_program(CLANG_COMPILER NAMES clang)
if(GCC_COMPILER)
message(STATUS "Found GCC as preferred compiler: ${GCC_COMPILER}")
set(CMAKE_C_COMPILER "${GCC_COMPILER}" CACHE STRING "" FORCE)
elseif(CLANG_COMPILER)
message(STATUS "Found Clang as fallback compiler: ${CLANG_COMPILER}")
set(CMAKE_C_COMPILER "${CLANG_COMPILER}" CACHE STRING "" FORCE)
endif()
endif()
endif()
# Now project() will use the compiler we found
# However, since we needed project() first to get C support,
# we check what we actually got
message(STATUS "Using compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID})")
# ---------------------------------------------------------------------------
# Platform and Compiler Detection
# ---------------------------------------------------------------------------
if(WIN32)
set(PLATFORM_WINDOWS TRUE)
set(PLATFORM_NAME "Windows")
else()
set(PLATFORM_LINUX TRUE)
set(PLATFORM_NAME "Linux")
endif()
# Compiler type
if(CMAKE_C_COMPILER_ID STREQUAL "Clang")
# Check if it's clang-cl
if(CMAKE_C_COMPILER_FRONTEND_VARIANT STREQUAL "MSVC")
set(COMPILER_CLANG_CL TRUE)
message(STATUS "Detected clang-cl (MSVC-compatible frontend)")
else()
set(COMPILER_CLANG_GNU TRUE)
message(STATUS "Detected Clang (GNU-compatible frontend)")
endif()
elseif(CMAKE_C_COMPILER_ID STREQUAL "GNU")
set(COMPILER_GCC TRUE)
message(STATUS "Detected GCC")
elseif(CMAKE_C_COMPILER_ID STREQUAL "MSVC")
# We don't want MSVC, but if it's found, warn user
message(FATAL_ERROR
"MSVC (cl.exe) detected!\n"
"This project requires clang-cl, GCC, or Clang.\n"
"Please install one of these compilers or specify manually:\n"
" cmake .. -DCMAKE_C_COMPILER=clang-cl\n"
" cmake .. -DCMAKE_C_COMPILER=gcc\n"
" cmake .. -DCMAKE_C_COMPILER=clang\n"
)
endif()
# ---------------------------------------------------------------------------
# Build System Selection
# ---------------------------------------------------------------------------
if(NOT CMAKE_GENERATOR OR CMAKE_GENERATOR STREQUAL "")
find_program(NINJA_EXECUTABLE NAMES ninja)
if(NINJA_EXECUTABLE)
message(STATUS "Using Ninja build system")
set(CMAKE_GENERATOR "Ninja")
else()
message(STATUS "Ninja not found, using default generator: ${CMAKE_GENERATOR}")
endif()
endif()
# ---------------------------------------------------------------------------
# Source Files
# ---------------------------------------------------------------------------
set(SOURCES
file_hasher.c
xxhash.c
xxh_x86dispatch.c
)
# Headers for dependency tracking and IDE
set(HEADERS
arena.h
base.h
xxhash.h
lf_mpmc.h
)
# ---------------------------------------------------------------------------
# Create Executable
# ---------------------------------------------------------------------------
add_executable(${PROJECT_NAME}
${SOURCES}
${HEADERS}
)
# Include directories
target_include_directories(${PROJECT_NAME} PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
)
# ---------------------------------------------------------------------------
# Compiler Flags - Exact match to your commands
# ---------------------------------------------------------------------------
if(PLATFORM_WINDOWS)
if(COMPILER_CLANG_CL)
# === clang-cl flags ===
# Release: /O2
target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Release>:/O2>
)
# Debug: /Zi /Od
target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Debug>:/Zi /Od>
)
# Common warnings
target_compile_options(${PROJECT_NAME} PRIVATE /W4)
elseif(COMPILER_GCC)
# === GCC flags (Windows/MinGW) ===
# Release: -O3
target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Release>:-O3>
)
# Debug: -g -O0
target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Debug>:-g -O0>
)
# Common warnings
target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra)
elseif(COMPILER_CLANG_GNU)
# === Clang flags (Windows, GNU frontend) ===
# Release: -O3
target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Release>:-O3>
)
# Debug: -g -O0
target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Debug>:-g -O0>
)
# Common warnings
target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra)
endif()
# Windows-specific libraries
target_link_libraries(${PROJECT_NAME} PRIVATE
kernel32
)
# Windows-specific defines
target_compile_definitions(${PROJECT_NAME} PRIVATE
WIN32_LEAN_AND_MEAN
_WIN32_WINNT=0x0A00 # Windows 10+
)
# Set output name with .exe
set_target_properties(${PROJECT_NAME} PROPERTIES
SUFFIX ".exe"
)
elseif(PLATFORM_LINUX)
# === Linux GCC/Clang flags ===
if(COMPILER_GCC OR COMPILER_CLANG_GNU)
# Release: -O3
target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Release>:-O3>
)
# Debug: -g -O0
target_compile_options(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Debug>:-g -O0>
)
# Common warnings
target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra)
endif()
# Linux-specific libraries
find_package(Threads REQUIRED)
find_library(LIBURING_LIBRARY NAMES uring)
if(LIBURING_LIBRARY)
message(STATUS "Found liburing: ${LIBURING_LIBRARY}")
target_link_libraries(${PROJECT_NAME} PRIVATE
Threads::Threads
${LIBURING_LIBRARY}
)
else()
message(FATAL_ERROR "liburing not found! Install liburing-dev or equivalent")
endif()
# Linux-specific defines
target_compile_definitions(${PROJECT_NAME} PRIVATE
_GNU_SOURCE
)
endif()
# ---------------------------------------------------------------------------
# C Standard
# ---------------------------------------------------------------------------
set_target_properties(${PROJECT_NAME} PROPERTIES
C_STANDARD 11
C_STANDARD_REQUIRED ON
C_EXTENSIONS OFF
)
# ---------------------------------------------------------------------------
# Build Configurations
# ---------------------------------------------------------------------------
# Set default build type if not specified (matching your Release command)
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING
"Choose the type of build: Release or Debug" FORCE)
message(STATUS "No build type specified, defaulting to Release")
endif()
# ---------------------------------------------------------------------------
# IDE Support
# ---------------------------------------------------------------------------
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
# ---------------------------------------------------------------------------
# Info Target
# ---------------------------------------------------------------------------
add_custom_target(info
COMMAND ${CMAKE_COMMAND} -E echo "=== Build Configuration ==="
COMMAND ${CMAKE_COMMAND} -E echo "Project: ${PROJECT_NAME}"
COMMAND ${CMAKE_COMMAND} -E echo "Compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID})"
COMMAND ${CMAKE_COMMAND} -E echo "Frontend: $<IF:$<BOOL:${COMPILER_CLANG_CL}>,clang-cl,GNU>"
COMMAND ${CMAKE_COMMAND} -E echo "Generator: ${CMAKE_GENERATOR}"
COMMAND ${CMAKE_COMMAND} -E echo "Build Type: ${CMAKE_BUILD_TYPE}"
COMMAND ${CMAKE_COMMAND} -E echo "Platform: ${PLATFORM_NAME}"
COMMAND ${CMAKE_COMMAND} -E echo "============================"
)
# ---------------------------------------------------------------------------
# Print final configuration
# ---------------------------------------------------------------------------
message(STATUS "----------------------------------------")
message(STATUS "Configuration Summary:")
message(STATUS " Compiler: ${CMAKE_C_COMPILER}")
message(STATUS " Build Type: ${CMAKE_BUILD_TYPE}")
message(STATUS " Generator: ${CMAKE_GENERATOR}")
message(STATUS " Platform: ${PLATFORM_NAME}")
message(STATUS "----------------------------------------")

View File

@@ -1,7 +1,7 @@
# Duplicate Finder
# filehasher
# Presentation
Collects some metadata and hashes files then detects duplicate files and directories. It outputs the path, hash, size, creation and
Collects some metadata and hashes files. It outputs the path, hash, size, creation and
last modification dates and the author in file_hasher.txt.
Creation and modification dates and author can be disabled in the config file.
@@ -20,80 +20,36 @@ It is a high performance cross platform Windows and Linux compatible program, it
* Fallback to buffered I/O if there is errors in the IO Ring path.
# Building
## Windows
**Requirements**: Make sur to use UCRT64 environment from MSYS2 instead of the standard MinGW environment.
### Release
**Note**: Make sur to use UCRT64 environment from MSYS2 instead of the standard MinGW environment.
UCRT64 uses the modern Universal C Runtime (ucrtbase.dll), which supports the newest APIs,
the standard MSYS2 uses the legacy msvcrt.dll and does not support IO Ring.
To install:
pacman -S mingw-w64-ucrt-x86_64-clang
or:
pacman -S mingw-w64-ucrt-x86_64-gcc
pacman -S mingw-w64-ucrt-x86_64-clang
pacman -Syu
And add to path:
C:\msys64\ucrt64\bin
Additionally, to use clang-cl install the latest version of Windows SDK and MSVC, or at least select these in Visual Studio Installer:
* MSVC Build tools fo x64/86.
* C++ Build tools core features.
* MSBuild support for LLVM (clang-cl) toolset.
* Windows Universal C runtime.
* Windows Universal CRT SDK.
* Windows 11 SDK.
And use the MSVC command prompt or run a script to add MSVC environment variables to current session.
Ex: for PowerShell Terminal save as .ps1 (not persistent):
```ps1
# Add MS visual studio environment variables
cmd /c '"C:\Program Files (x86)\Microsoft Visual Studio\18\BuildTools\VC\Auxiliary\Build\vcvarsall.bat" x64 && set' |
ForEach-Object {
if ($_ -match "^(.*?)=(.*)$") {
Set-Item -Path "Env:$($matches[1])" -Value $matches[2]
}
}
```
Optional: to use the build system
pacman -S mingw-w64-ucrt-x86_64-cmake
The build system uses Ninja and fallsback to make, in Windows it prefers clang-cl > gcc > clang, and in Linux gcc > clang.
### Using a build system
| Command | Description|
| :--- | :--- |
| ./build.bat | Build Release with best available compiler |
| ./build.bat Debug | Build Debug |
| ./build.bat clean | Clean and build Release |
| ./build.bat Debug clean | Clean and build Debug |
### Release
gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -o filehasher
clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -o filehasher
gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher
clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher
clang-cl /O2 file_hasher.c xxhash.c xxh_x86dispatch.c
### Debug
gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -o filehasher
clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -o filehasher
gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher
clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -o file_hasher
clang-cl /Zi /Od file_hasher.c xxhash.c xxh_x86dispatch.c
## Linux
**Requirements**: GCC or clang, optional CMake, Ninja or make.
### Using a build system
| Command | Description|
| :--- | :--- |
| ./build.sh | Build Release with best available compiler |
| ./build.sh Debug | Build Debug |
| ./build.sh clean | Clean and build Release |
| ./build.sh Debug clean | Clean and build Debug |
### Release
gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o filehasher
clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o filehasher
gcc -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher
clang -O3 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher
### Debug
gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o filehasher
clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o filehasher
gcc -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher
clang -g -O0 file_hasher.c xxhash.c xxh_x86dispatch.c -pthread -luring -o file_hasher
# Notes about the IO Ring implementations
## IO Ring
@@ -132,18 +88,16 @@ While both systems share the same core concept, their APIs and management styles
| **API Call** | `io_uring_register` | `BuildIoRingRegisterFileHandles` |
| **Registration Method** | Synchronous system call; blocks until the table is set up. | Asynchronous request submitted to the ring like a read/write operation. |
| **Partial Updates** | Supports `IORING_REGISTER_FILES_UPDATE` to swap specific indices. | No partial updates; a new registration replaces the entire table. |
| **Memory Mapping** | User must manually `mmap()` queues into their address space. | Kernel handles memory mapping automatically when the ring is created. |
| **Scope of Operations** | Extremely broad (files, sockets, timers, signals, etc.). | Primarily focused on file storage (read, write, flush). |
### Completion Wait count and peek
### Completion Wait count
To avoid busy waiting when receiving CQEs, we can use io_uring_submit_and_wait() in Linux by entering a wait count,
the threads sleep until the count of CQEs are received, in windows the wait_count is present in SubmitIoRing()
the threads sleeps until the count of CQEs are received, in windows the wait_count is present in SubmitIoRing()
but is not implemented yet, so we wait with a completion event for a single completion. Another limitation on the completion
event is that the kernel will waik up the thread only when receiving the first CQE, after that we need to drain the completion
queue completely before sleeping again, or we enter an eternal slumber.
In the other hand, in Linux we can batch pop completions with io_uring_peek_batch_cqe() + io_uring_cq_advance(),
in Windows we can only pop one completion at a time with PopIoRingCompletion() (equivalent to io_uring_peek_cqe() + io_uring_cqe_seen()).
To simulate the same behavior as the Linux functions we use a double loop, an outer loop to control how much we wait
and in inner loop to drain all the available completions.
queue completely before sleeping again, or we enter an eternal slumber. And my config, each time the thread wakes up
it receives rarely more than 3 to 5 CQEs and most of the time only one CQE.
### Filtering CQEs
@@ -177,15 +131,12 @@ IO Ring implementation.
"Increase the limit to solve this warning.\n");
```
The Memlock limit in Linux restricts the amount of memory that can be
"locked" into physical RAM using the mlock() family of system calls. This
The Memlock limit in Linux restricts the amount of memory a process can
"lock" into physical RAM using the mlock() family of system calls. This
prevents the operating system from swapping that memory out to disk.
And registering buffers will lock the buffers memory so the hardware
can access it directly without kernel intervention and prevents the kernel from
swapping it to the SSD or HDD.
This limit does not apply to a single process, but it applies to what all the runnig processes can lock, so in order
to be able to register the buffers, we need to set it to unlimited or increase it to at least:
num_hash_threads * NUM_BUFFERS_PER_THREAD * IORING_BUFFER_SIZE + extra memory reserved for other processes.
swapping it to the SSD or HDD. Increase the limit to be able to register the buffers.
#### *Modifying the Limit*
The method for changing the memlock limit depends on whether you are
@@ -197,7 +148,7 @@ the /etc/security/limits.conf file. Add the following lines:
```conf
# Example for a specific user (replace 'username'), unlimited or a custom value in KB
username soft memlock unlimited
username hard memlock unlimitedhttps://wiki.postgresql.org/wiki/AIO
username hard memlock unlimited
```
```conf
# Example for all users

View File

@@ -83,7 +83,8 @@ u64 arena_pos_from_ptr(mem_arena *arena, void *ptr) {
void *arena_ptr_from_pos(mem_arena *arena, u64 global_pos) {
ASSERT(arena);
if (!arena) {
ASSERT(global_pos >= 0);
if (!arena || global_pos < 0) {
return NULL;
}
@@ -487,6 +488,7 @@ void *arena_free(mem_arena **arena_ptr, u8 **ptr, u64 size) { // mk free
Find owning block
------------------------------------------------------------ */
mem_arena *selected = arena;
mem_arena *owner = arena_block_from_ptr(arena, *ptr);
ASSERT(owner);
if (!owner) {
@@ -498,7 +500,7 @@ void *arena_free(mem_arena **arena_ptr, u8 **ptr, u64 size) { // mk free
------------------------------------------------------------ */
u64 global_offset = arena_pos_from_ptr(arena, *ptr);
if (global_offset == UINT64_MAX) {
if (global_offset == -1) {
return NULL;
}
@@ -605,6 +607,9 @@ void *arena_swapback_pop(mem_arena **arena_ptr, u64 index) { // mk swapback
fprintf(stderr, "ERROR: Swapback pop failed, index out of range");
return NULL;
}
u8 *owner_base = (u8 *)owner + ALIGN_UP_POW2(sizeof(mem_arena), owner->align);
u8 *arena_base = (u8 *)arena + ALIGN_UP_POW2(sizeof(mem_arena), arena->align);
u8 *dst = arena_ptr_from_index(arena, index);
u8 *src = arena_ptr_from_index(arena, count);

85
base.h
View File

@@ -104,16 +104,16 @@ typedef double f64;
#define ASSERT(x) assert(x)
#endif
#ifndef NDEBUG
#define NDEBUG 1 // 0 to enable asserts
#endif
#define NDEBUG // Comment to enable asserts
/* ------------------------------------------------------------
Some helper functions
------------------------------------------------------------ */
#if defined(_WIN32) || defined(_WIN64)
// Memory allocation
static u32 plat_get_pagesize(void) {
SYSTEM_INFO sysinfo = {0};
GetSystemInfo(&sysinfo);
@@ -130,19 +130,51 @@ static b32 plat_mem_commit(void *ptr, u64 size) {
return ret != NULL;
}
// static b32 plat_mem_decommit(void *ptr, u64 size) { // Comment to prevent warning: unused function
// return VirtualFree(ptr, size, MEM_DECOMMIT);
// }
static b32 plat_mem_decommit(void *ptr, u64 size) {
return VirtualFree(ptr, size, MEM_DECOMMIT);
}
static b32 plat_mem_release(void *ptr, u64 size) {
return VirtualFree(ptr, size, MEM_RELEASE);
}
// Semaphores
typedef struct plat_sem {
HANDLE handle;
} plat_sem;
static b32 plat_sem_init(plat_sem *s, u32 initial) {
s->handle = CreateSemaphore(NULL, initial, LONG_MAX, NULL);
return s->handle != NULL;
}
static void plat_sem_wait(plat_sem *s) {
WaitForSingleObject(s->handle, INFINITE);
}
static b32 plat_sem_trywait(HANDLE sem) {
DWORD r = WaitForSingleObject(sem, 0);
return r == WAIT_OBJECT_0;
}
static void plat_sem_post(plat_sem *s, u32 count) {
ReleaseSemaphore(s->handle, count, NULL);
}
static void plat_sem_destroy(plat_sem *s) {
if (s->handle) {
CloseHandle(s->handle);
s->handle = NULL;
}
}
// Sleep
static void sleep_ms(int ms) { Sleep(ms); }
#elif defined(__linux__)
// Memory allocation
#ifndef _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#endif
@@ -165,19 +197,46 @@ static b32 plat_mem_commit(void *ptr, u64 size) {
return ret == 0;
}
// static b32 plat_mem_decommit(void *ptr, u64 size) { // Comment to prevent warning: unused function
// i32 ret = mprotect(ptr, size, PROT_NONE);
// if (ret != 0)
// return false;
// ret = madvise(ptr, size, MADV_DONTNEED);
// return ret == 0;
// }
static b32 plat_mem_decommit(void *ptr, u64 size) {
i32 ret = mprotect(ptr, size, PROT_NONE);
if (ret != 0)
return false;
ret = madvise(ptr, size, MADV_DONTNEED);
return ret == 0;
}
static b32 plat_mem_release(void *ptr, u64 size) {
i32 ret = munmap(ptr, size);
return ret == 0;
}
// Semaphores
#include <semaphore.h>
typedef struct plat_sem {
sem_t sem;
} plat_sem;
static b32 plat_sem_init(plat_sem *s, u32 initial) {
return sem_init(&s->sem, 0, initial) == 0;
}
static void plat_sem_wait(plat_sem *s) {
while (sem_wait(&s->sem) == -1 && errno == EINTR) {
}
}
static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; }
static void plat_sem_post(plat_sem *s, u32 count) {
for (u32 i = 0; i < count; i++) {
sem_post(&s->sem);
}
}
static void plat_sem_destroy(plat_sem *s) { sem_destroy(&s->sem); }
// Sleep
static void sleep_ms(int ms) { usleep(ms * 1000); }
#endif

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

170
build.bat
View File

@@ -1,170 +0,0 @@
@echo off
setlocal enabledelayedexpansion
:: ============================================================================
:: build.bat
:: ============================================================================
:: Get script directory (project root)
set SCRIPT_DIR=%~dp0
set SCRIPT_DIR=%SCRIPT_DIR:~0,-1%
:: ---------------------------------------------------------------------------
:: Default values
:: ---------------------------------------------------------------------------
set BUILD_TYPE=Release
set CLEAN_BUILD=0
:: --------------------------------------------------------------------------
:: Parse arguments
:: --------------------------------------------------------------------------
:parse_args
if "%~1"=="" goto :main
if /i "%~1"=="Release" (
set BUILD_TYPE=Release
shift
goto :parse_args
)
if /i "%~1"=="Debug" (
set BUILD_TYPE=Debug
shift
goto :parse_args
)
if /i "%~1"=="clean" (
set CLEAN_BUILD=1
shift
goto :parse_args
)
echo Unknown argument: %~1
echo Usage: build [Release^|Debug] [clean]
exit /b 1
:main
set BUILD_DIR=%SCRIPT_DIR%\build\windows\%BUILD_TYPE%
echo === Building filehasher (%BUILD_TYPE%) ===
:: --------------------------------------------------------------------------
:: Clean if requested
:: --------------------------------------------------------------------------
if %CLEAN_BUILD%==1 (
echo Cleaning...
if exist "%BUILD_DIR%" rmdir /s /q "%BUILD_DIR%" 2>nul
)
:: --------------------------------------------------------------------------
:: Create build directory
:: --------------------------------------------------------------------------
if not exist "%BUILD_DIR%" mkdir "%BUILD_DIR%"
pushd "%BUILD_DIR%"
:: --------------------------------------------------------------------------
:: Compiler selection
:: --------------------------------------------------------------------------
set CC=
where clang-cl >nul 2>&1
if !ERRORLEVEL! equ 0 (
echo Compiler: clang-cl ^(preferred^)
set "CC=-DCMAKE_C_COMPILER=clang-cl"
goto :find_generator
)
where gcc >nul 2>&1
if !ERRORLEVEL! equ 0 (
echo Compiler: gcc ^(fallback^)
set "CC=-DCMAKE_C_COMPILER=gcc"
goto :find_generator
)
where clang >nul 2>&1
if !ERRORLEVEL! equ 0 (
echo Compiler: clang ^(last resort^)
set "CC=-DCMAKE_C_COMPILER=clang"
goto :find_generator
)
echo ERROR: No suitable compiler found!
popd
exit /b 1
:: --------------------------------------------------------------------------
:: Generator selection (prefer ninja)
:: --------------------------------------------------------------------------
:find_generator
set GEN=
where ninja >nul 2>&1
if !ERRORLEVEL! equ 0 (
echo Generator: Ninja
set "GEN=-G Ninja"
) else (
echo Generator: Default
)
:: --------------------------------------------------------------------------
:: Configure
:: --------------------------------------------------------------------------
echo.
echo Configuring CMake...
:: --------------------------------------------------------------------------
:: compile_commands.json logic
:: --------------------------------------------------------------------------
set EXPORT_COMPILE_COMMANDS=OFF
if /i "%BUILD_TYPE%"=="Release" (
if exist "%SCRIPT_DIR%\compile_commands.json" (
echo compile_commands.json already exists - skipping generation
) else (
echo compile_commands.json will be generated
set EXPORT_COMPILE_COMMANDS=ON
)
)
set CMD=cmake "%SCRIPT_DIR%" %GEN% %CC% -DCMAKE_BUILD_TYPE=%BUILD_TYPE% -DCMAKE_EXPORT_COMPILE_COMMANDS=%EXPORT_COMPILE_COMMANDS%
echo !CMD!
!CMD!
if !ERRORLEVEL! neq 0 (
echo ERROR: Configuration failed
popd
exit /b 1
)
:: --------------------------------------------------------------------------
:: Build
:: --------------------------------------------------------------------------
echo.
echo Building...
cmake --build . --config %BUILD_TYPE%
if !ERRORLEVEL! neq 0 (
echo ERROR: Build failed
popd
exit /b 1
)
:: --------------------------------------------------------------------------
:: Copy compile_commands.json (only if generated)
:: --------------------------------------------------------------------------
if /i "%EXPORT_COMPILE_COMMANDS%"=="ON" (
if exist "compile_commands.json" (
echo.
echo clangd: compile_commands.json generated
copy /Y "compile_commands.json" "%SCRIPT_DIR%\compile_commands.json" >nul 2>&1
if !ERRORLEVEL! equ 0 (
echo clangd: Copied to project root
) else (
echo clangd: Copy failed
)
)
)
popd
echo.
echo === Build Complete ===
echo Executable: %BUILD_DIR%\filehasher.exe

272
build.sh
View File

@@ -1,272 +0,0 @@
#!/usr/bin/env bash
# ============================================================================
# build.sh - Build script for filehasher (Linux)
# Usage: ./build.sh [Release|Debug] [clean]
#
# Compiler preference: gcc > clang
# Build system: Ninja (fallback to Make)
# ============================================================================
set -euo pipefail
# ---------------------------------------------------------------------------
# Colors
# ---------------------------------------------------------------------------
readonly RED='\033[0;31m'
readonly GREEN='\033[0;32m'
readonly YELLOW='\033[1;33m'
readonly CYAN='\033[0;36m'
readonly NC='\033[0m'
# ---------------------------------------------------------------------------
# Default values
# ---------------------------------------------------------------------------
BUILD_TYPE="Release"
CLEAN_BUILD=0
# ---------------------------------------------------------------------------
# Parse arguments
# ---------------------------------------------------------------------------
while [[ $# -gt 0 ]]; do
case "$1" in
Release|release)
BUILD_TYPE="Release"
shift
;;
Debug|debug)
BUILD_TYPE="Debug"
shift
;;
clean|-clean|--clean)
CLEAN_BUILD=1
shift
;;
*)
echo -e "${RED}Unknown argument: $1${NC}"
echo "Usage: $0 [Release|Debug] [clean]"
exit 1
;;
esac
done
# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------
readonly BUILD_DIR="build/linux/${BUILD_TYPE}"
readonly SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo -e "${GREEN}=== Building filehasher (${BUILD_TYPE}) ===${NC}"
echo "Project: ${SCRIPT_DIR}"
# ---------------------------------------------------------------------------
# Clean if requested
# ---------------------------------------------------------------------------
if [[ $CLEAN_BUILD -eq 1 ]]; then
echo -e "${YELLOW}Cleaning build directory...${NC}"
rm -rf "${BUILD_DIR}"
echo
fi
# ---------------------------------------------------------------------------
# Create build directory
# ---------------------------------------------------------------------------
mkdir -p "${BUILD_DIR}"
cd "${BUILD_DIR}"
# ---------------------------------------------------------------------------
# Compiler selection (prefer gcc, fallback to clang)
# ---------------------------------------------------------------------------
echo -e "${YELLOW}Detecting compiler...${NC}"
CC_BINARY=""
CC_NAME=""
if command -v gcc &> /dev/null; then
CC_BINARY="gcc"
CC_VERSION=$(gcc --version | head -n1)
CC_NAME="GCC (${CC_VERSION})"
echo -e " ${GREEN}[OK]${NC} Found GCC (preferred): ${CC_VERSION}"
elif command -v clang &> /dev/null; then
CC_BINARY="clang"
CC_VERSION=$(clang --version | head -n1)
CC_NAME="Clang (${CC_VERSION})"
echo -e " ${YELLOW}[OK]${NC} Found Clang (fallback): ${CC_VERSION}"
else
echo -e "${RED}[FAIL] No suitable compiler found!${NC}"
echo "Please install gcc or clang:"
echo " Ubuntu/Debian: sudo apt install build-essential"
echo " Fedora/RHEL: sudo dnf install gcc"
echo " Arch: sudo pacman -S gcc"
exit 1
fi
# ---------------------------------------------------------------------------
# Check dependencies
# ---------------------------------------------------------------------------
echo -e "${YELLOW}Checking dependencies...${NC}"
# Check for liburing
HAVE_LIBURING=0
if ldconfig -p | grep -q liburing 2>/dev/null; then
HAVE_LIBURING=1
echo -e " ${GREEN}[OK]${NC} Found liburing"
elif pkg-config --exists liburing 2>/dev/null; then
HAVE_LIBURING=1
echo -e " ${GREEN}[OK]${NC} Found liburing (pkg-config)"
elif [[ -f /usr/lib/liburing.so ]] || [[ -f /usr/lib64/liburing.so ]] || [[ -f /usr/local/lib/liburing.so ]]; then
HAVE_LIBURING=1
echo -e " ${GREEN}[OK]${NC} Found liburing (manual detection)"
else
echo -e "${RED}[FAIL] liburing not found!${NC}"
echo "Please install liburing-dev:"
echo " Ubuntu/Debian: sudo apt install liburing-dev"
echo " Fedora/RHEL: sudo dnf install liburing-devel"
echo " Arch: sudo pacman -S liburing"
exit 1
fi
# Check for pthreads
# Check if pthreads is available (either in ldconfig or merged into libc)
if ldconfig -p | grep -q libpthread 2>/dev/null || ldd --version | grep -qP '2\.(3[4-9]|[4-9][0-9])'; then
echo -e " ${GREEN}[OK]${NC} Found pthreads (merged or standalone)"
else
echo -e " ${YELLOW}[NOTE]${NC} pthreads not found, attempting link"
fi
echo
# ---------------------------------------------------------------------------
# Generator selection (prefer ninja)
# ---------------------------------------------------------------------------
echo -e "${YELLOW}Selecting build system...${NC}"
GENERATOR=""
GENERATOR_NAME=""
if command -v ninja &> /dev/null; then
GENERATOR="Ninja"
GENERATOR_NAME="Ninja"
echo -e " ${GREEN}[OK]${NC} Using Ninja"
elif command -v make &> /dev/null; then
GENERATOR="Unix Makefiles"
GENERATOR_NAME="Make"
echo -e " ${YELLOW}[OK]${NC} Ninja not found, using Make"
else
echo -e "${RED}[FAIL] No build system found!${NC}"
echo "Please install ninja or make:"
echo " Ubuntu/Debian: sudo apt install ninja-build"
echo " Fedora/RHEL: sudo dnf install ninja-build"
echo " Arch: sudo pacman -S ninja"
exit 1
fi
echo
# ---------------------------------------------------------------------------
# Configure
# ---------------------------------------------------------------------------
echo -e "${YELLOW}Configuring CMake...${NC}"
# --------------------------------------------------------------------------
# compile_commands.json logic
# --------------------------------------------------------------------------
EXPORT_COMPILE_COMMANDS=OFF
if [[ "$BUILD_TYPE" == "Release" ]]; then
if [[ -f "${SCRIPT_DIR}/compile_commands.json" ]]; then
echo -e " compile_commands.json already exists - skipping generation"
else
echo -e " compile_commands.json will be generated"
EXPORT_COMPILE_COMMANDS=ON
fi
fi
echo -e " Build type: ${BUILD_TYPE}"
echo -e " Compiler: ${CC_NAME}"
echo -e " Generator: ${GENERATOR_NAME}"
cmake "${SCRIPT_DIR}" \
-G "${GENERATOR}" \
-DCMAKE_BUILD_TYPE="${BUILD_TYPE}" \
-DCMAKE_C_COMPILER="${CC_BINARY}" \
-DCMAKE_EXPORT_COMPILE_COMMANDS=${EXPORT_COMPILE_COMMANDS}
if [[ $? -ne 0 ]]; then
echo -e "${RED}CMake configuration failed!${NC}"
exit 1
fi
echo -e "${GREEN}Configuration successful!${NC}"
echo
# ---------------------------------------------------------------------------
# Build
# ---------------------------------------------------------------------------
echo -e "${YELLOW}Building...${NC}"
# Get number of CPU cores
if command -v nproc &> /dev/null; then
CORES=$(nproc)
else
CORES=4
fi
cmake --build . --config "${BUILD_TYPE}" --parallel "${CORES}"
if [[ $? -ne 0 ]]; then
echo -e "${RED}Build failed!${NC}"
exit 1
fi
echo -e "${GREEN}Build successful!${NC}"
echo
# ---------------------------------------------------------------------------
# Verify output
# ---------------------------------------------------------------------------
cd "${SCRIPT_DIR}"
if [[ -f "${BUILD_DIR}/filehasher" ]]; then
echo -e "${GREEN}Executable: ${BUILD_DIR}/filehasher${NC}"
if command -v file &> /dev/null; then
echo -e " Type: $(file -b ${BUILD_DIR}/filehasher)"
fi
if command -v du &> /dev/null; then
echo -e " Size: $(du -h ${BUILD_DIR}/filehasher | cut -f1)"
fi
elif [[ -f "${BUILD_DIR}/filehasher.exe" ]]; then
echo -e "${GREEN}Executable: ${BUILD_DIR}/filehasher.exe${NC}"
else
echo -e "${YELLOW}Note: Could not locate executable${NC}"
echo "Checking build directory:"
find "${BUILD_DIR}" -type f -executable 2>/dev/null || echo " No executables found"
fi
# ---------------------------------------------------------------------------
# Summary
# ---------------------------------------------------------------------------
echo
echo -e "${CYAN}=== Build Complete ===${NC}"
echo
echo -e "${YELLOW}Build Information:${NC}"
echo -e " Configuration: ${BUILD_TYPE}"
echo -e " Compiler: ${CC_NAME}"
echo -e " Generator: ${GENERATOR_NAME}"
echo -e " Output: ${BUILD_DIR}/"
# ---------------------------------------------------------------------------
# Copy compile_commands.json for clangd
# ---------------------------------------------------------------------------
if [[ "${EXPORT_COMPILE_COMMANDS}" == "ON" ]]; then
if [[ -f "${BUILD_DIR}/compile_commands.json" ]]; then
echo -e " clangd: compile_commands.json generated"
cp "${BUILD_DIR}/compile_commands.json" "${SCRIPT_DIR}/compile_commands.json"
echo -e " clangd: Copied to project root"
fi
fi
echo
echo -e "${GREEN}Ready to run: ./${BUILD_DIR}/filehasher${NC}"

View File

@@ -15,7 +15,6 @@
#define IORING_BUFFER_SIZE KiB(256)
#define NUM_BUFFERS_PER_THREAD 32
#define MAX_ACTIVE_FILES 16
#define MAX_WAIT_COUNT (NUM_BUFFERS_PER_THREAD / 2)
#define SUBMIT_TIMEOUT_MS 10000
#define IORING_DEBUG_PRINTS 0

View File

@@ -1,721 +0,0 @@
/*
# Compile
gcc -o io_uring_test io_uring_test.c -luring
# Run
./io_uring_test
*/
#include "base.h"
#include <stdint.h>
#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#define TEST_FILE "test_io_uring.txt"
#define BUFFER_SIZE 4096
#define NUM_BUFFERS 4
#define NUM_REGISTERED_FILES 8 // Maximum number of files to register
// Colors for output
#define COLOR_GREEN "\033[0;32m"
#define COLOR_RED "\033[0;31m"
#define COLOR_YELLOW "\033[0;33m"
#define COLOR_BLUE "\033[0;34m"
#define COLOR_RESET "\033[0m"
// Test result tracking
typedef struct {
int passed;
int failed;
} TestResults;
static void print_success(const char *step) {
printf(COLOR_GREEN "[✓] SUCCESS: %s" COLOR_RESET "\n", step);
}
static void print_failure(const char *step, const char *error) {
printf(COLOR_RED "[✗] FAILED: %s - %s" COLOR_RESET "\n", step, error);
}
static void print_info(const char *msg) {
printf(COLOR_BLUE "[i] INFO: %s" COLOR_RESET "\n", msg);
}
static void print_step(const char *step) {
printf(COLOR_YELLOW "\n>>> Testing: %s" COLOR_RESET "\n", step);
}
// Create a test file with known content
static int create_test_file(const char *filename, const char *content) {
FILE *f = fopen(filename, "w");
if (!f) {
perror("Failed to create test file");
return -1;
}
fprintf(f, "%s", content);
fclose(f);
printf(" Created test file: %s\n", filename);
return 0;
}
// Test 1: Create io_uring instance
static int test_io_uring_create(struct io_uring *ring, TestResults *results) {
print_step("io_uring creation");
int ret = io_uring_queue_init(256, ring, 0);
if (ret < 0) {
print_failure("io_uring_queue_init", strerror(-ret));
results->failed++;
return -1;
}
print_success("io_uring instance created");
results->passed++;
return 0;
}
// Test 2: Register buffers
static int test_register_buffers(struct io_uring *ring, void **buffers,
struct iovec *iovs, TestResults *results) {
print_step("Buffer registration");
// Allocate and prepare buffers
size_t total_size = BUFFER_SIZE * NUM_BUFFERS;
*buffers = aligned_alloc(4096, total_size); // Page-aligned for O_DIRECT
if (!*buffers) {
print_failure("Buffer allocation", strerror(errno));
results->failed++;
return -1;
}
// Initialize iovecs
for (int i = 0; i < NUM_BUFFERS; i++) {
iovs[i].iov_base = (char *)*buffers + (i * BUFFER_SIZE);
iovs[i].iov_len = BUFFER_SIZE;
memset(iovs[i].iov_base, 0, BUFFER_SIZE);
}
int ret = io_uring_register_buffers(ring, iovs, NUM_BUFFERS);
if (ret < 0) {
print_failure("io_uring_register_buffers", strerror(-ret));
results->failed++;
return -1;
}
print_success("Buffers registered successfully");
results->passed++;
return 0;
}
// Test 2b: Register files
static int test_register_files(struct io_uring *ring, int *fds, int num_fds,
TestResults *results) {
print_step("File registration");
if (num_fds == 0) {
print_info("No files to register");
results->passed++;
return 0;
}
int ret = io_uring_register_files(ring, fds, num_fds);
if (ret < 0) {
// File registration might not be supported on all kernels
if (ret == -EOPNOTSUPP || ret == -EINVAL) {
print_info("File registration not supported on this kernel, skipping");
results->passed++;
return 0;
}
print_failure("io_uring_register_files", strerror(-ret));
results->failed++;
return -1;
}
printf(" Registered %d files\n", num_fds);
print_success("Files registered successfully");
results->passed++;
return 0;
}
// Test 3: Open file
static int test_open_file(const char *filename, int *fd, bool use_direct,
TestResults *results) {
print_step("File opening");
// Get file size
struct stat st;
if (stat(filename, &st) != 0) {
print_failure("stat", strerror(errno));
results->failed++;
return -1;
}
int page_size = plat_get_pagesize();
size_t file_size = st.st_size;
printf(" File: %s\n", filename);
printf(" File size: %zu bytes\n", file_size);
printf(" Page size: %d bytes\n", page_size);
if (file_size % page_size != 0) {
printf(" Extending read size from %zu to %zu bytes\n", file_size,
ALIGN_UP_POW2(file_size, page_size));
}
// Try to open with specified flags
int flags = O_RDONLY;
if (use_direct) {
flags |= O_DIRECT;
}
*fd = open(filename, flags);
if (*fd < 0) {
if (use_direct) {
print_info("O_DIRECT failed, trying without it");
*fd = open(filename, O_RDONLY);
if (*fd < 0) {
print_failure("open", strerror(errno));
results->failed++;
return -1;
}
print_info("Using buffered I/O (O_DIRECT not available)");
} else {
print_failure("open", strerror(errno));
results->failed++;
return -1;
}
} else {
const char *io_type = use_direct ? "O_DIRECT" : "buffered I/O";
printf(" File opened with %s\n", io_type);
print_success("File opened successfully");
}
results->passed++;
return 0;
}
// Test 4: Build and submit read operation (using registered file)
static int test_submit_read_registered(struct io_uring *ring, int file_index,
struct iovec *iovs, int buffer_id,
uint64_t user_data, size_t file_size,
TestResults *results) {
print_step("Building and submitting read operation (registered file)");
u32 page_size = plat_get_pagesize();
size_t read_size = BUFFER_SIZE;
// For O_DIRECT, ensure read size is sector-aligned
if (read_size > file_size) {
read_size = ALIGN_UP_POW2(file_size, page_size);
printf(" Adjusted read size to %zu bytes for O_DIRECT alignment\n",
read_size);
}
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
print_failure("io_uring_get_sqe", "No available SQE");
results->failed++;
return -1;
}
// Use fixed file descriptor
io_uring_prep_read_fixed(sqe, file_index, iovs[buffer_id].iov_base, read_size,
0, buffer_id);
io_uring_sqe_set_data64(sqe, user_data);
int ret = io_uring_submit(ring);
if (ret < 0) {
print_failure("io_uring_submit", strerror(-ret));
results->failed++;
return -1;
}
printf(" Using registered file index: %d\n", file_index);
print_success("Read operation submitted successfully (registered file)");
results->passed++;
return 0;
}
// Test 4b: Build and submit read operation (using fd directly)
static int test_submit_read(struct io_uring *ring, int fd, struct iovec *iovs,
int buffer_id, uint64_t user_data,
TestResults *results) {
print_step("Building and submitting read operation");
// Get file size for proper alignment
struct stat st;
if (fstat(fd, &st) != 0) {
print_failure("fstat", strerror(errno));
results->failed++;
return -1;
}
u32 page_size = plat_get_pagesize();
size_t file_size = st.st_size;
size_t read_size = BUFFER_SIZE;
// For O_DIRECT, ensure read size is sector-aligned
if (read_size > file_size) {
read_size = ALIGN_UP_POW2(file_size, page_size);
printf(" Adjusted read size to %zu bytes for O_DIRECT alignment\n",
read_size);
}
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
print_failure("io_uring_get_sqe", "No available SQE");
results->failed++;
return -1;
}
// Prepare read operation using registered buffer
io_uring_prep_read_fixed(sqe, fd, iovs[buffer_id].iov_base, read_size, 0,
buffer_id);
io_uring_sqe_set_data64(sqe, user_data);
int ret = io_uring_submit(ring);
if (ret < 0) {
print_failure("io_uring_submit", strerror(-ret));
results->failed++;
return -1;
}
print_success("Read operation submitted successfully");
results->passed++;
return 0;
}
// Test 5: Wait for completion
static int test_wait_completion(struct io_uring *ring,
struct io_uring_cqe **cqe,
TestResults *results) {
print_step("Waiting for completion");
int ret = io_uring_wait_cqe(ring, cqe);
if (ret < 0) {
print_failure("io_uring_wait_cqe", strerror(-ret));
results->failed++;
return -1;
}
print_success("Completion received");
results->passed++;
return 0;
}
// Test 6: Process completion
static int test_process_completion(struct io_uring_cqe *cqe,
uint64_t expected_user_data,
TestResults *results) {
print_step("Processing completion");
uint64_t user_data = io_uring_cqe_get_data64(cqe);
int res = cqe->res;
printf(" Completion data:\n");
printf(" User data: %lu (expected: %lu)\n", user_data, expected_user_data);
printf(" Result: %d bytes read\n", res);
if (user_data != expected_user_data) {
print_failure("User data mismatch",
"User data doesn't match expected value");
results->failed++;
return -1;
}
if (res < 0) {
print_failure("Read operation", strerror(-res));
results->failed++;
return -1;
}
print_success("Completion processed successfully");
results->passed++;
return res; // Return number of bytes read
}
// Test 7: Verify read data
static int test_verify_data(struct iovec *iovs, int buffer_id, int bytes_read,
const char *expected_content,
TestResults *results) {
print_step("Data verification");
char *data = (char *)iovs[buffer_id].iov_base;
printf(" Read data (first 200 chars):\n");
printf(" ---\n");
for (int i = 0; i < bytes_read && i < 200; i++) {
putchar(data[i]);
}
if (bytes_read > 200)
printf("...");
printf("\n ---\n");
// Check if data is not empty
if (bytes_read == 0) {
print_failure("Data verification", "No data read");
results->failed++;
return -1;
}
// Check if data contains expected content
if (expected_content && strstr(data, expected_content) == NULL) {
print_failure("Data verification", "Expected content not found");
results->failed++;
return -1;
}
print_success("Data verified successfully");
results->passed++;
return 0;
}
// Test 8: Test multiple concurrent reads
static int test_concurrent_reads(struct io_uring *ring, int fd,
struct iovec *iovs, TestResults *results) {
print_step("Concurrent reads test");
int num_reads = 3;
int submitted = 0;
// Submit multiple reads
for (int i = 0; i < num_reads; i++) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
print_failure("Getting SQE for concurrent read", "No available SQE");
results->failed++;
return -1;
}
off_t offset = i * 100; // Read from different offsets
io_uring_prep_read_fixed(sqe, fd, iovs[i].iov_base, BUFFER_SIZE, offset, i);
io_uring_sqe_set_data64(sqe, i);
submitted++;
}
int ret = io_uring_submit(ring);
if (ret != submitted) {
char msg[64];
snprintf(msg, sizeof(msg), "Expected %d, got %d", submitted, ret);
print_failure("Submitting concurrent reads", msg);
results->failed++;
return -1;
}
print_success("Concurrent reads submitted");
// Wait for and process completions
for (int i = 0; i < submitted; i++) {
struct io_uring_cqe *cqe;
ret = io_uring_wait_cqe(ring, &cqe);
if (ret < 0) {
print_failure("Waiting for concurrent read completion", strerror(-ret));
results->failed++;
return -1;
}
uint64_t user_data = io_uring_cqe_get_data64(cqe);
int res = cqe->res;
printf(" Concurrent read %lu completed: %d bytes read\n", user_data, res);
io_uring_cqe_seen(ring, cqe);
}
print_success("Concurrent reads completed successfully");
results->passed++;
return 0;
}
// Test 9: Test file registration with multiple files
static int test_file_registration(struct io_uring *ring, TestResults *results) {
print_step("File registration with multiple files");
// Create multiple test files
const char *filenames[] = {"test_file1.txt", "test_file2.txt",
"test_file3.txt"};
const char *contents[] = {"Content of file 1: Hello World!",
"Content of file 2: io_uring is fast!",
"Content of file 3: Registered files test."};
int fds[3];
int num_files = 3;
// Create and open files
for (int i = 0; i < num_files; i++) {
if (create_test_file(filenames[i], contents[i]) != 0) {
results->failed++;
return -1;
}
fds[i] = open(filenames[i], O_RDONLY);
if (fds[i] < 0) {
print_failure("Opening file for registration", strerror(errno));
// Close previously opened files
for (int j = 0; j < i; j++)
close(fds[j]);
results->failed++;
return -1;
}
}
// Register files
int ret = io_uring_register_files(ring, fds, num_files);
if (ret < 0) {
if (ret == -EOPNOTSUPP || ret == -EINVAL) {
print_info("File registration not supported, skipping test");
results->passed++;
} else {
print_failure("io_uring_register_files", strerror(-ret));
results->failed++;
}
// Cleanup
for (int i = 0; i < num_files; i++) {
close(fds[i]);
remove(filenames[i]);
}
return (ret == -EOPNOTSUPP || ret == -EINVAL) ? 0 : -1;
}
print_success("Multiple files registered successfully");
// Read from each registered file using fixed operations
for (int i = 0; i < num_files; i++) {
struct iovec iov;
char buf[256] = {0};
iov.iov_base = buf;
iov.iov_len = sizeof(buf);
// Register a single buffer for this test
ret = io_uring_register_buffers(ring, &iov, 1);
if (ret < 0) {
print_failure("Registering buffer for file test", strerror(-ret));
break;
}
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
print_failure("Getting SQE for registered file", "No available SQE");
break;
}
// Use fixed file and fixed buffer
io_uring_prep_read_fixed(sqe, i, iov.iov_base, strlen(contents[i]), 0, 0);
io_uring_sqe_set_data64(sqe, i);
ret = io_uring_submit(ring);
if (ret < 0) {
print_failure("Submitting read for registered file", strerror(-ret));
break;
}
struct io_uring_cqe *cqe;
ret = io_uring_wait_cqe(ring, &cqe);
if (ret < 0) {
print_failure("Waiting for registered file read", strerror(-ret));
break;
}
if (cqe->res < 0) {
print_failure("Reading registered file", strerror(-cqe->res));
io_uring_cqe_seen(ring, cqe);
break;
}
printf(" File %d: Read %d bytes: %.*s\n", i, cqe->res, cqe->res, buf);
io_uring_cqe_seen(ring, cqe);
// Unregister buffer for next iteration
io_uring_unregister_buffers(ring);
}
// Cleanup files
io_uring_unregister_files(ring);
for (int i = 0; i < num_files; i++) {
close(fds[i]);
remove(filenames[i]);
}
print_success("File registration test completed");
results->passed++;
return 0;
}
// Cleanup function
static void cleanup(struct io_uring *ring, int *fds, int num_fds,
void *buffers) {
if (fds) {
io_uring_unregister_files(ring);
for (int i = 0; i < num_fds; i++) {
if (fds[i] >= 0)
close(fds[i]);
}
}
if (buffers) {
io_uring_unregister_buffers(ring);
free(buffers);
}
io_uring_queue_exit(ring);
remove(TEST_FILE);
}
int main() {
TestResults results = {0, 0};
struct io_uring ring;
int fd = -1;
int registered_fds[1] = {-1}; // For registered file test
void *buffers = NULL;
struct iovec iovs[NUM_BUFFERS];
printf(COLOR_BLUE "\n========================================\n");
printf(" io_uring Test Suite with File Registration\n");
printf("========================================\n" COLOR_RESET);
// Create main test file
const char *test_content =
"Hello, io_uring! This is a test file for async I/O operations.\n"
"Line 2: Testing reads with registered buffers.\n"
"Line 3: The quick brown fox jumps over the lazy dog.\n"
"Line 4: ABCDEFGHIJKLMNOPQRSTUVWXYZ\n"
"Line 5: 0123456789\n";
if (create_test_file(TEST_FILE, test_content) != 0) {
return 1;
}
// Test 1: Create io_uring
if (test_io_uring_create(&ring, &results) != 0) {
cleanup(&ring, NULL, 0, buffers);
return 1;
}
// Test 2: Register buffers
if (test_register_buffers(&ring, &buffers, iovs, &results) != 0) {
cleanup(&ring, NULL, 0, buffers);
return 1;
}
// Test 3: Open file
if (test_open_file(TEST_FILE, &fd, true, &results) != 0) {
cleanup(&ring, NULL, 0, buffers);
return 1;
}
// Test 4: Submit read with direct fd
uint64_t test_user_data = 12345;
if (test_submit_read(&ring, fd, iovs, 0, test_user_data, &results) != 0) {
cleanup(&ring, NULL, 0, buffers);
return 1;
}
// Test 5: Wait for completion
struct io_uring_cqe *cqe;
if (test_wait_completion(&ring, &cqe, &results) != 0) {
cleanup(&ring, NULL, 0, buffers);
return 1;
}
// Test 6: Process completion
int bytes_read = test_process_completion(cqe, test_user_data, &results);
if (bytes_read < 0) {
cleanup(&ring, NULL, 0, buffers);
return 1;
}
io_uring_cqe_seen(&ring, cqe);
// Test 7: Verify data
if (test_verify_data(iovs, 0, bytes_read, "io_uring", &results) != 0) {
cleanup(&ring, NULL, 0, buffers);
return 1;
}
// Close the file for file registration test
close(fd);
// Reopen and register the file
registered_fds[0] = open(TEST_FILE, O_RDONLY);
if (registered_fds[0] < 0) {
print_failure("Reopening file for registration", strerror(errno));
cleanup(&ring, NULL, 0, buffers);
return 1;
}
// Test 2b: Register files
if (test_register_files(&ring, registered_fds, 1, &results) != 0) {
cleanup(&ring, registered_fds, 1, buffers);
return 1;
}
// Get file size for the registered read test
struct stat st;
stat(TEST_FILE, &st);
// Test 4b: Submit read using registered file
test_user_data = 67890;
if (test_submit_read_registered(&ring, 0, iovs, 0, test_user_data, st.st_size,
&results) != 0) {
cleanup(&ring, registered_fds, 1, buffers);
return 1;
}
// Wait for and process completion
if (test_wait_completion(&ring, &cqe, &results) != 0) {
cleanup(&ring, registered_fds, 1, buffers);
return 1;
}
bytes_read = test_process_completion(cqe, test_user_data, &results);
if (bytes_read < 0) {
cleanup(&ring, registered_fds, 1, buffers);
return 1;
}
io_uring_cqe_seen(&ring, cqe);
// Verify data from registered file read
if (test_verify_data(iovs, 0, bytes_read, "io_uring", &results) != 0) {
cleanup(&ring, registered_fds, 1, buffers);
return 1;
}
// Test 8: Concurrent reads
if (test_concurrent_reads(&ring, registered_fds[0], iovs, &results) != 0) {
cleanup(&ring, registered_fds, 1, buffers);
return 1;
}
// Test 9: File registration with multiple files (requires new ring)
cleanup(&ring, registered_fds, 1, buffers);
buffers = NULL;
registered_fds[0] = -1;
if (test_io_uring_create(&ring, &results) != 0) {
return 1;
}
test_file_registration(&ring, &results);
// Cleanup the second ring
io_uring_queue_exit(&ring);
// Print summary
printf(COLOR_BLUE "\n========================================\n");
printf(" TEST SUMMARY\n");
printf("========================================\n" COLOR_RESET);
printf(" Total tests: %d\n", results.passed + results.failed);
printf(COLOR_GREEN " Passed: %d\n" COLOR_RESET, results.passed);
if (results.failed > 0) {
printf(COLOR_RED " Failed: %d\n" COLOR_RESET, results.failed);
} else {
printf(COLOR_GREEN " ✓ ALL TESTS PASSED!\n" COLOR_RESET);
}
return results.failed > 0 ? 1 : 0;
}

View File

@@ -77,14 +77,14 @@ int main(int argc, char **argv) {
// Detect hardware
// -------------------------------
// --- Windows: detect PHYSICAL cores (not logical threads) ---
uint8_t cpu_cores = platform_physical_cores();
uint32_t cpu_cores = platform_physical_cores();
// Logical threads = CPU cores * 2
uint8_t cpu_threads = cpu_cores * 2;
uint32_t cpu_threads = cpu_cores * 2;
#if MULTI_THREADING
uint8_t num_scan_threads = cpu_threads;
uint8_t num_hash_threads = cpu_threads;
uint32_t num_scan_threads = cpu_threads;
uint32_t num_hash_threads = cpu_threads;
printf("%d cores %d threads CPU detected with %s instruction set\n"
"Starting thread pool: %d scanning and %d hashing threads\n",
@@ -123,7 +123,7 @@ int main(int argc, char **argv) {
Thread *hash_threads =
arena_push(&gp_arena, sizeof(Thread) * num_hash_threads, true);
for (uint8_t i = 0; i < num_hash_threads; ++i) {
for (size_t i = 0; i < num_hash_threads; ++i) {
workers[i].arena = arena_create(&params);
workers[i].file_queue = &file_queue;
@@ -135,7 +135,7 @@ int main(int argc, char **argv) {
0)
#endif
{
fprintf(stderr, "Failed to create hash thread %d\n", i);
fprintf(stderr, "Failed to create hash thread %zu\n", i);
exit(1);
}
}
@@ -153,7 +153,7 @@ int main(int argc, char **argv) {
Thread *scan_threads =
arena_push(&gp_arena, sizeof(Thread) * num_scan_threads, true);
for (uint8_t i = 0; i < num_scan_threads; i++) {
for (size_t i = 0; i < num_scan_threads; i++) {
scanners[i].num_threads = num_scan_threads;
scanners[i].path_arena = arena_create(&params);
scanners[i].meta_arena = arena_create(&params);
@@ -162,7 +162,7 @@ int main(int argc, char **argv) {
if (thread_create(&scan_threads[i], (ThreadFunc)scan_worker,
&scanners[i]) != 0) {
fprintf(stderr, "Failed to create scan thread %d\n", i);
fprintf(stderr, "Failed to create scan thread %zu\n", i);
exit(1);
}
}
@@ -178,7 +178,7 @@ int main(int argc, char **argv) {
// Stop scan threads
thread_wait_multiple(scan_threads, num_scan_threads);
for (uint8_t i = 0; i < num_scan_threads; ++i) {
for (size_t i = 0; i < num_scan_threads; ++i) {
thread_close(&scan_threads[i]);
}
@@ -205,7 +205,7 @@ int main(int argc, char **argv) {
// Stop hashing threads
thread_wait_multiple(hash_threads, num_hash_threads);
for (uint8_t i = 0; i < num_hash_threads; ++i) {
for (size_t i = 0; i < num_hash_threads; ++i) {
thread_close(&hash_threads[i]);
}
@@ -222,7 +222,7 @@ int main(int argc, char **argv) {
FILE *f = fopen(FILE_HASHES_TXT, "wb");
for (uint8_t i = 0; i < num_hash_threads; i++) {
for (int i = 0; i < num_hash_threads; i++) {
mem_arena *arena = workers[i].arena;
u8 *arena_base =
(u8 *)arena + ALIGN_UP_POW2(sizeof(mem_arena), arena->align);

View File

@@ -1,6 +1,6 @@
/*
# Compile
gcc -o io_uring_test io_uring_test2.c -luring
gcc -o io_uring_test io_uring_test.c -luring
# Run
./io_uring_test

View File

@@ -23,6 +23,9 @@ static void cpu_pause(void) {
_mm_pause();
#endif
}
typedef struct plat_sem plat_sem;
typedef struct CACHE_ALIGN {
atomic_size_t seq;
void *data;
@@ -42,6 +45,8 @@ typedef struct {
size_t commit_step;
atomic_flag commit_lock;
plat_sem items_sem;
MPMCSlot *slots;
} MPMCQueue;
@@ -89,6 +94,8 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
atomic_init(&q->work_count, 0);
plat_sem_init(&q->items_sem, 0);
}
/* ----------------------------------------------------------- */
@@ -134,6 +141,7 @@ static void mpmc_commit_more(MPMCQueue *q) {
/* ----------------------------------------------------------- */
/* PUSH */
/* ----------------------------------------------------------- */
// Does not increment work
static void mpmc_push(MPMCQueue *q, void *item) {
MPMCSlot *slot;
size_t pos;
@@ -176,6 +184,8 @@ static void mpmc_push(MPMCQueue *q, void *item) {
slot->data = item;
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
plat_sem_post(&q->items_sem, 1);
}
// Increment work
@@ -223,11 +233,15 @@ static void mpmc_push_work(MPMCQueue *q, void *item) {
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
atomic_fetch_add(&q->work_count, 1);
plat_sem_post(&q->items_sem, 1);
}
/* ----------------------------------------------------------- */
/* POP */
/* ----------------------------------------------------------- */
static void *mpmc_pop(MPMCQueue *q) {
plat_sem_wait(&q->items_sem);
MPMCSlot *slot;
size_t pos;
@@ -248,14 +262,9 @@ static void *mpmc_pop(MPMCQueue *q) {
memory_order_relaxed))
break;
} else if (diff < 0) { // queue is empty
Sleep(500);
} else { // slot is still transitioning (written by another thread)
if (++spins > 10) {
SwitchToThread(); // yield CPU
sleep_ms(0); // yield CPU
spins = 0;
} else {
cpu_pause();
@@ -297,19 +306,21 @@ static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
/* ----------------------------------------------------------- */
/* MPMC Cleanup */
/* ----------------------------------------------------------- */
// static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused function
// if (!q)
// return;
//
// if (q->slots) {
// plat_mem_release(q->slots, 0);
// q->slots = NULL;
// }
//
// q->capacity = 0;
// q->mask = 0;
//
// atomic_store_explicit(&q->head, 0, memory_order_relaxed);
// atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
// atomic_store_explicit(&q->committed, 0, memory_order_relaxed);
// }
static void mpmc_finish(MPMCQueue *q) {
if (!q)
return;
if (q->slots) {
plat_mem_release(q->slots, 0);
q->slots = NULL;
}
plat_sem_destroy(&q->items_sem);
q->capacity = 0;
q->mask = 0;
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
atomic_store_explicit(&q->committed, 0, memory_order_relaxed);
}

246
mt_mpmc.h
View File

@@ -1,246 +0,0 @@
#pragma once
#include "base.h"
// Cache align abstraction
#define CACHELINE 64
#if defined(_MSC_VER)
#define CACHE_ALIGN __declspec(align(CACHELINE))
#else
#define CACHE_ALIGN __attribute__((aligned(CACHELINE)))
#endif
// Mutex/Critical section abstraction
#if defined(_WIN32)
#include <windows.h>
typedef CRITICAL_SECTION mtx_t;
typedef CONDITION_VARIABLE cond_t;
#define mtx_init(m) InitializeCriticalSection(m)
#define mtx_lock(m) EnterCriticalSection(m)
#define mtx_unlock(m) LeaveCriticalSection(m)
#define mtx_destroy(m) DeleteCriticalSection(m)
#define cond_init(c) InitializeConditionVariable(c)
#define cond_wait(c, m) SleepConditionVariableCS(c, m, INFINITE)
#define cond_signal(c) WakeConditionVariable(c)
#define cond_broadcast(c) WakeAllConditionVariable(c)
#else
#include <pthread.h>
typedef pthread_mutex_t mtx_t;
typedef pthread_cond_t cond_t;
#define mtx_init(m) pthread_mutex_init(m, NULL)
#define mtx_lock(m) pthread_mutex_lock(m)
#define mtx_unlock(m) pthread_mutex_unlock(m)
#define mtx_destroy(m) pthread_mutex_destroy(m)
#define cond_init(c) pthread_cond_init(c, NULL)
#define cond_wait(c, m) pthread_cond_wait(c, m)
#define cond_signal(c) pthread_cond_signal(c)
#define cond_broadcast(c) pthread_cond_broadcast(c)
#endif
typedef struct CACHE_ALIGN {
void *data;
char pad[64 - sizeof(void *)];
} MPMCSlot;
typedef struct {
CACHE_ALIGN size_t head;
CACHE_ALIGN size_t tail;
CACHE_ALIGN size_t count;
CACHE_ALIGN size_t work_count;
size_t capacity;
size_t mask;
size_t committed;
size_t commit_step;
mtx_t lock;
cond_t not_empty;
cond_t not_full;
MPMCSlot *slots;
} MPMCQueue;
/* ----------------------------------------------------------- */
/* INIT */
/* ----------------------------------------------------------- */
static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
q->capacity = max_capacity;
q->mask = max_capacity - 1;
size_t pagesize = plat_get_pagesize();
size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize);
q->slots = (MPMCSlot *)plat_mem_reserve(bytes);
if (!q->slots) {
fprintf(stderr, "plat_mem_reserve failed\n");
exit(1);
}
size_t commit_bytes = ALIGN_UP_POW2(pagesize, pagesize);
q->commit_step = commit_bytes / sizeof(MPMCSlot);
q->committed = q->commit_step;
plat_mem_commit(q->slots, commit_bytes);
for (size_t i = 0; i < q->committed; i++) {
q->slots[i].data = NULL;
}
q->head = 0;
q->tail = 0;
q->count = 0;
q->work_count = 0;
mtx_init(&q->lock);
cond_init(&q->not_empty);
cond_init(&q->not_full);
}
/* ----------------------------------------------------------- */
/* COMMIT MORE MEMORY */
/* ----------------------------------------------------------- */
static void mpmc_commit_more(MPMCQueue *q) {
size_t start = q->committed;
if (start >= q->capacity)
return;
size_t new_commit = start + q->commit_step;
if (new_commit > q->capacity)
new_commit = q->capacity;
size_t count = new_commit - start;
plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot));
for (size_t i = start; i < new_commit; i++) {
q->slots[i].data = NULL;
}
q->committed = new_commit;
}
/* ----------------------------------------------------------- */
/* PUSH */
/* ----------------------------------------------------------- */
// Does not increment work
static void mpmc_push(MPMCQueue *q, void *item) {
mtx_lock(&q->lock);
while (q->count == q->capacity) {
cond_wait(&q->not_full, &q->lock);
}
// Ensure committed
if (q->tail >= q->committed) {
mpmc_commit_more(q);
}
size_t pos = q->tail & q->mask;
q->slots[pos].data = item;
q->tail++;
q->count++;
cond_signal(&q->not_empty);
mtx_unlock(&q->lock);
}
// Increment work
static void mpmc_push_work(MPMCQueue *q, void *item) {
mtx_lock(&q->lock);
while (q->count == q->capacity) {
cond_wait(&q->not_full, &q->lock);
}
if (q->tail >= q->committed) {
mpmc_commit_more(q);
}
size_t pos = q->tail & q->mask;
q->slots[pos].data = item;
q->tail++;
q->count++;
q->work_count++;
cond_signal(&q->not_empty);
mtx_unlock(&q->lock);
}
/* ----------------------------------------------------------- */
/* POP */
/* ----------------------------------------------------------- */
static void *mpmc_pop(MPMCQueue *q) {
mtx_lock(&q->lock);
while (q->count == 0) {
cond_wait(&q->not_empty, &q->lock);
}
size_t pos = q->head & q->mask;
void *data = q->slots[pos].data;
q->head++;
q->count--;
cond_signal(&q->not_full);
mtx_unlock(&q->lock);
return data;
}
/* ----------------------------------------------------------- */
/* PUSH POISON */
/* ----------------------------------------------------------- */
static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
for (u8 i = 0; i < consumer_count; i++) {
mpmc_push(q, NULL);
}
}
/* ----------------------------------------------------------- */
/* Done */
/* ----------------------------------------------------------- */
static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
mtx_lock(&q->lock);
if (--q->work_count == 0) {
mpmc_producers_finished(q, consumer_count);
}
mtx_unlock(&q->lock);
}
/* ----------------------------------------------------------- */
/* MPMC Cleanup */
/* ----------------------------------------------------------- */
// static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused function
// if (!q) return;
//
// if (q->slots) {
// plat_mem_release(q->slots, 0);
// q->slots = NULL;
// }
//
// mtx_destroy(&q->lock);
//
// #if !defined(_WIN32)
// pthread_cond_destroy(&q->not_empty);
// pthread_cond_destroy(&q->not_full);
// #endif
//
// q->capacity = 0;
// q->mask = 0;
// }

View File

@@ -1,11 +1,10 @@
#pragma once
#pragma once // ensure that a given header file is included only once in a
// single compilation unit
#include "arena.h"
#include "base.h"
#include "sm_mpmc.h"
#include "lf_mpmc.h"
#include "arena.c"
#include <stdint.h>
#include <stdio.h>
// xxhash include
@@ -248,6 +247,13 @@ typedef struct {
void *arg;
} ThreadWrapper;
static void *thread_start_routine(void *arg) {
ThreadWrapper *wrapper = (ThreadWrapper *)arg;
void *result = wrapper->func(wrapper->arg);
free(wrapper);
return result;
}
// Thread creation function
static int thread_create(Thread *thread, ThreadFunc func, void *arg) {
int ret = pthread_create(&thread->handle, NULL, func, arg);
@@ -1150,6 +1156,12 @@ typedef struct {
uint32_t MaxVersion;
} IoRingCapabilities;
typedef struct {
BUILD_READ_RETURN_VALUE ResultCode;
uint32_t Information;
uintptr_t UserData;
} IoRingCQE;
// ------------------------ IO Ring Abstraction -------------------------
#if defined(_WIN32) || defined(_WIN64)
@@ -1187,20 +1199,13 @@ static int close_ioring(ThreadIoContext *thread_ctx) {
#define MAKE_BUF_INFO(a, l) \
(IORING_BUFFER_INFO) { .Address = (a), .Length = (uint32_t)(l) }
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) {
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count,
uint32_t timeout_ms, uint32_t *submitted) {
HRESULT hr = SubmitIoRing(thread_ctx->ring, 0, timeout_ms, submitted);
// HRESULT hr = SubmitIoRing(ring, wait_count, timeout_ms, submitted);
// uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
// The wait_count in windows is not implemented yet, so we wait in
// ioring_pop_completion()
HRESULT hr;
// if (wait_count > 0) {
// hr = SubmitIoRing(ring, wait_count, SUBMIT_TIMEOUT_MS, submitted);
// } else {
hr = SubmitIoRing(thread_ctx->ring, 0, SUBMIT_TIMEOUT_MS, submitted);
// }
// The wait_count in windows is not implemented yet, so we wait with a
// completion event for a single completion
if (thread_ctx->num_submissions > 0) {
WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS);
}
@@ -1209,6 +1214,7 @@ static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) {
}
static void ioring_register_buffers(ThreadIoContext *thread_ctx,
uint32_t num_buffers,
IORING_BUFFER_INFO *buf_info) {
HRESULT hr = BuildIoRingRegisterBuffers(
@@ -1224,7 +1230,7 @@ static void ioring_register_buffers(ThreadIoContext *thread_ctx,
error_msg, (unsigned int)hr);
}
// Submit registration
ioring_submit(thread_ctx, NULL);
ioring_submit(thread_ctx, 0, 0, NULL);
}
#if USE_REGISTERED_FILES
@@ -1295,68 +1301,52 @@ static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx,
return hr;
}
static void ioring_process_completions(ThreadIoContext *restrict thread_ctx) {
uint32_t cqe_count = 0;
uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) {
IORING_CQE win_cqe;
while (cqe_count < wait_count) {
while (1) {
HRESULT hr = PopIoRingCompletion(ring, &win_cqe);
// ---- Drain all available CQEs (non-blocking) ----
while (1) {
IORING_CQE win_cqe;
if (hr == S_FALSE)
// No CQE available
return 0;
HRESULT hr = PopIoRingCompletion(thread_ctx->ring, &win_cqe);
if (FAILED(hr))
return -1;
if (hr != S_OK) {
// No more CQEs available right now
break;
// Unlike linux, The Windows implementation treats buffer and file
// registration as an asynchronous operation that we submit to the ring,
// similar to a read or write. Those operations produce CQEs (completion
// queue entries) that we filter here using
// cqe.UserData == USERDATA_REGISTER
if (win_cqe.UserData == USERDATA_REGISTER)
continue;
cqe->ResultCode = win_cqe.ResultCode;
cqe->Information = win_cqe.Information;
cqe->UserData = win_cqe.UserData;
// Check for error and print warning
if (FAILED(win_cqe.ResultCode)) {
char error_msg[256];
FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, win_cqe.ResultCode, 0, error_msg, sizeof(error_msg),
NULL);
// Try to get the file path from the buffer
IoBuffer *buf = (IoBuffer *)win_cqe.UserData;
const char *file_path = "unknown";
if (buf && buf->file && buf->file->fe) {
file_path = buf->file->fe->path;
}
if (FAILED(hr)) {
fprintf(stderr, "WARNING: PopIoRingCompletion failed (0x%lx)\n", hr);
return;
}
// Skip internal registration completions
if (win_cqe.UserData == USERDATA_REGISTER) {
continue;
}
IoBuffer *restrict buf = (IoBuffer *)win_cqe.UserData;
FileReadContext *restrict file = buf->file;
if (SUCCEEDED(win_cqe.ResultCode)) {
buf->result = 0;
buf->bytes_read = win_cqe.Information;
} else {
buf->result = win_cqe.ResultCode;
buf->bytes_read = 0;
char error_msg[256];
FormatMessageA(
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
win_cqe.ResultCode, 0, error_msg, sizeof(error_msg), NULL);
fprintf(stderr,
"WARNING: I/O completion error for file '%s' - Error: %s "
"(Code: 0x%lx)\n",
buf->file->fe->path, error_msg, win_cqe.ResultCode);
}
file->active_reads--;
file->reads_completed++;
thread_ctx->num_submissions--;
cqe_count++;
fprintf(stderr,
"WARNING: I/O completion error for file '%s' - Error: %s (Code: "
"0x%lx)\n",
file_path, error_msg, win_cqe.ResultCode);
}
// ---- If we already waited enough, exit ----
if (cqe_count >= wait_count) {
break;
}
// ---- Otherwise wait for more completions ----
WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS);
return 1;
}
}
@@ -1439,21 +1429,21 @@ static int close_ioring(ThreadIoContext *thread_ctx) {
(IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) }
static void ioring_register_buffers(ThreadIoContext *thread_ctx,
uint32_t num_buffers,
IORING_BUFFER_INFO *buf_info) {
int ret = io_uring_register_buffers(&((IoUring *)thread_ctx->ring)->ring,
buf_info, NUM_BUFFERS_PER_THREAD);
buf_info, num_buffers);
if (ret < 0) {
if (ret == -ENOMEM) {
struct rlimit limit;
getrlimit(RLIMIT_MEMLOCK, &limit);
fprintf(
stderr,
"WARNING: Buffer registration failed due to Memlock limit, Error: "
"Cannot allocate memory (code: -12, ENOMEM).\n"
"See README for more informations.\n");
fprintf(stderr,
"WARNING: Buffer registration failed due to memlock limits "
"(ENOMEM).\n"
"See README for more informations.\n");
} else {
// For any other error (e.g., EFAULT, EBUSY, EINVAL)
@@ -1549,11 +1539,10 @@ static int ioring_build_read(ThreadIoContext *thread_ctx,
return 0;
}
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) {
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t wait_count,
uint32_t timeout_ms, uint32_t *submitted) {
int ret;
uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
if (wait_count > 0) {
ret = io_uring_submit_and_wait(&((IoUring *)thread_ctx->ring)->ring,
wait_count);
@@ -1573,44 +1562,58 @@ static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) {
return 0;
}
static void ioring_process_completions(ThreadIoContext *thread_ctx) {
struct io_uring_cqe *cqes[NUM_BUFFERS_PER_THREAD];
static int ioring_pop_completion(IoRingHandle ring, IoRingCQE *cqe) {
unsigned cqe_count = io_uring_peek_batch_cqe(&((IoUring *)thread_ctx->ring)->ring,
cqes, NUM_BUFFERS_PER_THREAD);
struct io_uring_cqe *cqe_ptr = NULL;
if (cqe_count == 0) {
return;
int ret = io_uring_peek_cqe(&((IoUring *)ring)->ring, &cqe_ptr);
if (ret == -EAGAIN) {
// No CQE available
return 0;
}
for (unsigned i = 0; i < cqe_count; i++) {
struct io_uring_cqe *cqe = cqes[i];
if (ret < 0) {
// Error
fprintf(stderr, "WARNING: io_uring_peek_cqe error - Error: %s (Code: %d)\n",
strerror(-ret), ret);
return -1;
}
int res = cqe->res;
if (!cqe_ptr) {
return 0;
}
IoBuffer *restrict buf = (IoBuffer *)cqe->user_data;
FileReadContext *restrict file = buf->file;
int res = cqe_ptr->res;
if (res >= 0) {
buf->result = 0;
buf->bytes_read = (uint32_t)res;
} else {
buf->result = res;
buf->bytes_read = 0;
if (res >= 0) {
cqe->ResultCode = 0;
cqe->Information = (uint32_t)res;
} else {
cqe->ResultCode = res;
cqe->Information = 0;
}
fprintf(stderr,
"WARNING: I/O completion error for file '%s' - Error: %s (Code: "
"%d)\n",
buf->file->fe->path, strerror(-res), res);
cqe->UserData = (uintptr_t)cqe_ptr->user_data;
io_uring_cqe_seen(&((IoUring *)ring)->ring, cqe_ptr);
// Check for error and print warning
if (res < 0) {
// Try to get the file path from the buffer
IoBuffer *buf = (IoBuffer *)cqe->UserData;
const char *file_path = "unknown";
if (buf && buf->file && buf->file->fe) {
file_path = buf->file->fe->path;
}
file->active_reads--;
file->reads_completed++;
thread_ctx->num_submissions--;
fprintf(
stderr,
"WARNING: I/O completion error for file '%s' - Error: %s (Code: %d)\n",
file_path, strerror(-res), ret);
}
// Mark CQE as seen, equivalent to io_uring_cqe_seen() but marks multiple CQEs
io_uring_cq_advance(&((IoUring *)thread_ctx->ring)->ring, cqe_count);
return 1;
}
FileHandle ioring_open_file(FileEntry *fe) {
@@ -1641,11 +1644,11 @@ typedef struct FileQueue {
int count;
} FileQueue;
static FileReadContext *fq_push(FileQueue *restrict fq) {
static FileReadContext *fq_push(FileQueue *fq) {
if (fq->count == MAX_ACTIVE_FILES)
return NULL;
FileReadContext *restrict file = &fq->files[fq->tail];
FileReadContext *file = &fq->files[fq->tail];
#if USE_REGISTERED_FILES
file->slot_id = fq->tail;
#endif
@@ -1671,11 +1674,11 @@ static FileReadContext *fq_peek_at(FileQueue *fq, int index) {
return &fq->files[idx];
}
static void fq_trim(FileQueue *restrict fq) {
static void fq_trim(FileQueue *fq) {
while (fq->count > 0) {
FileReadContext *restrict file = &fq->files[fq->head];
FileReadContext *f = &fq->files[fq->head];
if (!file->completed)
if (!f->completed)
break;
fq->head = (fq->head + 1) % MAX_ACTIVE_FILES;
@@ -1685,7 +1688,7 @@ static void fq_trim(FileQueue *restrict fq) {
// ----------------- Initialize thread context -----------------------
static ThreadIoContext *ioring_init_thread(void) {
ThreadIoContext *restrict thread_ctx =
ThreadIoContext *thread_ctx =
(ThreadIoContext *)calloc(1, sizeof(ThreadIoContext));
if (!thread_ctx)
return NULL;
@@ -1742,7 +1745,7 @@ static ThreadIoContext *ioring_init_thread(void) {
thread_ctx->free_count = NUM_BUFFERS_PER_THREAD;
// Register buffers
ioring_register_buffers(thread_ctx, buf_info);
ioring_register_buffers(thread_ctx, NUM_BUFFERS_PER_THREAD, buf_info);
#if USE_REGISTERED_FILES
ioring_register_files(thread_ctx);
@@ -1773,14 +1776,14 @@ static void ioring_cleanup_thread(ThreadIoContext *thread_ctx) {
}
// -------------------------- Buffer get and return ------------------------
static IoBuffer *get_free_buffer(ThreadIoContext *restrict thread_ctx) {
static IoBuffer *get_free_buffer(ThreadIoContext *ctx) {
if (thread_ctx->free_count == 0) {
if (ctx->free_count == 0) {
return NULL;
}
int idx = thread_ctx->buffer_pool[--thread_ctx->free_count];
IoBuffer *restrict buf = &thread_ctx->buffers[idx];
int idx = ctx->buffer_pool[--ctx->free_count];
IoBuffer *buf = &ctx->buffers[idx];
buf->bytes_read = 0;
buf->result = IO_PENDING;
buf->next = NULL;
@@ -1788,16 +1791,34 @@ static IoBuffer *get_free_buffer(ThreadIoContext *restrict thread_ctx) {
return buf;
}
static void return_buffer(ThreadIoContext *restrict thread_ctx, IoBuffer *restrict buf) {
static void return_buffer(ThreadIoContext *ctx, IoBuffer *buf) {
if (!buf)
return;
thread_ctx->buffer_pool[thread_ctx->free_count++] = buf->buffer_id;
ctx->buffer_pool[ctx->free_count++] = buf->buffer_id;
}
// -------------------------- Process completions ---------------------------
static void process_completions(ThreadIoContext *thread_ctx, FileQueue *fq) {
IoRingCQE cqe;
while (ioring_pop_completion(thread_ctx->ring, &cqe) == 1) {
IoBuffer *buf = (IoBuffer *)cqe.UserData;
FileReadContext *file = buf->file;
buf->result = cqe.ResultCode;
buf->bytes_read = cqe.Information;
file->active_reads--;
file->reads_completed++;
thread_ctx->num_submissions--;
}
}
// -------------------- File operations -----------------------
static int init_file(ThreadIoContext *restrict thread_ctx, FileReadContext *restrict file,
FileEntry *restrict fe) {
static int init_file(ThreadIoContext *thread_ctx, FileReadContext *file,
FileEntry *fe) {
#if USE_REGISTERED_FILES
uint32_t saved_slot_id = file->slot_id;
@@ -1834,10 +1855,10 @@ static int init_file(ThreadIoContext *restrict thread_ctx, FileReadContext *rest
return 1;
}
static void finalize_file(ThreadIoContext *restrict thread_ctx,
WorkerContext *worker_ctx, FileReadContext *restrict file) {
static void finalize_file(ThreadIoContext *thread_ctx,
WorkerContext *worker_ctx, FileReadContext *file) {
FileEntry *restrict fe = file->fe;
FileEntry *fe = file->fe;
os_file_close(file->file_handle);
@@ -1888,26 +1909,26 @@ static void finalize_file(ThreadIoContext *restrict thread_ctx,
size_kib);
#endif
char *restrict dst = arena_push(&worker_ctx->arena, len, false);
char *dst = arena_push(&worker_ctx->arena, len, false);
memcpy(dst, stack_buf, len);
atomic_fetch_add(&g_files_hashed, 1);
}
// -------------------- Hash files -----------------------
static void hash_ready_files(ThreadIoContext *restrict thread_ctx, FileQueue *restrict fq,
static void hash_ready_files(ThreadIoContext *thread_ctx, FileQueue *fq,
WorkerContext *worker_ctx) {
for (int i = 0; i < fq->count; i++) {
FileReadContext *restrict file = fq_peek_at(fq, i);
FileReadContext *file = fq_peek_at(fq, i);
if (!file || file->completed)
continue;
// ---- HASH READY BUFFERS IN ORDER ----
while (file->head) {
IoBuffer *restrict buf = file->head;
IoBuffer *buf = file->head;
// CQE not received yet
if (buf->result == IO_PENDING)
@@ -1962,12 +1983,12 @@ static void hash_ready_files(ThreadIoContext *restrict thread_ctx, FileQueue *re
}
// ------------------ Build pending reads ----------------------
static void build_pending_reads(ThreadIoContext *restrict thread_ctx, FileQueue *restrict fq,
static void build_pending_reads(ThreadIoContext *thread_ctx, FileQueue *fq,
WorkerContext *worker_ctx) {
MPMCQueue *file_queue = worker_ctx->file_queue;
FileReadContext *restrict file = fq_peek_tail(fq);
FileReadContext *file = fq_peek_tail(fq);
for (;;) {
@@ -1975,7 +1996,7 @@ static void build_pending_reads(ThreadIoContext *restrict thread_ctx, FileQueue
if (file) {
while (file->next_read_offset < file->file_size) {
IoBuffer *restrict buf = get_free_buffer(thread_ctx);
IoBuffer *buf = get_free_buffer(thread_ctx);
if (!buf)
return;
@@ -2062,7 +2083,8 @@ static THREAD_RETURN hash_worker_ioring(void *arg) {
FileQueue fq;
memset(&fq, 0, sizeof(fq));
uint32_t submitted;
uint32_t submitted = 0;
uint32_t wait_count;
// Main pipeline loop
for (;;) {
@@ -2070,14 +2092,13 @@ static THREAD_RETURN hash_worker_ioring(void *arg) {
// Submit new reads
build_pending_reads(thread_ctx, &fq, worker_ctx);
wait_count = MIN(thread_ctx->num_submissions, NUM_BUFFERS_PER_THREAD - 6);
submitted = 0;
ioring_submit(thread_ctx, &submitted);
ioring_submit(thread_ctx, wait_count, 0, &submitted);
// Process completions
ioring_process_completions(thread_ctx);
// Hash files
hash_ready_files(thread_ctx, &fq, worker_ctx);
process_completions(thread_ctx, &fq);
#if IORING_DEBUG_STATS
printf(
@@ -2086,6 +2107,9 @@ static THREAD_RETURN hash_worker_ioring(void *arg) {
thread_ctx->active_files, fq.count);
#endif
// Hash files
hash_ready_files(thread_ctx, &fq, worker_ctx);
// Exit condition
if (!thread_ctx->submitting && thread_ctx->active_files == 0 &&
thread_ctx->num_submissions == 0) {

388
sm_mpmc.h
View File

@@ -1,388 +0,0 @@
#pragma once
#include "base.h"
// Cache align abstraction
#define CACHELINE 64
#if defined(_MSC_VER)
#define CACHE_ALIGN __declspec(align(CACHELINE))
#else
#define CACHE_ALIGN __attribute__((aligned(CACHELINE)))
#endif
// Compiler hints
#if defined(__GNUC__) || defined(__clang__)
#define likely(x) __builtin_expect((x), 1)
#define unlikely(x) __builtin_expect((x), 0)
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif
static void cpu_pause(void) {
#if defined(_MSC_VER) || defined(__x86_64__) || defined(__i386__)
_mm_pause();
#endif
}
// Semaphores
#if defined(_WIN32) || defined(_WIN64)
typedef struct plat_sem {
HANDLE handle;
} plat_sem;
static b32 plat_sem_init(plat_sem *s, u32 initial) {
s->handle = CreateSemaphore(NULL, initial, LONG_MAX, NULL);
return s->handle != NULL;
}
static void plat_sem_wait(plat_sem *s) {
WaitForSingleObject(s->handle, INFINITE);
}
// static b32 plat_sem_trywait(HANDLE sem) { // Comment to prevent warning: unused function
// DWORD r = WaitForSingleObject(sem, 0);
// return r == WAIT_OBJECT_0;
// }
static void plat_sem_post(plat_sem *s, u32 count) {
ReleaseSemaphore(s->handle, count, NULL);
}
// static void plat_sem_destroy(plat_sem *s) { // Comment to prevent warning: unused function
// if (s->handle) {
// CloseHandle(s->handle);
// s->handle = NULL;
// }
// }
#elif defined(__linux__)
#include <semaphore.h>
typedef struct plat_sem {
sem_t sem;
} plat_sem;
static b32 plat_sem_init(plat_sem *s, u32 initial) {
return sem_init(&s->sem, 0, initial) == 0;
}
static void plat_sem_wait(plat_sem *s) {
while (sem_wait(&s->sem) == -1 && errno == EINTR) {
}
}
// static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; } // Comment to prevent warning: unused function
static void plat_sem_post(plat_sem *s, u32 count) {
for (u32 i = 0; i < count; i++) {
sem_post(&s->sem);
}
}
// static void plat_sem_destroy(plat_sem *s) { sem_destroy(&s->sem); } // Comment to prevent warning: unused function
#endif
typedef struct plat_sem plat_sem;
typedef struct CACHE_ALIGN {
atomic_size_t seq;
void *data;
char pad[64 - sizeof(atomic_size_t) - sizeof(void *)];
} MPMCSlot;
typedef struct {
CACHE_ALIGN atomic_size_t head;
CACHE_ALIGN atomic_size_t tail;
CACHE_ALIGN atomic_size_t work_count;
size_t capacity;
size_t mask;
atomic_size_t committed;
size_t commit_step;
atomic_flag commit_lock;
plat_sem items_sem;
MPMCSlot *slots;
} MPMCQueue;
// --------------- functions ----------------
// static: each translation unit gets its own private copy this will solve the
// error: Function defined in a header file; function definitions in header
// files can lead to ODR violations (multiple definition errors if included in
// more than one file)
/* ----------------------------------------------------------- */
/* INIT */
/* ----------------------------------------------------------- */
static void mpmc_init(MPMCQueue *q, size_t max_capacity) {
q->capacity = max_capacity;
q->mask = max_capacity - 1;
u32 pagesize = plat_get_pagesize();
size_t bytes = ALIGN_UP_POW2(sizeof(MPMCSlot) * max_capacity, pagesize);
q->slots = (MPMCSlot *)plat_mem_reserve(bytes);
if (!q->slots) {
fprintf(stderr, "VirtualAlloc reserve failed\n");
exit(1);
}
u64 commit_bytes = pagesize;
commit_bytes = ALIGN_UP_POW2(commit_bytes, pagesize);
q->commit_step = commit_bytes / sizeof(MPMCSlot);
atomic_flag_clear(&q->commit_lock);
q->committed = q->commit_step;
plat_mem_commit(q->slots, commit_bytes);
for (size_t i = 0; i < q->committed; i++) {
atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL;
}
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
atomic_init(&q->work_count, 0);
plat_sem_init(&q->items_sem, 0);
}
/* ----------------------------------------------------------- */
/* COMMIT MORE MEMORY */
/* ----------------------------------------------------------- */
static void mpmc_commit_more(MPMCQueue *q) {
if (atomic_flag_test_and_set(&q->commit_lock))
return;
size_t start = atomic_load_explicit(&q->committed, memory_order_acquire);
size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
// another thread already committed enough
if (tail < start) {
atomic_flag_clear(&q->commit_lock);
return;
}
if (start >= q->capacity) {
atomic_flag_clear(&q->commit_lock);
return;
}
size_t new_commit = start + q->commit_step;
if (new_commit > q->capacity)
new_commit = q->capacity;
size_t count = new_commit - start;
plat_mem_commit(&q->slots[start], count * sizeof(MPMCSlot));
for (size_t i = start; i < new_commit; i++) {
atomic_init(&q->slots[i].seq, i);
q->slots[i].data = NULL;
}
atomic_store_explicit(&q->committed, new_commit, memory_order_release);
atomic_flag_clear(&q->commit_lock);
}
/* ----------------------------------------------------------- */
/* PUSH */
/* ----------------------------------------------------------- */
// Does not increment work
static void mpmc_push(MPMCQueue *q, void *item) {
MPMCSlot *slot;
size_t pos;
for (;;) {
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
// ensure the slot is committed BEFORE accessing it
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (unlikely(pos >= committed)) {
mpmc_commit_more(q);
continue;
}
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (likely(diff == 0)) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) { // queue actually full
sleep_ms(1000);
} else { // waiting to grow
sleep_ms(0);
}
}
slot->data = item;
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
plat_sem_post(&q->items_sem, 1);
}
// Increment work
static void mpmc_push_work(MPMCQueue *q, void *item) {
MPMCSlot *slot;
size_t pos;
for (;;) {
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
// ensure the slot is committed BEFORE accessing it
size_t committed =
atomic_load_explicit(&q->committed, memory_order_relaxed);
if (unlikely(pos >= committed)) {
mpmc_commit_more(q);
continue;
}
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)pos;
if (likely(diff == 0)) {
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else if (diff < 0) { // queue actually full
sleep_ms(1000);
} else { // waiting to grow
sleep_ms(0);
}
}
slot->data = item;
atomic_store_explicit(&slot->seq, pos + 1, memory_order_release);
atomic_fetch_add(&q->work_count, 1);
plat_sem_post(&q->items_sem, 1);
}
/* ----------------------------------------------------------- */
/* POP */
/* ----------------------------------------------------------- */
static void *mpmc_pop(MPMCQueue *q) {
plat_sem_wait(&q->items_sem);
MPMCSlot *slot;
size_t pos;
int spins = 0;
for (;;) {
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
slot = &q->slots[pos & q->mask];
size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
if (likely(diff == 0)) {
if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1,
memory_order_relaxed,
memory_order_relaxed))
break;
} else { // slot is still transitioning (written by another thread)
if (++spins > 10) {
sleep_ms(0); // yield CPU
spins = 0;
} else {
cpu_pause();
}
}
}
void *data = slot->data;
atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release);
return data;
}
/* ----------------------------------------------------------- */
/* PUSH POISON */
/* ----------------------------------------------------------- */
/*note:
After producers finishes, push N poison pills where N = number of consumer
threads, this is necessary to stop the consumers.
*/
static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) {
for (u8 i = 0; i < consumer_count; i++) {
mpmc_push(q, NULL);
}
}
/* ----------------------------------------------------------- */
/* Done */
/* ----------------------------------------------------------- */
static void mpmc_task_done(MPMCQueue *q, u8 consumer_count) {
size_t prev = atomic_fetch_sub(&q->work_count, 1);
if (prev == 1) {
mpmc_producers_finished(q, consumer_count);
}
}
/* ----------------------------------------------------------- */
/* MPMC Cleanup */
/* ----------------------------------------------------------- */
// static void mpmc_finish(MPMCQueue *q) { // Comment to prevent warning: unused function
// if (!q)
// return;
//
// if (q->slots) {
// plat_mem_release(q->slots, 0);
// q->slots = NULL;
// }
//
// plat_sem_destroy(&q->items_sem);
//
// q->capacity = 0;
// q->mask = 0;
//
// atomic_store_explicit(&q->head, 0, memory_order_relaxed);
// atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
// atomic_store_explicit(&q->committed, 0, memory_order_relaxed);
// }