diff --git a/binaries/changlog.txt b/binaries/changlog.txt index 86dc9f8..07bcec1 100644 --- a/binaries/changlog.txt +++ b/binaries/changlog.txt @@ -9,3 +9,7 @@ v3.0: Simple mutex/critical section based MPMC queue reusable hashing buffer v3.1: Lock free MPMC queue + +v3.2: Making the lock free MPMC queue growable +Add padding to avoir false sharing +Add sleep() and SwitchToThread() to limit spinning diff --git a/binaries/file_hasher_v3.2.exe b/binaries/file_hasher_v3.2.exe new file mode 100644 index 0000000..8364396 Binary files /dev/null and b/binaries/file_hasher_v3.2.exe differ diff --git a/platform.h b/platform.h index 30ce896..e67fd7b 100644 --- a/platform.h +++ b/platform.h @@ -103,16 +103,22 @@ static double timer_stop(HiResTimer *t) { typedef struct { atomic_size_t seq; FileEntry *data; + char pad[64 - sizeof(atomic_size_t) - sizeof(FileEntry *)]; } MPMCSlot; typedef struct { + atomic_size_t head; + char pad1[64]; + atomic_size_t tail; + char pad2[64]; + size_t capacity; size_t mask; - MPMCSlot *slots; + atomic_size_t committed; + size_t commit_step; - atomic_size_t head; - atomic_size_t tail; + MPMCSlot *slots; } MPMCQueue; static MPMCQueue g_file_queue; diff --git a/platform_windows.c b/platform_windows.c index e552849..b20876d 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -77,18 +77,32 @@ static void format_time(uint64_t t, char *out, size_t out_sz) { } // --------------- parallel directory scanning ---------------- -void mpmc_init(MPMCQueue *q, size_t capacity) { - if ((capacity & (capacity - 1)) != 0) { +void mpmc_init(MPMCQueue *q, size_t max_capacity) { + if ((max_capacity & (max_capacity - 1)) != 0) { fprintf(stderr, "capacity must be power of two\n"); exit(1); } - q->capacity = capacity; - q->mask = capacity - 1; + q->capacity = max_capacity; + q->mask = max_capacity - 1; - q->slots = malloc(sizeof(MPMCSlot) * capacity); + size_t bytes = sizeof(MPMCSlot) * max_capacity; - for (size_t i = 0; i < capacity; i++) { + q->slots = VirtualAlloc(NULL, bytes, MEM_RESERVE, PAGE_READWRITE); + + if (!q->slots) { + fprintf(stderr, "VirtualAlloc reserve failed\n"); + exit(1); + } + + q->commit_step = (64ull * 1024 * 1024) / sizeof(MPMCSlot); + + q->committed = q->commit_step; + + VirtualAlloc(q->slots, q->commit_step * sizeof(MPMCSlot), MEM_COMMIT, + PAGE_READWRITE); + + for (size_t i = 0; i < q->committed; i++) { atomic_init(&q->slots[i].seq, i); q->slots[i].data = NULL; } @@ -97,6 +111,27 @@ void mpmc_init(MPMCQueue *q, size_t capacity) { atomic_init(&q->tail, 0); } +static void mpmc_commit_more(MPMCQueue *q) { + size_t start = q->committed; + + size_t new_commit = start + q->commit_step; + if (new_commit > q->capacity) + new_commit = q->capacity; + + if (!atomic_compare_exchange_strong(&q->committed, &start, new_commit)) + return; // another thread already committed + + size_t count = new_commit - start; + + VirtualAlloc(&q->slots[start], count * sizeof(MPMCSlot), MEM_COMMIT, + PAGE_READWRITE); + + for (size_t i = start; i < new_commit; i++) { + atomic_init(&q->slots[i].seq, i); + q->slots[i].data = NULL; + } +} + void mpmc_push(MPMCQueue *q, FileEntry *item) { MPMCSlot *slot; size_t pos; @@ -110,15 +145,27 @@ void mpmc_push(MPMCQueue *q, FileEntry *item) { intptr_t diff = (intptr_t)seq - (intptr_t)pos; if (diff == 0) { + if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, memory_order_relaxed, memory_order_relaxed)) break; + } else if (diff < 0) { - // queue full - _mm_pause(); + + size_t committed = + atomic_load_explicit(&q->committed, memory_order_relaxed); + + if (pos >= committed) { + mpmc_commit_more(q); + continue; + } + + Sleep(1000); // queue actually full + } else { - _mm_pause(); + + Sleep(1000); } } @@ -131,6 +178,8 @@ FileEntry *mpmc_pop(MPMCQueue *q) { MPMCSlot *slot; size_t pos; + int spins = 0; + for (;;) { pos = atomic_load_explicit(&q->head, memory_order_relaxed); @@ -146,13 +195,20 @@ FileEntry *mpmc_pop(MPMCQueue *q) { memory_order_relaxed)) break; - // } else if (diff < 0) { - // - // _mm_pause(); // wait for producers + } else if (diff < 0) { + + Sleep(1000); } else { + if (++spins > 10) { - _mm_pause(); + SwitchToThread(); // yield CPU + spins = 0; + + } else { + + _mm_pause(); + } } } @@ -527,7 +583,7 @@ int main(int argc, char **argv) { // Step 1: Scan all folders // ------------------------------- - mpmc_init(&g_file_queue, 65536); + mpmc_init(&g_file_queue, 1024 * 1024 * 1024); DirQueue q; memset(&q, 0, sizeof(q));