diff --git a/.gitignore b/.gitignore index fd00ec0..4ae4fd8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ file_hasher.rdi file_hasher.exe file_hashes.txt file_list.txt +temp.c diff --git a/arena.c b/arena.c index 8f552a2..8c03d99 100644 --- a/arena.c +++ b/arena.c @@ -1,6 +1,6 @@ -#include "arena.h" -#include "base.h" +#pragma once +#include "arena.h" /* ============================================================ Helper functions ============================================================ */ @@ -820,70 +820,3 @@ mem_arena_temp arena_scratch_get(mem_arena **conflicts, u32 num_conflicts) { } void arena_scratch_release(mem_arena_temp scratch) { arena_temp_end(scratch); } - -#if defined(_WIN32) || defined(_WIN64) - -#include - -u32 plat_get_pagesize(void) { - SYSTEM_INFO sysinfo = {0}; - GetSystemInfo(&sysinfo); - - return sysinfo.dwPageSize; -} - -void *plat_mem_reserve(u64 size) { - return VirtualAlloc(NULL, size, MEM_RESERVE, PAGE_READWRITE); -} - -b32 plat_mem_commit(void *ptr, u64 size) { - void *ret = VirtualAlloc(ptr, size, MEM_COMMIT, PAGE_READWRITE); - return ret != NULL; -} - -b32 plat_mem_decommit(void *ptr, u64 size) { - return VirtualFree(ptr, size, MEM_DECOMMIT); -} - -b32 plat_mem_release(void *ptr, u64 size) { - return VirtualFree(ptr, size, MEM_RELEASE); -} - -#elif defined(__linux__) - -#ifndef _DEFAULT_SOURCE -#define _DEFAULT_SOURCE -#endif - -#include -#include - -u32 plat_get_pagesize(void) { return (u32)sysconf(_SC_PAGESIZE); } - -void *plat_mem_reserve(u64 size) { - void *out = mmap(NULL, size, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if (out == MAP_FAILED) { - return NULL; - } - return out; -} - -b32 plat_mem_commit(void *ptr, u64 size) { - i32 ret = mprotect(ptr, size, PROT_READ | PROT_WRITE); - return ret == 0; -} - -b32 plat_mem_decommit(void *ptr, u64 size) { - i32 ret = mprotect(ptr, size, PROT_NONE); - if (ret != 0) - return false; - ret = madvise(ptr, size, MADV_DONTNEED); - return ret == 0; -} - -b32 plat_mem_release(void *ptr, u64 size) { - i32 ret = munmap(ptr, size); - return ret == 0; -} - -#endif diff --git a/arena.h b/arena.h index 282ab70..a0ddb4d 100644 --- a/arena.h +++ b/arena.h @@ -1,45 +1,7 @@ #pragma once + #include "base.h" -// #define _CRT_SECURE_NO_WARNINGS -// -// #include -// #include -// #include -// #include -// #include -// -// /* ------------------------------------------------------------ -// Base types -// ------------------------------------------------------------ */ -// -// typedef uint8_t u8; -// typedef uint32_t u32; -// typedef uint64_t u64; -// typedef int32_t i32; -// typedef int b32; -// -// /* ------------------------------------------------------------ -// Size helpers -// ------------------------------------------------------------ */ -// -// #define KiB(x) ((u64)(x) * 1024ULL) -// #define MiB(x) (KiB(x) * 1024ULL) -// -// /* ------------------------------------------------------------ -// Alignment helpers -// ------------------------------------------------------------ */ -// -// #define ALIGN_UP_POW2(x, a) (((x) + ((a) - 1)) & ~((a) - 1)) -// -// /* ------------------------------------------------------------ -// Assert -// ------------------------------------------------------------ */ -// -// #ifndef ASSERT -// #define ASSERT(x) assert(x) -// #endif -// /* =============================================================================== ARENA USAGE GUIDE @@ -417,10 +379,3 @@ void arena_scratch_release(mem_arena_temp scratch); #define ARENA_PUSH_NZ(arena, size) arena_push((arena), (size), false) #define arena_pop(arena_ptr) arena_pop_to((arena_ptr), 1) - -u32 plat_get_pagesize(void); - -void *plat_mem_reserve(u64 size); -b32 plat_mem_commit(void *ptr, u64 size); -b32 plat_mem_decommit(void *ptr, u64 size); -b32 plat_mem_release(void *ptr, u64 size); diff --git a/arena_base.h b/arena_base.h index ce3f6f9..692fcf6 100644 --- a/arena_base.h +++ b/arena_base.h @@ -1,5 +1,4 @@ -#ifndef BASE_H -#define BASE_H +#pragma once #include #include @@ -58,4 +57,73 @@ typedef double f64; #define ASSERT(x) assert(x) #endif -#endif // Base.h +/* ------------------------------------------------------------ + Some helper functions + ------------------------------------------------------------ */ + +#if defined(_WIN32) || defined(_WIN64) + +#include + +static u32 plat_get_pagesize(void) { + SYSTEM_INFO sysinfo = {0}; + GetSystemInfo(&sysinfo); + + return sysinfo.dwPageSize; +} + +static void *plat_mem_reserve(u64 size) { + return VirtualAlloc(NULL, size, MEM_RESERVE, PAGE_READWRITE); +} + +static b32 plat_mem_commit(void *ptr, u64 size) { + void *ret = VirtualAlloc(ptr, size, MEM_COMMIT, PAGE_READWRITE); + return ret != NULL; +} + +static b32 plat_mem_decommit(void *ptr, u64 size) { + return VirtualFree(ptr, size, MEM_DECOMMIT); +} + +static b32 plat_mem_release(void *ptr, u64 size) { + return VirtualFree(ptr, size, MEM_RELEASE); +} + +#elif defined(__linux__) + +#ifndef _DEFAULT_SOURCE +#define _DEFAULT_SOURCE +#endif + +#include +#include + +static u32 plat_get_pagesize(void) { return (u32)sysconf(_SC_PAGESIZE); } + +static void *plat_mem_reserve(u64 size) { + void *out = mmap(NULL, size, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (out == MAP_FAILED) { + return NULL; + } + return out; +} + +static b32 plat_mem_commit(void *ptr, u64 size) { + i32 ret = mprotect(ptr, size, PROT_READ | PROT_WRITE); + return ret == 0; +} + +static b32 plat_mem_decommit(void *ptr, u64 size) { + i32 ret = mprotect(ptr, size, PROT_NONE); + if (ret != 0) + return false; + ret = madvise(ptr, size, MADV_DONTNEED); + return ret == 0; +} + +static b32 plat_mem_release(void *ptr, u64 size) { + i32 ret = munmap(ptr, size); + return ret == 0; +} + +#endif diff --git a/base.h b/base.h index 61557e0..5d4af68 100644 --- a/base.h +++ b/base.h @@ -90,3 +90,72 @@ typedef double f64; #endif #define NDEBUG // Comment to enable asserts + +/* ------------------------------------------------------------ + Some helper functions + ------------------------------------------------------------ */ + +#if defined(_WIN32) || defined(_WIN64) + +static u32 plat_get_pagesize(void) { + SYSTEM_INFO sysinfo = {0}; + GetSystemInfo(&sysinfo); + + return sysinfo.dwPageSize; +} + +static void *plat_mem_reserve(u64 size) { + return VirtualAlloc(NULL, size, MEM_RESERVE, PAGE_READWRITE); +} + +static b32 plat_mem_commit(void *ptr, u64 size) { + void *ret = VirtualAlloc(ptr, size, MEM_COMMIT, PAGE_READWRITE); + return ret != NULL; +} + +static b32 plat_mem_decommit(void *ptr, u64 size) { + return VirtualFree(ptr, size, MEM_DECOMMIT); +} + +static b32 plat_mem_release(void *ptr, u64 size) { + return VirtualFree(ptr, size, MEM_RELEASE); +} + +#elif defined(__linux__) + +#ifndef _DEFAULT_SOURCE +#define _DEFAULT_SOURCE +#endif + +#include +#include + +static u32 plat_get_pagesize(void) { return (u32)sysconf(_SC_PAGESIZE); } + +static void *plat_mem_reserve(u64 size) { + void *out = mmap(NULL, size, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (out == MAP_FAILED) { + return NULL; + } + return out; +} + +static b32 plat_mem_commit(void *ptr, u64 size) { + i32 ret = mprotect(ptr, size, PROT_READ | PROT_WRITE); + return ret == 0; +} + +static b32 plat_mem_decommit(void *ptr, u64 size) { + i32 ret = mprotect(ptr, size, PROT_NONE); + if (ret != 0) + return false; + ret = madvise(ptr, size, MADV_DONTNEED); + return ret == 0; +} + +static b32 plat_mem_release(void *ptr, u64 size) { + i32 ret = munmap(ptr, size); + return ret == 0; +} + +#endif diff --git a/binaries/changelog.txt b/binaries/changelog.txt index ddceca4..d886bdb 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -20,8 +20,10 @@ Reorder helper functions v3.4: Rewriting hash_worker() to export file_hashes.txt -v4.0: Instead of writing directly to file_hashes.txt, hash_workers now are using a local arena, writing everything once at the end +3.5: Instead of writing directly to file_hashes.txt, hash_workers now are using a local arena, writing everything once at the end Using #pragma once to ensure that a given header file is included only once in a single compilation unit Forcing xxhash to use the stack instead of the heap Making the hashing buffer reusable instead of malloc every file Implementing a general purpose arena to replace small allocations +Small improvements of the LF MPMC queue +Making the LF MPMC queue generic and in a seperate header file diff --git a/binaries/file_hasher_v3.5.exe b/binaries/file_hasher_v3.5.exe new file mode 100644 index 0000000..4915feb Binary files /dev/null and b/binaries/file_hasher_v3.5.exe differ diff --git a/lf_mpmc.h b/lf_mpmc.h index fb52c8a..0d69996 100644 --- a/lf_mpmc.h +++ b/lf_mpmc.h @@ -1,26 +1,37 @@ #pragma once -/*note: - After producers finishes, push N poison pills where N = number of consumer -threads. - -for (size_t i = 0; i < num_threads; i++) { - mpmc_push(&g_file_queue, NULL); -} - */ #include "base.h" -typedef struct { +#define CACHELINE 64 + +#if defined(_MSC_VER) +#define CACHE_ALIGN __declspec(align(CACHELINE)) +#else +#define CACHE_ALIGN __attribute__((aligned(CACHELINE))) +#endif + +#if defined(__GNUC__) || defined(__clang__) +#define likely(x) __builtin_expect((x), 1) +#define unlikely(x) __builtin_expect((x), 0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + +static void cpu_pause(void) { +#if defined(_MSC_VER) || defined(__x86_64__) || defined(__i386__) + _mm_pause(); +#endif +} +typedef struct CACHE_ALIGN { atomic_size_t seq; void *data; char pad[64 - sizeof(atomic_size_t) - sizeof(void *)]; } MPMCSlot; typedef struct { - atomic_size_t head; - char pad1[64]; - atomic_size_t tail; - char pad2[64]; + CACHE_ALIGN atomic_size_t head; + CACHE_ALIGN atomic_size_t tail; size_t capacity; size_t mask; @@ -38,12 +49,19 @@ typedef struct { // files can lead to ODR violations (multiple definition errors if included in // more than one file) +/* ----------------------------------------------------------- */ +/* INIT */ +/* ----------------------------------------------------------- */ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { - if ((max_capacity & (max_capacity - 1)) != 0) { - fprintf(stderr, "capacity must be power of two\n"); + if (!max_capacity) { + fprintf(stderr, "capacity must positive\n"); exit(1); } + u32 pagesize = plat_get_pagesize(); + + max_capacity = ALIGN_UP_POW2(max_capacity, pagesize); + q->capacity = max_capacity; q->mask = max_capacity - 1; @@ -56,11 +74,10 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { exit(1); } - q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); + q->commit_step = pagesize; atomic_flag_clear(&q->commit_lock); q->committed = q->commit_step; - VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT, PAGE_READWRITE); @@ -73,6 +90,9 @@ static void mpmc_init(MPMCQueue *q, size_t max_capacity) { atomic_init(&q->tail, 0); } +/* ----------------------------------------------------------- */ +/* COMMIT MORE MEMORY */ +/* ----------------------------------------------------------- */ static void mpmc_commit_more(MPMCQueue *q) { if (atomic_flag_test_and_set(&q->commit_lock)) @@ -111,6 +131,9 @@ static void mpmc_commit_more(MPMCQueue *q) { atomic_flag_clear(&q->commit_lock); } +/* ----------------------------------------------------------- */ +/* PUSH */ +/* ----------------------------------------------------------- */ static void mpmc_push(MPMCQueue *q, void *item) { MPMCSlot *slot; size_t pos; @@ -123,7 +146,7 @@ static void mpmc_push(MPMCQueue *q, void *item) { size_t committed = atomic_load_explicit(&q->committed, memory_order_relaxed); - if (pos >= committed) { + if (unlikely(pos >= committed)) { mpmc_commit_more(q); continue; } @@ -133,7 +156,7 @@ static void mpmc_push(MPMCQueue *q, void *item) { size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)pos; - if (diff == 0) { + if (likely(diff == 0)) { if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, memory_order_relaxed, @@ -155,6 +178,9 @@ static void mpmc_push(MPMCQueue *q, void *item) { atomic_store_explicit(&slot->seq, pos + 1, memory_order_release); } +/* ----------------------------------------------------------- */ +/* POP */ +/* ----------------------------------------------------------- */ static void *mpmc_pop(MPMCQueue *q) { MPMCSlot *slot; size_t pos; @@ -169,7 +195,7 @@ static void *mpmc_pop(MPMCQueue *q) { size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); - if (diff == 0) { + if (likely(diff == 0)) { if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, memory_order_relaxed, @@ -183,13 +209,10 @@ static void *mpmc_pop(MPMCQueue *q) { } else { // slot is still transitioning (written by another thread) if (++spins > 10) { - SwitchToThread(); // yield CPU spins = 0; - } else { - - _mm_pause(); // busy waiting + cpu_pause(); } } } @@ -200,3 +223,37 @@ static void *mpmc_pop(MPMCQueue *q) { return data; } + +/* ----------------------------------------------------------- */ +/* PUSH POISON */ +/* ----------------------------------------------------------- */ +/*note: + After producers finishes, push N poison pills where N = number of consumer +threads, this is necessary to stop the consumers. +*/ + +static void mpmc_producers_finished(MPMCQueue *q, u8 consumer_count) { + for (u8 i = 0; i < consumer_count; i++) { + mpmc_push(q, NULL); + } +} + +/* ----------------------------------------------------------- */ +/* MPMC Cleanup */ +/* ----------------------------------------------------------- */ +static void mpmc_finish(MPMCQueue *q) { + if (!q) + return; + + if (q->slots) { + VirtualFree(q->slots, 0, MEM_RELEASE); + q->slots = NULL; + } + + q->capacity = 0; + q->mask = 0; + + atomic_store_explicit(&q->head, 0, memory_order_relaxed); + atomic_store_explicit(&q->tail, 0, memory_order_relaxed); + atomic_store_explicit(&q->committed, 0, memory_order_relaxed); +} diff --git a/platform_windows.c b/platform_windows.c index 550f731..d55968a 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -1,4 +1,3 @@ -#include "arena.h" #include "platform.h" // ----------------------------- Globals ------------------------------------ @@ -475,7 +474,7 @@ int main(int argc, char **argv) { arena_free(&gp_arena, (u8 **)&buf, len); // Add some extra threads to overlap I/O more aggressively - size_t num_threads = hw_threads * 2; + u8 num_threads = hw_threads * 2; if (num_threads < 2) num_threads = 2; @@ -527,9 +526,7 @@ int main(int argc, char **argv) { WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); - for (size_t i = 0; i < num_threads; i++) { - mpmc_push(&g_file_queue, NULL); - } + mpmc_producers_finished(&g_file_queue, num_threads); atomic_store(&g_scan_done, 1);