@@ -213,8 +213,7 @@ class ThreadSafeFunction {
213
213
resource,
214
214
*v8::String::Utf8Value (env_->isolate, name)),
215
215
thread_count(thread_count_),
216
- is_closing(false ),
217
- is_closed(false ),
216
+ state(OPEN),
218
217
dispatch_state(kDispatchIdle ),
219
218
context(context_),
220
219
max_queue_size(max_queue_size_),
@@ -237,14 +236,14 @@ class ThreadSafeFunction {
237
236
node::Mutex::ScopedLock lock (this ->mutex );
238
237
239
238
while (queue.size () >= max_queue_size && max_queue_size > 0 &&
240
- !is_closing ) {
239
+ state == OPEN ) {
241
240
if (mode == napi_tsfn_nonblocking) {
242
241
return napi_queue_full;
243
242
}
244
243
cond->Wait (lock);
245
244
}
246
245
247
- if (!is_closing ) {
246
+ if (state == OPEN ) {
248
247
queue.push (data);
249
248
Send ();
250
249
return napi_ok;
@@ -253,7 +252,7 @@ class ThreadSafeFunction {
253
252
return napi_invalid_arg;
254
253
}
255
254
thread_count--;
256
- if (!is_closed || thread_count > 0 ) {
255
+ if (!(state == CLOSED && thread_count == 0 ) ) {
257
256
return napi_closing;
258
257
}
259
258
}
@@ -265,13 +264,13 @@ class ThreadSafeFunction {
265
264
napi_status Acquire () {
266
265
node::Mutex::ScopedLock lock (this ->mutex );
267
266
268
- if (is_closing) {
269
- return napi_closing;
270
- }
267
+ if (state == OPEN) {
268
+ thread_count++;
271
269
272
- thread_count++;
270
+ return napi_ok;
271
+ }
273
272
274
- return napi_ok ;
273
+ return napi_closing ;
275
274
}
276
275
277
276
napi_status Release (napi_threadsafe_function_release_mode mode) {
@@ -285,16 +284,18 @@ class ThreadSafeFunction {
285
284
thread_count--;
286
285
287
286
if (thread_count == 0 || mode == napi_tsfn_abort) {
288
- if (!is_closing) {
289
- is_closing = (mode == napi_tsfn_abort);
290
- if (is_closing && max_queue_size > 0 ) {
287
+ if (state == OPEN) {
288
+ if (mode == napi_tsfn_abort) {
289
+ state = CLOSING;
290
+ }
291
+ if (state == CLOSING && max_queue_size > 0 ) {
291
292
cond->Signal (lock);
292
293
}
293
294
Send ();
294
295
}
295
296
}
296
297
297
- if (!is_closed || thread_count > 0 ) {
298
+ if (!(state == CLOSED && thread_count == 0 ) ) {
298
299
return napi_ok;
299
300
}
300
301
}
@@ -372,8 +373,8 @@ class ThreadSafeFunction {
372
373
373
374
protected:
374
375
void ReleaseResources () {
375
- if (!is_closed ) {
376
- is_closed = true ;
376
+ if (state != CLOSED ) {
377
+ state = CLOSED ;
377
378
ref.Reset ();
378
379
node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
379
380
env->Unref ();
@@ -409,9 +410,7 @@ class ThreadSafeFunction {
409
410
410
411
{
411
412
node::Mutex::ScopedLock lock (this ->mutex );
412
- if (is_closing) {
413
- CloseHandlesAndMaybeDelete ();
414
- } else {
413
+ if (state == OPEN) {
415
414
size_t size = queue.size ();
416
415
if (size > 0 ) {
417
416
data = queue.front ();
@@ -425,7 +424,7 @@ class ThreadSafeFunction {
425
424
426
425
if (size == 0 ) {
427
426
if (thread_count == 0 ) {
428
- is_closing = true ;
427
+ state = CLOSING ;
429
428
if (max_queue_size > 0 ) {
430
429
cond->Signal (lock);
431
430
}
@@ -434,6 +433,8 @@ class ThreadSafeFunction {
434
433
} else {
435
434
has_more = true ;
436
435
}
436
+ } else {
437
+ CloseHandlesAndMaybeDelete ();
437
438
}
438
439
}
439
440
@@ -466,7 +467,7 @@ class ThreadSafeFunction {
466
467
v8::HandleScope scope (env->isolate );
467
468
if (set_closing) {
468
469
node::Mutex::ScopedLock lock (this ->mutex );
469
- is_closing = true ;
470
+ state = CLOSING ;
470
471
if (max_queue_size > 0 ) {
471
472
cond->Signal (lock);
472
473
}
@@ -538,6 +539,8 @@ class ThreadSafeFunction {
538
539
using node::AsyncResource::CallbackScope;
539
540
};
540
541
542
+ enum State : unsigned char { OPEN, CLOSING, CLOSED };
543
+
541
544
static const unsigned char kDispatchIdle = 0 ;
542
545
static const unsigned char kDispatchRunning = 1 << 0 ;
543
546
static const unsigned char kDispatchPending = 1 << 1 ;
@@ -552,8 +555,7 @@ class ThreadSafeFunction {
552
555
std::queue<void *> queue;
553
556
uv_async_t async;
554
557
size_t thread_count;
555
- bool is_closing;
556
- bool is_closed;
558
+ State state;
557
559
std::atomic_uchar dispatch_state;
558
560
559
561
// These are variables set once, upon creation, and then never again, which
0 commit comments