Updating the LF MPMC queue and replacing DirQueue with it
Making the MPMC queue support when producers are consumers at the same time by adding a variable work, mpmc_push_work() that increments work and mpmc_task_done() that decrements work, and if work = 0 calls mpmc_producers_finished() that pushes poinsons to wake up sleeping threads and make them return NULL Replacing DirQueue, a queue growable with realloc with the MPMC queue
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
#include "arena.h"
|
||||
#include "platform.h"
|
||||
|
||||
// ----------------------------- Globals ------------------------------------
|
||||
@@ -105,47 +104,6 @@ void platform_get_file_owner(const char *path, char *out_owner,
|
||||
}
|
||||
|
||||
// --------------- parallel directory scanning ----------------
|
||||
// Add queue helper functions
|
||||
static void dirqueue_push(DirQueue *q, const char *path) {
|
||||
EnterCriticalSection(&q->cs);
|
||||
|
||||
if (q->count + 1 > q->cap) {
|
||||
q->cap = q->cap ? q->cap * 2 : 1024;
|
||||
q->items = realloc(q->items, q->cap * sizeof(char *));
|
||||
}
|
||||
|
||||
q->items[q->count++] = _strdup(path);
|
||||
|
||||
WakeConditionVariable(&q->cv);
|
||||
LeaveCriticalSection(&q->cs);
|
||||
}
|
||||
|
||||
static char *dirqueue_pop(DirQueue *q) {
|
||||
EnterCriticalSection(&q->cs);
|
||||
|
||||
while (q->count == 0 && q->active > 0) {
|
||||
SleepConditionVariableCS(&q->cv, &q->cs, INFINITE);
|
||||
}
|
||||
|
||||
if (q->count == 0 && q->active == 0) {
|
||||
LeaveCriticalSection(&q->cs);
|
||||
return NULL; // truly done
|
||||
}
|
||||
|
||||
char *dir = q->items[--q->count];
|
||||
q->active++;
|
||||
|
||||
LeaveCriticalSection(&q->cs);
|
||||
return dir;
|
||||
}
|
||||
|
||||
static void dirqueue_done(DirQueue *q) {
|
||||
EnterCriticalSection(&q->cs);
|
||||
q->active--;
|
||||
WakeAllConditionVariable(&q->cv);
|
||||
LeaveCriticalSection(&q->cs);
|
||||
}
|
||||
|
||||
// ----------------------------- Scan helpers -----------------------------
|
||||
void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
|
||||
char search[MAX_PATHLEN];
|
||||
@@ -167,7 +125,12 @@ void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
|
||||
continue;
|
||||
|
||||
if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
|
||||
dirqueue_push(ctx->dir_queue, full);
|
||||
size_t len = strlen(full) + 1;
|
||||
|
||||
char *dir = arena_push(&ctx->path_arena, len, false);
|
||||
memcpy(dir, full, len);
|
||||
|
||||
mpmc_push_work(&g_dir_queue, dir);
|
||||
} else {
|
||||
|
||||
atomic_fetch_add(&g_files_found, 1);
|
||||
@@ -203,19 +166,16 @@ void scan_folder_windows_parallel(const char *base, ScannerContext *ctx) {
|
||||
// ------------------------- Scan worker --------------------------------
|
||||
static DWORD WINAPI scan_worker(LPVOID arg) {
|
||||
ScannerContext *ctx = arg;
|
||||
DirQueue *q = ctx->dir_queue;
|
||||
|
||||
for (;;) {
|
||||
char *dir = dirqueue_pop(q);
|
||||
char *dir = mpmc_pop(&g_dir_queue);
|
||||
if (!dir)
|
||||
break;
|
||||
|
||||
scan_folder_windows_parallel(dir, ctx);
|
||||
|
||||
free(dir);
|
||||
dirqueue_done(q);
|
||||
mpmc_task_done(&g_dir_queue, 12);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -420,9 +380,7 @@ int main(int argc, char **argv) {
|
||||
folder_count = 1;
|
||||
}
|
||||
|
||||
// -------------------------------
|
||||
// Display selected folders
|
||||
// -------------------------------
|
||||
printf("Processing %d folder(s):\n", folder_count);
|
||||
for (int i = 0; i < folder_count; ++i) {
|
||||
printf(" - %s\n", folders[i]);
|
||||
@@ -472,21 +430,12 @@ int main(int argc, char **argv) {
|
||||
size_t num_threads = hw_threads * 2;
|
||||
|
||||
// -------------------------------
|
||||
// Step 1: Scan all folders
|
||||
// Scanning and hashing
|
||||
// -------------------------------
|
||||
|
||||
mpmc_init(&g_dir_queue, MiB(1));
|
||||
mpmc_init(&g_file_queue, MiB(1));
|
||||
|
||||
DirQueue q;
|
||||
memset(&q, 0, sizeof(q));
|
||||
InitializeCriticalSection(&q.cs);
|
||||
InitializeConditionVariable(&q.cv);
|
||||
q.active = 0;
|
||||
|
||||
for (int i = 0; i < folder_count; ++i) {
|
||||
dirqueue_push(&q, folders[i]);
|
||||
}
|
||||
|
||||
// starting hash threads
|
||||
size_t num_hash_threads = num_threads;
|
||||
|
||||
@@ -514,26 +463,35 @@ int main(int argc, char **argv) {
|
||||
arena_push(&gp_arena, sizeof(HANDLE) * num_scan_threads, true);
|
||||
|
||||
for (size_t i = 0; i < num_scan_threads; i++) {
|
||||
|
||||
scanners[i].dir_queue = &q;
|
||||
|
||||
scanners[i].path_arena = arena_create(¶ms);
|
||||
scanners[i].meta_arena = arena_create(¶ms);
|
||||
|
||||
scan_threads[i] = CreateThread(NULL, 0, scan_worker, &scanners[i], 0, NULL);
|
||||
}
|
||||
|
||||
// Initial folder push
|
||||
for (int i = 0; i < folder_count; i++) {
|
||||
|
||||
size_t len = strlen(folders[i]) + 1;
|
||||
|
||||
char *path = arena_push(&scanners[0].path_arena, len, false);
|
||||
memcpy(path, folders[i], len);
|
||||
|
||||
mpmc_push_work(&g_dir_queue, path);
|
||||
}
|
||||
|
||||
// Stop scan threads
|
||||
WaitForMultipleObjects((DWORD)num_scan_threads, scan_threads, TRUE, INFINITE);
|
||||
|
||||
for (size_t i = 0; i < num_scan_threads; ++i)
|
||||
CloseHandle(scan_threads[i]);
|
||||
|
||||
for (size_t i = 0; i < num_hash_threads; i++) {
|
||||
mpmc_push(&g_file_queue, NULL);
|
||||
}
|
||||
|
||||
atomic_store(&g_scan_done, 1);
|
||||
|
||||
for (size_t i = 0; i < num_scan_threads; ++i)
|
||||
CloseHandle(scan_threads[i]);
|
||||
|
||||
arena_free(&gp_arena, (u8 **)&scan_threads,
|
||||
sizeof(HANDLE) * num_scan_threads);
|
||||
|
||||
@@ -544,13 +502,13 @@ int main(int argc, char **argv) {
|
||||
printf("Completed scanning in %.2f seconds, found %zu files\n\n",
|
||||
scan_seconds, total_found);
|
||||
|
||||
// if no files found
|
||||
// If no files found
|
||||
if (total_found == 0) {
|
||||
printf("No files found.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// stop hashing threads
|
||||
// Stop hashing threads
|
||||
WaitForMultipleObjects((DWORD)num_hash_threads, hash_threads, TRUE, INFINITE);
|
||||
|
||||
for (size_t i = 0; i < num_hash_threads; ++i)
|
||||
@@ -559,10 +517,13 @@ int main(int argc, char **argv) {
|
||||
arena_free(&gp_arena, (u8 **)&hash_threads,
|
||||
sizeof(HANDLE) * num_hash_threads);
|
||||
|
||||
// Stop progress printing thread
|
||||
WaitForSingleObject(progress, INFINITE);
|
||||
CloseHandle(progress);
|
||||
|
||||
// write file_hashes.txt
|
||||
// -------------------------------
|
||||
// Export file_hashes.txt
|
||||
// -------------------------------
|
||||
|
||||
// FILE *f = fopen(FILE_HASHES_TXT, "wb");
|
||||
//
|
||||
@@ -591,7 +552,9 @@ int main(int argc, char **argv) {
|
||||
WriteFile(h, arena_base, (DWORD)local_hash_arena->pos, &written, NULL);
|
||||
}
|
||||
|
||||
// done time
|
||||
// -------------------------------
|
||||
// Print summary
|
||||
// -------------------------------
|
||||
double total_seconds = timer_stop(&total_timer);
|
||||
|
||||
printf("Completed hashing %zu files\n", total_found);
|
||||
|
||||
Reference in New Issue
Block a user