diff --git a/binaries/changlog.txt b/binaries/changlog.txt index 07bcec1..3dd2d9b 100644 --- a/binaries/changlog.txt +++ b/binaries/changlog.txt @@ -13,3 +13,6 @@ v3.1: Lock free MPMC queue v3.2: Making the lock free MPMC queue growable Add padding to avoir false sharing Add sleep() and SwitchToThread() to limit spinning + +v3.3: Fix bug slots used before initialization,compare and swap is protecting updating committed, but it is not protecting the memory initialization. Adding atomic_flag commit_lock to protect against that +Fix bug multiple threads committing at the same time, fixed by using atomic_flag commit_lock and re-checking committed after acquiring the lock diff --git a/binaries/file_hasher_v3.3.exe b/binaries/file_hasher_v3.3.exe new file mode 100644 index 0000000..de83b98 Binary files /dev/null and b/binaries/file_hasher_v3.3.exe differ diff --git a/platform.h b/platform.h index e67fd7b..337ce54 100644 --- a/platform.h +++ b/platform.h @@ -117,6 +117,7 @@ typedef struct { atomic_size_t committed; size_t commit_step; + atomic_flag commit_lock; MPMCSlot *slots; } MPMCQueue; diff --git a/platform_windows.c b/platform_windows.c index b20876d..c3f2d20 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -96,6 +96,7 @@ void mpmc_init(MPMCQueue *q, size_t max_capacity) { } q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); + atomic_flag_clear(&q->commit_lock); q->committed = q->commit_step; @@ -112,15 +113,28 @@ void mpmc_init(MPMCQueue *q, size_t max_capacity) { } static void mpmc_commit_more(MPMCQueue *q) { - size_t start = q->committed; + + if (atomic_flag_test_and_set(&q->commit_lock)) + return; + + size_t start = atomic_load_explicit(&q->committed, memory_order_acquire); + size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // another thread already committed enough + if (tail < start) { + atomic_flag_clear(&q->commit_lock); + return; + } + + if (start >= q->capacity) { + atomic_flag_clear(&q->commit_lock); + return; + } size_t new_commit = start + q->commit_step; if (new_commit > q->capacity) new_commit = q->capacity; - if (!atomic_compare_exchange_strong(&q->committed, &start, new_commit)) - return; // another thread already committed - size_t count = new_commit - start; VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, @@ -130,6 +144,10 @@ static void mpmc_commit_more(MPMCQueue *q) { atomic_init(&q->slots[i].seq, i); q->slots[i].data = NULL; } + + atomic_store_explicit(&q->committed, new_commit, memory_order_release); + + atomic_flag_clear(&q->commit_lock); } void mpmc_push(MPMCQueue *q, FileEntry *item) { @@ -139,6 +157,16 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) { for (;;) { pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + + // ensure the slot is committed BEFORE accessing it + size_t committed = + atomic_load_explicit(&q->committed, memory_order_relaxed); + + if (pos >= committed) { + mpmc_commit_more(q); + continue; + } + slot = &q->slots[pos & q->mask]; size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); @@ -153,19 +181,11 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) { } else if (diff < 0) { - size_t committed = - atomic_load_explicit(&q->committed, memory_order_relaxed); - - if (pos >= committed) { - mpmc_commit_more(q); - continue; - } - - Sleep(1000); // queue actually full + Sleep(100); // queue actually full } else { - Sleep(1000); + Sleep(100); } } @@ -197,7 +217,7 @@ FileEntry *mpmc_pop(MPMCQueue *q) { } else if (diff < 0) { - Sleep(1000); + Sleep(500); } else { if (++spins > 10) {