Skip to content

Commit 3378a3d

Browse files
committed
add ability to set destructor in QueuePerThreadPool
this is needed when the queues hold items that need additional work to free above just calling free(). so that they can clean up everything on exit properly.
1 parent eefa227 commit 3378a3d

File tree

5 files changed

+32
-10
lines changed

5 files changed

+32
-10
lines changed

include/QueuePerThreadPool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ QPTPool_t *QPTPool_init_with_props(const size_t nthreads, void *args,
115115
const uint64_t queue_limit, const char *swap_prefix,
116116
const uint64_t steal_num, const uint64_t steal_denom);
117117

118+
void QPTPool_set_destructor(QPTPool_t *pool, void (*destructor)(void *));
118119
/*
119120
* QPTPool_init only allocates memory - call this to start threads
120121
*

include/bf.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ struct work {
360360
size_t struct_work_size(struct work *w);
361361
struct work *new_work_with_name(const char *prefix, const size_t prefix_len,
362362
const char *basename, const size_t basename_len);
363-
void free_work(struct work *w);
363+
void free_work(void *p);
364364

365365
/*
366366
* A reference-counted directory handle.

src/QueuePerThreadPool.c

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ struct QPTPool {
119119
uint64_t swapped;
120120

121121
QPTPoolThreadData_t *data;
122+
123+
/*
124+
* An optional pointer to a destructor for the work items in this QPTPool.
125+
* If it is not specified, then the destructor is just free().
126+
*/
127+
void (*destructor)(void *);
122128
};
123129

124130
/* struct to pass into pthread_create */
@@ -551,6 +557,10 @@ int QPTPool_get_steal(QPTPool_t *ctx, uint64_t *num, uint64_t *denom) {
551557
return 0;
552558
}
553559

560+
void QPTPool_set_destructor(QPTPool_t *pool, void (*destructor)(void *)) {
561+
pool->destructor = destructor;
562+
}
563+
554564
QPTPool_t *QPTPool_init_with_props(const size_t nthreads, void *args,
555565
QPTPoolNextFunc_t next_func, void *next_args,
556566
const uint64_t queue_limit, const char *swap_prefix,
@@ -574,6 +584,8 @@ QPTPool_t *QPTPool_init_with_props(const size_t nthreads, void *args,
574584
ctx->incomplete = 0;
575585
ctx->swapped = 0;
576586

587+
ctx->destructor = NULL;
588+
577589
/* this can fail since nthreads is user input */
578590
ctx->data = calloc(nthreads, sizeof(QPTPoolThreadData_t));
579591
if (!ctx->data) {
@@ -1057,6 +1069,13 @@ void QPTPool_destroy(QPTPool_t *ctx) {
10571069
return;
10581070
}
10591071

1072+
void (*destructor)(void *);
1073+
if (ctx->destructor) {
1074+
destructor = ctx->destructor;
1075+
} else {
1076+
destructor = free;
1077+
}
1078+
10601079
for(size_t i = 0; i < ctx->nthreads; i++) {
10611080
QPTPoolThreadData_t *data = &ctx->data[i];
10621081
data->threads_successful = 0;
@@ -1073,9 +1092,9 @@ void QPTPool_destroy(QPTPool_t *ctx) {
10731092
* enqueuing work without starting the worker threads is allowed,
10741093
* so free() is called to clear out any unprocessed work items
10751094
*/
1076-
sll_destroy(&data->waiting, free);
1095+
sll_destroy(&data->waiting, destructor);
10771096
pthread_mutex_destroy(&data->claimed_mutex);
1078-
sll_destroy(&data->claimed, free);
1097+
sll_destroy(&data->claimed, destructor);
10791098
}
10801099

10811100
pthread_cond_destroy(&ctx->cv);

src/bf.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,8 @@ struct work *new_work_with_name(const char *prefix, const size_t prefix_len,
695695
return w;
696696
}
697697

698-
void free_work(struct work *w) {
698+
void free_work(void *p) {
699+
struct work *w = (struct work *) p;
699700
dir_dec(w->parent_dir);
700701
free(w);
701702
}

test/unit/googletest/descend.cpp.in

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ TEST(descend, builddir) {
100100
QPTPool_t *pool = QPTPool_init(1, nullptr);
101101
ASSERT_NE(pool, nullptr);
102102
EXPECT_EQ(QPTPool_start(pool), 0);
103+
QPTPool_set_destructor(pool, (void (*)(void *)) free_work);
103104

104105
struct descend_counters ctrs;
105106

@@ -130,7 +131,7 @@ TEST(descend, builddir) {
130131
work->level = 0;
131132
EXPECT_EQ(descend(pool, 0, nullptr, &in, work, 0, dir_rc, 0,
132133
[](QPTPool_t *, const size_t, void *data, void *) -> int {
133-
free(data);
134+
free_work(data);
134135
return 0;
135136
},
136137
nullptr, nullptr,
@@ -148,7 +149,7 @@ TEST(descend, builddir) {
148149
rewinddir(dir);
149150
EXPECT_EQ(descend(pool, 0, nullptr, &in, work, 0, dir_rc, 1,
150151
[](QPTPool_t *, const size_t, void *data, void *) -> int {
151-
free(data);
152+
free_work(data);
152153
return 0;
153154
},
154155
nullptr, nullptr,
@@ -166,7 +167,7 @@ TEST(descend, builddir) {
166167
in.subdir_limit = 1;
167168
EXPECT_EQ(descend(pool, 0, nullptr, &in, work, 0, dir_rc, 0,
168169
[](QPTPool_t *, const size_t, void *data, void *) -> int {
169-
free(data);
170+
free_work(data);
170171
return 0;
171172
},
172173
nullptr, nullptr,
@@ -181,9 +182,9 @@ TEST(descend, builddir) {
181182
QPTPool_stop(pool);
182183
QPTPool_destroy(pool);
183184

184-
EXPECT_EQ(closedir(dir), 0);
185185
free_work(work);
186186
dir_dec(dir_rc);
187+
187188
EXPECT_EQ(unlink(pipename), 0);
188189
EXPECT_EQ(unlink(linkname), 0);
189190
}
@@ -210,6 +211,7 @@ TEST(descend, swap) {
210211
ASSERT_NE(pool, nullptr);
211212
EXPECT_EQ(QPTPool_set_queue_limit(pool, 1), 0); // if the WAIT queue already has 1 item, swap out the new item
212213
EXPECT_EQ(QPTPool_start(pool), 0);
214+
QPTPool_set_destructor(pool, (void (*)(void *)) free_work);
213215

214216
// enqueue stuck thread
215217
pthread_mutex_lock(&args.mutex);
@@ -258,7 +260,7 @@ TEST(descend, swap) {
258260

259261
EXPECT_EQ(descend(pool, 0, nullptr, &in, work, 0, dir_rc, 0,
260262
[](QPTPool_t *, const size_t, void *data, void *) -> int {
261-
free(data);
263+
free_work(data);
262264
return 0;
263265
},
264266
nullptr, nullptr,
@@ -273,7 +275,6 @@ TEST(descend, swap) {
273275
QPTPool_stop(pool);
274276
QPTPool_destroy(pool);
275277

276-
EXPECT_EQ(closedir(dir), 0);
277278
free_work(work);
278279
dir_dec(dir_rc);
279280

0 commit comments

Comments
 (0)