diff --git a/binaries/changlog.txt b/binaries/changlog.txt index 07bcec1..587df6a 100644 --- a/binaries/changlog.txt +++ b/binaries/changlog.txt @@ -8,8 +8,12 @@ v2.1: Uses AVX2 instead of SSE2 v3.0: Simple mutex/critical section based MPMC queue reusable hashing buffer -v3.1: Lock free MPMC queue +v3.1: Lock free MPMC queue Vyukov-style v3.2: Making the lock free MPMC queue growable Add padding to avoir false sharing Add sleep() and SwitchToThread() to limit spinning + +v3.3: Fix bug slots used before initialization,compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that +Fix bug multiple threads committing at the same time, fixed by using atomic_flag commit_lock and re-checking committed after acquiring the lock +Reorder helper functions diff --git a/binaries/file_hasher_v3.3.exe b/binaries/file_hasher_v3.3.exe new file mode 100644 index 0000000..1712169 Binary files /dev/null and b/binaries/file_hasher_v3.3.exe differ diff --git a/platform.h b/platform.h index e67fd7b..337ce54 100644 --- a/platform.h +++ b/platform.h @@ -117,6 +117,7 @@ typedef struct { atomic_size_t committed; size_t commit_step; + atomic_flag commit_lock; MPMCSlot *slots; } MPMCQueue; diff --git a/platform_windows.c b/platform_windows.c index b20876d..7107f7c 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -1,4 +1,5 @@ #include "platform.h" +#include // ----------------------------- Globals ------------------------------------ static atomic_uint_fast64_t g_files_found = 0; @@ -30,6 +31,25 @@ static uint64_t filetime_to_epoch(const FILETIME *ft) { return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL; } +// ----------------------------- Format time helper ------------------------- +static void format_time(uint64_t t, char *out, size_t out_sz) { + if (t == 0) { + snprintf(out, out_sz, "N/A"); + return; + } + + time_t tt = (time_t)t; + struct tm tm; + +#if PLATFORM_WINDOWS + localtime_s(&tm, &tt); +#else + localtime_r(&tt, &tm); +#endif + + strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); +} + // ----------------------------- Resolve file owner --------------------- static void get_file_owner(const char *path, char *out, size_t out_sz) { PSID sid = NULL; @@ -57,23 +77,22 @@ static void get_file_owner(const char *path, char *out, size_t out_sz) { LocalFree(sd); } -// ----------------------------- Format time helper ------------------------- -static void format_time(uint64_t t, char *out, size_t out_sz) { - if (t == 0) { - snprintf(out, out_sz, "N/A"); - return; +// ----------------------------- Get file metadata ------------------------- +void platform_get_file_times(const char *path, uint64_t *out_created, + uint64_t *out_modified) { + WIN32_FILE_ATTRIBUTE_DATA fad; + if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) { + *out_created = filetime_to_epoch(&fad.ftCreationTime); + *out_modified = filetime_to_epoch(&fad.ftLastWriteTime); + } else { + *out_created = 0; + *out_modified = 0; } +} - time_t tt = (time_t)t; - struct tm tm; - -#if PLATFORM_WINDOWS - localtime_s(&tm, &tt); -#else - localtime_r(&tt, &tm); -#endif - - strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm); +void platform_get_file_owner(const char *path, char *out_owner, + size_t out_owner_size) { + get_file_owner(path, out_owner, out_owner_size); } // --------------- parallel directory scanning ---------------- @@ -96,6 +115,7 @@ void mpmc_init(MPMCQueue *q, size_t max_capacity) { } q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); + atomic_flag_clear(&q->commit_lock); q->committed = q->commit_step; @@ -112,15 +132,28 @@ void mpmc_init(MPMCQueue *q, size_t max_capacity) { } static void mpmc_commit_more(MPMCQueue *q) { - size_t start = q->committed; + + if (atomic_flag_test_and_set(&q->commit_lock)) + return; + + size_t start = atomic_load_explicit(&q->committed, memory_order_acquire); + size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // another thread already committed enough + if (tail < start) { + atomic_flag_clear(&q->commit_lock); + return; + } + + if (start >= q->capacity) { + atomic_flag_clear(&q->commit_lock); + return; + } size_t new_commit = start + q->commit_step; if (new_commit > q->capacity) new_commit = q->capacity; - if (!atomic_compare_exchange_strong(&q->committed, &start, new_commit)) - return; // another thread already committed - size_t count = new_commit - start; VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, @@ -130,6 +163,10 @@ static void mpmc_commit_more(MPMCQueue *q) { atomic_init(&q->slots[i].seq, i); q->slots[i].data = NULL; } + + atomic_store_explicit(&q->committed, new_commit, memory_order_release); + + atomic_flag_clear(&q->commit_lock); } void mpmc_push(MPMCQueue *q, FileEntry *item) { @@ -139,6 +176,16 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) { for (;;) { pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // ensure the slot is committed BEFORE accessing it + size_t committed = + atomic_load_explicit(&q->committed, memory_order_relaxed); + + if (pos >= committed) { + mpmc_commit_more(q); + continue; + } + slot = &q->slots[pos & q->mask]; size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); @@ -151,21 +198,13 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) { memory_order_relaxed)) break; - } else if (diff < 0) { - - size_t committed = - atomic_load_explicit(&q->committed, memory_order_relaxed); - - if (pos >= committed) { - mpmc_commit_more(q); - continue; - } - - Sleep(1000); // queue actually full - - } else { + } else if (diff < 0) { // queue actually full Sleep(1000); + + } else { // waiting to grow + + Sleep(0); } } @@ -195,11 +234,12 @@ FileEntry *mpmc_pop(MPMCQueue *q) { memory_order_relaxed)) break; - } else if (diff < 0) { + } else if (diff < 0) { // queue is empty - Sleep(1000); + Sleep(500); + + } else { // slot is still transitioning (written by another thread) - } else { if (++spins > 10) { SwitchToThread(); // yield CPU @@ -207,7 +247,7 @@ FileEntry *mpmc_pop(MPMCQueue *q) { } else { - _mm_pause(); + _mm_pause(); // busy waiting } } } @@ -478,24 +518,6 @@ DWORD WINAPI progress_thread(void *arg) { return 0; } -// ----------------------------- Get file metadata ------------------------- -void platform_get_file_times(const char *path, uint64_t *out_created, - uint64_t *out_modified) { - WIN32_FILE_ATTRIBUTE_DATA fad; - if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) { - *out_created = filetime_to_epoch(&fad.ftCreationTime); - *out_modified = filetime_to_epoch(&fad.ftLastWriteTime); - } else { - *out_created = 0; - *out_modified = 0; - } -} - -void platform_get_file_owner(const char *path, char *out_owner, - size_t out_owner_size) { - get_file_owner(path, out_owner, out_owner_size); -} - // ----------------------------- Main --------------------------------------- int main(int argc, char **argv) { char folders[64][MAX_PATHLEN]; // up to 64 input folders