Skip to content

WIP / Experiment: reference counting directory FDs to keep them alive longer #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions include/QueuePerThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ QPTPool_t *QPTPool_init_with_props(const size_t nthreads, void *args,
const uint64_t queue_limit, const char *swap_prefix,
const uint64_t steal_num, const uint64_t steal_denom);

void QPTPool_set_destructor(QPTPool_t *pool, void (*destructor)(void *));
/*
* QPTPool_init only allocates memory - call this to start threads
*
Expand Down
19 changes: 19 additions & 0 deletions include/bf.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ OF SUCH DAMAGE.
#ifndef BF_H
#define BF_H

#include <dirent.h>
#include <inttypes.h>
#include <sys/stat.h>

Expand Down Expand Up @@ -357,6 +358,9 @@ struct work {
long long int pinode;
size_t recursion_level;

/* an optional reference to the parent dir -- to keep it alive */
struct dir_rc *parent_dir;

/* probably shouldn't be here */
char * fullpath;
size_t fullpath_len;
Expand All @@ -367,6 +371,21 @@ struct work {
size_t struct_work_size(struct work *w);
struct work *new_work_with_name(const char *prefix, const size_t prefix_len,
const char *basename, const size_t basename_len);
void free_work(void *p);

/*
* A reference-counted directory handle.
*/
struct dir_rc {
DIR *dir;
uint64_t rc;
int dont_clone;
};

struct dir_rc *open_dir_rc(struct work *w);
int get_dir_fd(struct dir_rc *dir);
struct dir_rc *dir_clone(struct dir_rc *dir);
void dir_dec(struct dir_rc *dir);

/* extra data used by entries that does not depend on data from other directories */
struct entry_data {
Expand Down
2 changes: 1 addition & 1 deletion include/descend.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ struct descend_counters {
*/
int descend(QPTPool_t *ctx, const size_t id, void *args,
struct input *in, struct work *work, ino_t inode,
DIR *dir, const int skip_db,
struct dir_rc *d_rc, const int skip_db,
QPTPool_f processdir, process_nondir_f processnondir, void *nondir_args,
struct descend_counters *counters);

Expand Down
59 changes: 56 additions & 3 deletions src/QueuePerThreadPool.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,52 @@



#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/resource.h>

#include "QueuePerThreadPool.h"
#include "SinglyLinkedList.h"
#include "swap.h"
#include "utils.h"

extern rlim_t MAX_OPEN_FILES;
/*
* Determine how many file descriptors to allocate to potentially long-lived
* directory handles. Since the number of directory handles alive at once could
* be unbounded, it would risk file descriptor exhaustion if a limit is not
* imposed.
*/
void init_open_file_limit(size_t nthreads) {
struct rlimit rl;
int res = getrlimit(RLIMIT_NOFILE, &rl);
if (res) {
fprintf(stderr, "Warning: could not get open file limit: %s\n", strerror(errno));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store errno first

MAX_OPEN_FILES = 0;
return;
}

/*
* Reserve 3 file descriptors per thread so that they always can always open
* the files they are currently working on. (This factor was determined
* experimentally.)
*/
size_t reserve_fds = nthreads * 3;

if (rl.rlim_cur <= reserve_fds) {
// fprintf(stderr, "Warning: system may not allow enough open files for the number of requested threads.\n");
// fprintf(stderr, "Max number of open files: %llu; Number of threads requested: %llu\n", rl.rlim_cur, nthreads);
Comment on lines +102 to +103

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
MAX_OPEN_FILES = 0;
return;
};

MAX_OPEN_FILES = rl.rlim_cur - reserve_fds;
}

typedef enum {
INITIALIZED,
RUNNING,
Expand Down Expand Up @@ -119,6 +156,12 @@
uint64_t swapped;

QPTPoolThreadData_t *data;

/*
* An optional pointer to a destructor for the work items in this QPTPool.
* If it is not specified, then the destructor is just free().
*/
void (*destructor)(void *);
};

/* struct to pass into pthread_create */
Expand Down Expand Up @@ -551,6 +594,11 @@
return 0;
}

void QPTPool_set_destructor(QPTPool_t *ctx, void (*destructor)(void *)) {
assert(destructor != NULL);
ctx->destructor = destructor;
}

QPTPool_t *QPTPool_init_with_props(const size_t nthreads, void *args,
QPTPoolNextFunc_t next_func, void *next_args,
const uint64_t queue_limit, const char *swap_prefix,
Expand All @@ -559,6 +607,8 @@
return NULL;
}

init_open_file_limit(nthreads);

QPTPool_t *ctx = malloc(sizeof(QPTPool_t));
ctx->nthreads = nthreads;
ctx->queue_limit = queue_limit;
Expand All @@ -574,6 +624,8 @@
ctx->incomplete = 0;
ctx->swapped = 0;

ctx->destructor = free;

/* this can fail since nthreads is user input */
ctx->data = calloc(nthreads, sizeof(QPTPoolThreadData_t));
if (!ctx->data) {
Expand Down Expand Up @@ -1071,11 +1123,12 @@
*
* If QPTPool_start was not called, queues might not be empty since
* enqueuing work without starting the worker threads is allowed,
* so free() is called to clear out any unprocessed work items
* so free(), or a user-defined destructor, is called to clear out
* any unprocessed work items
*/
sll_destroy(&data->waiting, free);
sll_destroy(&data->waiting, ctx->destructor);
pthread_mutex_destroy(&data->claimed_mutex);
sll_destroy(&data->claimed, free);
sll_destroy(&data->claimed, ctx->destructor);
}

pthread_cond_destroy(&ctx->cv);
Expand Down
130 changes: 130 additions & 0 deletions src/bf.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ OF SUCH DAMAGE.



#include <errno.h>
#include <fcntl.h>
#include <pwd.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/resource.h>

#include "bf.h"
#include "dbutils.h"
Expand Down Expand Up @@ -589,6 +592,127 @@ INSTALL_NUMBER(INT, int, "%d")
INSTALL_NUMBER(SIZE, size_t, "%zu")
INSTALL_NUMBER(UINT64, uint64_t, "%" PRIu64)

rlim_t MAX_OPEN_FILES; /* Maximum number of files that may be held open by dir_rc objects */
uint64_t CUR_OPEN_FILES; /* Current number of open files held by dir_rc objects */

/*
* For understanding performance, track how many directories were opened
* with relative vs. absolute paths.
*/
static uint64_t n_openats = 0;
static uint64_t n_opens = 0;

#if defined(DEBUG) && 0
__attribute__((destructor)) static void print_openat_stats(void) {
uint64_t tot = n_openats + n_opens;
if (tot > 0) {
printf("%" PRIu64 "/%" PRIu64 " (%" PRIu64 "%%) of directories used openat() over open()\n",
n_openats, tot, n_openats * 100 / tot);
}
}
#endif

/*
* Increment the reference count for a dir_rc.
*/
static void dir_inc(struct dir_rc *dir) {
// XXX: is relaxed OK here?
__atomic_fetch_add(&dir->rc, 1, __ATOMIC_ACQ_REL);
}

/*
* Create a new reference-counted DIR object for the given `struct work`.
*
* If work->parent_dir is a non-NULL dir_rc, then openat() the new DIR relative to it.
* This assumes that w->basename_len is correctly initialized!
*
* Otherwise, just opendir() the path.
*
* Increments the refcount on the new object.
*
* If we are hitting the limit on the maximum number of open files available,
* then designate the new dir_rc as "non-clonable" so that it is closed as soon as
* processing the current directory is complete.
*/
struct dir_rc *open_dir_rc(struct work *w) {
DIR *dir;

if (w->parent_dir) {
__atomic_add_fetch(&n_openats, 1, __ATOMIC_RELAXED);
int d_fd = get_dir_fd(w->parent_dir);
char *basename = w->name + w->name_len - w->basename_len;
int fd = openat(d_fd, basename, O_RDONLY|O_DIRECTORY);
if (fd < 0) {
goto err;
}
dir = fdopendir(fd);
} else {
__atomic_add_fetch(&n_opens, 1, __ATOMIC_RELAXED);
dir = opendir(w->name);
}

if (!dir) {
goto err;
}

struct dir_rc *new = calloc(1, sizeof(*new));
new->dir = dir;
dir_inc(new);

uint64_t cur_open_files = __atomic_add_fetch(&CUR_OPEN_FILES, 1, __ATOMIC_ACQ_REL);
if (cur_open_files >= MAX_OPEN_FILES) {
new->dont_clone = 1;
}

return new;

err:
if (errno == EMFILE) {
fprintf(stderr, "Warning: too many open files, index may not be complete!\n");
}

return NULL;
}

/*
* Get a directory FD out of a dir_rc.
*/
int get_dir_fd(struct dir_rc *dir) {
return gufi_dirfd(dir->dir);
}

/*
* Attempt to clone a `dir_rc`.
*
* If succesful, this increments the refcount and returns a pointer to the
* dir_rc to the caller.
*
* If unsuccesful, returns NULL.
*
* The caller MUST be prepared for cloning to fail!
*/
struct dir_rc *dir_clone(struct dir_rc *dir) {
if (dir->dont_clone) {
return NULL;
}

dir_inc(dir);
return dir;
}

/*
* Decrement the reference count for a dir_rc, and free it if that was the last reference.
*/
void dir_dec(struct dir_rc *dir) {
if (dir) {
if (__atomic_sub_fetch(&dir->rc, 1, __ATOMIC_ACQ_REL) == 0) {
closedir(dir->dir);
__atomic_sub_fetch(&CUR_OPEN_FILES, 1, __ATOMIC_ACQ_REL);
free(dir);
}
}
}

/*
* Returns size of a dynamically sized struct work_packed.
*
Expand Down Expand Up @@ -627,3 +751,9 @@ struct work *new_work_with_name(const char *prefix, const size_t prefix_len,

return w;
}

void free_work(void *p) {
struct work *w = (struct work *) p;
dir_dec(w->parent_dir);
free(w);
}
11 changes: 7 additions & 4 deletions src/descend.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@ static int work_serialize_and_free(const int fd, QPTPool_f func, void *work, siz
*/
int descend(QPTPool_t *ctx, const size_t id, void *args,
struct input *in, struct work *work, ino_t inode,
DIR *dir, const int skip_db,
struct dir_rc *d_rc, const int skip_db,
QPTPool_f processdir, process_nondir_f processnondir, void *nondir_args,
struct descend_counters *counters) {
if (!work) {
return 1;
}

DIR *dir = d_rc->dir;

trie_t *skip_names = in->skip;

struct descend_counters ctrs;
Expand Down Expand Up @@ -125,6 +127,7 @@ int descend(QPTPool_t *ctx, const size_t id, void *args,
}

struct work *child = new_work_with_name(work->name, work->name_len, dir_child->d_name, len);
child->parent_dir = dir_clone(d_rc);

struct entry_data child_ed;
memset(&child_ed, 0, sizeof(child_ed));
Expand Down Expand Up @@ -199,7 +202,7 @@ int descend(QPTPool_t *ctx, const size_t id, void *args,
}
else {
/* skip enqueuing and just free */
free(child);
free_work(child);
}
continue;
}
Expand All @@ -215,7 +218,7 @@ int descend(QPTPool_t *ctx, const size_t id, void *args,
}
else {
/* other types are not stored */
free(child);
free_work(child);
continue;
}

Expand All @@ -239,7 +242,7 @@ int descend(QPTPool_t *ctx, const size_t id, void *args,
}
}

free(child);
free_work(child);
}
}

Expand Down
Loading
Loading