2100 lines
55 KiB
C
2100 lines
55 KiB
C
#pragma once
|
|
|
|
#include "arena.h"
|
|
#include "base.h"
|
|
#include "sm_mpmc.h"
|
|
|
|
#include "arena.c"
|
|
#include <stdint.h>
|
|
#include <stdio.h>
|
|
|
|
// xxhash include
|
|
#define XXH_STATIC_LINKING_ONLY
|
|
#include "xxh_x86dispatch.h"
|
|
|
|
#include "config.h"
|
|
// ----------------------------- Globals ------------------------------------
|
|
static atomic_uint_fast64_t g_files_found = 0;
|
|
static atomic_uint_fast64_t g_files_hashed = 0;
|
|
static atomic_uint_fast64_t g_bytes_processed = 0;
|
|
static atomic_int g_scan_done = 0;
|
|
|
|
#define HASH_STRLEN 33 // 128-bit hex (32 chars) + null terminator
|
|
#define MAX_PATHLEN KiB(4)
|
|
// ================== OS-agnostic functions abstraction =====================
|
|
// --------------------- Timer functions ---------------------
|
|
typedef struct {
|
|
u64 start;
|
|
u64 now;
|
|
} HiResTimer;
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
|
|
static LARGE_INTEGER g_freq;
|
|
|
|
static void timer_init(void) { QueryPerformanceFrequency(&g_freq); }
|
|
|
|
static void timer_start(HiResTimer *t) {
|
|
LARGE_INTEGER v;
|
|
QueryPerformanceCounter(&v);
|
|
t->start = v.QuadPart;
|
|
}
|
|
|
|
static double timer_elapsed(HiResTimer *t) {
|
|
LARGE_INTEGER v;
|
|
QueryPerformanceCounter(&v);
|
|
t->now = v.QuadPart;
|
|
|
|
return (double)(t->now - t->start) / (double)g_freq.QuadPart;
|
|
}
|
|
|
|
#elif defined(__linux__)
|
|
|
|
void timer_init(void) {}
|
|
|
|
void timer_start(HiResTimer *t) {
|
|
struct timespec ts;
|
|
clock_gettime(CLOCK_MONOTONIC, &ts);
|
|
t->start = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
|
|
}
|
|
|
|
double timer_elapsed(HiResTimer *t) {
|
|
struct timespec ts;
|
|
clock_gettime(CLOCK_MONOTONIC, &ts);
|
|
|
|
uint64_t now = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
|
|
|
|
return (double)(now - t->start) / 1e9;
|
|
}
|
|
|
|
#endif
|
|
|
|
// ------------------- Get HW info --------------------
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
size_t platform_physical_cores(void) {
|
|
DWORD len = 0;
|
|
GetLogicalProcessorInformation(NULL, &len);
|
|
|
|
SYSTEM_LOGICAL_PROCESSOR_INFORMATION buf[len];
|
|
|
|
GetLogicalProcessorInformation(buf, &len);
|
|
DWORD count = 0;
|
|
DWORD n = len / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION);
|
|
for (DWORD i = 0; i < n; i++) {
|
|
if (buf[i].Relationship == RelationProcessorCore)
|
|
count++;
|
|
}
|
|
return count ? count : 1;
|
|
}
|
|
|
|
#elif defined(__linux__)
|
|
size_t platform_physical_cores(void) {
|
|
long n = sysconf(_SC_NPROCESSORS_ONLN);
|
|
return n > 0 ? (size_t)n : 1;
|
|
}
|
|
#endif
|
|
|
|
const char *get_xxhash_instruction_set(void) {
|
|
int vecID = XXH_featureTest();
|
|
|
|
switch (vecID) {
|
|
case XXH_SCALAR:
|
|
return "Scalar (portable C)";
|
|
case XXH_SSE2:
|
|
return "SSE2";
|
|
case XXH_AVX2:
|
|
return "AVX2";
|
|
case XXH_AVX512:
|
|
return "AVX-512";
|
|
default:
|
|
return "Unknown";
|
|
}
|
|
}
|
|
|
|
// -------------------- File IO -------------------
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
typedef HANDLE FileHandle;
|
|
#define FLAG_SEQUENTIAL_READ FILE_FLAG_SEQUENTIAL_SCAN
|
|
#define FLAG_ASYNC_DIRECT_READ (FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING)
|
|
#define INVALID_FILE_HANDLE INVALID_HANDLE_VALUE
|
|
|
|
// File open function
|
|
static FileHandle os_file_open(const char *path, DWORD flags) {
|
|
return CreateFileA(path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE,
|
|
NULL, OPEN_EXISTING, flags, NULL);
|
|
}
|
|
|
|
// File read function
|
|
static int os_file_read(FileHandle handle, void *buf, size_t count,
|
|
uint64_t *bytes_read) {
|
|
DWORD read = 0;
|
|
BOOL result = ReadFile(handle, buf, (DWORD)count, &read, NULL);
|
|
*bytes_read = read;
|
|
return (result && read > 0) ? 0 : -1;
|
|
}
|
|
|
|
// File close function
|
|
static void os_file_close(FileHandle handle) { CloseHandle(handle); }
|
|
|
|
#elif defined(__linux__)
|
|
typedef int FileHandle;
|
|
#define FLAG_SEQUENTIAL_READ (0)
|
|
#define FLAG_ASYNC_DIRECT_READ (O_DIRECT)
|
|
#define INVALID_FILE_HANDLE (-1)
|
|
|
|
// File open function
|
|
static FileHandle os_file_open(const char *path, int flags) {
|
|
// Combine your mandatory flags with the user-provided flag
|
|
int fd = open(path, O_RDONLY | O_NOFOLLOW | flags);
|
|
|
|
// If sequential was requested, advise the kernel
|
|
if (fd != -1 && (flags == FLAG_SEQUENTIAL_READ)) {
|
|
posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
|
|
}
|
|
|
|
return fd;
|
|
}
|
|
|
|
// File read function
|
|
static int os_file_read(FileHandle handle, void *buf, size_t count,
|
|
uint64_t *bytes_read) {
|
|
ssize_t result = read(handle, buf, count);
|
|
if (result >= 0) {
|
|
*bytes_read = (uint64_t)result;
|
|
return 0;
|
|
}
|
|
*bytes_read = 0;
|
|
return -1;
|
|
}
|
|
|
|
// File close function
|
|
static void os_file_close(FileHandle handle) { close(handle); }
|
|
#endif
|
|
|
|
// -------------------- Thread abstraction -------------------
|
|
// Threads context
|
|
typedef struct {
|
|
mem_arena *path_arena;
|
|
mem_arena *meta_arena;
|
|
|
|
MPMCQueue *dir_queue;
|
|
MPMCQueue *file_queue;
|
|
|
|
u8 num_threads;
|
|
} ScannerContext;
|
|
|
|
typedef struct {
|
|
mem_arena *arena;
|
|
MPMCQueue *file_queue;
|
|
} WorkerContext;
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
typedef HANDLE ThreadHandle;
|
|
typedef DWORD(WINAPI *ThreadFunc)(void *);
|
|
#define THREAD_RETURN DWORD WINAPI
|
|
#define THREAD_RETURN_VALUE 0;
|
|
|
|
typedef struct {
|
|
ThreadHandle handle;
|
|
int valid; // Track if thread was successfully created
|
|
} Thread;
|
|
|
|
// Thread function wrapper to handle different return types
|
|
#define THREAD_FUNCTION(name) DWORD WINAPI name(LPVOID arg)
|
|
|
|
// Thread creation function
|
|
static int thread_create(Thread *thread, ThreadFunc func, void *arg) {
|
|
thread->handle =
|
|
CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)func, arg, 0, NULL);
|
|
return (thread->handle != NULL) ? 0 : -1;
|
|
}
|
|
|
|
// Thread join function
|
|
static int thread_join(Thread *thread) {
|
|
return (WaitForSingleObject(thread->handle, INFINITE) == WAIT_OBJECT_0) ? 0
|
|
: -1;
|
|
}
|
|
|
|
// Thread close/detach function
|
|
static void thread_close(Thread *thread) { CloseHandle(thread->handle); }
|
|
|
|
// Wait for multiple threads
|
|
static int thread_wait_multiple(Thread *threads, size_t count) {
|
|
HANDLE handles[64]; // Max 64 threads for Windows
|
|
for (size_t i = 0; i < count; i++) {
|
|
handles[i] = threads[i].handle;
|
|
}
|
|
return (WaitForMultipleObjects((DWORD)count, handles, TRUE, INFINITE) ==
|
|
WAIT_OBJECT_0)
|
|
? 0
|
|
: -1;
|
|
}
|
|
|
|
#elif defined(__linux__)
|
|
typedef pthread_t ThreadHandle;
|
|
typedef void *(*ThreadFunc)(void *);
|
|
#define THREAD_RETURN void *
|
|
#define THREAD_RETURN_VALUE NULL;
|
|
|
|
typedef struct {
|
|
ThreadHandle handle;
|
|
int valid; // Track if thread was successfully created
|
|
} Thread;
|
|
|
|
// Thread function wrapper to handle different return types
|
|
typedef struct {
|
|
void *(*func)(void *);
|
|
void *arg;
|
|
} ThreadWrapper;
|
|
|
|
// Thread creation function
|
|
static int thread_create(Thread *thread, ThreadFunc func, void *arg) {
|
|
int ret = pthread_create(&thread->handle, NULL, func, arg);
|
|
if (ret == 0) {
|
|
thread->valid = 1;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
// Thread join function
|
|
static int thread_join(Thread *thread) {
|
|
int ret = pthread_join(thread->handle, NULL);
|
|
thread->valid = 0;
|
|
return ret;
|
|
}
|
|
|
|
// Thread close/detach function
|
|
static void thread_close(Thread *thread) {
|
|
if (thread->valid) {
|
|
pthread_detach(thread->handle);
|
|
thread->valid = 0;
|
|
}
|
|
}
|
|
|
|
// Wait for multiple threads
|
|
static int thread_wait_multiple(Thread *threads, size_t count) {
|
|
for (size_t i = 0; i < count; i++) {
|
|
if (thread_join(&threads[i]) != 0) {
|
|
return -1;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
// ======================== Get file metadata ========================
|
|
// -------------------- Path parsing -------------------
|
|
static void normalize_path(char *p) {
|
|
char *src = p;
|
|
char *dst = p;
|
|
int prev_slash = 0;
|
|
|
|
while (*src) {
|
|
char c = *src++;
|
|
|
|
if (c == '\\' || c == '/') {
|
|
if (!prev_slash) {
|
|
*dst++ = '/';
|
|
prev_slash = 1;
|
|
}
|
|
} else {
|
|
*dst++ = c;
|
|
prev_slash = 0;
|
|
}
|
|
}
|
|
|
|
*dst = '\0';
|
|
}
|
|
|
|
static int parse_paths(char *line, char folders[][MAX_PATHLEN],
|
|
int max_folders) {
|
|
int count = 0;
|
|
char *p = line;
|
|
|
|
while (*p && count < max_folders) {
|
|
|
|
while (*p && isspace((unsigned char)*p))
|
|
p++;
|
|
|
|
if (!*p)
|
|
break;
|
|
|
|
char *start;
|
|
char quote = 0;
|
|
|
|
if (*p == '"' || *p == '\'') {
|
|
quote = *p++;
|
|
start = p;
|
|
|
|
while (*p && *p != quote)
|
|
p++;
|
|
} else {
|
|
start = p;
|
|
|
|
while (*p && !isspace((unsigned char)*p))
|
|
p++;
|
|
}
|
|
|
|
size_t len = p - start;
|
|
if (len >= MAX_PATHLEN)
|
|
len = MAX_PATHLEN - 1;
|
|
|
|
memcpy(folders[count], start, len);
|
|
folders[count][len] = 0;
|
|
|
|
normalize_path(folders[count]);
|
|
|
|
count++;
|
|
|
|
if (quote && *p == quote)
|
|
p++;
|
|
}
|
|
return count;
|
|
}
|
|
|
|
// ------------------------- File time -------------------------
|
|
#if FILE_TIMES
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
static void format_time(uint64_t t, char *out, size_t out_sz) {
|
|
if (t == 0) {
|
|
snprintf(out, out_sz, "N/A");
|
|
return;
|
|
}
|
|
|
|
time_t tt = (time_t)t;
|
|
struct tm tm;
|
|
|
|
localtime_s(&tm, &tt);
|
|
|
|
strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm);
|
|
}
|
|
// ------------------ Convert filetime to epoch -------------------
|
|
static uint64_t filetime_to_epoch(const FILETIME *ft) {
|
|
ULARGE_INTEGER ull;
|
|
ull.LowPart = ft->dwLowDateTime;
|
|
ull.HighPart = ft->dwHighDateTime;
|
|
|
|
// Windows epoch (1601) ¬ニメ Unix epoch (1970)
|
|
return (ull.QuadPart - 116444736000000000ULL) / 10000000ULL;
|
|
}
|
|
|
|
void platform_get_file_times(const char *path, uint64_t *out_created,
|
|
uint64_t *out_modified) {
|
|
WIN32_FILE_ATTRIBUTE_DATA fad;
|
|
if (GetFileAttributesExA(path, GetFileExInfoStandard, &fad)) {
|
|
*out_created = filetime_to_epoch(&fad.ftCreationTime);
|
|
*out_modified = filetime_to_epoch(&fad.ftLastWriteTime);
|
|
} else {
|
|
*out_created = 0;
|
|
*out_modified = 0;
|
|
}
|
|
}
|
|
|
|
#elif defined(__linux__)
|
|
static void format_time(uint64_t t, char *out, size_t out_sz) {
|
|
if (t == 0) {
|
|
snprintf(out, out_sz, "N/A");
|
|
return;
|
|
}
|
|
|
|
time_t tt = (time_t)t;
|
|
struct tm tm;
|
|
|
|
localtime_r(&tt, &tm);
|
|
|
|
strftime(out, out_sz, "%Y-%m-%d %H:%M:%S", &tm);
|
|
}
|
|
|
|
void platform_get_file_times(const char *path, uint64_t *out_created,
|
|
uint64_t *out_modified) {
|
|
struct stat st;
|
|
if (stat(path, &st) == 0) {
|
|
*out_created = (uint64_t)st.st_ctime;
|
|
*out_modified = (uint64_t)st.st_mtime;
|
|
} else {
|
|
*out_created = 0;
|
|
*out_modified = 0;
|
|
}
|
|
}
|
|
|
|
static int platform_get_file_times_fd(int dir_fd, const char *name,
|
|
uint64_t *created, uint64_t *modified) {
|
|
struct stat st;
|
|
if (fstatat(dir_fd, name, &st, 0) == 0) {
|
|
*created = st.st_ctime; // or st.st_birthtime on systems that support it
|
|
*modified = st.st_mtime;
|
|
return 0;
|
|
}
|
|
return -1;
|
|
}
|
|
#endif
|
|
#endif
|
|
|
|
// -------------------- File owner ---------------------
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
#if FILE_OWNER
|
|
void platform_get_file_owner(const char *path, char *out_owner,
|
|
size_t out_owner_size) {
|
|
PSID sid = NULL;
|
|
PSECURITY_DESCRIPTOR sd = NULL;
|
|
|
|
if (GetNamedSecurityInfoA(path, SE_FILE_OBJECT, OWNER_SECURITY_INFORMATION,
|
|
&sid, NULL, NULL, NULL, &sd) == ERROR_SUCCESS) {
|
|
|
|
char name[64], domain[64];
|
|
DWORD name_len = sizeof(name);
|
|
DWORD domain_len = sizeof(domain);
|
|
SID_NAME_USE use;
|
|
|
|
if (LookupAccountSidA(NULL, sid, name, &name_len, domain, &domain_len,
|
|
&use)) {
|
|
snprintf(out_owner, out_owner_size, "%s\\%s", domain, name);
|
|
} else {
|
|
snprintf(out_owner, out_owner_size, "UNKNOWN");
|
|
}
|
|
} else {
|
|
snprintf(out_owner, out_owner_size, "UNKNOWN");
|
|
}
|
|
|
|
if (sd)
|
|
LocalFree(sd);
|
|
}
|
|
#endif
|
|
|
|
#elif defined(__linux__)
|
|
#if FILE_OWNER
|
|
void platform_get_file_owner(const char *path, char *out_owner,
|
|
size_t out_owner_size) {
|
|
struct stat st;
|
|
const char *owner = "UNKNOWN";
|
|
|
|
if (stat(path, &st) == 0) {
|
|
struct passwd *pw = getpwuid(st.st_uid);
|
|
if (pw) {
|
|
owner = pw->pw_name;
|
|
}
|
|
}
|
|
|
|
snprintf(out_owner, out_owner_size, "%s", owner);
|
|
}
|
|
|
|
static int platform_get_file_owner_fd(int dir_fd, const char *name, char *owner,
|
|
size_t owner_size) {
|
|
struct stat st;
|
|
if (fstatat(dir_fd, name, &st, 0) == 0) {
|
|
struct passwd pw;
|
|
struct passwd *result;
|
|
char buffer[4096]; // Sufficiently large buffer for passwd data
|
|
|
|
// Reentrant version (thread-safe)
|
|
if (getpwuid_r(st.st_uid, &pw, buffer, sizeof(buffer), &result) == 0 &&
|
|
result != NULL && result->pw_name != NULL) {
|
|
strncpy(owner, result->pw_name, owner_size - 1);
|
|
owner[owner_size - 1] = '\0';
|
|
} else {
|
|
// Fallback to uid
|
|
snprintf(owner, owner_size, "uid:%d", st.st_uid);
|
|
}
|
|
return 0;
|
|
}
|
|
return -1;
|
|
}
|
|
#endif
|
|
|
|
// ----------------------------- File system -----------------------------
|
|
|
|
#if CHECK_FILE_SYSTEM
|
|
typedef enum FileSystemType {
|
|
FS_UNKNOWN = 0,
|
|
FS_EXT4,
|
|
FS_XFS,
|
|
FS_BTRFS,
|
|
FS_TMPFS,
|
|
FS_NFS,
|
|
FS_CIFS,
|
|
FS_FAT,
|
|
FS_EXFAT,
|
|
FS_NTFS,
|
|
FS_ZFS,
|
|
FS_F2FS,
|
|
FS_EROFS,
|
|
FS_VIRTIOFS,
|
|
FS_OVERLAY,
|
|
FS_HUGETLBFS,
|
|
FS_SQUASHFS,
|
|
FS_PROC,
|
|
FS_SYSFS,
|
|
} FileSystemType;
|
|
|
|
static inline FileSystemType fs_from_magic(long type) {
|
|
switch (type) {
|
|
case 0xEF53:
|
|
return FS_EXT4;
|
|
case 0x58465342:
|
|
return FS_XFS;
|
|
case 0x9123683E:
|
|
return FS_BTRFS;
|
|
case 0x01021994:
|
|
return FS_TMPFS;
|
|
case 0x6969:
|
|
return FS_NFS;
|
|
case 0xFF534D42:
|
|
return FS_CIFS;
|
|
case 0x4d44:
|
|
return FS_FAT;
|
|
case 0x2011BAB0:
|
|
return FS_EXFAT;
|
|
case 0x5346544E:
|
|
return FS_NTFS;
|
|
case 0x2FC1211:
|
|
return FS_ZFS;
|
|
case 0xF2F52010:
|
|
return FS_F2FS;
|
|
case 0xE0F5E1E2:
|
|
return FS_EROFS;
|
|
case 0x56495254:
|
|
return FS_VIRTIOFS;
|
|
case 0x794C764F:
|
|
return FS_OVERLAY;
|
|
case 0x958458f6:
|
|
return FS_HUGETLBFS;
|
|
case 0x73717368:
|
|
return FS_SQUASHFS;
|
|
case 0x9fa0:
|
|
return FS_PROC;
|
|
case 0x62656572:
|
|
return FS_SYSFS;
|
|
|
|
default:
|
|
return FS_UNKNOWN;
|
|
}
|
|
}
|
|
|
|
// Yes it is officially called "magic number" or "signature" in the
|
|
// documentation
|
|
|
|
typedef enum {
|
|
FS_POLICY_BUFFERED,
|
|
FS_POLICY_DIRECT_OK,
|
|
} FsPolicy;
|
|
|
|
static inline FsPolicy fs_policy(FileSystemType fs) {
|
|
switch (fs) {
|
|
case FS_EXT4:
|
|
case FS_XFS:
|
|
case FS_BTRFS:
|
|
case FS_ZFS:
|
|
case FS_F2FS:
|
|
case FS_NFS:
|
|
case FS_CIFS:
|
|
case FS_VIRTIOFS:
|
|
return FS_POLICY_DIRECT_OK;
|
|
|
|
case FS_TMPFS: // Resides in Page Cache; O_DIRECT generally returns EINVAL
|
|
case FS_EROFS: // Read-only filesystem; O_DIRECT support is
|
|
// uncommon/restricted
|
|
case FS_FAT: // Generally does not implement direct_IO address space ops
|
|
case FS_EXFAT: // Limited support depending on driver implementation
|
|
case FS_NTFS: // Depends on driver (ntfs3 supports it, older ntfs-3g does not)
|
|
default:
|
|
return FS_POLICY_BUFFERED;
|
|
}
|
|
}
|
|
#endif
|
|
#endif
|
|
|
|
// ----------------------------- Scan helpers -----------------------------
|
|
typedef struct FileEntry {
|
|
char *path;
|
|
|
|
uint64_t size_bytes;
|
|
#if FILE_TIMES
|
|
uint64_t created_time; // epoch
|
|
uint64_t modified_time; // epoch seconds
|
|
#endif
|
|
#if FILE_OWNER
|
|
char owner[128]; // resolved owner name
|
|
#endif
|
|
|
|
#if CHECK_FILE_SYSTEM // Linux only
|
|
FileSystemType fs_type;
|
|
#endif
|
|
} FileEntry;
|
|
|
|
typedef struct {
|
|
char buffer[MAX_PATHLEN];
|
|
char *base_end; // Points to end of base path
|
|
char *filename_pos; // Points to where filename should be written
|
|
size_t base_len;
|
|
} PathBuilder;
|
|
|
|
static void path_builder_init(PathBuilder *pb, const char *base) {
|
|
pb->base_len = strlen(base);
|
|
memcpy(pb->buffer, base, pb->base_len);
|
|
pb->base_end = pb->buffer + pb->base_len;
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
*pb->base_end = '\\';
|
|
#elif defined(__linux__)
|
|
*pb->base_end = '/';
|
|
#endif
|
|
|
|
// Ensure null termination
|
|
*(pb->base_end + 1) = '\0';
|
|
pb->filename_pos = pb->base_end + 1;
|
|
}
|
|
|
|
static void path_builder_set_filename(PathBuilder *pb, const char *filename,
|
|
size_t name_len) {
|
|
memcpy(pb->filename_pos, filename, name_len);
|
|
pb->filename_pos[name_len] = '\0'; // Ensure null termination
|
|
}
|
|
|
|
static char *path_builder_dup_arena(PathBuilder *pb, mem_arena *arena,
|
|
bool zero) {
|
|
// Calculate total length including base + separator + filename + null
|
|
// terminator
|
|
size_t total_len =
|
|
(pb->filename_pos - pb->buffer) + strlen(pb->filename_pos) + 1;
|
|
char *dup = arena_push(&arena, total_len, zero);
|
|
memcpy(dup, pb->buffer, total_len);
|
|
return dup;
|
|
}
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
void scan_folder(const char *base, ScannerContext *ctx) {
|
|
PathBuilder pb;
|
|
path_builder_init(&pb, base);
|
|
|
|
char search[MAX_PATHLEN];
|
|
memcpy(search, pb.buffer, pb.base_len + 1); // Copy base + separator
|
|
memcpy(search + pb.base_len + 1, "*", 2); // Add "*" and null
|
|
|
|
WIN32_FIND_DATAA fd;
|
|
HANDLE h = FindFirstFileA(search, &fd);
|
|
if (h == INVALID_HANDLE_VALUE)
|
|
return;
|
|
|
|
do {
|
|
// Skip . and ..
|
|
if (fd.cFileName[0] == '.' &&
|
|
(fd.cFileName[1] == 0 ||
|
|
(fd.cFileName[1] == '.' && fd.cFileName[2] == 0)))
|
|
continue;
|
|
|
|
if (fd.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT)
|
|
continue;
|
|
|
|
size_t name_len = strlen(fd.cFileName);
|
|
path_builder_set_filename(&pb, fd.cFileName, name_len);
|
|
|
|
// If it's a directory:
|
|
if (fd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
|
|
char *dir = path_builder_dup_arena(&pb, ctx->path_arena, false);
|
|
mpmc_push_work(ctx->dir_queue, dir);
|
|
} else {
|
|
// else a file:
|
|
atomic_fetch_add(&g_files_found, 1);
|
|
|
|
FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true);
|
|
|
|
// Create a temporary copy for normalization to avoid corrupting pb.buffer
|
|
char temp_path[MAX_PATHLEN];
|
|
memcpy(temp_path, pb.buffer,
|
|
(pb.filename_pos - pb.buffer) + name_len + 1);
|
|
normalize_path(temp_path);
|
|
|
|
fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false);
|
|
strcpy(fe->path, temp_path);
|
|
|
|
#if FILE_TIMES
|
|
platform_get_file_times(pb.buffer, &fe->created_time, &fe->modified_time);
|
|
#endif
|
|
|
|
#if FILE_OWNER
|
|
platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner));
|
|
#endif
|
|
|
|
fe->size_bytes = ((uint64_t)fd.nFileSizeHigh << 32) | fd.nFileSizeLow;
|
|
|
|
mpmc_push(ctx->file_queue, fe);
|
|
}
|
|
|
|
} while (FindNextFileA(h, &fd));
|
|
|
|
FindClose(h);
|
|
}
|
|
|
|
#elif defined(__linux__)
|
|
void scan_folder(const char *base, ScannerContext *ctx) {
|
|
PathBuilder pb;
|
|
path_builder_init(&pb, base);
|
|
|
|
int dir_fd = open(base, O_RDONLY | O_DIRECTORY | O_NOFOLLOW);
|
|
if (dir_fd == -1)
|
|
return;
|
|
|
|
#if CHECK_FILE_SYSTEM
|
|
struct statfs fs;
|
|
FileSystemType fs_type = FS_UNKNOWN;
|
|
if (fstatfs(dir_fd, &fs) == 0) {
|
|
fs_type = fs_from_magic(fs.f_type);
|
|
}
|
|
#endif
|
|
|
|
DIR *dir = fdopendir(dir_fd);
|
|
if (!dir) {
|
|
close(dir_fd);
|
|
return;
|
|
}
|
|
|
|
struct dirent *entry;
|
|
|
|
while ((entry = readdir(dir)) != NULL) {
|
|
if (entry->d_name[0] == '.' &&
|
|
(entry->d_name[1] == 0 ||
|
|
(entry->d_name[1] == '.' && entry->d_name[2] == 0)))
|
|
continue;
|
|
|
|
size_t name_len = strlen(entry->d_name);
|
|
path_builder_set_filename(&pb, entry->d_name, name_len);
|
|
|
|
int file_type = DT_UNKNOWN;
|
|
|
|
#ifdef _DIRENT_HAVE_D_TYPE
|
|
file_type = entry->d_type;
|
|
#endif
|
|
|
|
// Fast path using d_type
|
|
if (file_type != DT_UNKNOWN) {
|
|
if (file_type == DT_LNK)
|
|
continue; // Skip symlinks
|
|
|
|
if (file_type == DT_DIR) {
|
|
char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false);
|
|
mpmc_push_work(ctx->dir_queue, dir_path);
|
|
continue;
|
|
}
|
|
|
|
if (file_type == DT_REG) {
|
|
atomic_fetch_add(&g_files_found, 1);
|
|
FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true);
|
|
|
|
// Use fstatat for file info
|
|
struct stat st;
|
|
if (fstatat(dir_fd, entry->d_name, &st, 0) == 0) {
|
|
#if FILE_TIMES
|
|
platform_get_file_times_fd(dir_fd, entry->d_name, &fe->created_time,
|
|
&fe->modified_time);
|
|
#endif
|
|
|
|
#if FILE_OWNER
|
|
platform_get_file_owner_fd(dir_fd, entry->d_name, fe->owner,
|
|
sizeof(fe->owner));
|
|
#endif
|
|
|
|
fe->size_bytes = (uint64_t)st.st_size;
|
|
|
|
// Normalize path
|
|
char temp_path[MAX_PATHLEN];
|
|
memcpy(temp_path, pb.buffer,
|
|
(pb.filename_pos - pb.buffer) + name_len + 1);
|
|
normalize_path(temp_path);
|
|
|
|
fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false);
|
|
strcpy(fe->path, temp_path);
|
|
|
|
#if CHECK_FILE_SYSTEM
|
|
fe->fs_type = fs_type;
|
|
#endif
|
|
|
|
mpmc_push(ctx->file_queue, fe);
|
|
}
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Fallback for unknown types
|
|
struct stat st;
|
|
if (fstatat(dir_fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW) == 0) {
|
|
if (S_ISLNK(st.st_mode))
|
|
continue;
|
|
|
|
if (S_ISDIR(st.st_mode)) {
|
|
char *dir_path = path_builder_dup_arena(&pb, ctx->path_arena, false);
|
|
mpmc_push_work(ctx->dir_queue, dir_path);
|
|
} else if (S_ISREG(st.st_mode)) {
|
|
atomic_fetch_add(&g_files_found, 1);
|
|
FileEntry *fe = arena_push(&ctx->meta_arena, sizeof(FileEntry), true);
|
|
|
|
#if FILE_TIMES
|
|
platform_get_file_times(pb.buffer, &fe->created_time,
|
|
&fe->modified_time);
|
|
#endif
|
|
|
|
#if FILE_OWNER
|
|
platform_get_file_owner(pb.buffer, fe->owner, sizeof(fe->owner));
|
|
#endif
|
|
|
|
fe->size_bytes = (uint64_t)st.st_size;
|
|
|
|
char temp_path[MAX_PATHLEN];
|
|
memcpy(temp_path, pb.buffer,
|
|
(pb.filename_pos - pb.buffer) + name_len + 1);
|
|
normalize_path(temp_path);
|
|
|
|
fe->path = arena_push(&ctx->path_arena, strlen(temp_path) + 1, false);
|
|
strcpy(fe->path, temp_path);
|
|
|
|
#if CHECK_FILE_SYSTEM
|
|
fe->fs_type = fs_type;
|
|
#endif
|
|
|
|
mpmc_push(ctx->file_queue, fe);
|
|
}
|
|
}
|
|
}
|
|
|
|
closedir(dir);
|
|
}
|
|
#endif
|
|
|
|
// ------------------------- Scan worker --------------------------------
|
|
static THREAD_RETURN scan_worker(void *arg) {
|
|
ScannerContext *ctx = (ScannerContext *)arg;
|
|
|
|
for (;;) {
|
|
char *dir = mpmc_pop(ctx->dir_queue);
|
|
if (!dir)
|
|
break;
|
|
|
|
scan_folder(dir, ctx);
|
|
|
|
mpmc_task_done(ctx->dir_queue, ctx->num_threads);
|
|
}
|
|
|
|
return THREAD_RETURN_VALUE;
|
|
}
|
|
|
|
// ----------------------------- Hashing helpers -----------------------------
|
|
static void xxh3_hash_file_stream(const char *path, char *out_hex,
|
|
unsigned char *buf) {
|
|
XXH128_hash_t h;
|
|
XXH3_state_t state;
|
|
XXH3_128bits_reset(&state);
|
|
|
|
FileHandle handle = os_file_open(path, FLAG_SEQUENTIAL_READ);
|
|
if (handle == INVALID_FILE_HANDLE) {
|
|
strcpy(out_hex, "ERROR");
|
|
return;
|
|
}
|
|
|
|
uint64_t bytes_read;
|
|
while (os_file_read(handle, buf, READ_BLOCK, &bytes_read) == 0 &&
|
|
bytes_read > 0) {
|
|
XXH3_128bits_update(&state, buf, (size_t)bytes_read);
|
|
atomic_fetch_add(&g_bytes_processed, bytes_read);
|
|
}
|
|
|
|
os_file_close(handle);
|
|
|
|
h = XXH3_128bits_digest(&state);
|
|
snprintf(out_hex, HASH_STRLEN, "%016llx%016llx", (unsigned long long)h.high64,
|
|
(unsigned long long)h.low64);
|
|
}
|
|
|
|
// ------------------------- Hash worker --------------------------------
|
|
static THREAD_RETURN hash_worker(void *arg) {
|
|
WorkerContext *ctx = (WorkerContext *)arg;
|
|
void *buf = malloc(READ_BLOCK);
|
|
|
|
for (;;) {
|
|
FileEntry *fe = mpmc_pop(ctx->file_queue);
|
|
if (!fe)
|
|
break;
|
|
|
|
char hash[HASH_STRLEN];
|
|
xxh3_hash_file_stream(fe->path, hash, buf);
|
|
|
|
double size_kib = (double)fe->size_bytes / 1024.0;
|
|
char stack_buf[KiB(4)];
|
|
int len;
|
|
|
|
#if FILE_TIMES
|
|
char created[32], modified[32];
|
|
format_time(fe->created_time, created, sizeof(created));
|
|
format_time(fe->modified_time, modified, sizeof(modified));
|
|
#endif
|
|
|
|
#if FILE_TIMES && FILE_OWNER
|
|
len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n",
|
|
hash, fe->path, size_kib, created, modified, fe->owner);
|
|
#elif FILE_TIMES
|
|
len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\n", hash,
|
|
fe->path, size_kib, created, modified);
|
|
#elif FILE_OWNER
|
|
len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\n", hash,
|
|
fe->path, size_kib, fe->owner);
|
|
#else
|
|
len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\n", hash,
|
|
fe->path, size_kib);
|
|
#endif
|
|
|
|
char *dst = arena_push(&ctx->arena, len, false);
|
|
memcpy(dst, stack_buf, len);
|
|
|
|
atomic_fetch_add(&g_files_hashed, 1);
|
|
}
|
|
|
|
free(buf);
|
|
|
|
return THREAD_RETURN_VALUE;
|
|
}
|
|
|
|
// ------------------------- Progress display ---------------------------
|
|
static THREAD_RETURN progress_thread(void *arg) {
|
|
(void)arg;
|
|
|
|
HiResTimer progress_timer;
|
|
timer_start(&progress_timer);
|
|
|
|
uint64_t last_bytes = 0;
|
|
double last_time = 0.0;
|
|
double displayed_speed = 0.0;
|
|
const double sample_interval = 0.5;
|
|
|
|
// Hide cursor to prevent flickering
|
|
printf("\033[?25l");
|
|
|
|
for (;;) {
|
|
uint64_t found = atomic_load(&g_files_found);
|
|
uint64_t hashed = atomic_load(&g_files_hashed);
|
|
uint64_t bytes = atomic_load(&g_bytes_processed);
|
|
int scan_done = atomic_load(&g_scan_done);
|
|
|
|
double t = timer_elapsed(&progress_timer);
|
|
double dt = t - last_time;
|
|
|
|
if (dt >= sample_interval) {
|
|
uint64_t db = (bytes > last_bytes) ? bytes - last_bytes : 0;
|
|
displayed_speed = (double)db / (1024.0 * 1024.0) / dt;
|
|
last_bytes = bytes;
|
|
last_time = t;
|
|
}
|
|
|
|
printf("\r");
|
|
|
|
if (!scan_done) {
|
|
printf("\033[1mScanning:\033[0m %llu files | Hashed: %llu | \033[32m%.2f "
|
|
"MB/s\033[0m ",
|
|
(unsigned long long)found, (unsigned long long)hashed,
|
|
displayed_speed);
|
|
} else {
|
|
double pct = found ? (double)hashed / (double)found : 0.0;
|
|
int barw = 40;
|
|
int filled = (int)(pct * barw);
|
|
|
|
printf("[");
|
|
// Print filled part in Green (\033[32m)
|
|
printf("\033[32m");
|
|
for (int i = 0; i < filled; i++)
|
|
putchar('#');
|
|
// Reset color for empty part
|
|
printf("\033[0m");
|
|
for (int i = filled; i < barw; i++)
|
|
putchar('.');
|
|
|
|
printf("] %6.2f%% (%llu/%llu) \033[32m%.2f MB/s\033[0m ", pct * 100.0,
|
|
(unsigned long long)hashed, (unsigned long long)found,
|
|
displayed_speed);
|
|
}
|
|
|
|
fflush(stdout);
|
|
|
|
if (scan_done && hashed == found)
|
|
break;
|
|
sleep_ms(100);
|
|
}
|
|
|
|
// Restore cursor (\033[?25h) and move to next line
|
|
printf("\033[?25h\n");
|
|
|
|
return THREAD_RETURN_VALUE;
|
|
}
|
|
|
|
// ======================== IO Ring implementation ========================
|
|
#if USE_IORING
|
|
// -------------------------- Data structures ---------------------------
|
|
|
|
// Globals
|
|
u64 g_ioring_buffer_size = 4096 * 64;
|
|
static atomic_uint_fast64_t g_io_ring_fallbacks = 0;
|
|
|
|
#define IO_PENDING INT_MIN
|
|
|
|
typedef struct IoBuffer IoBuffer;
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
// Windows I/O Ring types
|
|
typedef HIORING IoRingHandle;
|
|
#define BUILD_READ_RETURN_VALUE HRESULT
|
|
|
|
#elif defined(__linux__)
|
|
// Linux io_uring types
|
|
typedef struct {
|
|
struct io_uring ring;
|
|
struct io_uring_cqe *cqe_cache;
|
|
int cqe_cache_index;
|
|
int cqe_cache_count;
|
|
} IoUring;
|
|
|
|
typedef IoUring *IoRingHandle;
|
|
typedef struct iovec IORING_BUFFER_INFO;
|
|
#define BUILD_READ_RETURN_VALUE int
|
|
#endif
|
|
|
|
typedef struct FileReadContext {
|
|
FileEntry *fe;
|
|
size_t file_size;
|
|
|
|
// For in-order hashing
|
|
size_t next_read_offset;
|
|
|
|
IoBuffer *head;
|
|
IoBuffer *tail;
|
|
|
|
// Completion tracking
|
|
size_t bytes_hashed;
|
|
uint32_t reads_hashed;
|
|
|
|
uint32_t reads_submitted;
|
|
uint32_t reads_completed;
|
|
|
|
uint32_t active_reads;
|
|
|
|
union {
|
|
XXH3_state_t hash_state; // For incremental hash (large files)
|
|
XXH128_hash_t single_hash; // For single-shot hash (small files)
|
|
};
|
|
|
|
FileHandle file_handle;
|
|
|
|
#if USE_REGISTERED_FILES
|
|
uint32_t slot_id;
|
|
#endif
|
|
|
|
bool use_incremental_hash;
|
|
|
|
bool completed;
|
|
|
|
} FileReadContext;
|
|
|
|
// -------------------------- Buffer structure ---------------------------
|
|
typedef struct IoBuffer {
|
|
FileReadContext *file;
|
|
void *data;
|
|
size_t size;
|
|
size_t offset;
|
|
size_t bytes_read;
|
|
|
|
BUILD_READ_RETURN_VALUE result;
|
|
|
|
int buffer_id;
|
|
struct IoBuffer *next;
|
|
} IoBuffer;
|
|
|
|
// Thread-local I/O Ring context
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
|
|
typedef struct ThreadIoContext {
|
|
IoRingHandle ring;
|
|
void *completion_event;
|
|
|
|
void *fallback_buffer;
|
|
IoBuffer buffers[NUM_BUFFERS_PER_THREAD];
|
|
int buffer_pool[NUM_BUFFERS_PER_THREAD];
|
|
int free_count;
|
|
int num_submissions;
|
|
int active_files;
|
|
bool submitting;
|
|
|
|
#if USE_REGISTERED_FILES
|
|
bool use_registered_files;
|
|
FileHandle registered_handles[MAX_ACTIVE_FILES];
|
|
#endif
|
|
|
|
} ThreadIoContext;
|
|
|
|
#elif defined(__linux__)
|
|
typedef struct ThreadIoContext {
|
|
IoRingHandle ring;
|
|
|
|
void *fallback_buffer;
|
|
IoBuffer buffers[NUM_BUFFERS_PER_THREAD];
|
|
int buffer_pool[NUM_BUFFERS_PER_THREAD];
|
|
int free_count;
|
|
int num_submissions;
|
|
int active_files;
|
|
bool submitting;
|
|
|
|
bool use_registered_buffers;
|
|
bool use_registered_files;
|
|
|
|
} ThreadIoContext;
|
|
|
|
#endif
|
|
typedef struct {
|
|
uint32_t MaxSubmissionQueueSize;
|
|
uint32_t MaxCompletionQueueSize;
|
|
uint32_t MaxVersion;
|
|
} IoRingCapabilities;
|
|
|
|
// ------------------------ IO Ring Abstraction -------------------------
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
|
|
// Windows I/O Ring functions
|
|
static void ioring_query_capabilities(IoRingCapabilities *caps) {
|
|
IORING_CAPABILITIES win_caps;
|
|
QueryIoRingCapabilities(&win_caps);
|
|
caps->MaxSubmissionQueueSize = win_caps.MaxSubmissionQueueSize;
|
|
caps->MaxCompletionQueueSize = win_caps.MaxCompletionQueueSize;
|
|
caps->MaxVersion = win_caps.MaxVersion;
|
|
}
|
|
|
|
static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) {
|
|
IORING_CREATE_FLAGS flags = {0};
|
|
HRESULT hr = CreateIoRing(IORING_VERSION_3, flags, queue_size, queue_size * 2,
|
|
&thread_ctx->ring);
|
|
|
|
// Create completion event
|
|
thread_ctx->completion_event = CreateEvent(NULL, FALSE, FALSE, NULL);
|
|
if (thread_ctx->completion_event)
|
|
SetIoRingCompletionEvent(thread_ctx->ring, thread_ctx->completion_event);
|
|
return SUCCEEDED(hr) ? 0 : -1;
|
|
}
|
|
|
|
static int close_ioring(ThreadIoContext *thread_ctx) {
|
|
|
|
if (thread_ctx->completion_event)
|
|
CloseHandle(thread_ctx->completion_event);
|
|
CloseIoRing(thread_ctx->ring);
|
|
return 0;
|
|
}
|
|
|
|
#define USERDATA_REGISTER 1
|
|
|
|
#define MAKE_BUF_INFO(a, l) \
|
|
(IORING_BUFFER_INFO) { .Address = (a), .Length = (uint32_t)(l) }
|
|
|
|
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) {
|
|
|
|
// uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
|
|
|
|
// The wait_count in windows is not implemented yet, so we wait in
|
|
// ioring_pop_completion()
|
|
|
|
HRESULT hr;
|
|
|
|
// if (wait_count > 0) {
|
|
// hr = SubmitIoRing(ring, wait_count, SUBMIT_TIMEOUT_MS, submitted);
|
|
// } else {
|
|
hr = SubmitIoRing(thread_ctx->ring, 0, SUBMIT_TIMEOUT_MS, submitted);
|
|
// }
|
|
if (thread_ctx->num_submissions > 0) {
|
|
WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS);
|
|
}
|
|
|
|
return SUCCEEDED(hr) ? 0 : -1;
|
|
}
|
|
|
|
static void ioring_register_buffers(ThreadIoContext *thread_ctx,
|
|
IORING_BUFFER_INFO *buf_info) {
|
|
|
|
HRESULT hr = BuildIoRingRegisterBuffers(
|
|
thread_ctx->ring, NUM_BUFFERS_PER_THREAD, buf_info, USERDATA_REGISTER);
|
|
if (FAILED(hr)) {
|
|
char error_msg[256];
|
|
FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
|
|
FORMAT_MESSAGE_IGNORE_INSERTS,
|
|
NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
|
|
error_msg, sizeof(error_msg), NULL);
|
|
|
|
fprintf(stderr, "WARNING: Error registering buffers: %s (0x%08X)\n",
|
|
error_msg, (unsigned int)hr);
|
|
}
|
|
// Submit registration
|
|
ioring_submit(thread_ctx, NULL);
|
|
}
|
|
|
|
#if USE_REGISTERED_FILES
|
|
static void ioring_register_files(ThreadIoContext *thread_ctx) {
|
|
|
|
HRESULT hr = BuildIoRingRegisterFileHandles(
|
|
thread_ctx->ring, MAX_ACTIVE_FILES, thread_ctx->registered_handles,
|
|
USERDATA_REGISTER);
|
|
|
|
if (FAILED(hr)) {
|
|
char error_msg[256];
|
|
FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
|
|
FORMAT_MESSAGE_IGNORE_INSERTS,
|
|
NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
|
|
error_msg, sizeof(error_msg), NULL);
|
|
|
|
fprintf(stderr, "WARNING: File registration failed: %s (0x%08X)\n",
|
|
error_msg, (unsigned int)hr);
|
|
}
|
|
|
|
thread_ctx->use_registered_files = (hr == 0);
|
|
}
|
|
|
|
static void ioring_register_files_update(ThreadIoContext *thread_ctx,
|
|
FileReadContext *file) {
|
|
thread_ctx->registered_handles[file->slot_id] = file->file_handle;
|
|
ioring_register_files(thread_ctx);
|
|
}
|
|
#endif
|
|
|
|
static BUILD_READ_RETURN_VALUE ioring_build_read(ThreadIoContext *thread_ctx,
|
|
FileReadContext *file_ctx,
|
|
uint32_t buffer_id,
|
|
size_t size, uint64_t offset,
|
|
uintptr_t user_data) {
|
|
|
|
#if USE_REGISTERED_FILES
|
|
IORING_HANDLE_REF file_ref;
|
|
|
|
if (thread_ctx->use_registered_files) {
|
|
file_ref = (IORING_HANDLE_REF)IoRingHandleRefFromIndex(file_ctx->slot_id);
|
|
} else {
|
|
file_ref =
|
|
(IORING_HANDLE_REF)IoRingHandleRefFromHandle(file_ctx->file_handle);
|
|
}
|
|
#else
|
|
IORING_HANDLE_REF file_ref = IoRingHandleRefFromHandle(file_ctx->file_handle);
|
|
#endif
|
|
|
|
IORING_BUFFER_REF buffer_ref =
|
|
IoRingBufferRefFromIndexAndOffset(buffer_id, 0);
|
|
|
|
HRESULT hr =
|
|
BuildIoRingReadFile(thread_ctx->ring, file_ref, buffer_ref,
|
|
(uint32_t)size, offset, user_data, IOSQE_FLAGS_NONE);
|
|
if (FAILED(hr)) {
|
|
char error_msg[256];
|
|
FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
|
|
FORMAT_MESSAGE_IGNORE_INSERTS,
|
|
NULL, hr, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
|
|
error_msg, sizeof(error_msg), NULL);
|
|
|
|
fprintf(stderr,
|
|
"ERROR: Building read error for file '%s' - Error: %s (Code: "
|
|
"0x%08X)\n",
|
|
file_ctx->fe->path, error_msg, (unsigned int)hr);
|
|
}
|
|
return hr;
|
|
}
|
|
|
|
static void ioring_process_completions(ThreadIoContext *restrict thread_ctx) {
|
|
uint32_t cqe_count = 0;
|
|
uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
|
|
|
|
while (cqe_count < wait_count) {
|
|
|
|
// ---- Drain all available CQEs (non-blocking) ----
|
|
while (1) {
|
|
IORING_CQE win_cqe;
|
|
|
|
HRESULT hr = PopIoRingCompletion(thread_ctx->ring, &win_cqe);
|
|
|
|
if (hr != S_OK) {
|
|
// No more CQEs available right now
|
|
break;
|
|
}
|
|
|
|
if (FAILED(hr)) {
|
|
fprintf(stderr, "WARNING: PopIoRingCompletion failed (0x%lx)\n", hr);
|
|
return;
|
|
}
|
|
|
|
// Skip internal registration completions
|
|
if (win_cqe.UserData == USERDATA_REGISTER) {
|
|
continue;
|
|
}
|
|
|
|
IoBuffer *restrict buf = (IoBuffer *)win_cqe.UserData;
|
|
FileReadContext *restrict file = buf->file;
|
|
|
|
if (SUCCEEDED(win_cqe.ResultCode)) {
|
|
buf->result = 0;
|
|
buf->bytes_read = win_cqe.Information;
|
|
} else {
|
|
buf->result = win_cqe.ResultCode;
|
|
buf->bytes_read = 0;
|
|
|
|
char error_msg[256];
|
|
FormatMessageA(
|
|
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
|
|
win_cqe.ResultCode, 0, error_msg, sizeof(error_msg), NULL);
|
|
|
|
fprintf(stderr,
|
|
"WARNING: I/O completion error for file '%s' - Error: %s "
|
|
"(Code: 0x%lx)\n",
|
|
buf->file->fe->path, error_msg, win_cqe.ResultCode);
|
|
}
|
|
|
|
file->active_reads--;
|
|
file->reads_completed++;
|
|
thread_ctx->num_submissions--;
|
|
|
|
cqe_count++;
|
|
}
|
|
|
|
// ---- If we already waited enough, exit ----
|
|
if (cqe_count >= wait_count) {
|
|
break;
|
|
}
|
|
|
|
// ---- Otherwise wait for more completions ----
|
|
WaitForSingleObject(thread_ctx->completion_event, SUBMIT_TIMEOUT_MS);
|
|
}
|
|
}
|
|
|
|
FileHandle ioring_open_file(FileEntry *fe) {
|
|
|
|
FileHandle handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ);
|
|
if (handle == INVALID_FILE_HANDLE) {
|
|
return os_file_open(fe->path, FLAG_SEQUENTIAL_READ);
|
|
}
|
|
return handle;
|
|
}
|
|
|
|
#elif defined(__linux__)
|
|
// Linux io_uring functions implementation
|
|
static void ioring_query_capabilities(IoRingCapabilities *caps) {
|
|
// Get system limits for io_uring
|
|
long max_entries = sysconf(_SC_IOV_MAX);
|
|
if (max_entries <= 0)
|
|
max_entries = 4096;
|
|
|
|
caps->MaxSubmissionQueueSize =
|
|
(uint32_t)(max_entries < 4096 ? max_entries : 4096);
|
|
caps->MaxCompletionQueueSize = caps->MaxSubmissionQueueSize * 2;
|
|
caps->MaxVersion = 1;
|
|
}
|
|
|
|
// static int async_io_create_ring(uint32_t queue_size, AsyncIoRing *ring) {
|
|
static int create_ioring(ThreadIoContext *thread_ctx, uint32_t queue_size) {
|
|
IoUring *ring_impl = (IoUring *)calloc(1, sizeof(IoUring));
|
|
if (!ring_impl)
|
|
return -1;
|
|
|
|
// Initialize io_uring
|
|
struct io_uring_params params = {0};
|
|
|
|
params.flags =
|
|
IORING_SETUP_CQSIZE |
|
|
IORING_SETUP_SINGLE_ISSUER | // Thread local io_uring
|
|
IORING_SETUP_DEFER_TASKRUN; // Do not send interupts when a CQE is ready,
|
|
// send them when a wait function is called,
|
|
// and groupe them according to the nbre or
|
|
// entries to wait (reduces syscall overhead)
|
|
|
|
params.cq_entries = queue_size * 2;
|
|
|
|
int ret = io_uring_queue_init_params(queue_size, &ring_impl->ring, ¶ms);
|
|
if (ret < 0) {
|
|
// Fallback to without params
|
|
printf("WARNING: Creating io_uring with default params\n");
|
|
ret = io_uring_queue_init(queue_size, &ring_impl->ring, 0);
|
|
if (ret < 0) {
|
|
free(ring_impl);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
ring_impl->cqe_cache = NULL;
|
|
ring_impl->cqe_cache_index = 0;
|
|
ring_impl->cqe_cache_count = 0;
|
|
|
|
thread_ctx->ring = ring_impl;
|
|
return 0;
|
|
}
|
|
|
|
static int close_ioring(ThreadIoContext *thread_ctx) {
|
|
IoUring *rimg_impl = (IoUring *)thread_ctx->ring;
|
|
if (!rimg_impl)
|
|
return -1;
|
|
|
|
if (thread_ctx->use_registered_buffers) {
|
|
io_uring_unregister_buffers(&rimg_impl->ring);
|
|
}
|
|
io_uring_queue_exit(&rimg_impl->ring);
|
|
free(rimg_impl);
|
|
|
|
return 0;
|
|
}
|
|
|
|
#define MAKE_BUF_INFO(a, l) \
|
|
(IORING_BUFFER_INFO) { .iov_base = (a), .iov_len = (size_t)(l) }
|
|
|
|
static void ioring_register_buffers(ThreadIoContext *thread_ctx,
|
|
IORING_BUFFER_INFO *buf_info) {
|
|
|
|
int ret = io_uring_register_buffers(&((IoUring *)thread_ctx->ring)->ring,
|
|
buf_info, NUM_BUFFERS_PER_THREAD);
|
|
|
|
if (ret < 0) {
|
|
if (ret == -ENOMEM) {
|
|
struct rlimit limit;
|
|
getrlimit(RLIMIT_MEMLOCK, &limit);
|
|
|
|
fprintf(
|
|
stderr,
|
|
"WARNING: Buffer registration failed due to Memlock limit, Error: "
|
|
"Cannot allocate memory (code: -12, ENOMEM).\n"
|
|
"See README for more informations.\n");
|
|
|
|
} else {
|
|
// For any other error (e.g., EFAULT, EBUSY, EINVAL)
|
|
fprintf(stderr, "WARNING: Error registering buffers: %s (code: %d)\n",
|
|
strerror(-ret), ret);
|
|
}
|
|
|
|
fprintf(stderr, "Falling back to unregistered buffers (performance may "
|
|
"be reduced).\n");
|
|
}
|
|
|
|
thread_ctx->use_registered_buffers = (ret == 0);
|
|
}
|
|
|
|
#if USE_REGISTERED_FILES
|
|
static void ioring_register_files(ThreadIoContext *thread_ctx) {
|
|
|
|
int ret = io_uring_register_files_sparse(&((IoUring *)thread_ctx->ring)->ring,
|
|
MAX_ACTIVE_FILES);
|
|
if (ret < 0) {
|
|
fprintf(stderr,
|
|
"WARNING: File registeration failed, fallback to unregistered "
|
|
"files - Error: %s (code: %d)\n",
|
|
strerror(-ret), ret);
|
|
}
|
|
|
|
thread_ctx->use_registered_files = (ret == 0);
|
|
}
|
|
|
|
static void ioring_register_files_update(ThreadIoContext *thread_ctx,
|
|
FileReadContext *file) {
|
|
|
|
// Update the kernel's file table at the specific slot
|
|
int ret = io_uring_register_files_update(
|
|
&((IoUring *)thread_ctx->ring)->ring,
|
|
file->slot_id, // offset - which slot to update
|
|
&file->file_handle, // pointer to the fd
|
|
1 // number of files to update
|
|
);
|
|
|
|
if (ret < 0) {
|
|
fprintf(stderr,
|
|
"WARNING: File registration update failed for slot %u updating "
|
|
"file '%s' - Error: %s "
|
|
"(code: %d)\n"
|
|
"Fallback to unregistered files\n",
|
|
file->slot_id, file->fe->path, strerror(-ret), ret);
|
|
|
|
thread_ctx->use_registered_files = false;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
static int ioring_build_read(ThreadIoContext *thread_ctx,
|
|
FileReadContext *file_ctx, uint32_t buffer_id,
|
|
size_t size, uint64_t offset,
|
|
uintptr_t user_data) {
|
|
|
|
struct io_uring_sqe *sqe =
|
|
io_uring_get_sqe(&((IoUring *)thread_ctx->ring)->ring);
|
|
if (!sqe) {
|
|
printf("SQE FULL\n");
|
|
return -1;
|
|
}
|
|
|
|
void *buf = thread_ctx->buffers[buffer_id].data;
|
|
|
|
#if USE_REGISTERED_FILES
|
|
if (thread_ctx->use_registered_files) {
|
|
sqe->flags |= IOSQE_FIXED_FILE;
|
|
|
|
if (thread_ctx->use_registered_buffers) {
|
|
io_uring_prep_read_fixed(sqe, file_ctx->slot_id, buf, size, offset,
|
|
buffer_id);
|
|
} else {
|
|
io_uring_prep_read(sqe, file_ctx->slot_id, buf, size, offset);
|
|
}
|
|
|
|
io_uring_sqe_set_data64(sqe, user_data);
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
// Fallback: use regular file descriptor
|
|
if (thread_ctx->use_registered_buffers) {
|
|
io_uring_prep_read_fixed(sqe, file_ctx->file_handle, buf, size, offset,
|
|
buffer_id);
|
|
} else {
|
|
io_uring_prep_read(sqe, file_ctx->file_handle, buf, size, offset);
|
|
}
|
|
|
|
io_uring_sqe_set_data64(sqe, user_data);
|
|
return 0;
|
|
}
|
|
|
|
static int ioring_submit(ThreadIoContext *thread_ctx, uint32_t *submitted) {
|
|
int ret;
|
|
|
|
uint32_t wait_count = MIN(thread_ctx->num_submissions, MAX_WAIT_COUNT);
|
|
|
|
if (wait_count > 0) {
|
|
ret = io_uring_submit_and_wait(&((IoUring *)thread_ctx->ring)->ring,
|
|
wait_count);
|
|
} else {
|
|
ret = io_uring_submit(&((IoUring *)thread_ctx->ring)->ring);
|
|
}
|
|
|
|
if (ret < 0) {
|
|
fprintf(stderr, "ERROR: Submit error: %s (Code: %d)\n", strerror(-ret),
|
|
ret);
|
|
return -1;
|
|
}
|
|
|
|
if (submitted)
|
|
*submitted = (uint32_t)ret;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void ioring_process_completions(ThreadIoContext *thread_ctx) {
|
|
struct io_uring_cqe *cqes[NUM_BUFFERS_PER_THREAD];
|
|
|
|
unsigned cqe_count = io_uring_peek_batch_cqe(&((IoUring *)thread_ctx->ring)->ring,
|
|
cqes, NUM_BUFFERS_PER_THREAD);
|
|
|
|
if (cqe_count == 0) {
|
|
return;
|
|
}
|
|
|
|
for (unsigned i = 0; i < cqe_count; i++) {
|
|
struct io_uring_cqe *cqe = cqes[i];
|
|
|
|
int res = cqe->res;
|
|
|
|
IoBuffer *restrict buf = (IoBuffer *)cqe->user_data;
|
|
FileReadContext *restrict file = buf->file;
|
|
|
|
if (res >= 0) {
|
|
buf->result = 0;
|
|
buf->bytes_read = (uint32_t)res;
|
|
} else {
|
|
buf->result = res;
|
|
buf->bytes_read = 0;
|
|
|
|
fprintf(stderr,
|
|
"WARNING: I/O completion error for file '%s' - Error: %s (Code: "
|
|
"%d)\n",
|
|
buf->file->fe->path, strerror(-res), res);
|
|
}
|
|
|
|
file->active_reads--;
|
|
file->reads_completed++;
|
|
thread_ctx->num_submissions--;
|
|
}
|
|
|
|
// Mark CQE as seen, equivalent to io_uring_cqe_seen() but marks multiple CQEs
|
|
io_uring_cq_advance(&((IoUring *)thread_ctx->ring)->ring, cqe_count);
|
|
}
|
|
|
|
FileHandle ioring_open_file(FileEntry *fe) {
|
|
|
|
#if CHECK_FILE_SYSTEM
|
|
if (!fs_policy(fe->fs_type)) {
|
|
return os_file_open(fe->path, FLAG_SEQUENTIAL_READ);
|
|
}
|
|
#endif
|
|
|
|
FileHandle handle = os_file_open(fe->path, FLAG_ASYNC_DIRECT_READ);
|
|
if (handle == INVALID_FILE_HANDLE) {
|
|
return os_file_open(fe->path, FLAG_SEQUENTIAL_READ);
|
|
}
|
|
return handle;
|
|
}
|
|
#endif
|
|
|
|
// OS-agnostic helper macros
|
|
#define IORING_SUCCEEDED(result) ((result) >= 0)
|
|
#define IORING_FAILED(result) ((result) < 0)
|
|
|
|
// ---------------------- FIFO queue operations ---------------------------
|
|
typedef struct FileQueue {
|
|
FileReadContext files[MAX_ACTIVE_FILES];
|
|
int head;
|
|
int tail;
|
|
int count;
|
|
} FileQueue;
|
|
|
|
static FileReadContext *fq_push(FileQueue *restrict fq) {
|
|
if (fq->count == MAX_ACTIVE_FILES)
|
|
return NULL;
|
|
|
|
FileReadContext *restrict file = &fq->files[fq->tail];
|
|
#if USE_REGISTERED_FILES
|
|
file->slot_id = fq->tail;
|
|
#endif
|
|
|
|
fq->tail = (fq->tail + 1) % MAX_ACTIVE_FILES;
|
|
fq->count++;
|
|
return file;
|
|
}
|
|
|
|
static FileReadContext *fq_peek_tail(FileQueue *fq) {
|
|
if (fq->count == 0)
|
|
return NULL;
|
|
|
|
int idx = (fq->tail - 1 + MAX_ACTIVE_FILES) % MAX_ACTIVE_FILES;
|
|
return &fq->files[idx]; // return the newest file
|
|
}
|
|
|
|
static FileReadContext *fq_peek_at(FileQueue *fq, int index) {
|
|
if (index < 0 || index >= fq->count)
|
|
return NULL;
|
|
|
|
int idx = (fq->head + index) % MAX_ACTIVE_FILES;
|
|
return &fq->files[idx];
|
|
}
|
|
|
|
static void fq_trim(FileQueue *restrict fq) {
|
|
while (fq->count > 0) {
|
|
FileReadContext *restrict file = &fq->files[fq->head];
|
|
|
|
if (!file->completed)
|
|
break;
|
|
|
|
fq->head = (fq->head + 1) % MAX_ACTIVE_FILES;
|
|
fq->count--;
|
|
}
|
|
}
|
|
|
|
// ----------------- Initialize thread context -----------------------
|
|
static ThreadIoContext *ioring_init_thread(void) {
|
|
ThreadIoContext *restrict thread_ctx =
|
|
(ThreadIoContext *)calloc(1, sizeof(ThreadIoContext));
|
|
if (!thread_ctx)
|
|
return NULL;
|
|
|
|
// Query I/O Ring capabilities
|
|
IoRingCapabilities caps;
|
|
ioring_query_capabilities(&caps);
|
|
|
|
uint32_t queue_size = caps.MaxSubmissionQueueSize;
|
|
if (queue_size > 4096)
|
|
queue_size = 4096; // Cap at 4096 for reasonable memory usage
|
|
|
|
// Create I/O Ring
|
|
if (create_ioring(thread_ctx, queue_size) != 0) {
|
|
free(thread_ctx);
|
|
thread_ctx = NULL;
|
|
return NULL;
|
|
}
|
|
|
|
// Initialize buffer pool
|
|
thread_ctx->fallback_buffer = malloc(READ_BLOCK);
|
|
|
|
IORING_BUFFER_INFO buf_info[NUM_BUFFERS_PER_THREAD];
|
|
|
|
u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD;
|
|
|
|
// Reserve and Commit memory for buffers
|
|
void *base_ptr = plat_mem_reserve(buf_pool_size);
|
|
if (base_ptr) {
|
|
if (!plat_mem_commit(base_ptr, buf_pool_size)) {
|
|
plat_mem_release(base_ptr, 0);
|
|
close_ioring(thread_ctx);
|
|
free(thread_ctx);
|
|
thread_ctx = NULL;
|
|
return NULL;
|
|
}
|
|
} else {
|
|
|
|
close_ioring(thread_ctx);
|
|
free(thread_ctx);
|
|
thread_ctx = NULL;
|
|
return NULL;
|
|
}
|
|
|
|
for (int i = 0; i < NUM_BUFFERS_PER_THREAD; i++) {
|
|
thread_ctx->buffers[i].data = (u8 *)base_ptr + (i * g_ioring_buffer_size);
|
|
thread_ctx->buffer_pool[i] = i;
|
|
thread_ctx->buffers[i].buffer_id = i;
|
|
|
|
buf_info[i] =
|
|
MAKE_BUF_INFO(thread_ctx->buffers[i].data, g_ioring_buffer_size);
|
|
}
|
|
|
|
thread_ctx->free_count = NUM_BUFFERS_PER_THREAD;
|
|
|
|
// Register buffers
|
|
ioring_register_buffers(thread_ctx, buf_info);
|
|
|
|
#if USE_REGISTERED_FILES
|
|
ioring_register_files(thread_ctx);
|
|
#endif
|
|
|
|
thread_ctx->submitting = true;
|
|
thread_ctx->num_submissions = 0;
|
|
thread_ctx->active_files = 0;
|
|
|
|
return thread_ctx;
|
|
}
|
|
|
|
static void ioring_cleanup_thread(ThreadIoContext *thread_ctx) {
|
|
if (!thread_ctx)
|
|
return;
|
|
|
|
if (thread_ctx->ring)
|
|
close_ioring(thread_ctx);
|
|
|
|
// Free the buffer pool memory
|
|
if (thread_ctx->buffers[0].data) {
|
|
u64 buf_pool_size = g_ioring_buffer_size * NUM_BUFFERS_PER_THREAD;
|
|
plat_mem_release(thread_ctx->buffers[0].data, buf_pool_size);
|
|
}
|
|
|
|
free(thread_ctx);
|
|
thread_ctx = NULL;
|
|
}
|
|
|
|
// -------------------------- Buffer get and return ------------------------
|
|
static IoBuffer *get_free_buffer(ThreadIoContext *restrict thread_ctx) {
|
|
|
|
if (thread_ctx->free_count == 0) {
|
|
return NULL;
|
|
}
|
|
|
|
int idx = thread_ctx->buffer_pool[--thread_ctx->free_count];
|
|
IoBuffer *restrict buf = &thread_ctx->buffers[idx];
|
|
buf->bytes_read = 0;
|
|
buf->result = IO_PENDING;
|
|
buf->next = NULL;
|
|
|
|
return buf;
|
|
}
|
|
|
|
static void return_buffer(ThreadIoContext *restrict thread_ctx, IoBuffer *restrict buf) {
|
|
if (!buf)
|
|
return;
|
|
|
|
thread_ctx->buffer_pool[thread_ctx->free_count++] = buf->buffer_id;
|
|
}
|
|
|
|
// -------------------- File operations -----------------------
|
|
static int init_file(ThreadIoContext *restrict thread_ctx, FileReadContext *restrict file,
|
|
FileEntry *restrict fe) {
|
|
|
|
#if USE_REGISTERED_FILES
|
|
uint32_t saved_slot_id = file->slot_id;
|
|
#endif
|
|
|
|
memset(file, 0, sizeof(*file));
|
|
|
|
file->fe = fe;
|
|
file->file_size = fe->size_bytes;
|
|
|
|
file->file_handle = ioring_open_file(fe);
|
|
|
|
if (file->file_handle == INVALID_FILE_HANDLE) {
|
|
|
|
#if IORING_DEBUG_PRINTS
|
|
printf("ERROR: Could not open file '%s'\n", fe->path);
|
|
#endif
|
|
return 0;
|
|
}
|
|
|
|
#if USE_REGISTERED_FILES
|
|
file->slot_id = saved_slot_id;
|
|
|
|
if (thread_ctx->use_registered_files) {
|
|
ioring_register_files_update(thread_ctx, file);
|
|
}
|
|
#endif
|
|
|
|
// Determine hash method based on file size
|
|
if (file->file_size > g_ioring_buffer_size) {
|
|
file->use_incremental_hash = true;
|
|
XXH3_128bits_reset(&file->hash_state);
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
static void finalize_file(ThreadIoContext *restrict thread_ctx,
|
|
WorkerContext *worker_ctx, FileReadContext *restrict file) {
|
|
|
|
FileEntry *restrict fe = file->fe;
|
|
|
|
os_file_close(file->file_handle);
|
|
|
|
char hash[HASH_STRLEN];
|
|
|
|
if (file->bytes_hashed == file->file_size) {
|
|
if (file->use_incremental_hash) {
|
|
// Large file: digest the accumulated hash state
|
|
XXH128_hash_t h = XXH3_128bits_digest(&file->hash_state);
|
|
snprintf(hash, HASH_STRLEN, "%016llx%016llx",
|
|
(unsigned long long)h.high64, (unsigned long long)h.low64);
|
|
} else {
|
|
// Small file: hash already computed, stored directly in single_hash
|
|
snprintf(hash, HASH_STRLEN, "%016llx%016llx",
|
|
(unsigned long long)file->single_hash.high64,
|
|
(unsigned long long)file->single_hash.low64);
|
|
}
|
|
} else {
|
|
#if IORING_DEBUG_PRINTS
|
|
printf("WARNING: Fallback for path: %s\n", fe->path);
|
|
#endif
|
|
|
|
atomic_fetch_add(&g_io_ring_fallbacks, 1);
|
|
xxh3_hash_file_stream(fe->path, hash, thread_ctx->fallback_buffer);
|
|
}
|
|
|
|
double size_kib = (double)fe->size_bytes / 1024.0;
|
|
char stack_buf[KiB(4)];
|
|
int len;
|
|
|
|
#if FILE_TIMES
|
|
char created[32], modified[32];
|
|
format_time(fe->created_time, created, sizeof(created));
|
|
format_time(fe->modified_time, modified, sizeof(modified));
|
|
#endif
|
|
|
|
#if FILE_TIMES && FILE_OWNER
|
|
len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\t%s\n",
|
|
hash, fe->path, size_kib, created, modified, fe->owner);
|
|
#elif FILE_TIMES
|
|
len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\t%s\n", hash,
|
|
fe->path, size_kib, created, modified);
|
|
#elif FILE_OWNER
|
|
len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\t%s\n", hash,
|
|
fe->path, size_kib, fe->owner);
|
|
#else
|
|
len = snprintf(stack_buf, sizeof(stack_buf), "%s\t%s\t%.2f\n", hash, fe->path,
|
|
size_kib);
|
|
#endif
|
|
|
|
char *restrict dst = arena_push(&worker_ctx->arena, len, false);
|
|
memcpy(dst, stack_buf, len);
|
|
|
|
atomic_fetch_add(&g_files_hashed, 1);
|
|
}
|
|
|
|
// -------------------- Hash files -----------------------
|
|
static void hash_ready_files(ThreadIoContext *restrict thread_ctx, FileQueue *restrict fq,
|
|
WorkerContext *worker_ctx) {
|
|
|
|
for (int i = 0; i < fq->count; i++) {
|
|
|
|
FileReadContext *restrict file = fq_peek_at(fq, i);
|
|
if (!file || file->completed)
|
|
continue;
|
|
|
|
// ---- HASH READY BUFFERS IN ORDER ----
|
|
while (file->head) {
|
|
|
|
IoBuffer *restrict buf = file->head;
|
|
|
|
// CQE not received yet
|
|
if (buf->result == IO_PENDING)
|
|
break;
|
|
|
|
// Consume buffer
|
|
file->head = buf->next;
|
|
|
|
if (IORING_SUCCEEDED(buf->result) && buf->bytes_read > 0) {
|
|
|
|
size_t bytes_to_hash = buf->bytes_read;
|
|
|
|
if (buf->offset + buf->bytes_read > file->file_size) {
|
|
bytes_to_hash = file->file_size - buf->offset;
|
|
}
|
|
|
|
if (bytes_to_hash > 0) {
|
|
if (file->use_incremental_hash) {
|
|
XXH3_128bits_update(&file->hash_state, buf->data, bytes_to_hash);
|
|
} else {
|
|
file->single_hash = XXH3_128bits(buf->data, bytes_to_hash);
|
|
}
|
|
|
|
file->bytes_hashed += bytes_to_hash;
|
|
atomic_fetch_add(&g_bytes_processed, bytes_to_hash);
|
|
}
|
|
|
|
file->reads_hashed++;
|
|
|
|
} else if (buf->bytes_read == 0 && IORING_SUCCEEDED(buf->result)) {
|
|
file->reads_hashed++; // EOF
|
|
} else {
|
|
finalize_file(thread_ctx, worker_ctx, file);
|
|
file->completed = true;
|
|
}
|
|
|
|
return_buffer(thread_ctx, buf);
|
|
}
|
|
|
|
// ---- FINALIZE ----
|
|
if (!file->completed && file->active_reads == 0 &&
|
|
file->bytes_hashed >= file->file_size) {
|
|
|
|
finalize_file(thread_ctx, worker_ctx, file);
|
|
file->completed = true;
|
|
thread_ctx->active_files--;
|
|
}
|
|
}
|
|
|
|
// Clean up completed files from the head
|
|
fq_trim(fq);
|
|
}
|
|
|
|
// ------------------ Build pending reads ----------------------
|
|
static void build_pending_reads(ThreadIoContext *restrict thread_ctx, FileQueue *restrict fq,
|
|
WorkerContext *worker_ctx) {
|
|
|
|
MPMCQueue *file_queue = worker_ctx->file_queue;
|
|
|
|
FileReadContext *restrict file = fq_peek_tail(fq);
|
|
|
|
for (;;) {
|
|
|
|
// BUILD READS FOR CURRENT FILE
|
|
if (file) {
|
|
while (file->next_read_offset < file->file_size) {
|
|
|
|
IoBuffer *restrict buf = get_free_buffer(thread_ctx);
|
|
if (!buf)
|
|
return;
|
|
|
|
size_t remaining = file->file_size - file->next_read_offset;
|
|
size_t bytes_to_read;
|
|
|
|
if (remaining >= g_ioring_buffer_size) {
|
|
bytes_to_read = g_ioring_buffer_size;
|
|
} else {
|
|
bytes_to_read = ALIGN_UP_POW2(remaining, g_pagesize);
|
|
}
|
|
|
|
// Initialize buffer
|
|
buf->file = file;
|
|
buf->offset = file->next_read_offset;
|
|
buf->size = bytes_to_read;
|
|
|
|
// Chain buffer
|
|
if (!file->head) {
|
|
file->head = buf;
|
|
} else {
|
|
file->tail->next = buf;
|
|
}
|
|
file->tail = buf;
|
|
|
|
BUILD_READ_RETURN_VALUE hr =
|
|
ioring_build_read(thread_ctx, file, buf->buffer_id, bytes_to_read,
|
|
buf->offset, (uintptr_t)buf);
|
|
|
|
if (IORING_FAILED(hr)) {
|
|
// mark failure and stop this file
|
|
return_buffer(thread_ctx, buf);
|
|
finalize_file(thread_ctx, worker_ctx, file);
|
|
file->completed = true;
|
|
break;
|
|
}
|
|
|
|
file->active_reads++;
|
|
file->reads_submitted++;
|
|
thread_ctx->num_submissions++;
|
|
|
|
file->next_read_offset += bytes_to_read;
|
|
}
|
|
}
|
|
|
|
// ADD NEW FILE
|
|
if (!thread_ctx->submitting)
|
|
return;
|
|
|
|
if (fq->count >= MAX_ACTIVE_FILES)
|
|
return;
|
|
|
|
FileEntry *fe = mpmc_pop(file_queue);
|
|
if (!fe) {
|
|
thread_ctx->submitting = false;
|
|
return;
|
|
}
|
|
|
|
FileReadContext *newfile = fq_push(fq);
|
|
|
|
if (!init_file(thread_ctx, newfile, fe)) {
|
|
finalize_file(thread_ctx, worker_ctx, newfile);
|
|
newfile->completed = true;
|
|
continue;
|
|
}
|
|
|
|
file = newfile;
|
|
thread_ctx->active_files++;
|
|
}
|
|
}
|
|
|
|
// -------------------------- Hash worker I/O Ring ---------------------------
|
|
static THREAD_RETURN hash_worker_ioring(void *arg) {
|
|
WorkerContext *worker_ctx = (WorkerContext *)arg;
|
|
|
|
// Init IO ring
|
|
ThreadIoContext *thread_ctx = ioring_init_thread();
|
|
if (!thread_ctx || !thread_ctx->ring) {
|
|
printf("I/O Ring unavailable, using buffered I/O\n");
|
|
return hash_worker(arg);
|
|
}
|
|
|
|
// Initialize pipeline state
|
|
FileQueue fq;
|
|
memset(&fq, 0, sizeof(fq));
|
|
|
|
uint32_t submitted;
|
|
|
|
// Main pipeline loop
|
|
for (;;) {
|
|
|
|
// Submit new reads
|
|
build_pending_reads(thread_ctx, &fq, worker_ctx);
|
|
|
|
submitted = 0;
|
|
ioring_submit(thread_ctx, &submitted);
|
|
|
|
// Process completions
|
|
ioring_process_completions(thread_ctx);
|
|
|
|
// Hash files
|
|
hash_ready_files(thread_ctx, &fq, worker_ctx);
|
|
|
|
#if IORING_DEBUG_STATS
|
|
printf(
|
|
"Free buffers: %d, Submissions: %d, Active files: %d, fq count: %d\n",
|
|
thread_ctx->free_count, thread_ctx->num_submissions,
|
|
thread_ctx->active_files, fq.count);
|
|
#endif
|
|
|
|
// Exit condition
|
|
if (!thread_ctx->submitting && thread_ctx->active_files == 0 &&
|
|
thread_ctx->num_submissions == 0) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
ioring_cleanup_thread(thread_ctx);
|
|
return THREAD_RETURN_VALUE;
|
|
}
|
|
#endif
|