Skip to content

Commit 534ab3e

Browse files
committed
ENH: First pass free thread compatible fsenvents
1 parent 157479f commit 534ab3e

File tree

1 file changed

+58
-9
lines changed

1 file changed

+58
-9
lines changed

src/watchdog_fsevents.c

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <stdlib.h>
1616
#include <signal.h>
1717

18+
#include "pythoncapi_compat.h"
1819

1920
/* Compatibility; since fsevents won't set these on earlier macOS versions the properties will always be False */
2021
#if MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_13
@@ -258,6 +259,14 @@ PyObject *thread_to_run_loop = NULL;
258259
*/
259260
PyObject *watch_to_stream = NULL;
260261

262+
#ifdef Py_GIL_DISABLED
263+
static PyMutex g_watchdog_module_data_lock = {0};
264+
#define LOCK_GDATA_MUTEX PyMutex_Lock(&g_watchdog_module_data_lock)
265+
#define UNLOCK_GDATA_MUTEX PyMutex_Unlock(&g_watchdog_module_data_lock)
266+
#else
267+
#define LOCK_GDATA_MUTEX ((void)0)
268+
#define UNLOCK_GDATA_MUTEX ((void)0)
269+
#endif
261270

262271
/**
263272
* PyCapsule destructor.
@@ -520,9 +529,10 @@ watchdog_CFMutableArrayRef_from_PyStringList(PyObject *py_string_list)
520529
* CFString array list. */
521530
for (i = 0; i < string_list_size; ++i)
522531
{
523-
py_string = PyList_GetItem(py_string_list, i);
532+
py_string = PyList_GetItemRef(py_string_list, i);
524533
G_RETURN_NULL_IF_NULL(py_string);
525534
cf_string = PyString_AsUTF8EncodedCFStringRef(py_string);
535+
Py_DECREF(py_string);
526536
G_RETURN_NULL_IF_NULL(cf_string);
527537
CFArraySetValueAtIndex(array_of_cf_string, i, cf_string);
528538
CFRelease(cf_string);
@@ -622,16 +632,21 @@ watchdog_add_watch(PyObject *self, PyObject *args)
622632
&emitter_thread, &watch,
623633
&python_callback, &paths_to_watch));
624634

635+
/* Ensure only one thread tries to add the watch */
636+
LOCK_GDATA_MUTEX;
637+
625638
/* Watch must not already be scheduled. */
626639
if(PyDict_Contains(watch_to_stream, watch) == 1) {
627640
PyErr_Format(PyExc_RuntimeError, "Cannot add watch %S - it is already scheduled", watch);
641+
UNLOCK_GDATA_MUTEX;
628642
return NULL;
629643
}
630644

631645
/* Create an instance of the callback information structure. */
632646
stream_callback_info_ref = PyMem_New(StreamCallbackInfo, 1);
633647
if(stream_callback_info_ref == NULL) {
634648
PyErr_SetString(PyExc_SystemError, "Failed allocating stream callback info");
649+
UNLOCK_GDATA_MUTEX;
635650
return NULL;
636651
}
637652

@@ -643,13 +658,15 @@ watchdog_add_watch(PyObject *self, PyObject *args)
643658
if (!stream_ref) {
644659
PyMem_Free(stream_callback_info_ref);
645660
PyErr_SetString(PyExc_RuntimeError, "Failed creating fsevent stream");
661+
UNLOCK_GDATA_MUTEX;
646662
return NULL;
647663
}
648664
value = PyCapsule_New(stream_ref, NULL, watchdog_pycapsule_destructor);
649665
if (!value || !PyCapsule_IsValid(value, NULL)) {
650666
PyMem_Free(stream_callback_info_ref);
651667
FSEventStreamInvalidate(stream_ref);
652668
FSEventStreamRelease(stream_ref);
669+
UNLOCK_GDATA_MUTEX;
653670
return NULL;
654671
}
655672
PyDict_SetItem(watch_to_stream, watch, value);
@@ -666,6 +683,9 @@ watchdog_add_watch(PyObject *self, PyObject *args)
666683
run_loop_ref = PyCapsule_GetPointer(value, NULL);
667684
}
668685

686+
/* Finished working with global dictionaries */
687+
UNLOCK_GDATA_MUTEX;
688+
669689
/* Schedule the stream with the obtained runloop. */
670690
FSEventStreamScheduleWithRunLoop(stream_ref, run_loop_ref, kCFRunLoopDefaultMode);
671691

@@ -707,6 +727,7 @@ watchdog_read_events(PyObject *self, PyObject *args)
707727
CFRunLoopRef run_loop_ref = NULL;
708728
PyObject *emitter_thread = NULL;
709729
PyObject *value = NULL;
730+
int val_ind = 0;
710731

711732
G_RETURN_NULL_IF_NOT(PyArg_ParseTuple(args, "O:loop", &emitter_thread));
712733

@@ -716,27 +737,42 @@ watchdog_read_events(PyObject *self, PyObject *args)
716737
PyEval_InitThreads();
717738
#endif
718739

740+
LOCK_GDATA_MUTEX;
719741
/* Allocate information and store thread state. */
720-
value = PyDict_GetItem(thread_to_run_loop, emitter_thread);
742+
val_ind = PyDict_GetItemRef(thread_to_run_loop, emitter_thread, &value);
721743
if (G_IS_NULL(value))
722744
{
745+
if (PyErr_Occurred()) { UNLOCK_GDATA_MUTEX; return NULL; }
723746
run_loop_ref = CFRunLoopGetCurrent();
724747
value = PyCapsule_New(run_loop_ref, NULL, watchdog_pycapsule_destructor);
725-
PyDict_SetItem(thread_to_run_loop, emitter_thread, value);
748+
if (!value) { UNLOCK_GDATA_MUTEX; return NULL; }
749+
if (PyDict_SetItem(thread_to_run_loop, emitter_thread, value) < 0) {
750+
Py_DECREF(value);
751+
UNLOCK_GDATA_MUTEX;
752+
return NULL;
753+
}
726754
Py_INCREF(emitter_thread);
727-
Py_INCREF(value);
728755
}
756+
UNLOCK_GDATA_MUTEX;
729757

730758
/* No timeout, block until events. */
731759
Py_BEGIN_ALLOW_THREADS;
732760
CFRunLoopRun();
733761
Py_END_ALLOW_THREADS;
734762

735763
/* Clean up state information. */
736-
if (PyDict_DelItem(thread_to_run_loop, emitter_thread) == 0)
764+
LOCK_GDATA_MUTEX;
765+
int del_result = PyDict_DelItem(thread_to_run_loop, emitter_thread);
766+
UNLOCK_GDATA_MUTEX;
767+
if (del_result == 0)
737768
{
738769
Py_DECREF(emitter_thread);
739-
Py_INCREF(value);
770+
Py_DECREF(value);
771+
} else if (del_result < 0 && PyErr_Occurred()){
772+
Py_DECREF(value);
773+
return NULL;
774+
} else {
775+
Py_DECREF(value);
740776
}
741777

742778
G_RETURN_NULL_IF(PyErr_Occurred());
@@ -754,11 +790,15 @@ static PyObject *
754790
watchdog_flush_events(PyObject *self, PyObject *watch)
755791
{
756792
UNUSED(self);
757-
PyObject *value = PyDict_GetItem(watch_to_stream, watch);
793+
PyObject *value = NULL;
794+
LOCK_GDATA_MUTEX;
795+
PyDict_GetItemRef(watch_to_stream, watch, &value);
758796

759797
FSEventStreamRef stream_ref = PyCapsule_GetPointer(value, NULL);
760798

761799
FSEventStreamFlushSync(stream_ref);
800+
UNLOCK_GDATA_MUTEX;
801+
Py_DECREF(value);
762802

763803
Py_INCREF(Py_None);
764804
return Py_None;
@@ -773,19 +813,23 @@ static PyObject *
773813
watchdog_remove_watch(PyObject *self, PyObject *watch)
774814
{
775815
UNUSED(self);
776-
PyObject *streamref_capsule = PyDict_GetItem(watch_to_stream, watch);
816+
PyObject *streamref_capsule = NULL;
817+
LOCK_GDATA_MUTEX;
818+
PyDict_GetItemRef(watch_to_stream, watch, &streamref_capsule);
777819
if (!streamref_capsule) {
778820
// A watch might have been removed explicitly before, in which case we can simply early out.
779821
Py_RETURN_NONE;
780822
}
781823
PyDict_DelItem(watch_to_stream, watch);
824+
UNLOCK_GDATA_MUTEX;
782825

783826
FSEventStreamRef stream_ref = PyCapsule_GetPointer(streamref_capsule, NULL);
784827

785828
FSEventStreamStop(stream_ref);
786829
FSEventStreamInvalidate(stream_ref);
787830
FSEventStreamRelease(stream_ref);
788831

832+
Py_DECREF(streamref_capsule);
789833
Py_RETURN_NONE;
790834
}
791835

@@ -798,7 +842,11 @@ static PyObject *
798842
watchdog_stop(PyObject *self, PyObject *emitter_thread)
799843
{
800844
UNUSED(self);
801-
PyObject *value = PyDict_GetItem(thread_to_run_loop, emitter_thread);
845+
PyObject *value = NULL;
846+
int val_ind = 0;
847+
LOCK_GDATA_MUTEX;
848+
val_ind = PyDict_GetItemRef(thread_to_run_loop, emitter_thread, &value);
849+
UNLOCK_GDATA_MUTEX;
802850
if (G_IS_NULL(value)) {
803851
goto success;
804852
}
@@ -813,6 +861,7 @@ watchdog_stop(PyObject *self, PyObject *emitter_thread)
813861
}
814862

815863
success:
864+
Py_DECREF(value);
816865
Py_INCREF(Py_None);
817866
return Py_None;
818867
}

0 commit comments

Comments
 (0)