diff --git a/base.h b/base.h index 4f459b1..10dd18a 100644 --- a/base.h +++ b/base.h @@ -137,6 +137,11 @@ static void plat_sem_wait(plat_sem *s) { WaitForSingleObject(s->handle, INFINITE); } +static b32 plat_sem_trywait(HANDLE sem) { + DWORD r = WaitForSingleObject(sem, 0); + return r == WAIT_OBJECT_0; +} + static void plat_sem_post(plat_sem *s, u32 count) { ReleaseSemaphore(s->handle, count, NULL); } @@ -203,6 +208,8 @@ static void plat_sem_wait(plat_sem *s) { } } +static b32 plat_sem_trywait(sem_t *sem) { return sem_trywait(sem) == 0; } + static void plat_sem_post(plat_sem *s, u32 count) { for (u32 i = 0; i < count; i++) { sem_post(&s->sem); diff --git a/binaries/changelog.txt b/binaries/changelog.txt index ac2b8b4..e6058eb 100644 --- a/binaries/changelog.txt +++ b/binaries/changelog.txt @@ -33,3 +33,5 @@ Making the LF MPMC queue generic and in a seperate header file Making the MPMC queue platform agnostic Align the MPMC queue to pagesize + +Getting file size from FindFirstFileA() instead of CreateFileA(), since we already call FindFirstFileA() and it returns the size there is no need to open/close every file to get it's size diff --git a/lf_mpmc.h b/lf_mpmc.h index c302f30..9c51abd 100644 --- a/lf_mpmc.h +++ b/lf_mpmc.h @@ -185,7 +185,7 @@ static void mpmc_push(MPMCQueue *q, void *item) { } /* ----------------------------------------------------------- */ -/* POP */ +/* POP (blocking with semaphore) */ /* ----------------------------------------------------------- */ static void *mpmc_pop(MPMCQueue *q) { @@ -228,6 +228,52 @@ static void *mpmc_pop(MPMCQueue *q) { return data; } +/* ----------------------------------------------------------- */ +/* TRY POP (non blocking) */ +/* ----------------------------------------------------------- */ +static b32 mpmc_try_pop(MPMCQueue *q, void **out) { + + if (!plat_sem_trywait(&q->items_sem)) + return false; + + MPMCSlot *slot; + size_t pos; + + int spins = 0; + + for (;;) { + + pos = atomic_load_explicit(&q->head, memory_order_relaxed); + slot = &q->slots[pos & q->mask]; + + size_t seq = atomic_load_explicit(&slot->seq, memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1); + + if (likely(diff == 0)) { + + if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) + break; + + } else { + + if (++spins > 10) { + SwitchToThread(); + spins = 0; + } else { + cpu_pause(); + } + } + } + + *out = slot->data; + + atomic_store_explicit(&slot->seq, pos + q->capacity, memory_order_release); + + return true; +} + /* ----------------------------------------------------------- */ /* PUSH POISON */ /* ----------------------------------------------------------- */ diff --git a/platform.h b/platform.h index 8e77590..572793e 100644 --- a/platform.h +++ b/platform.h @@ -46,24 +46,26 @@ static double timer_stop(HiResTimer *t) { } // MPMC Queue +static MPMCQueue g_dir_queue; static MPMCQueue g_file_queue; +typedef struct { + mem_arena *path_arena; + mem_arena *meta_arena; + + MPMCQueue *dir_queue; + MPMCQueue *file_queue; +} ScannerContext; + typedef struct { MPMCQueue *queue; mem_arena *arena; } WorkerContext; /* Scan folders */ + typedef struct DirQueue DirQueue; -void scan_folder_windows_parallel(const char *base, DirQueue *q); -void scan_folder_posix_parallel(const char *base, DirQueue *q); - -typedef struct DirJob { - char *path; - struct DirJob *next; -} DirJob; - typedef struct DirQueue { char **items; size_t count; @@ -80,3 +82,7 @@ typedef struct DirQueue { pthread_cond_t cond; #endif } DirQueue; + +// void scan_folder_windows_parallel(const char *base, ScannerContext *ctx); +// void scan_folder_posix_parallel(const char *base, ScannerContext *ctx); +void scan_folder_windows_parallel(const char *base, DirQueue *q); diff --git a/platform_windows.c b/platform_windows.c index a596297..2392593 100644 --- a/platform_windows.c +++ b/platform_windows.c @@ -1,3 +1,4 @@ +#include "arena.h" #include "platform.h" // ----------------------------- Globals ------------------------------------ @@ -201,16 +202,7 @@ void scan_folder_windows_parallel(const char *base, DirQueue *q) { platform_get_file_owner(full, fe->owner, sizeof(fe->owner)); - LARGE_INTEGER size; - HANDLE hf = - CreateFileA(full, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, - NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); - - if (hf != INVALID_HANDLE_VALUE) { - if (GetFileSizeEx(hf, &size)) - fe->size_bytes = (uint64_t)size.QuadPart; - CloseHandle(hf); - } + fe->size_bytes = ((uint64_t)fd.nFileSizeHigh << 32) | fd.nFileSizeLow; mpmc_push(&g_file_queue, fe); } @@ -474,7 +466,7 @@ int main(int argc, char **argv) { arena_free(&gp_arena, (u8 **)&buf, len); // Add some extra threads to overlap I/O more aggressively - u8 num_threads = hw_threads * 2; + size_t num_threads = hw_threads * 2; if (num_threads < 2) num_threads = 2; @@ -490,6 +482,10 @@ int main(int argc, char **argv) { InitializeConditionVariable(&q.cv); q.active = 0; + for (int i = 0; i < folder_count; ++i) { + dirqueue_push(&q, folders[i]); + } + // starting hash threads WorkerContext workers[num_threads]; @@ -508,10 +504,6 @@ int main(int argc, char **argv) { // starting scan threads HANDLE progress = CreateThread(NULL, 0, progress_thread, NULL, 0, NULL); - for (int i = 0; i < folder_count; ++i) { - dirqueue_push(&q, folders[i]); - } - size_t scan_threads = hw_threads; if (scan_threads < 2) scan_threads = 2; @@ -526,7 +518,9 @@ int main(int argc, char **argv) { WaitForMultipleObjects((DWORD)scan_threads, scan_tids, TRUE, INFINITE); - mpmc_producers_finished(&g_file_queue, num_threads); + for (size_t i = 0; i < num_threads; i++) { + mpmc_push(&g_file_queue, NULL); + } atomic_store(&g_scan_done, 1); @@ -597,7 +591,7 @@ int main(int argc, char **argv) { double total_mb = (double)total_bytes / (1024.0 * 1024.0); double avg_mbps = total_mb / total_seconds; printf("Total: %.2f MB, Average: %.2f MB/s\n", total_mb, avg_mbps); - printf(" Total time : %.2f seconds\n", total_seconds); + printf(" Total time : %.2f seconds\n\n", total_seconds); return 0; }